Thông thường việc kết nối Kafka với các loại cơ sở dữ liệu(CSDL) được chia thành hai loại là Source Connector và Sink Connector. Source Connector được sử dụng để đọc dữ liệu từ CSDL và ghi vào Kafka broker còn Sink Connector được dùng để ghi dữ liệu từ Kafka vào CSDL. Trong bài tutorial này, chúng ta sẽ cùng tìm hiểu cách kết nối Kafka với Cassandra Sink để ghi dữ liệu từ Kafka topic vào Cassandra table bằng cách sử dụng thư viện của Landoop lenses.

Để thực hiện việc kết nối này, ta làm các bước sau:

1. Download Cassandra Connector tại đây.

2. Cấu hình Kafka Connect Plugin

– Tạo thư mục kafka/plugins trong /usr/local/share rồi tiến hành copy file .jar vừa download ở trên vào thư mục plugins này.

– Kafka Connector cho phép người dùng chạy chương trình dưới dạng Standalone mode (chạy trên một máy) hoặc Distributed mode(chạy phân tán trên nhiều máy). Để chạy dưới dạng Standalone mode ta sử dụng file connect-standalone.properties và sử dụng file connect-distributed.properties để chạy Distributed mode (cả hai file này đều có sẵn trong thư mục kafka_2.12-2.1.0/config). Để cấu hình Kafka sử dụng Cassandra Connector ta phải cấu hình plugin.path trong file connect-standalone.properties (hoặc connect-distributed.properties) bằng cách thêm đường dẫn của thư mục plugins vào plugin.path như sau:

* Lưu ý: một số trang web hướng dẫn cách copy trực tiếp file kafka-connect-cassandra-1.2.0-2.0.0-all.jar vào thư mục kafka_2.12-2.1.0/libs thay vì copy vào thư mục plugins và thiết lập plugin.path như hướng dẫn ở trên. Tuy nhiên cách copy trực tiếp này thường sẽ tạo ra lỗi “java.lang.InstantiationError: com.typesafe.scalalogging.Logger”.

3. Cấu hình Connector Configuration file.

– Tiến hành download file cassandra-sink.properties tại đây rồi copy vào thư mục kafka_2.12-2.1.0/config.

– Mở file cassandra-sink.properties rồi tiến thành thay đổi các tham số cho phù hợp với từng yêu cầu cụ thể.

Để kiểm tra lại việc cài đặt kết nối giữa Kafka và Cassandra Sink, chúng ta sẽ thử ghi dữ liệu từ “employee-topic” của Kafka vào emp table của Cassandra (emp table đã được tạo ra trong bài tutorial Cài đặt và chạy thử Cassandra) như sau:

1. Thay đổi các tham số của file config/cassandra-sink.properties:

2. Kiểm tra dữ liệu có sẵn trong emp table để so sánh sự thay đổi sau khi kết nối với Kafka.

3. Chạy Kafka Cassandra Connector bằng dòng lệnh sau (Standalone mode)

4. Chạy Kafka Producer và truyền thử một Json data như sau:

Sau khi truyền dữ liệu Json ở trên, ta thấy trên cửa sổ terminal ở bước 3 xuất hiện thông báo đã truyền thành công 1 record

Kiểm tra lại dữ liệu trong emp table ta thấy rằng dữ liệu truyền từ Kafka employee-topic đã được ghi vào trong table này.

5. Tương tự bước 4, ta truyền thêm một số dư liệu vào employee-topic và kiểm tra việc ghi dữ liệu vào emp table trong Cassandra.

Như vậy chúng ta đã hoàn thành việc cài đặt kết nối giữa Kafka và Cassandra Sink để ghi dữ liệu từ Kafka topic vào Cassandra table. Với việc sử dụng thư viện Landoop lenses, việc kết nối diễn ra hoàn toàn tự động mà không cần người dùng phải viết bất kỳ đoạn code nào (chỉ cần thiết lập tham số trong configuration file). Ngoài ra, chúng ta có thể chạy nhiều configuration files theo cú pháp sau (tham khảo thêm tại đây):

Tháng Hai 20, 2019
ITechSeeker