Trước hết các bạn tạo một Java Maven Project trong IntelliJ bằng cách vào File -> New -> Project, chọn Maven rồi click Next. Tiếp theo, ta điền thông tin vào GroupId, ArtifactId và cuối cùng nhấn Finish để hoàn tất việc tạo một Project mới.
Sau đó các bạn mở file pom.xml rồi thêm dependency như sau vào dưới phần <version>
1 2 3 4 5 6 7 8 |
<dependencies> <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --> <dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka_2.12</artifactId> <version>2.0.0</version> </dependency> </dependencies> |
Tiếp theo các bạn click chuột phải vào src/main/java chọn New -> Java class, nhập SimpleProducer vào phần Name rồi ấn finish để tạo file SimpleProducer.java. Các bạn copy đoạn code sau vào file java này (đoạn code này được lấy từ trang Tutorialspoint.com)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 |
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer” public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", "localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", "all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println("Message sent successfully"); producer.close(); } } |
Khi nhấn Ctrl+Shift+F10 để chạy chương trình ta được kết quả:
Kết quả này hiện ra là do ta chưa truyền tham số đầu vào cho chương trình (ở đây là tên của topic). Để thiết lập tham số này, ta nhấp chuột vào SimpleProducer -> Edit Configurations ở phía trên bên phải như hình sau:
Cửa sổ Configuration hiện ra, trong mục Program arguments ta điền tên của topic mà ta đã tạo trong bài trước (trong trường hợp này tên topic là test)
Sau đó các bạn nhấn Apply rồi đóng của sổ và chạy lại chương trình ta được kết quả sau:
Chúng ta có thể mở một cửa sổ Consumer như phần trước để kiểm tra xem liệu Consumer có nhận được kết quả từ 0-9 như sau hay không.
Tương tự ta tạo một SimpleConsumer với đoạn code sau (đoạn code này cũng được lấy từ trang Tutorialspoint.com).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)); //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } } |
Ta tiến hành thiết lập Edit Configurations như trong phần SimpleProducer rồi chạy thử chương trình, ta được kết quả sau:
Mở một cửa sổ Producer như phần trước và nhập một tin nhắn bất kỳ vào Producer, ta sẽ thấy nó hiện ra trên phần output của SimpleConsumer.