Movie Data Statistics with Apache Spark

The last couple of months, here at SOUTHWORKS we have been involved in several Big Data projects. We have been building complete ETL and processing pipelines at large-scale that leverage among them some of the trending tools within the Big Data world: Apache Spark, and its close relative Databricks.

During one of our projects’ break from phase to phase, we took the opportunity to come up with this article and reference implementation to do our bit towards contributing with the big community of software & data engineers or anyone that wants to start walking their first steps into this huge world.

Before we get into it, I don’t want to skip mentioning the great group of people that brought this sample to life and contributed with feedback into putting this together: Lucas Nicolas Cabello, Santiago Calle, Pablo Costantini, Mauro Ezequiel Della Vecchia, Derek Nicolas Fernandez, Juan Manso & Alejandro Tizzoni.

We hope that you like it 😃

Overview

In this introductory article we are going to see initial steps you might take to start working with Apache Spark (Spark).

First, we are going to mention some introductory concepts, its main use-cases and advantages to finally introduce a complete sample application which makes use of Spark’s Dataframe API for processing large datasets in a distributed way.

For this application, we are performing some data analysis over the MovieLens dataset[¹], which consists of 25 million ratings given to 62,000 movies by 162,000 users, thus obtaining some statistics.

Such statistics include:

  • Best films by rating
  • Global ratings given by each user per category
  • Most rated films
  • Users with overall lowest ratings given
  • Genres by rating
  • Most commonly correlated genres

Lastly, we want to share with you some benchmarking we performed using different worker configurations and to talk about distributed processing.

Introduction to Spark

What Is Apache Spark?

Apache Spark is a fast, distributed data processing engine most commonly used for big data due to its capability of splitting data into chunks and distributing them across computational resources (workers).

Spark Architecture

Spark processing occurs completely in-memory (actually, if possible) avoiding the overhead of I/O calls. It incorporates libraries with APIs for several different use-cases. Mentioning some non-exhaustively:

  • Machine learning (MLlib)
  • Graph processing (GraphX)
  • Stream processing (for interacting with real-time data) - Spark Structured Streaming & Spark Streaming (RDDs)
  • SQL for interactive queries (Spark SQL)

Spark APIs

Some of the main advantages that Spark provides are:

  • High speed for data querying, transformation and analysis with large datasets
  • In-memory processing
  • Iterative algorithms optimized by using estimation chain
  • Ease of use
  • Readability
  • Easily maintainable
  • Supports multiple languages (Scala, Python, Java, R, and .NET using the .NET for Apache Spark)
  • Integrates perfectly with distributed data sources such as Amazon S3, No-SQL databases and raw data in Data Lakes.

What is a Dataframe?

A Dataframe is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets with a schema. In few words, it’s a collection of rows with a schema that is the result of a structured query it describes.

It uses the immutable, in-memory, resilient, distributed, and parallel capabilities of RDDs, and applies a structure (called schema) to the data. The key difference between Dataframes and RDDs is that, when working with Dataframes, you have much more information about the structure of the data. Information such as names of the column, the data types, etc. are already known, which enables a whole bunch of optimizations which would not be possible in RDD. Thus, the manual parallelization performed over RDDs wouldn’t be necessary to be performed when working with Dataframes, since they are already optimized. Apart from that, Spark Dataframes API provides a much easier way to perform data analysis on potentially big amounts of data, by performing SQL-like operations in a dev-friendly manner.

Our scenario

As we previously mentioned, this sample application shows how to perform simple data analysis over a big dataset consisting on 25M+ entries of movie/rating pairs.

The scope of this application is to perform all needed operations for calculating a set of metrics in the most efficient way possible. To achieve the best performance, we are using the Dataframe API, which -as we’ve just mentioned- is heavily optimized for SQL-like operations over Dataframes.

Note: In this repository you can find the entire implementation of the sample application in Java, Python, Scala and .NET.

If you want to run the code locally please follow steps on the README.md file.

Dataset structure

The Spark job reads two CSV files: movies.csv and ratings.csv. These files’ contents are at first instance loaded into two Dataframes, and then manipulated in order to obtain the proposed metrics, reducing original datasets into a single, small dataset for each metric to be calculated.

Expected results

  • Best films by rating: Compute average rating for each film and list the 10 with the highest rate. Films to be considered should have been rated at least 5000 times.
  • Most rated films: Compute total times a movie has been rated and list the 10 most rated ones.
  • Global ratings by user, by category: Compute average score given by each user who rated at least 250 movies.
  • Users with overall lowest ratings given: Compute average rating given by each user and list the 10 users with the lowest average along with the average value itself. Users should only be considered if they rated at least 100 movies.
  • Genres by overall rating: Compute average score for each film, generalize to each of their genres and finally return existing genres ordered by global score.
  • Most commonly correlated genres: Since movies used to have multiple labels, for each genre return the label of the genre its most usually related with (e.g. thriller/mystery, romance/drama, etc.)

Entry point and configuration

The Spark job entry point is the main method. The parameters accepted by this method should be added at the end of the spark-submit command used for execution, as explained on README.md.

Accepted parameters:

  • logLevel=<INFO|ERROR|WARN|FINE>
  • metrics=<1-6>[,1-6]\*

Run the Job

Within the run method, a SparkSession (entry point for all Spark functionalities) is created and then, we read the datasets into Dataframes. Once the Dataframes are ready, we proceed to perform the analysis, calculating the metrics defined above and printing the result.

Java

Follow the links to find code examples in Scala, Python, or .NET

Read dataset from CSV

To read data from CSV files, we created a private method called readCsvIntoDataframe. It wraps the SparkSession.read() method. As our datasets are comma-separated value files, the option that better suits our needs is csv. To see all available formats, check the‘What is a Dataframe?’ section above.

Java

Follow the links to find code examples in Scala, Python, or .NET

Metrics calculation

First of all, we want to give you a high-level overview of all operators used within the queries.

Operators used

  • agg: Aggregates with or without grouping (i.e. over an entire Dataset)
  • groupBy: Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names
  • filter: Returns a Dataframe that only contains the entries that satisfy the condition
  • join: Correlates data from two (could be different or not) dataframes by a common correlation id, aggregating all columns from the joined table to the right
  • orderBy: Orders the Dataframe by its argument's value, in either descendant or ascendant order
  • limit: Takes the N higher value fields
  • withColumnRenamed: Rename column to give more expressiveness. May be needed to avoid repeated names in same dataframe joins
  • split: Creates a collection of items divided by a given character
  • explode: Separates previously split items into multiple rows

Best films by rating

Execution plan

Code implementation (Java code sample)

Results

Winner:

Best films by overall rating: The Shawshank RedemptionList obtained in: 17.24sAverage Score Title
4.413576 Shawshank Redemption, The (1994)
4.324336 Godfather, The (1972)
4.284353 Usual Suspects, The (1995)
4.261759 Godfather: Part II, The (1974)
4.254769 Seven Samurai (Shichinin no samurai) (1954)
4.247579 Schindler's List (1993)
4.243014 12 Angry Men (1957)
4.237948 Rear Window (1954)
4.228311 Fight Club (1999)
4.218662 One Flew Over the Cuckoo's Nest (1975)

Where is Pulp Fiction?

Genres by average rating

Execution plan

Code implementation (.NET code sample)

Results

Winner: Film-Noir

Genres by average rating
Results obtained in 39,21sAverage Rating Genre
3,925728 Film-Noir
3,791466 War
3,705281 Documentary
3,685044 Crime
3,677185 Drama
3,670169 Mystery
3,614946 Animation
3,603712 IMAX
3,585755 Western
3,554716 Musical
3,542712 Romance
3,522964 Thriller
3,517445 Adventure
3,511589 Fantasy
3,478143 Sci-Fi
3,466592 Action
3,432507 Children
3,423993 Comedy
3,326379 (no genres listed)
3,293563 Horror

We all love Film-Noir movies, don’t we?

Most commonly correlated genres

Execution plan

Code implementation (Python code sample)

Results

Winner: Drama/Romance

Genres combinations:List obtained in: 9.95sTimes related Genre Most related genre
2406 Action Drama
1652 Adventure Action
1015 Animation Children
1169 Children Comedy
4603 Comedy Drama
2996 Crime Drama
245 Documentary Drama
4654 Drama Romance
836 Fantasy Comedy
258 Film-Noir Drama
2181 Horror Thriller
100 IMAX Action
519 Musical Comedy
1466 Mystery Thriller
4654 Romance Drama
1185 Sci-Fi Action
3510 Thriller Drama
1348 War Drama
300 Western Action

Drama and romance? Wasn’t it obvious from the beginning?

Benchmarking

We obtained six different metrics from the same data origin and benchmarked executions running with three different worker configurations:

  • * workers: uses as many threads as the number of processors available to the Java virtual machine (it uses Runtime.getRuntime.availableProcessors() to know the number). For our benchmarking tests, * will represent 8 cores.
  • 2 workers
  • 1 worker (no parallel processing)

As we can appreciate in the table below, every calculation had its fastest execution time using the greatest amount of workers. Meanwhile, the slowest execution for every metric is the one with a single worker.

Metric                             Workers    Elapsed time (min:sec)
Best films by ratings                    *                 00:17.244
Best films by ratings                    2                 00:27.314
Best films by rating                     1                 00:39.630
Most rated films                         *                 00:14.066
Most rated films                         2                 00:19.650
Most rated films                         1                 00:28.065
Global ratings by user, by category      *                 00:58.422
Global ratings by user, by category      2                 01:06.391
Global ratings by user, by category      1                 01:35.265
Users with overall lowest ratings given  *                 00:17.027
Users with overall lowest ratings given  2                 00:24.413
Users with overall lowest ratings given  1                 00:40.309
Genres by overall rating                 *                 00:15.859
Genres by overall rating                 2                 00:26.676
Genres by overall rating                 1                 00:41.311
Most commonly correlated genres          *                 00:09.958
Most commonly correlated genres          2                 00:10.976
Most commonly correlated genres          1                 00:17.365

Specs used for benchmarking

  • CPU: Intel i7–10510U 1.80GHz
  • 4 cores (8 logical processors)
  • RAM: 16GB

Conclusion

Throughout this article, we have been able to understand how to use Spark to perform data analysis using Java, Python, Scala and .NET, by performing simple SQL-like operations to obtain six different metrics from a big dataset, also correlating data from different sources.

We benchmarked the execution on regular computers with three different worker threads configurations:

  • Free # of workers (limited by # of logical processors)
  • Two workers
  • Single worker

Comparing those results, we were able to confirm that the six metrics ran faster with a free amount of workers. Thus, we can conclude that Spark takes advantage of parallel processing out-of-the-box, by using the Dataframe API, which is heavily optimized towards performance with low-to-none manual tweaks to be implemented. There is no recommended number of workers as it will depend on each specific case, most related with dataset size & operations performed to reach the expected results.

When would you want to use Apache Spark?

There’s no golden rule regarding when to use or not to use Spark, but let’s list some of the most common go-to scenarios:

  • When you need to perform intensive data analysis over huge datasets, where typical libs & tools wouldn’t suffice
  • When there’s a need to process (near) real-time streaming data, especially if a response needs to be yield
  • Performing complex data analysis based on patterns, such as clustering
  • Machine learning tasks in general (from training ML models to running predictions)

Finally, let’s mention two single no go-to scenarios:

  • Working with small datasets. Since there’s no gain on using Spark to process low amounts of data compared to traditional, low-weight libs, it would be a wiser choice to stick to one of those instead
  • Multi-user systems, with shared memory. This is not possible due to Spark architecture itself, as worker nodes don’t communicate with each other

Appendix A: Benchmarking results

Free workers (local [*])

Two workers (local [2])

Single worker (local[1])

References

[¹]: F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19. https://doi.org/10.1145/2827872

Originally published by Mauro Krikorian for SOUTHWORKS on Medium 11 November 2020