Tại Serving Layer, chúng ta kết hợp kết quả xử lý của cả Batch Layer (batch views) và Speed Layer (realtime views) nhằm đáp ứng các truy vấn từ người dùng. Ở đây, ta sử dụng Akka Http để tạo một REST API giúp người dùng có thể truy xuất kết quả phân tích từ hệ thống bằng cách truy cập đường dẫn /getHashtagCount (các bạn tham khảo thêm cách viết chương trình Akka Http tại bài tutorial Sử dụng Akka Http):

Trong hàm getViews() chúng ta sẽ tiến hành đọc kết quả phân tích của cả Batch Layer (hashtag_batchview table) và Speed Layer (hashtag_realtimeview table) rồi chuyển đổi dữ liệu này thành một list (hashtagList) của Hashtag objects như sau (ta tiến hành truy xuất riêng từng table do Cassandra chưa hỗ trợ các queries như JOIN hoặc UNION):

Tuy nhiên, trong hashtag_batchview table và hashtag_realtimeview table thường chứa dữ liệu có chung giá trị “hashtag” (giá trị “count” thường khác nhau). Do đó trong hashtagList sẽ chứa các Hashtag object với giá trị “value” giống nhau và giá trị “count” có thể giống nhau hoặc khác nhau. Vì vậy nhiệm vụ tiếp theo chúng ta cần thực hiện là gộp các object có chung giá trị “value” này thành một object duy nhất với giá trị “count” là tổng giá trị “count” của các object được gộp. Việc này được thực hiện bằng cách sử dụng hàm .groupBy(), .map() và .sum như sau:

Như vậy full code của Serving Layer được viết như sau:

Chạy AkkaServer.start() rồi truy cập vào đường dẫn http://localhost:8080/getHashtagCount, ta được kết quả sau:

Như đã trình bày trong bài Xử lý dữ liệu dưới dạng Batch Processing chúng ta chạy Batch Processing 30 phút một lần và trong khoảng thời gian này thì giá trị trong hashtag_batchview table là không đổi. Tuy vậy số lượng hashtag count vẫn luôn được cập nhật do sự thay đổi trong hashtag_realtimeview table. Vì vậy, nếu ta đợi một lúc rồi truy cập lại vào đường dẫn http://localhost:8080/getHashtagCount thì ta sẽ thấy số lượng hashtag cũng như giá trị count của một số hashtag sẽ tăng lên

Tháng Ba 17, 2019
ITechSeeker