Tại Speed Layer, chúng ta sẽ sử dụng Spark Structured Streaming để đọc và xử lý dữ liệu từ topic “TwitterStreaming” của Kafka trong thời gian thực. Công việc này được thực hiện qua ba bước sau:

– Kết nối Spark với Kafka: thực hiện tương tự như trong bài tutorial Xử lý dữ liệu Twitter trong thời gian thực

– Phân tích hashtag: thực hiện tương tự như trong Batch Layer ở bài Xử lý dữ liệu dưới dạng Batch Processing. Ở đây chúng ta viết hàm restartQuery() để đảm bảo rằng các Streaming Query sẽ được khởi động lại trong mỗi lần chạy Batch Processing.

– Lưu kết quả phân tích và Cassandra: hiện tại, Spark Structured Streaming chỉ hỗ trợ một số loại output sinks như File sink, Kafka sink, Foreach sink, Console sink nhưng chưa hỗ trợ Cassandra sink (xem thêm tại đây). Do đó để lưu dữ liệu Streaming DataFrame/Dataset vào Cassandra, ta có thể sử dụng DSE phiên bản 6.0 trở lên hoặc tự viết một Cassandra Sink sử dụng Foreach Sink như hướng dẫn tại đây. Tuy nhiên có một cách đơn giản hơn là chúng ta sử dụng ForeachBatch trong Spark Streaming. ForeachBatch() cho phép người dùng thực hiện một hàm chức năng trên dữ liệu đầu ra là mỗi micro-batch của streaming query với cấu trúc như sau:

Ở đây, hàm chức năng chúng ta sử dụng là hàm ghi dữ liệu từ DataFrame thông thường (không phải Streaming DataFrame) vào Cassandra. Do đó đoạn code để lưu Streaming DataFrame vào Cassandra sử dụng ForeachBatch được viết như sau:

Như vậy, chương trình RealtimeProcessingActor để thực hiện việc xử lý thông tin trong thời gian thực tại Speed layer có thể được viết như sau:

Chạy chương trình trên và kiểm tra lại việc lưu kết quả xử lý trong Cassandra, ta được kết quả:

Như vậy, ban đầu hashtag_realtimeview table không có chứa dữ liệu nào. Tuy nhiên, kết quả của việc xử lý thông tin tại Speed Layer dần được ghi vào table này và khi chúng ta restart lại query thì toàn bộ dữ liệu trong table bị xóa để ghi kết quả của query mới vào.

Lưu ý:

– Trong chương trình trên, việc sử dụng Runner với Akka Scheduler (restart lại streaming query 3 phút một lần) nhằm giúp bạn đọc có thể kiểm tra được kết quả của việc chạy streaming query và việc restart lại query này. Tuy nhiên, khi hoàn thiện project thì Runner sẽ bị loại bỏ do chúng ta sẽ gửi trực tiếp StartProcessing message từ Batch Processor để restart lại streaming query và hashtag_realtimeview table mỗi khi bắt đầu chạy một Batch Processing mới

– Twitter4j Streaming trả về kết quả là các tweets trong thời gian thực, mới được đăng tải gần đây. Tuy nhiên trong nhiều trường hợp, khi ta thực hiện việc Crawling thông thường thì có thể bài viết đã được đăng tải từ cách đây nhiều ngày nhưng bây giờ mới được crawl về. Trong trường hợp này chúng ta cần phải sử dụng cả hàm filter() để lọc lấy bài đăng thời gian gần đây nhất.

Tháng Ba 17, 2019
ITechSeeker