As briefly explained in Overview of the Project, we will use Twitter4j library to collect Twitter’s stream data, publish this data to Kafka, then save it in Cassandra database. The details about how to publish Twitter’s data to Kafka can be found in the tutorial Publishing Tweeter’s data to a Kafka topic or Real-time Twitter Analysis while the details of how to connect Kafka with Cassandra can be found in the tutorial Connecting Kafka to Cassandra Sink (The dependencies of this project can be found at here. Note: in this project we will use Scala 2.11 as Spark Cassandra Connector hasn’t supported Scala 2.12 as already mentioned in the tutorial Connecting Spark to Cassandra).

In the tutorials mentioned above, we mainly run Kafka server, Zookeeper and Cassandra by running commands on a terminal window. However, in this project we will write a code to run these commands directly from Scala with Runtime.getRuntime().exec(). We will use Akka Actor to create 4 actors that run 4 different commands in foreground mode instead of background mode. This can be done by using the EnvRunner class as follows (more details about Akka Actor can be found in the tutorial Sending and receiving messages between Akka Actors)

Run the EnvRunner.start() function, we get the the same result as running the command using terminal:

Note: before running EnvRunner.start(), we need to config the properties of Kafka Cassandra Sink in kafka/config/ as follows (make sure the name of Kafka topic and Cassandra database are the same as those specified in the CassandraDB and KafkaTwitterStreaming described in the next section):

After starting Kafka, Cassandra and Kafka Cassandra Sink successfully, we will create Cassandra tables to store data as follows:

– Create keyspace with the name lambda_architecture

– Create master_dataset table to store the data sent from Kafka to Cassandra

– Create hashtag_batchView table to store the result of Batch Processing in the Batch Layer

– Create hashtag_realtimeView table to store the result of Real time Processing in the Speed Layer

These steps are carried on by CassandraDB with the following code (see more at Interacting with Cassandra using its API)

After running CassandraDB.runDB(), we use bin/cqlsh to re-check the creation of Cassandra tables as follows:

Thus, we have finished connecting Kafka to Cassandra with and CassandraDB.runDB(). The next step we need to do is to collect Twitter’s data and send it to Kafka using KafkaTwitterStreaming with the code as follows (see more at Real-time Twitter Analysis):

Here we use AppConfiguration, which is a Scala object in the main_package, to retrieve the configurations specified on the  /resources/application.conf file (Eg: Kafka topic, kafka keywords, Twitter keys …).

By default, ConfigFactory.load() will find information on the application.conf file. So we need to create this file in the /resources directory and set the configurations as follows:

In this project, we only collect English tweets and for each tweet, we only extract the following 7 information: tweet_id, user_id, user_name, user_loc, content, hashtag and created_date.

Run the function, we get the following result:

We re-check the insertion of Twitter’s data into Cassandra using cqlsh as follows:

So, we have finished implementing the first part of our system (Data collection and storage). This part is responsible for collecting Twitter’s data, publishing this data to Kafka’s “TwitterStreaming” topic and saving it in the master_dataset table of Cassandra. In the next post, we will implement the Batch layer of Lambda Architecture to analyze the data stored in the master_dataset table with Batch Processing

March 15, 2019