Trong bài tutorial này, chúng ta sẽ cùng tìm hiểu cách thức tích hợp Kafka với Spark và viết một chương trình sử dụng Spark để lấy dữ liệu từ một Kafka topic rồi tiến hành một số thao tác cơ bản để xử lý dữ liệu đó. Thông thường có 2 cách để tích hợp Kafka với Spark là sử dụng DStream (API cũ của Spark) hoặc sử dụng Structured Streaming (API mới, xem thêm tại bài tutorial về Structured Streaming). Trong bài tutorial này, chúng ta sẽ cùng tìm hiểu cách thức tích hợp thứ nhất sử dụng DStream còn cách thức tích hợp sử dụng Structured Streaming sẽ được nghiên cứu trong bài tutorial tiếp theo (Tích hợp Kafka với Spark sử dụng Structured Streaming).
Trước hết, để có thể tích hợp Kafka và Spark ta cần phải thêm dependency sau vào file pom.xml ( Ở đây jason-databind dependency được thêm vào để tránh lỗi “jackson.databind.JsonMappingException: Incompatible Jackson version: 2.9.6” khi chạy chương trình sau này).
1 2 3 4 5 6 7 8 9 10 11 12 13 |
<!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-kafka-0-10 --> <dependency> <groupId>org.apache.spark</groupId> <artifactId>spark-streaming-kafka-0-10_2.12</artifactId> <version>2.4.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind --> <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.6.7</version> </dependency> |
Tiếp theo, ta tạo một Java class với tên KafkaSpark và tạo một Streaming Context với batch interval là 10 giây như sau:
1 2 3 4 5 6 |
//Set log level to warn Logger.getLogger("org").setLevel(Level.OFF); // Create a local StreamingContext and batch interval of 10 second SparkConf conf = new SparkConf().setMaster("local").setAppName("Kafka Spark Integration"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); |
Ta thiết lập các thông số của Kafka như sau (các bạn có thế tham khảo thêm các thông số này tại phần Consumer Configs trên trang chủ của Kafka)
1 2 3 4 5 6 7 8 9 |
//Define Kafka parameter Map<String, Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "0"); // Automatically reset the offset to the earliest offset kafkaParams.put("auto.offset.reset", "earliest"); kafkaParams.put("enable.auto.commit", false); |
Tiếp theo ta tạo một DStream (Discretized Stream) nhằm lấy dữ liệu từ Kafka topic (ở đây là topic ‘test’):
1 2 3 4 5 6 |
//Define a list of Kafka topic to subscribe Collection<String> topics = Arrays.asList("test"); //Create an input Dstream which consume message from Kafka topics JavaInputDStream<ConsumerRecord<String, String>> stream; stream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)); |
Sau khi lấy được dữ liệu, ta sử dụng các hàm của Spark để xử lý thông tin như đã hướng dẫn trong các bài tutorials về Spark. Ở đây chúng ta sẽ sử dụng Lambda expression để đếm các từ trong dữ liệu nhận về như sau:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
// Read value of each message from Kafka JavaDStream<String> lines = stream.map((Function<ConsumerRecord<String, String>, String>) kafkaRecord -> kafkaRecord.value()); // Split message into words JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator()); // Take every word and return Tuple with (word,1) JavaPairDStream<String,Integer> wordMap = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word,1)); // Count occurance of each word JavaPairDStream<String,Integer> wordCount = wordMap.reduceByKey((Function2<Integer, Integer, Integer>) (first, second) -> first+second); //Print the word count wordCount.print(); // Start the computation jssc.start(); jssc.awaitTermination(); |
Lưu ý: trong trường hợp IntelliJ báo lỗi “lambda expression is not supported at language level 5”, các bạn cần phải chỉnh lại language level của chương trình bằng cách nhấn chuột phải vào tên project, chọn Open Module Settings. Trong phần Language level bạn chọn level 8 để hỗ trợ lambda expression rồi nhấn OK.
Ta tiến hành chạy thử chương trình trên (nếu báo lỗi, các bạn cần xóa bỏ <scope>provided</scope> của spark-streaming trong file pom.xml) . Sau đó mở một cửa sổ terminal và chạy một Producer để giả lập dữ liệu truyền về topic ‘test’.
Kết quả trên IntelliJ cho thấy, cứ sau 10 giây (10000 ms) chương trình tiến hành thu thập, xử lý và trả về kết của là số lần xuất hiện của từng từ trong đoạn dữ liệu nhận về.
Như vậy ta đã hoàn thành việc viết một chương trình tích hợp Kafka và Spark. Các bạn có thể tham khảo full code của chương trình dưới đây.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 |
import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.spark.SparkConf; import org.apache.spark.api.java.function.FlatMapFunction; import org.apache.spark.api.java.function.Function; import org.apache.spark.api.java.function.Function2; import org.apache.spark.api.java.function.PairFunction; import org.apache.spark.sql.streaming.StreamingQueryException; import org.apache.spark.streaming.Durations; import org.apache.spark.streaming.api.java.JavaDStream; import org.apache.spark.streaming.api.java.JavaInputDStream; import org.apache.spark.streaming.api.java.JavaPairDStream; import org.apache.spark.streaming.api.java.JavaStreamingContext; import org.apache.spark.streaming.kafka010.ConsumerStrategies; import org.apache.spark.streaming.kafka010.KafkaUtils; import org.apache.spark.streaming.kafka010.LocationStrategies; import scala.Tuple2; import java.util.*; import org.apache.log4j.Logger; import org.apache.log4j.Level; public class KafkaSpark { public static void main(String[] args) throws InterruptedException, StreamingQueryException { //Set log level to warn Logger.getLogger("org").setLevel(Level.OFF); // Create a local StreamingContext and batch interval of 10 second SparkConf conf = new SparkConf().setMaster("local").setAppName("Kafka Spark Integration"); JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10)); //Define Kafka parameter Map<String, Object> kafkaParams = new HashMap<String, Object>(); kafkaParams.put("bootstrap.servers", "localhost:9092"); kafkaParams.put("key.deserializer", StringDeserializer.class); kafkaParams.put("value.deserializer", StringDeserializer.class); kafkaParams.put("group.id", "0"); // Automatically reset the offset to the earliest offset kafkaParams.put("auto.offset.reset", "earliest"); kafkaParams.put("enable.auto.commit", false); //Define a list of Kafka topic to subscribe Collection<String> topics = Arrays.asList("test"); //Create an input Dstream which consume message from Kafka topics JavaInputDStream<ConsumerRecord<String, String>> stream; stream = KafkaUtils.createDirectStream(jssc,LocationStrategies.PreferConsistent(),ConsumerStrategies.Subscribe(topics, kafkaParams)); // Read value of each message from Kafka JavaDStream<String> lines = stream.map((Function<ConsumerRecord<String, String>, String>) kafkaRecord -> kafkaRecord.value()); // Split message into words JavaDStream<String> words = lines.flatMap((FlatMapFunction<String, String>) line -> Arrays.asList(line.split(" ")).iterator()); // Take every word and return Tuple with (word,1) JavaPairDStream<String,Integer> wordMap = words.mapToPair((PairFunction<String, String, Integer>) word -> new Tuple2<>(word,1)); // Count occurance of each word JavaPairDStream<String,Integer> wordCount = wordMap.reduceByKey((Function2<Integer, Integer, Integer>) (first, second) -> first+second); //Print the word count wordCount.print(); // Start the computation jssc.start(); jssc.awaitTermination(); } } |