Trong bài này, chúng ta sẽ bắt đầu tiến hành lập trình Spark với các ví dụ đơn giản sử dụng DataFrame. Trước hết, các bạn tạo một Scala Object mới với tên DataFrameEx như hướng dẫn ở bài trước. Tiếp theo, để sử dụng Spark ta cần phải thiết lập một phiên làm việc của Spark bằng cách sử dụng SparkSession.builder().getOrCreate(). Ngoài ra, ta cũng có thể cấu hình Spark với các thuộc tính khác nhau như số lượng core, số lượng thread, dung lượng memory,…Tuy nhiên, trong ví dụ này để đơn giản chúng ta chỉ cấu hình hai thuộc tính là tên của chương trình Spark và cluster manager (ở đây “local” nghĩa là chạy Spark trên máy nội bộ với một worker thread duy nhất, “local[K]” là chạy Spark trên máy nội bộ với K worker thread còn “local[*]” là chạy Spark trên máy nội bộ với số lượng worker thread mà máy tính đó hỗ trợ). Các bạn có thể tham khảo chi tiết danh sách các thuộc tính và cách cấu hình các thuộc tính tại đây .
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 |
import org.apache.spark.sql.SparkSession object SparkSql { def main(args: Array[String]): Unit = { //Define Spark Session val spark=SparkSession.builder() .appName("Spark DataFrame Example") .config("spark.master","local") .getOrCreate() //Implicit methods available in Scala for converting common Scala objects into DataFrames import spark.implicits._ //Set the Log file level spark.sparkContext.setLogLevel("WARN") } } |
Tiếp theo, ta tạo ra một DataFrame từ nội dung của một file json và thực hiện một số thao tác cơ bản trên DataFrame này như sau (file people.json là file json có sẵn trong thư mục spark/example sau khi các bạn giải nén file Spark download trên trang chủ của Spark):
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 |
//The file path val file="E:\\WorkPlace\\BigData\\Spark\\examples\\src\\main\\resources\\people.json" //Create a Data Frame val df=spark.read.json(file) // Displays the content of the DataFrame df.show() // Select only the "name" column df.select("name").show() // Select everybody, but increment the age by 1 df.select($"name", $"age" + 1).show() //Find people older than 20 df.filter($"age" >20).show() |
Chạy chương trình trên, ta được kết quả:
Ngoài ra, ta cũng có thể biến DataFrame thành một SQL temporary view rồi sử dụng hàm .sql() để chạy các truy vấn SQL như sau:
1 2 3 4 5 6 |
// Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") //Run SQL query val sqlDF=spark.sql("Select * from people") println("Running SQL queries using Spark ") sqlDF.show() |
Chạy dòng lệnh trên, ta được kết quả:
Như vậy ta đã hoàn thành việc viết một chương trình cơ bản sử dụng DataFrame để trích xuất thông tin từ một tập dữ liệu cho trước. Các bạn có thể tham khảo toàn bộ code trong bài hướng dẫn này ở bên dưới. Lưu ý: đây chỉ là một ví dụ đơn giản, bước đầu giúp các bạn hình dung được cách viết một chương trình Spark. Ở các bài tutorials tiếp theo, chúng ta sẽ từng bước viết các chương trình phức tạp, sử dụng kết hợp nhiều hàm chức năng của Spark hơn.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
import org.apache.spark.sql.SparkSession object SparkSql { def main(args: Array[String]): Unit = { //Define Spark Session val spark=SparkSession.builder() .appName("Spark SQL Example") .config("spark.master","local") .getOrCreate() //Implicit methods available in Scala for converting common Scala objects into DataFrames import spark.implicits._ //Set the Log file level spark.sparkContext.setLogLevel("WARN") //The file path val file="E:\\WorkPlace\\BigData\\Spark\\examples\\src\\main\\resources\\people.json" //Create a Data Frame val df=spark.read.json(file) // Displays the content of the DataFrame println("The content of the DataFrame:") df.show() // Select only the "name" column println("The column Name:") df.select("name").show() // Select everybody, but increment the age by 1 println("Increasing age by 1:") df.select($"name", $"age" + 1).show() //Find people older than 20 println("People older than 20:") df.filter($"age" >20).show() // Register the DataFrame as a SQL temporary view df.createOrReplaceTempView("people") //Run SQL query val sqlDF=spark.sql("Select * from people") println("Running SQL queries using Spark ") sqlDF.show() } } |