In this tutorial, we will use TwitterStreaming API to get Twitter’s tweets in real time and publish it to a Kafka topic. We then use Spark as a Consumer to read Twitter data from Kafka and analyse that data. To get more understanding about this tutorial, please refer to the previous two tutorials: Publishing Tweeter’s data to a Kafka topic and Integrating Kafka with Spark using Structured Streaming.
In this tutorial, we will use both Java and Scala to write the following two programs:
– KafkaTwitterStreaming.java: a Java program that is used to collect Twitter’s data in real time (Twitter Streaming Data) and publish it to a Kafka topic
– KafkaSparkSS.scala: a Scala program thats use Spark to read data from Kafka broker and analyse it.
Similar to the tutorial Publishing Tweeter’s data to a Kafka topic, we will use twitter4j library to collect Twitter’s data. However, for real-time data we need to use twitter4j-stream instead of twitter4j-core, so we add the following dependency to the pom.xml file:
1 2 3 4 5 6 |
<!-- https://mvnrepository.com/artifact/org.twitter4j/twitter4j-stream --> <dependency> <groupId>org.twitter4j</groupId> <artifactId>twitter4j-stream</artifactId> <version>4.0.7</version> </dependency> |
We config the properties of Kafka and connect the program to our Twitter’s account (more details on the tutorial Publishing Tweeter’s data to a Kafka topic) with the following two methods (we add .setJSONStoreEnabled(true) to publish data to Kafka in Json format):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 |
private static Properties getKafkaProp() { // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); return props; } private static Configuration getTwitterConf() { //Config Twitter API key to access Twitter API //The String keys here are only examples and not valid. //You need to use your own keys ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setJSONStoreEnabled(true) .setOAuthConsumerKey("Fljmu9Wp1YVNXhqfmDHDyEAz9") .setOAuthConsumerSecret("7CZDMiqhaeV7FOsUTYLgi9utt4eYEVaxqVuKZj5VGHLYqO0mLU") .setOAuthAccessToken("1060702756430729216-1L9lL05TdEbanhGDFETkKMknmbw70w") .setOAuthAccessTokenSecret("Qu41ydcAzTxClfVW4BMU6UjziS6Lv9Kkwz1zBXKh3JWrx"); return cb.build(); } |
Next, we write the getStreamTweets() method to collect Twitter data in real-time as follows:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 |
public static void getStreamTweets(final Producer<String, String> producer,String topicName) { TwitterStream twitterStream = new TwitterStreamFactory(getTwitterConf()).getInstance(); StatusListener listener = new StatusListener(){ public void onStatus(Status status) { //Status To JSON String String statusJson = DataObjectFactory.getRawJSON(status); ProducerRecord data = new ProducerRecord("TwitterStreaming", statusJson); System.out.println(statusJson); //Send data producer.send(data); } public void onException(Exception ex) { ex.printStackTrace(); } public void onDeletionNotice(StatusDeletionNotice statusDeletionNotice) { } public void onTrackLimitationNotice(int numberOfLimitedStatuses) { } public void onScrubGeo(long userId, long upToStatusId) { } public void onStallWarning(StallWarning warning) { } }; twitterStream.addListener(listener); twitterStream.filter(topicName); } |
The main method of our KafkaTwitterStreaming class is as follows (the Kafka topic is “Big Data”):
1 2 3 4 5 6 7 8 9 |
public static void main(String[] args) { //The Kafka Topic String topicName="Big Data"; //Define a Kafka Producer Producer<String, String> producer = new KafkaProducer<String, String>(getKafkaProp()); getStreamTweets(producer,topicName); } |
Run KafkaTwitterStreaming in IntelliJ , we get the following result (the returned result is in Json format, containing information about the tweets such as ID, content(in text format), posted date(created_at), …):
Open a terminal and start a Kafka Consummer, we get the same result as above (below is the fourth tweets with the ID: 1081476078546370560):
So, we have finished writing a java program (KafkaTwitterStreaming.java) to collect Twitter’s tweets in real time and publish it to Kafka (the number of tweets increases over time as the program collect data continously). The next step we need to do is writing a Scala program (KafkaSparkSS.scala) to get data from Kafka broker and perform some analysis on that data. To do so, we use Spark and connect it with Kafka as described on the tutorial Integrating Kafka with Spark using Structured Streaming.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
//Define a Spark session val spark=SparkSession.builder().appName("Spark Kafka Integration using Structured Streaming") .master("local") .getOrCreate() //Set the Log file level spark.sparkContext.setLogLevel("WARN") //Implicit methods available in Scala for converting common Scala objects into DataFrames import spark.implicits._ //Subscribe Spark to topic 'TwitterStreaming' val df=spark.readStream.format("kafka") .option("kafka.bootstrap.servers","localhost:9092") .option("subscribe","TwitterStreaming") .load() |
As mentioned above, the returned result is in Json format but Spark Structured Streaming can not extract Schema of Kafka data automatically. Therefore, we have to pre-define the Schema of the recieved data. For data with a simple Schema, we can define the Schema manually as described on the tutorial Statistical analysis with Spark DataFrame. However, the Schema of Twitter’s data is so complex that we can hardly define it by hand. Consequently, we first use Spark to extract the Scheme of a given sample data automatically and then use this schema for the real-time data. To do so, we copy a sample data received from running KafkaTwitterStreaming.java and save it as twitter.json. We then use the following code to extract the Schema:
1 2 3 |
//Extract the schema from a sample of Twitter Data val twitterData=spark.read.json("src/main/resources/data_source/twitter.json").toDF() val twitterDataScheme=twitterData.schema |
Next, we use the above Schema to read the stream data of Twitter as follows:
1 2 3 4 |
//Reading the streaming json data with its schema val twitterStreamData=df.selectExpr( "CAST(value AS STRING) as jsonData") .select(from_json($"jsonData",schema = twitterDataScheme).as("data")) .select("data.*") |
We display the result of Spark with the code below:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 |
// Display output (all columns) val query = twitterStreamData .writeStream .outputMode("append") .format("console") .start() // Display output (only few columns) val query2 = twitterStreamData.select("created_at","user.name","text","user.lang") .writeStream .outputMode("append") .format("console") .start() query.awaitTermination() query2.awaitTermination() |
Run KafkaSparkSS.scala, we get the following result:
So, we have finished writing a program that makes use of TwitterStreaming API, Kafka and Spark to collect and analyze Twitter’s data in real time. You can download the full code of this tutorial on the Github of ITechSeeker.