Trong bài tutorial này, chúng ta sẽ thực hành viết chương trình sử dụng Spark Cassandra Connector để kết nối Spark với Cassandra.

Trước hết các bạn thêm dependencies sau vào file pom.xml:

Spark Cassandra Connector cho phép người dùng sử dụng cả RDD và DataFrame để truy xuất dữ liệu từ Cassandra (các bạn tham khảo chi tiết tại đây). Tuy nhiên trong bài này chúng ta sẽ chỉ sử dụng DataFrame do ưu điểm về tính đơn giản và tốc độ xử lý nhanh của nó so với RDD (xem thêm về Spark DataFrame trong bài tutorial Viết chương trình sử dụng DataFrame).

Để xử lý dữ liệu Cassandra, ta cần tạo một phiên làm việc cho Spark như sau:

Tiếp theo ta sử dụng CassandraConnector() để thực thi các lệnh CQL từ một chương trình Spark như sau:

Ta sử dụng hàm read để đọc dữ liệu từ Cassandra dưới dạng DataFrame như sau(các bạn tham khảo thêm về cách đọc dữ liệu của Spark trong bài tutorial Đọc dữ liệu từ nhiều nguồn khác nhau):

Chạy chương trình trên ta được kết quả:

Sau khi đọc dữ liệu từ Cassandra sử dụng DataFrame, ta có thể tiến hành các thao tác xử lý và phân tích dữ liệu tương tự như hướng dẫn trong các bài tutorials về Spark SQL, DataSet và DataFrames. Ở đây ta sử dụng hàm select() và filter() để tìm tất cả các cá nhân có thu nhập cao (>50.000)

Để tiến hành lưu trữ dữ liệu đã được xử lý vào Cassandra, ta có thể sử dụng hàm createCassandraTable() để tạo một Cassandra table mới, rồi dùng hàm write() để lưu dữ liệu này(ở đây chúng ta sử dụng Format Helper để đoạn code trở nên đơn giản và dễ đọc hơn)

Tiếp theo, ta kiểm tra lại việc lưu trữ dữ liệu bằng cách sử dụng hàm read() để đọc dữ liệu từ table mới tạo ra và hiển thị kết quả lên màn hình:

Như vậy chúng ta đã hoàn thành việc viết chương trình sử dụng Spark Cassandra Connector để kết nối Spark với Cassandra. Các bạn có thể tham khảo full code của chương trình dưới đây.

Lưu ý một số lỗi có thể xảy ra khi chạy chương trình:

* Lỗi do không phù hợp phiên bản giữa Apache Spark và Spark Cassandra Connector: Tính tới thời điểm hiện tại (02/2019) thì Spark Cassandra Connector chỉ hỗ trợ Scala 2.11, do đó các bạn phải thiết lập Project sử dụng thư viện Scala 2.11 thay vì các phiên bảo mới hơn như Scala 2.12 (xem thêm các phiên bản tương ứng giữa Apache Spark và Spark Cassandra Connector tại đây) . Ví dụ lỗi:

Lỗi này thường gặp phải khi bạn thiết lập Project sử dụng thư viện Scala 2.12 thay vì Scala 2.11. Để khắc phục lỗi này, các bạn nhấn chuột phải vào Project => Open Module Settings. Trong phần Global Libraries, các bạn chỉ sử dụng thư viện scala-sdk-2.11.12 và nhấn nút ‘-’ để xóa bỏ các thư viện Scala khác (nếu vẫn xảy ra lỗi, các bạn cần phải vào File => Invalidate Caches/ Restart… để xóa caches của IntelliJ)

* Lỗi do không đúng phiên bản java: Lỗi này xảy ra khi các bạn sử dụng phiên bản Java mới hơn Java 1.8 (đã được đề cập trong bài tutorial Cài đặt và chạy thử Cassandra). Do đó các bạn cần cấu hình lại Project bằng cách nhấn chuột phải vào Project => Open Module Settings và chọn Java 1.8).

* Lỗi do không tương thích giữa cassandra-driver-core và spark-cassandra-connector: để khắc phục lỗi này các bạn chỉ cần xóa dependency của cassandra-driver-core trong file pom.xml

Tháng Hai 9, 2019
ITechSeeker