Trong bài trước (Viết chương trình cơ bản), chúng ta đã thực hành viết một chương trình giả lập Kafka Producer và Consumer (SimpleProducer và SimpleConsumer). Chương trình SimpleProducer được dùng để gửi dữ liệu (số tự nhiên từ 0-9) tới topic ‘test’ của Kafka và chương trình SimpleConsumer đọc các dữ liệu từ topic ‘test’. Trong bài tutorial này, chúng ta sẽ thực hành viết một chương trình sử dụng Kafka để truyền dữ liệu từ Twiter bằng cách kết hợp code của SimpleProducer và code trong bài Phân tích bài đăng trên Twitter.
Trước hết ta tạo một Kafka Topic với tên TwitterData như trong hướng dẫn ở bài Cài đặt Apache Kafka với dòng lệnh sau:
1 |
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic TwitterData |
Tiếp theo, ta tạo một Java class có tên KafkaTwitter và thêm dependency của twitter4j như hướng dẫn trong bài Phân tích bài đăng trên Twitter. Ta cũng sẽ sử dụng lại hàm getTweets(String topic) để thu thập thông tin bài đăng về một chủ đề nhất định.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 |
public static ArrayList<String> getTweets(String topic) { //Config Twitter API key to access Twitter API //The String keys here are only examples and not valid. //You need to use your own keys ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey("OlaPFScciNp7C83UhyuqYKyOQ") .setOAuthConsumerSecret("yix4knWhTIuI1rb0ypagg1ERmtwNPqPXcQmzYOpu7koMOojP6k") .setOAuthAccessToken("1060702756430729216-avkRRZtWfBb7y29XUVo2aAvIBMVZyo") .setOAuthAccessTokenSecret("bXQ5a4iB75q1fohiHA3VfCEvQsZuFdKAErbpxZGatnPFz"); Twitter twitter = new TwitterFactory(cb.build()).getInstance(); ArrayList<String> tweetList = new ArrayList<String>(); try { //Set query for our topic Query query = new Query(topic); QueryResult result; do { //Run the query result = twitter.search(query); List<Status> tweets = result.getTweets(); //Add the result to the list of tweets for (Status tweet : tweets) { tweetList.add(tweet.getText()); } } while ((query = result.nextQuery()) != null); } catch (TwitterException te) { te.printStackTrace(); System.out.println("Failed to search tweets: " + te.getMessage()); } return tweetList; } |
Trong hàm main của KafkaTwitter class, ta tạo một Producer với các thuộc tính tương tự ở bài SimpleProducer và thiết lập Kafka Topic là TwitterData:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 |
//The Kafka Topic String topicName="TwitterData"; // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //Define a Kafka Producer Producer<String, String> producer = new KafkaProducer<String, String>(props); |
Tiếp theo ta sử dụng hàm getTweets(String topic) để thu thập các bài đăng liên quan đến “Natural Language Processing” như sau:
1 2 3 4 5 6 |
//Set the topic for Twitter String topic = "Natural Language Processing"; //Get tweets of the topicafka ArrayList<String> tweets; tweets= getTweets(topic); |
Cuối cùng, ta truyền dữ liệu của các bài đăng trên Twitter tới TwitterData topic như sau:
1 2 3 4 5 6 7 8 9 |
int i=1; for(String tweet : tweets) { //Send the tweet to a particualar Kafka Topic producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), tweet)); i++; } System.out.println("Message sent successfully"); producer.close(); |
Nhấn Ctrl+Shift+F10 để chạy chương trình trên IntelliJ. Sau khi chương trình chạy xong, sẽ hiện ra đoạn tin nhắn: “Message sent successfully” báo rằng chúng ta đã chạy thành công chương trình.
Để kiểm tra dữ liệu nhận được, ta mở một cửa sổ terminal và chạy Consumer với topic là TwitterData như hướng dẫn trong bài Viết chương trình cơ bản, ta được kết quả:
1 |
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic TwitterData --from-beginning |
Như vậy ta đã hoàn thành việc viết một chương trình sử dụng Kafka để truyền dữ liệu là các bài đăng liên quan đến “Natural Language Processing” trên Twitter. Các bạn có thể tham khảo full code của chương trình dưới đây.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 |
import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.ProducerRecord; import twitter4j.Query; import twitter4j.QueryResult; import twitter4j.Status; import twitter4j.Twitter; import twitter4j.TwitterException; import twitter4j.TwitterFactory; import twitter4j.conf.ConfigurationBuilder; public class KafkaTwitter { public static void main(String[] args) { //The Kafka Topic String topicName="TwitterData"; // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); //Define a Kafka Producer Producer<String, String> producer = new KafkaProducer<String, String>(props); //Set the topic for Twitter String topic = "Natural Language Processing"; //Get tweets of the topicafka ArrayList<String> tweets; tweets= getTweets(topic); int i=1; for(String tweet : tweets) { //Send the tweet to a particualar Kafka Topic producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), tweet)); i++; } System.out.println("Message sent successfully"); producer.close(); } /** * Get tweets from Twitter using twitter4j library * * @param topic : the topic we need to search on Twitter * @return : String list of Twitter related to our topic */ public static ArrayList<String> getTweets(String topic) { //Config Twitter API key to access Twitter API //The String keys here are only examples and not valid. //You need to use your own keys ConfigurationBuilder cb = new ConfigurationBuilder(); cb.setDebugEnabled(true) .setOAuthConsumerKey("OlaPFScciNp7C83UhyuqYKyOQ") .setOAuthConsumerSecret("yix4knWhTIuI1rb0ypagg1ERmtwNPqPXcQmzYOpu7koMOojP6k") .setOAuthAccessToken("1060702756430729216-avkRRZtWfBb7y29XUVo2aAvIBMVZyo") .setOAuthAccessTokenSecret("bXQ5a4iB75q1fohiHA3VfCEvQsZuFdKAErbpxZGatnPFz"); Twitter twitter = new TwitterFactory(cb.build()).getInstance(); ArrayList<String> tweetList = new ArrayList<String>(); try { //Set query for our topic Query query = new Query(topic); QueryResult result; do { //Run the query result = twitter.search(query); List<Status> tweets = result.getTweets(); //Add the result to the list of tweets for (Status tweet : tweets) { tweetList.add(tweet.getText()); } } while ((query = result.nextQuery()) != null); } catch (TwitterException te) { te.printStackTrace(); System.out.println("Failed to search tweets: " + te.getMessage()); } return tweetList; } } |