Scalable Real Time Data Analysis with Apache Spark Structured Streaming

In a previous article we discussed on how to perform “Movie Data Statistics with Apache Spark” showing you the way to leverage Spark to perform Batch Processing.

Today, and as we (at SOUTHWORKS) have continued working on Big Data and Distributed Task related projects, I want to show you a sample of how you can mix and match Spark Streaming with Batch Processing capabilities to perform real-time analytics of chunks of data flowing through the system.

With the rise of requirements for processing big datasets and doing real-time analytics you need to orchestrate the right solution and, by grabbing and combining both flavors you get from Spark, you bring to life a Lambda Architecture capable of handling those massive quantities of data by taking advantage of both batch and stream-processing methods.

Before we move on, I want to mention the group of people that collaborated on creating this article, the reference implementation linked to it and/or provided valuable feedback to allow me sharing it with you: Lucas Nicolas Cabello, Santiago Calle, Pablo Costantini, Mauro Ezequiel Della Vecchia, Derek Nicolas Fernandez & Alejandro Tizzoni.

We all hope you enjoy it 😃

Overview

This time we are going to see the underlying architecture you need to be able to perform near real-time video analysis, supporting both VOD and live video, using Spark. The idea of the whole thing is to show you how to handle complex workloads in a scalable and distributed way.

The solution admits, as input, video stream from different sources such as video files, webcams, or security cameras feed. Regarding the analysis processes, it has two pre-trained neural networks and an unsupervised learning model.

We’ll delve deeper into the architecture and each of the models later on.

Apache Spark Structured streaming

As streaming tool we have chosen to use Apache Spark Structured Streaming functionalities. We are not going to explain how Spark works or what is a Dataframe, but you can check that out in our previous article .

Spark Structured Streaming is a bit more complicated to understand but as simple to develop. It is a stream processing engine built on top of the Spark SQL engine, which allows us to express streaming computation in the same way that we would express a batch computation on static data. The Structured Streaming engine executes a series of small batch jobs (often referred to as micro-batches) upon data streams and uses Write Ahead Logs and checkpointing to provide exactly-once (guarantee that each record will be processed once and only once even when there is a failure) fault-tolerance guarantees

In short, as said in the official documentation : ‘Structured Streaming provides fast, scalable, fault-tolerant, end-to-end exactly-once stream processing without the user having to reason about streaming’.

Context & solution

Scenario

As we previously mentioned, the scenario targets a pipeline for real-time video analysis with high degrees of parallelization — video usage here is just to get a constant flow of streamed data (as it is simply being treated as that). For the sake of it, video sent from different sources gets processed by several state-of-the-art deep learning models, performing real-time analysis over chunks of data (frames).

For this purpose, we carefully selected three models (more details below) for the following use-cases:

  • Emotion recognition
  • Object detection
  • Color palette extraction

Disclaimer: Although we detailed the AI models of choice below— the focus of this article is not on the models nor the logic chosen to process the data being ingested, but how to leverage an ingestion/analysis real-time pipeline that does the job.

Goal

The objective is to achieve a highly scalable live analysis & inference on extracted frames from the input video.

To keep track of how does our solution perform, we defined some metrics:

  • Number of frames extracted from video
  • Video ingestion time (frames’ extraction from source)
  • Frame resizing time
  • Batch inference time

The solution

The solution heavily relies on Spark capabilities for distributed processing along with Kafka , since it’s the go-to option for real-time streaming data architectures while allowing delivering messages to multiple subscribers.

In addition, Cassandra is being used as the persistence layer, which is a high velocity No-SQL database which can handle both structured and non-structured data.

  • The API is the entry point for our solution, a user can make a request to start a video analysis, query the analysis status, its results, or metrics.
  • After a user starts a new video analysis the input data is stored in Cassandra and a message is queued in the videos Kafka topic.
  • Then, this message is consumed by the Video ingestion job, which is in charge of extracting frames from the supplied video and also queueing these frames in the frames Kafka topic. The rest of the Spark jobs will consume this frames topic.
  • On one side, there is a Video analysis job, which will use Deep learning models to predict each and every extracted frame and, finally, persist these inferences in Cassandra.
  • And on the other side, there is a Frame storage job, which will take care of storing each extracted frame in the filesystem, to be used later on by the API to show the results.

Note: In this repository you can find the entire reference implementation.

If you want to run the code locally please follow steps on the README file.

Solution Deployment

It is done by leveraging a docker environment, and a container has been created for each component of the solution.

Solution Data Models

Kafka

There are two Kafka topics for queueing messages, and we push only the minimum needed information for each step into the topics:

  • The first one in the lifecycle is the videos topic. It contains video id, URL to the source and the list of inference models to run.
  • The second topic is the frames one. It contains video id, frame number, the image encoded in base64 (buffer), the frame timestamp within the video, and a list of the models to run.

Cassandra

We defined a keyspace named ‘videoanalysis’. In it, we defined three tables to store all the data generated in the analysis. The first one is videos. It contains video_id, which is the primary key; video_url; status; and error, which is a user-friendly error message to display if any error occurs. The second one is metrics. It contains metric_id, which is the primary key; video_id, which is videos’ table foreign key; metric_name; and metric_value. The third one is analysis. It contains video_id, which is videos’ table foreign key; frame_number; video_timestamp; inference model, which alongside video_id and frame number compose the primary key; inference_result; and inference_time.

AI Models

We decided to go with pre-trained models for our video analysis tool for two main reasons:

  • Pre-trained models are really easy to integrate, even with the modifications that anyone sure needs to perform so that the model suits their needs.
  • Pre-trained models also produce solid results between multiple runs, meaning that we can get the same or better results each time we run them.

But we did not only use pre-trained models, as we have also used an unsupervised learning algorithm to accomplish one of the functionalities. In this case we use unsupervised learning for cluster analysis, which groups or segments datasets with shared or similar attributes in order to estimate algorithmic relationships.

YOLOv3

YOLOv3 is a real-time object detection system. This model applies a neural network to the full image that divides it in regions and after doing so, it predicts bounding boxes and probabilities for each of the regions it produced. Bounding boxes are then weighted by the predicted probabilities of the region it corresponds to.

An easier explanation to it: if we input a video frame, YOLO calculates the relative coordinates of the objects it recognizes and assigns a probability to this prediction. Anyone can then choose what probability is considered acceptable for the result to be correct and keep or ignore gotten results.

For a more detailed view, consider visiting the official webpage .

Joy Detection

To detect joy we had to use three models, not just one:

  • Face detection model
  • Gender detection model
  • Emotion detection model

First, the face detection model recognizes faces within a grayscale version of the original image, and returns their coordinates.

The original image is put away, and two images are generated for each face detected:

  • 64x64 RGB image
  • 48x48 grayscale image

The Gender detection model and the Emotion detection model don’t have any inter-dependency and can be run on any order. The Gender detection model runs over the RGB images, meanwhile the Emotion detection model runs over grayscale images.

Finally, a single output is generated, containing for each face detected:

  • Face ID
  • Coordinates
  • Gender
  • Emotion

For further information, make sure to check official models’ repositories: Face detection , Gender and emotion detection

Color extraction with K-means

To extract the color palette of the multiple frames that are streamed to our solution, we decided to choose an unsupervised learning algorithm for clustering: K-Means .

Let’s say you have some vector X that contains n data points. Running our algorithm consists of the following steps:

  • Take random K points (called centroids) from X
  • Assign every point to the closest centroid. The newly formed bunch of points is called cluster.
  • For each cluster, find new centroid by calculating a new center from the points
  • Repeat steps 2–3 until centroids stop changing

This can be applied to gather the different points of a frame by color, and then extract their RGB code to obtain the color palette of the clusters, and therefore the color palette of the frame itself.

Modules

API

The API represents the entry-point for this reference implementation. Its endpoints allow us submitting videos for analysis, query both status and results, in addition to view specific frames alongside the result of their analysis.

This API is powered by Flask .

  • /analysis/start: Submits the path to a video and start the analysis lifecycle. Returns path to analysis status endpoint with video id.
  • /analysis/<video_id>: Given a video id, returns the status of the analysis.
  • /analysis/<video_id>/results: Given a video id, returns the analysis results.
  • /analysis/<video_id>/frame/<frame_number>: Given a video id, and a frame number, returns the stored frame with the bounding boxes drawn.
  • /analysis/<video_id>/frame/<frame_number>/results: Given a video id, and a frame number, returns a JSON array with the analysis results.
  • /analysis/<video_id>/metrics: Given a video id, returns the metrics obtained to measure the performance
  • /analysis/rtmp-endpoint: Requests an RTMP endpoint for submitting a video stream.
  • /analysis/videos: Returns a JSON array containing all the completed videos.

RTMP Streaming

The RTMP endpoint allows submitting live video for analysis. To start streaming, a request must be sent to /analysis/rtmp-endpoint, indicating the desired models to be executed. The response will contain the address where the bitstream must be pushed.

Client Application

The ReactJS provides a graphical interface to make the user experience simpler.

The app consists of 3 tabs:

  1. The landing tab, which requests the user to provide a video URL, and a video name and trigger the analysis pipeline, also showing status and results on pipeline’s progress. Optionally the user can provide the step between frames being analyzed. If not provided, every frame will be analyzed.
  2. The second tab being the webcam tab, where the user can record video feedback from a webcam, or a camera connected to the PC where the solution is running and use that footage. The user will also be requested for a video name. Optionally a frame step can be added. If no provided, every frame will be analyzed.
  3. The third tab, namely History tab, where the user can browse previously submitted videos and see the analyzed frames and analysis metrics.

This app can be found within this repository folder along with the rest of the solution. Instructions on how to run it can be found on this README file.

Conclusion

This is not a straightforward nor simple solution, nevertheless it helps us accomplish what I wanted to show you on how to leverage Spark Streaming for real-time analytics at scale. The reference implementation focus on showing Spark’s parallelization power to handle constant chunks of data concurrently while applying different processes to each one and building, piece by piece, an interesting result.

While building the scenario the team decided to use pre-trained models and brought AI into the picture to make it more compelling, although for the sake of it the models themselves (ie. processes applied to the data) are not so relevant.

As final words, we see that the solution performs quite well even though there is still room for improvement, but at last pipeline’s performance will finally depend on the logic applied to your data. With time and effort even the models could be modified to work seemingly across the pipeline, but to demonstrate that Spark Structured Streaming can be combined with Machine Learning, it is far more than enough.

Originally published by Mauro Krikorian for SOUTHWORKS on Medium 26 January 2021