Trong bài Viết chương trình sử dụng DataFrame, chúng ta đã sử dụng DataFrame để đọc một file json (people.json) và tiến hành truy suất một số thông tin của file đó. Trong bài này, chúng ta sẽ tìm hiểu chi tiết hơn về cách sử dụng Spark để đọc dữ liệu từ nhiều nguồn khác nhau.

Spark SQL hỗ trợ việc thực hiện các thao tao trên các nguồn dữ liệu này thông qua DataFrame vì từ DataFrame chúng ta có thể chuyển thành một SQL temporary view rồi sử dụng hàm .sql() để chạy các truy vấn SQL như hướng dẫn trong bài Viết chương trình sử dụng DataFrame. Từ DataFrame chúng ta cũng có thể chuyển thành Dataset bằng cách sử dụng hàm .as[class] (chúng ta sẽ tìm hiểu trong bài tutorial tiếp theo).

Trước hết ta tạo một Scala Object với tên DataLoader và tạo một phiên làm việc của Spark như sau:

Tiếp theo chúng ta sử dụng hàm spark.read() để đọc dữ liệu từ các định dạng file khác nhau. Để thuận tiên cho việc lấy dữ liệu, ta nhấn chuột phải vào thư mục src/main/resources, chọn New -> Directory để tạo một thư mục có tên data_source. Sau đó ta vào thư mục spark\examples\src\main\resources (spark chính là thư mục giải nén của file Spark, download trên trang chủ của Spark) để copy ba file là employees.json, people.csv và users.parquet vào thư mục data_source vừa tạo ra.

Việc sử dụng hàm sử dụng hàm spark.read() yêu cầu đường dẫn của file muốn đọc dữ liệu. Do đó ta tạo một hàm chức năng với tên getFilePath để lấy đường dẫn này như sau.

Spark sử dụng định dạng file mặc định là .parquet (các bạn có thể cấu hình định dạng file mặc định bằng cách sử dụng spark.sql.sources.default). Để đọc dữ liệu của file này ta chỉ cần sử dụng hàm spark.read.load():

Chạy dòng lệnh trên ta được kết quả:

Để đọc file ở các định dạng khác ta có thể sử dụng .format() để thiết lập định dạng file. Đối với một số nguồn dữ liệu phổ biến (json, parquet, jdbc, orc, libsvm, csv, text) đã được Spark tích hợp sẵn nên ta có thể sử dụng short name của các định dạng này thay vì sử dụng .format()

Ngoài ra, Spark cũng cho phép ta cấu hình các tùy chọn trong việc đọc dữ liệu. Mỗi một nguồn dữ liệu có các tùy chọn khác nhau. Ở đây chúng ta sẽ sử dụng ba tùy chọn là ‘sep’, ‘inferSchema’ và ‘header’ để đọc file csv (Các bạn có thể tham khảo các option của từng data source tại đây).

Như vậy theo mặc định, dữ liệu trả về ở dưới dạng dạng thô. Khi ta cấu hình các tùy chọn thì dữ liệu trả về được sắp sếp một cách rõ ràng hơn (tương tự như kết quả đọc file .json)

Cuối cùng ta sử dụng hàm .write() để lưu DataFrame ở định dạng mong muốn (sử dụng .format() hoặc short name tương tự như trong .read()). Ngoài ra ta cũng có thể sử dụng hàm write.mode() để cấu hình cách thức lưu dữ liệu (Save Mode). Spark hỗ trợ bốn loại Save Mode sau:

– SaveMode.ErrorIfExists: Nếu file đã tồn tại thì sẽ trả về một Exception

– SaveMode.Append: Nếu file đã tồn tại thì ghi thêm dữ liệu vào file này

– SaveMode.Overwrite: Nếu file đã tồn tại thì ghi đè lên file này (dữ liệu sẽ bị xóa trước rồi mới ghi dữ liệu mới sau)

– SaveMode.Ignore: Không lưu hoặc thay đổi dữ liệu nếu có tồn tại file này.

Khi chạy dòng lệnh trên, dữ liệu sẽ được lưu trong thư mục people_csvDF1 mới được tạo ra trong thư mục data_source ( khác với việc lưu file thông thường, Spark sử dụng Hadoop File Format nên dữ liệu sẽ được lưu theo từng partition)

Lưu ý: Trong khi chạy nếu các bạn xảy ra lỗi “IllegalArgumentException: Illegal pattern component: XXX” thì có thể do thư viện commons-lang3 chưa được cập nhật phiên bản mới nhất. Do đó bạn thêm dependency của commons-lang3 như sau:

Như vậy ta đã hoàn thành việc viết một chương trình sử dụng Spark để đọc dữ liệu từ nhiều nguồn khác nhau. Các bạn có thể tham khảo full code của chương trình dưới đây.

Tháng Mười Hai 13, 2018
ITechSeeker