In this post, we will implement the third part of our system, which is the Speed Layer of Lambda architecture. We will use Spark Structured Streaming to read data from Kafka’s “TwitterStreaming” topic and analyze this data in real time. This process is carried out with the following three steps :

– Connect Spark to Kafka: this is done in the same way as described in the tutorial Real-time Twitter Analysis

– Analyze the hashtag: this is done in the same way as Batch Processing in the Batch Layer . Here we use restartQuery() to make sure that Streaming Queries will be restarted in each Batch Processing run.

– Store the processing results in Cassandra: currently Spark Structured Streaming only supports some types of output sinks such as File sink, Kafka sink, Foreach sink, Console sink, but not yet support Cassandra sink (see more at here). To save Streaming DataFrame/Dataset to Cassandra, we can use DSE version 6.0 or later, or we can even write a customized Cassandra Sink using Foreach Sink as explained here. However, the simplest way to save Spark Streaming DataFrame to Cassandra is using ForeachBatch. ForeachBatch() allow users to specify a function that is executed on the output of every micro-batch of a streaming query with the following syntax:

In our project, the function need to be executed is the write() function that saves a  normal DataFrame (not Streaming DataFrame) to Cassandra. The code for saving Streaming DataFrame to Cassandra using ForeachBatch is as follows:

So, the RealtimeProcessingActor performing real-time analysis in Speed layer can be written as follows:

Run the code, we get the following result:

Initially, the hashtag_realtimeview table doesn’t contain any data. However, the processing results of Speed Layer are gradually inserted into the table and when we restart the query, all existing data of the table are deleted to write the result of the new query.

Note:

– In the above code, the use of Runner with Akka Scheduler (restart streaming query every 3 minutes) is to help readers re-check the result of running streaming query and restarting this query. However, after completing the project, Runner will be removed as we will send StartProcessing message directly from Batch Processor to restart the streaming query and hashtag_realtimeview table each time we start a new Batch Processing

– Twitter4j Streaming helps us collect tweets in real-time (tweets posted recently). However, in many cases when we crawl data from websites, the data might be collected much latter than the posted date. Therefore, we need to use filter()  function to only select the recent posts.

March 17, 2019
ITechSeeker