Nếu các bạn đã sử dụng ngôn ngữ lập trình Python hoặc tham gia các cuộc thi trên Kaggle thì các bạn đã tương đối quen thuộc với thư viện Pandas. Pandas hỗ trợ rất nhiều hàm chức năng để người dùng có thể dễ dàng xử lý và phân tích dữ liệu đầu vào. Spark DataFrame cũng hỗ trợ các chức năng gần tương tự Pandas, nhưng dữ liệu được xử lý theo kiểu phân tán, qua đó giúp tăng tốc độ và hiệu quả xử lý dữ liệu. Mặc dù các hàm chức năng của Spark DataFrame không thể phong phú và sử dụng thuận tiện như Pandas, nhưng Spark đang dần cải thiện và nâng cao chức năng của các API này. (Lưu ý: chúng ta có thể chuyển đổi giữa Spark DataFrame và Pandas DataFrame bằng cách sử dụng Apache Arrow).

Trong bài tutorial này, chúng ta sẽ sử dụng Spark DataFrame để xử lý và phân tích dữ liệu với các bước tương tự như khi sử dụng Pandas. Chúng ta sẽ sử dụng dữ liệu đầu vào là dữ liệu của cuộc thi Porto Segura’s Safe Driver Prediction trên Kaggle và làm theo các bước như trong bài phân tích Data Preparation & Exploration. Chỉ khác là chúng ta sẽ sử dụng Spark DataFrame thay vì Pandas.

Trước hết chúng ta tạo một Scala Object với tên DFProcessing và thiết lập một phiên làm việc của Spark tương tự như các bài tutorial trước:

Ta giải nén dữ liệu download được từ Porto Segura’s Safe Driver Prediction và tiến hành đọc file train.csv như hướng dẫn trong bài Đọc dữ liệu từ nhiều nguồn khác nhau (trong Pandas chúng ta chỉ cần sử dụng pd.read_csv() nhưng trong Spark chúng ta cần phải thiết lập tùy chọn để có kết quả theo định dạng mong muốn)

Chúng ta có thể sử dụng .head hoặc .show(5) để liệt kể 5 dòng đầu của dữ liệu (hiện tại Spark vẫn chưa hỗ trợ việc sử dụng tail())

Ta sử dụng dòng lệnh sau để kiểm tra số lượng dữ liệu (row), số lượng biến (column) và kiểm tra xem có dòng dữ liệu nào bị trùng lặp hay không (giống như .shape và .drop_duplicates() trong pandas). Số lượng dòng dữ liệu trước và sau khi sử dụng .dropDuplicates() không thay đổi cho ta biết không có dữ liệu nào bị trùng lặp trong dữ liệu đầu vào.

Việc sử dụng hàm .printSchema() trong Spark cũng trả về kết quả tương tự như hàm .info() trong Pandas .

Để truy xuất Meta data của từng biến, ta sử dụng đoạn code sau với logic tương tự như trong bài phân tích gốc, chỉ khác là ta chuyển hóa python code thành Spark code: phân loại các biến trong dữ liệu theo nhiệm vụ (target, id và input); theo giá trị (binary: có giá trị là 0 hoặc 1; norminal (category number): có giá trị là các số tự nhiên, đại diện cho phân mục nhất định; interval: có giá trị là các số thực ; ordinal: có giá trị là các số tự nhiên, đại diện cho thứ hạng nhất định); phân loại theo định dạng (Integer, Double,..)

Tiếp theo, ta liệt kê các biến nominal bằng cách sử dụng hàm .filter() của DataFrame như sau (lưu ý chúng ta cần sử dụng “===” thay vì “==” như thông thường)

Để thống kê số lượng các biến theo role và level, ta sử dụng hàm .groupby() và .count() như sau:

Ở phần Descriptive Statistics, ta dùng hàm .describe() để truy xuất giá trị count, mean, std, min, max của từng biến theo từng hạng mục đã chia. Với Pandas, ta chỉ cần 2 dòng lệnh là có thể lấy được thông tin này nhưng trong Spark ta cần phải thực hiện một số bước phức tạp hơn một chút. Do đó để thuận tiên, ta viết riêng một hàm chức năng với tên getVarDF() để thực hiện nhiệm vụ này. Trước hết, ta dùng hàm .filter() để liệt kê các biến theo từng hạng mục (binary, interval, nominal,..) rồi thực hiện thêm các bước: chuyển giá trị của các dòng trong một cột thành một List giá trị String, chuyển giá trị String List này thành Column List rồi dùng hàm .select để lựa chọn riêng các biến cần thiết trong toàn bộ dữ liệu đầu vào (ở đây ta sử dụng hàm col() để chuyển String thành dạng Column. Ngoài ra, còn rất nhiều hàm chức năng khác mà DataFrame functions cung cấp để thuận tiện cho việc viết chương trình. Các bạn có thể tham khảo chi tiết các hàm này tại đây).

Sử dụng hàm getVarDF() ở trên để truy xuất thông tin của các biến: interval, ordinal và binary, ta được kết quả sau:

Để kiểm tra số lượng missing value của từng biến, ta sử dụng hàm .filter() và .count() như sau (lưu ý: kết quả trả về sẽ khác với kết quả trong bài phân tích Data Preparation & Exploration do ta không thực hiện quá trình undersampling)

Việc liệt kê số lượng giá trị của từng biến catergorical variable được thực hiện sử dụng hàm .distinct().count() như sau:

Như vậy ta đã hoàn thành việc viết một chương trình sử dụng Spark để tiến hành phân tích và thống kê dữ liệu đầu vào ( làm theo tương tự các bước như trong bài phân tích Data Preparation & Exploration nhưng không thực hiện phần Data Visualization). Các bạn có thể tham khảo full code của chương trình dưới đây.

Lưu ý: Các bạn có thể tham khảo thêm một số hàm về thống kê dữ liệu của Spark trong phần Spark Machine Learning.

Tháng Mười Hai 18, 2018
ITechSeeker