Trong bài này chúng ta sẽ viết một chương trình WordCount để đếm số lần xuất hiện của từng từ trong một đoạn dữ liệu được chuyển về từ Data server. Chúng ta sẽ sử dụng Netcat để mô phỏng Data server và chương trình WordCount sẽ sử dụng Structured Streaming để thực hiện chức năng đếm từ.

Trước hết, ta tạo một WordCount object và tạo một Spark session như trong bài trước với đoạn code sau:

Tiếp theo ta tạo một Streaming DataFrame từ dữ liệu nhận được từ Data server (Data server được mô phỏng là localhost với cổng 9999) bằng cách sử dụng spark.readStream. Chúng ta có thể thiết lập loại nguồn dữ liệu đầu vào bằng cách sử dụng spark.readStream.format(). Spark hỗ trợ bốn loại nguồn dữ liệu là: File source, Socket source, Kafka source và Rate Source. File source là các nguồn thông tin từ các file đã được lưu trữ ở định dạng text, csv, json, orc và parquet. Socket source là nguồn dữ liệu từ các kết nối socket và Kafka source là nguồn dữ liệu lấy từ Kafka. Rate source là nguồn dữ liệu chủ yếu được dùng cho mục đích thử nghiệm. Đây là nguồn dự liệu do Spark tạo ra bằng các thiết lập tốc độ tạo dữ liệu trong 1 giây. Đối với mỗi nguồn dữ liệu lại có các option khác nhau. Các bạn có thể tham khảo chi tiết các Option của từng nguồn dữ liệu tại đây.

Biến lines là một DataFrame biểu diễn dưới dạng một bảng (table) có một cột với tên là “value”. Đây là một table không có giới hạn và mỗi dòng của bảng này sẽ là một dòng của stream text. Sau khi nhận được dữ liệu stream từ Data server, nhiệm vụ tiếp theo là ta cần phải thực hiện việc tách từ trong từng câu. Việc này có thể được thực hiện bằng cách sử dụng flatMap method. Tuy nhiên biến lines là một DataFrame nên dữ liệu từng dòng ở dạng Row (Un-typed).  Do đó trước hết ta cần phải chuyển biến lines từ DataFrame thành một biến Dataset với String Type rồi tiến hành tách từ như sau(ở đây _.split(” “) chính là cách viết ngắn gọn của line -> line.split(” “) ) :

Sau khi tách từ xong, ta thực hiện đếm số lần xuất hiện của từng từ bằng hàm count():

Cuối cùng ta in kết quả ra màn hình để kiểm tra xem chương trình có thực hiện đúng chức năng không. Ở đây ta có thể lựa chọn một trong ba outputMode là Complete Mode, Append Mode và Update Mode. Complete Mode sẽ cập nhập lại toàn bộ kết quả tính tới thời điểm hiện tại trong khi Append Mode chỉ hiển thị kết quả mới chưa được xử lý. Update Mode thì chỉ hiện thị các kết quả khác so với kết quả tại lần cập nhật trước đó. Sau khi khởi chạy query, ta sử dụng hàm awaitTermination() để đảm bảo rằng chương trình không bị dừng khi query vẫn đang ở trạng thái active)

Như vậy ta đã hoàn thành việc viết chương trình WordCount. Để kiểm tra hoạt động của chương trình này, ta mở một cửa sổ terminal (với hệ điều hành Linux) và sử dụng Netcat với dòng lệnh nc -lk 9999 để mô phỏng Data server rồi sau đó chạy chương trình WordCount (nhấn Ctl+Shift+F10).

Ta nhập một dòng text bên cửa sổ Netcat và kiểm tra xem chương trình WordCount có đếm đúng từ hay không

Như vậy ta đã hoàn thành việc viết chương trình và chạy thử chương trình WordCount. Các bạn có thể tham khảo full code của chương trình dưới đây.

Tháng Mười Một 20, 2018
ITechSeeker