Như đã giới thiệu khái quát trong phần Tổng quan về Project, chúng ta sẽ sử dụng thư viện Twitter4j để thu thập các tweets trên Twitter rồi truyền về Kafka và sau đó lưu vào Cassandra. Ở đây các bạn có thể tham khảo cách truyền dữ liệu từ Twitter vào Kafka tại bài tutorial Sử dụng Kafka với Twitter và bài tutorial Xử lý dữ liệu Twitter trong thời gian thực. Việc kết nối Kafka với Cassandra có thể được thực hiện bằng cách sử dụng Kafka Cassandra Sink như đã hướng dẫn tại bài tutorial Kết nối Kafka với Cassandra Sink (Các bạn tham khảo các dependencies của project này tại đây. Lưu ý: trong project này chúng ta hoàn toàn sử dụng scala 2.11 do Spark Cassandra Connector chưa hỗ trợ Scala 2.12 như đã đề cập trong bài Kết nối Spark với Cassandra)

Trong các bài tutorials trên chúng ta chủ yếu chạy Kafka server, Zookeeper và Cassandra bằng cách sử dụng cửa sổ terminal. Tuy nhiên trong project này chúng ta sẽ viết chương trình chạy dòng lệnh trực tiếp từ scala bằng cách sử dụng hàm Runtime.getRuntime().exec(). Ở đây chúng ta sẽ sử dụng Akka Actor để tạo 4 actors chạy 4 dòng lệnh khác nhau (mục đích của việc này là để chạy mỗi dòng lệnh ở dạng foreground thay vì background). Công việc này được thực hiện bởi EnvRunner như sau (các bạn tham khảo thêm về cách sử dụng Akka Actor tại bài tutorial Truyền nhận message giữa các Akka Actors)

Khi chạy hàm EnvRunner.start() ta được kết quả tương tự như chạy trên terminal:

Lưu ý: trước khi chạy EnvRunner.start(), các bạn thiết lập cấu hình trong file kafka/config/cassandra-sink.properties như sau (để thống nhất với Kafka topic và Cassandra database được thiết lập trong CassandraDB và KafkaTwitterStreaming như trình bày dưới đây):

Sau khi thực hiện việc khởi chạy Kafka, Cassandra và Kafka Cassandra Sink thành công, chúng ta sẽ tiến hành tạo các table để lưu trữ dữ liệu trong Cassandra với các bước sau:

– Tạo keyspace với tên gọi lambda_architecture

– Tạo master_dataset table để lưu trữ dữ liệu từ Kafka truyền về Cassandra

– Tạo hashtag_batchView table để lưu kết quả của Batch Processing ở Batch Layer

– Tạo hashtag_realtimeView table để lưu kết quả của Real time Processing ở Speed Layer

Các bước này được thực hiện bởi CassandraDB với đoạn code sau (các bạn tham khảo thêm bài tutorial Viết chương trình Cassandra)

Sau khi chạy hàm CassandraDB.runDB(), ta sử dụng bin/cqlsh để kiểm tra việc tạo các Cassandra tables như sau:

Như vậy, với việc chạy CmdRunner.run() và CassandraDB.runDB(), chúng ta đã hoàn thành việc thiết lập môi trường và kết nối Kafka với Cassandra. Tiếp theo chúng ta tiến hành thu thập dữ liệu Twitter và truyền về Kafka bằng KafkaTwitterStreaming như sau (các bạn tham khảo thêm bài Xử lý dữ liệu Twitter trong thời gian thực):

Ở đây chúng ta sử dụng AppConfiguration (object trong main_package) để truy xuất các thiết lập được quy định trong /resources/application.conf (ví dụ như Kafka topic, kafka keywords, Twitter keys …).

Mặc định, ConfigFactory.load() sẽ tìm kiếm và truy xuất thông tin từ file application.conf. Do đó ta tạo file này trong thư mục /resources và tiến hành các thiết lập như sau:

Trong project này, chúng ta chỉ thu thập các tweet viết bằng tiếng Anh và đối với từng tweet ta chỉ trích xuất 7 thông tin cơ bản là : tweet_id, user_id, user_name, user_loc, content, hashtag và created_date.

Chạy KafkaTwitterStreaming.run() ta được kết quả:

Ta kiểm tra lại việc lưu kết quả vào Cassandra bằng cách sử dụng cqlsh như sau:

Như vậy chúng ta đã hoàn thành việc thiết lập môi trường và các kết nối cần thiết cho việc thu thập dữ liệu từ Twitter rồi truyền về topic “TwitterStreaming” của Kafka và sau đó lưu vào master_dataset table của Cassandra. Trong phần tiếp theo, chúng ta sẽ viết chương trình cho Batch layer của Lambda Architecture để xử lý dữ liệu lưu trong master_dataset dưới dạng Batch Processing

Tháng Ba 15, 2019
ITechSeeker