The last couple of months, here at SOUTHWORKS we have been involved in several Big Data projects. We have been building complete ETL and processing pipelines at large-scale that leverage among them some of the trending tools within the Big Data world: Apache Spark, and its close relative Databricks.
During one of our projects’ break from phase to phase, we took the opportunity to come up with this article and reference implementation to do our bit towards contributing with the big community of software & data engineers or anyone that wants to start walking their first steps into this huge world.
Before we get into it, I don’t want to skip mentioning the great group of people that brought this sample to life and contributed with feedback into putting this together: Lucas Nicolas Cabello, Santiago Calle, Pablo Costantini, Mauro Ezequiel Della Vecchia, Derek Nicolas Fernandez, Juan Manso & Alejandro Tizzoni.
We hope that you like it 😃
In this introductory article we are going to see initial steps you might take to start working with Apache Spark (Spark).
First, we are going to mention some introductory concepts, its main use-cases and advantages to finally introduce a complete sample application which makes use of Spark’s Dataframe API for processing large datasets in a distributed way.
For this application, we are performing some data analysis over the MovieLens dataset[¹], which consists of 25 million ratings given to 62,000 movies by 162,000 users, thus obtaining some statistics.
Such statistics include:
Best films by rating
Global ratings given by each user per category
Most rated films
Users with overall lowest ratings given
Genres by rating
Most commonly correlated genres
Lastly, we want to share with you some benchmarking we performed using different worker configurations and to talk about distributed processing.
Introduction to Spark
What Is Apache Spark?
Apache Spark is a fast, distributed data processing engine most commonly used for big data due to its capability of splitting data into chunks and distributing them across computational resources (workers).
Spark processing occurs completely in-memory (actually, if possible) avoiding the overhead of I/O calls. It incorporates libraries with APIs for several different use-cases. Mentioning some non-exhaustively:
Integrates perfectly with distributed data sources such as Amazon S3, No-SQL databases and raw data in Data Lakes.
What is a Dataframe?
A Dataframe is a data abstraction or a domain-specific language (DSL) for working with structured and semi-structured data, i.e. datasets with a schema. In few words, it’s a collection of rows with a schema that is the result of a structured query it describes.
It uses the immutable, in-memory, resilient, distributed, and parallel capabilities of RDDs, and applies a structure (called schema) to the data. The key difference between Dataframes and RDDs is that, when working with Dataframes, you have much more information about the structure of the data. Information such as names of the column, the data types, etc. are already known, which enables a whole bunch of optimizations which would not be possible in RDD. Thus, the manual parallelization performed over RDDs wouldn’t be necessary to be performed when working with Dataframes, since they are already optimized. Apart from that, Spark Dataframes API provides a much easier way to perform data analysis on potentially big amounts of data, by performing SQL-like operations in a dev-friendly manner.
As we previously mentioned, this sample application shows how to perform simple data analysis over a big dataset consisting on 25M+ entries of movie/rating pairs.
The scope of this application is to perform all needed operations for calculating a set of metrics in the most efficient way possible. To achieve the best performance, we are using the Dataframe API, which -as we’ve just mentioned- is heavily optimized for SQL-like operations over Dataframes.
Note: In this repository you can find the entire implementation of the sample application in Java, Python, Scala and .NET.
If you want to run the code locally please follow steps on the README.md file.
The Spark job reads two CSV files: movies.csv and ratings.csv. These files’ contents are at first instance loaded into two Dataframes, and then manipulated in order to obtain the proposed metrics, reducing original datasets into a single, small dataset for each metric to be calculated.
Best films by rating: Compute average rating for each film and list the 10 with the highest rate. Films to be considered should have been rated at least 5000 times.
Most rated films: Compute total times a movie has been rated and list the 10 most rated ones.
Global ratings by user, by category: Compute average score given by each user who rated at least 250 movies.
Users with overall lowest ratings given: Compute average rating given by each user and list the 10 users with the lowest average along with the average value itself. Users should only be considered if they rated at least 100 movies.
Genres by overall rating: Compute average score for each film, generalize to each of their genres and finally return existing genres ordered by global score.
Most commonly correlated genres: Since movies used to have multiple labels, for each genre return the label of the genre its most usually related with (e.g. thriller/mystery, romance/drama, etc.)
Entry point and configuration
The Spark job entry point is the main method. The parameters accepted by this method should be added at the end of the spark-submit command used for execution, as explained on README.md.
Run the Job
Within the run method, a SparkSession (entry point for all Spark functionalities) is created and then, we read the datasets into Dataframes. Once the Dataframes are ready, we proceed to perform the analysis, calculating the metrics defined above and printing the result.
To read data from CSV files, we created a private method called readCsvIntoDataframe. It wraps the SparkSession.read() method. As our datasets are comma-separated value files, the option that better suits our needs is csv. To see all available formats, check the‘What is a Dataframe?’ section above.
First of all, we want to give you a high-level overview of all operators used within the queries.
agg: Aggregates with or without grouping (i.e. over an entire Dataset)
groupBy: Used for untyped aggregates using DataFrames. Grouping is described using column expressions or column names
filter: Returns a Dataframe that only contains the entries that satisfy the condition
join: Correlates data from two (could be different or not) dataframes by a common correlation id, aggregating all columns from the joined table to the right
orderBy: Orders the Dataframe by its argument's value, in either descendant or ascendant order
limit: Takes the N higher value fields
withColumnRenamed: Rename column to give more expressiveness. May be needed to avoid repeated names in same dataframe joins
split: Creates a collection of items divided by a given character
explode: Separates previously split items into multiple rows
Best films by rating
Code implementation (Java code sample)
Best films by overall rating: The Shawshank RedemptionList obtained in: 17.24sAverage Score Title 4.413576 Shawshank Redemption, The (1994) 4.324336 Godfather, The (1972) 4.284353 Usual Suspects, The (1995) 4.261759 Godfather: Part II, The (1974) 4.254769 Seven Samurai (Shichinin no samurai) (1954) 4.247579 Schindler's List (1993) 4.243014 12 Angry Men (1957) 4.237948 Rear Window (1954) 4.228311 Fight Club (1999) 4.218662 One Flew Over the Cuckoo's Nest (1975)
Where is Pulp Fiction?
Genres by average rating
Code implementation (.NET code sample)
Genres by average rating Results obtained in 39,21sAverage Rating Genre 3,925728 Film-Noir 3,791466 War 3,705281 Documentary 3,685044 Crime 3,677185 Drama 3,670169 Mystery 3,614946 Animation 3,603712 IMAX 3,585755 Western 3,554716 Musical 3,542712 Romance 3,522964 Thriller 3,517445 Adventure 3,511589 Fantasy 3,478143 Sci-Fi 3,466592 Action 3,432507 Children 3,423993 Comedy 3,326379 (no genres listed) 3,293563 Horror
We all love Film-Noir movies, don’t we?
Most commonly correlated genres
Code implementation (Python code sample)
Genres combinations:List obtained in: 9.95sTimes related Genre Most related genre 2406 Action Drama 1652 Adventure Action 1015 Animation Children 1169 Children Comedy 4603 Comedy Drama 2996 Crime Drama 245 Documentary Drama 4654 Drama Romance 836 Fantasy Comedy 258 Film-Noir Drama 2181 Horror Thriller 100 IMAX Action 519 Musical Comedy 1466 Mystery Thriller 4654 Romance Drama 1185 Sci-Fi Action 3510 Thriller Drama 1348 War Drama 300 Western Action
Drama and romance? Wasn’t it obvious from the beginning?
We obtained six different metrics from the same data origin and benchmarked executions running with three different worker configurations:
* workers: uses as many threads as the number of processors available to the Java virtual machine (it uses Runtime.getRuntime.availableProcessors() to know the number). For our benchmarking tests, * will represent 8 cores.
1 worker (no parallel processing)
As we can appreciate in the table below, every calculation had its fastest execution time using the greatest amount of workers. Meanwhile, the slowest execution for every metric is the one with a single worker.
Metric Workers Elapsed time (min:sec) Best films by ratings * 00:17.244 Best films by ratings 2 00:27.314 Best films by rating 1 00:39.630 Most rated films * 00:14.066 Most rated films 2 00:19.650 Most rated films 1 00:28.065 Global ratings by user, by category * 00:58.422 Global ratings by user, by category 2 01:06.391 Global ratings by user, by category 1 01:35.265 Users with overall lowest ratings given * 00:17.027 Users with overall lowest ratings given 2 00:24.413 Users with overall lowest ratings given 1 00:40.309 Genres by overall rating * 00:15.859 Genres by overall rating 2 00:26.676 Genres by overall rating 1 00:41.311 Most commonly correlated genres * 00:09.958 Most commonly correlated genres 2 00:10.976 Most commonly correlated genres 1 00:17.365
Specs used for benchmarking
CPU: Intel i7–10510U 1.80GHz
4 cores (8 logical processors)
Throughout this article, we have been able to understand how to use Spark to perform data analysis using Java, Python, Scala and .NET, by performing simple SQL-like operations to obtain six different metrics from a big dataset, also correlating data from different sources.
We benchmarked the execution on regular computers with three different worker threads configurations:
Free # of workers (limited by # of logical processors)
Comparing those results, we were able to confirm that the six metrics ran faster with a free amount of workers. Thus, we can conclude that Spark takes advantage of parallel processing out-of-the-box, by using the Dataframe API, which is heavily optimized towards performance with low-to-none manual tweaks to be implemented. There is no recommended number of workers as it will depend on each specific case, most related with dataset size & operations performed to reach the expected results.
When would you want to use Apache Spark?
There’s no golden rule regarding when to use or not to use Spark, but let’s list some of the most common go-to scenarios:
When you need to perform intensive data analysis over huge datasets, where typical libs & tools wouldn’t suffice
When there’s a need to process (near) real-time streaming data, especially if a response needs to be yield
Performing complex data analysis based on patterns, such as clustering
Machine learning tasks in general (from training ML models to running predictions)
Finally, let’s mention two single no go-to scenarios:
Working with small datasets. Since there’s no gain on using Spark to process low amounts of data compared to traditional, low-weight libs, it would be a wiser choice to stick to one of those instead
Multi-user systems, with shared memory. This is not possible due to Spark architecture itself, as worker nodes don’t communicate with each other
Appendix A: Benchmarking results
Free workers (local [*])
Two workers (local )
Single worker (local)
[¹]: F. Maxwell Harper and Joseph A. Konstan. 2015. The MovieLens Datasets: History and Context. ACM Transactions on Interactive Intelligent Systems (TiiS) 5, 4: 19:1–19:19. https://doi.org/10.1145/2827872
Originally published by Mauro Krikorian for SOUTHWORKS on Medium 11 November 2020