Nếu các bạn đã sử dụng ngôn ngữ lập trình Python hoặc tham gia các cuộc thi trên Kaggle thì các bạn đã tương đối quen thuộc với thư viện Pandas. Pandas hỗ trợ rất nhiều hàm chức năng để người dùng có thể dễ dàng xử lý và phân tích dữ liệu đầu vào. Spark DataFrame cũng hỗ trợ các chức năng gần tương tự Pandas, nhưng dữ liệu được xử lý theo kiểu phân tán, qua đó giúp tăng tốc độ và hiệu quả xử lý dữ liệu. Mặc dù các hàm chức năng của Spark DataFrame không thể phong phú và sử dụng thuận tiện như Pandas, nhưng Spark đang dần cải thiện và nâng cao chức năng của các API này. (Lưu ý: chúng ta có thể chuyển đổi giữa Spark DataFrame và Pandas DataFrame bằng cách sử dụng Apache Arrow).
Trong bài tutorial này, chúng ta sẽ sử dụng Spark DataFrame để xử lý và phân tích dữ liệu với các bước tương tự như khi sử dụng Pandas. Chúng ta sẽ sử dụng dữ liệu đầu vào là dữ liệu của cuộc thi Porto Segura’s Safe Driver Prediction trên Kaggle và làm theo các bước như trong bài phân tích Data Preparation & Exploration. Chỉ khác là chúng ta sẽ sử dụng Spark DataFrame thay vì Pandas.
Trước hết chúng ta tạo một Scala Object với tên DFProcessing và thiết lập một phiên làm việc của Spark tương tự như các bài tutorial trước:
1 2 3 4 5 6 7 8 9 10 |
//Define Spark Session val spark=SparkSession.builder() .appName("Spark DataFrame Example") .master("local") .getOrCreate() //Implicit methods available in Scala for converting common Scala objects into DataFrames import spark.implicits._ //Set the Log file level spark.sparkContext.setLogLevel("WARN") |
Ta giải nén dữ liệu download được từ Porto Segura’s Safe Driver Prediction và tiến hành đọc file train.csv như hướng dẫn trong bài Đọc dữ liệu từ nhiều nguồn khác nhau (trong Pandas chúng ta chỉ cần sử dụng pd.read_csv() nhưng trong Spark chúng ta cần phải thiết lập tùy chọn để có kết quả theo định dạng mong muốn)
1 2 3 4 5 6 7 8 9 |
//The file path val file="E:/Workspace/BigData/Data/train.csv" //Create a Data Frame var df=spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "true") .load(file) |
Chúng ta có thể sử dụng .head hoặc .show(5) để liệt kể 5 dòng đầu của dữ liệu (hiện tại Spark vẫn chưa hỗ trợ việc sử dụng tail())
1 2 3 |
//Print the fist five rows println("The first five rows: ") df.show(5) |
Ta sử dụng dòng lệnh sau để kiểm tra số lượng dữ liệu (row), số lượng biến (column) và kiểm tra xem có dòng dữ liệu nào bị trùng lặp hay không (giống như .shape và .drop_duplicates() trong pandas). Số lượng dòng dữ liệu trước và sau khi sử dụng .dropDuplicates() không thay đổi cho ta biết không có dữ liệu nào bị trùng lặp trong dữ liệu đầu vào.
1 2 3 4 5 6 7 8 9 |
//Get the number of row and columns print("The number of row and columns: ") println(df.count()+","+df.columns.length) //Check if there are duplicate rows df=df.dropDuplicates(); val totalRow=df.count() print("The number of row and columns after removing duplicate rows: ") println(totalRow+","+df.columns.length) |
Việc sử dụng hàm .printSchema() trong Spark cũng trả về kết quả tương tự như hàm .info() trong Pandas .
1 2 |
println("\nThe type of each column variable") df.printSchema() |
Để truy xuất Meta data của từng biến, ta sử dụng đoạn code sau với logic tương tự như trong bài phân tích gốc, chỉ khác là ta chuyển hóa python code thành Spark code: phân loại các biến trong dữ liệu theo nhiệm vụ (target, id và input); theo giá trị (binary: có giá trị là 0 hoặc 1; norminal (category number): có giá trị là các số tự nhiên, đại diện cho phân mục nhất định; interval: có giá trị là các số thực ; ordinal: có giá trị là các số tự nhiên, đại diện cho thứ hạng nhất định); phân loại theo định dạng (Integer, Double,..)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
//Initialize the value of role, level, keep and dtype var role = "" var level="" var keep=true var dtype="" val data=new util.ArrayList[Row]() for (col <- df.columns) { //Define the role if(col.contains("target")) role="target" else if(col.contains("id")) role="id" else role="input" //Define the level dtype=df.select(col).dtypes(0)._2 if(col.contains("bin")|| col.equals("target")) level="binary" else if(col.contains("cat")||col.equals("id")) level="nominal" else if(dtype.equals("DoubleType")) level="interval" else if(dtype.equals("IntegerType")) level="ordinal" //Set True to all variables except id keep=true if(col.equals("id")) keep=false //Add Row to the Arraylist data.add(Row(col,role,level,keep,dtype)) } //Define a DataFrame Schema val schema = StructType( List( StructField("varname", StringType, true), StructField("role", StringType, true), StructField("level", StringType, true), StructField("keep", BooleanType, true), StructField("dtype", StringType, true) ) ) //Create meta DataFrame val meta = spark.createDataFrame(data,schema ) //Show the value of meta DataFrame println("The metadata of the dataset") meta.show(df.columns.length) |
Tiếp theo, ta liệt kê các biến nominal bằng cách sử dụng hàm .filter() của DataFrame như sau (lưu ý chúng ta cần sử dụng “===” thay vì “==” như thông thường)
1 2 3 |
//Extract all nominal variables that are not dropped println("All nominal variables that are not dropped: ") meta.filter($"level"==="nominal" && $"keep").select("varname").show() |
Để thống kê số lượng các biến theo role và level, ta sử dụng hàm .groupby() và .count() như sau:
1 2 3 |
//Count the number of variables per role and level println("The number of variables per role and level: ") meta.groupBy("role","level").count().show() |
Ở phần Descriptive Statistics, ta dùng hàm .describe() để truy xuất giá trị count, mean, std, min, max của từng biến theo từng hạng mục đã chia. Với Pandas, ta chỉ cần 2 dòng lệnh là có thể lấy được thông tin này nhưng trong Spark ta cần phải thực hiện một số bước phức tạp hơn một chút. Do đó để thuận tiên, ta viết riêng một hàm chức năng với tên getVarDF() để thực hiện nhiệm vụ này. Trước hết, ta dùng hàm .filter() để liệt kê các biến theo từng hạng mục (binary, interval, nominal,..) rồi thực hiện thêm các bước: chuyển giá trị của các dòng trong một cột thành một List giá trị String, chuyển giá trị String List này thành Column List rồi dùng hàm .select để lựa chọn riêng các biến cần thiết trong toàn bộ dữ liệu đầu vào (ở đây ta sử dụng hàm col() để chuyển String thành dạng Column. Ngoài ra, còn rất nhiều hàm chức năng khác mà DataFrame functions cung cấp để thuận tiện cho việc viết chương trình. Các bạn có thể tham khảo chi tiết các hàm này tại đây).
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 |
def getVarDF(varName : String, metaDF: DataFrame, dataFrame: DataFrame, describe:Boolean =true) : DataFrame ={ //Get the list of Variables val varCols = metaDF.filter($"level" === varName && $"keep").select("varname").map(r => r.getString(0)).collect.toList //Convert List of String to List of Column val colNames = varCols.map(name => col(name)) // Create a new DataFrame with a specified columns from the original data frame val varDF = dataFrame.select(colNames: _*) //Print the descripion of the DataFrame if the boolean value is true if(describe==true) { println(varName+" variables: ") varDF.describe().show() } return varDF } |
Sử dụng hàm getVarDF() ở trên để truy xuất thông tin của các biến: interval, ordinal và binary, ta được kết quả sau:
1 2 3 4 5 6 |
//Interval variables getVarDF("interval",meta,df) //Ordinal variables getVarDF("ordinal",meta,df) //Binary variables getVarDF("binary",meta,df) |
Để kiểm tra số lượng missing value của từng biến, ta sử dụng hàm .filter() và .count() như sau (lưu ý: kết quả trả về sẽ khác với kết quả trong bài phân tích Data Preparation & Exploration do ta không thực hiện quá trình undersampling)
1 2 3 4 5 6 7 8 9 10 11 12 13 |
//Checking missing value var vars_with_missing=new util.ArrayList[String]() var missing=0.0 for (column <- df.columns) { missing=df.filter(col(column) === -1).count() if(missing>0) { println(column+" has "+missing+"/"+totalRow+" record with missing values") vars_with_missing.add(column) } } println("Totally, there are "+vars_with_missing.size()+" variables with missing values") |
Việc liệt kê số lượng giá trị của từng biến catergorical variable được thực hiện sử dụng hàm .distinct().count() như sau:
1 2 3 4 5 6 7 8 9 |
//Checking the cardinality of the categorical variables //Get the list of categorical Variables val catDF = getVarDF("nominal",meta,df,false) println("\nDistinct values of each categorical variable") for (column <- catDF.columns) { //Count the number of distinct values of each variable println(column+" has "+df.select(column).distinct().count()+" distinct values") } |
Như vậy ta đã hoàn thành việc viết một chương trình sử dụng Spark để tiến hành phân tích và thống kê dữ liệu đầu vào ( làm theo tương tự các bước như trong bài phân tích Data Preparation & Exploration nhưng không thực hiện phần Data Visualization). Các bạn có thể tham khảo full code của chương trình dưới đây.
Lưu ý: Các bạn có thể tham khảo thêm một số hàm về thống kê dữ liệu của Spark trong phần Spark Machine Learning.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 |
package main.scala import java.util import org.apache.spark.sql.types.{BooleanType, StringType, StructField, StructType} import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.functions.col object DFProcessing { //Define Spark Session val spark=SparkSession.builder() .appName("Spark DataFrame Example") .master("local") .getOrCreate() //Implicit methods available in Scala for converting common Scala objects into DataFrames import spark.implicits._ //Set the Log file level spark.sparkContext.setLogLevel("WARN") def main(args: Array[String]): Unit = { //The file path val file="E:/Workspace/BigData/Data/train.csv" //Create a Data Frame var df=spark.read.format("csv") .option("sep", ",") .option("inferSchema", "true") .option("header", "true") .load(file) //Print the fist five rows println("The first five rows: ") df.show(5) //Get the number of row and columns print("The number of row and columns: ") println(df.count()+","+df.columns.length) //Check if there are duplicate rows df=df.dropDuplicates(); val totalRow=df.count() print("The number of row and columns after removing duplicate rows: ") println(totalRow+","+df.columns.length) println("\nThe type of each column variable") df.printSchema() //Initialize the value of role, level, keep and dtype var role = "" var level="" var keep=true var dtype="" val data=new util.ArrayList[Row]() for (col <- df.columns) { //Define the role if(col.contains("target")) role="target" else if(col.contains("id")) role="id" else role="input" //Define the level dtype=df.select(col).dtypes(0)._2 if(col.contains("bin")|| col.equals("target")) level="binary" else if(col.contains("cat")||col.equals("id")) level="nominal" else if(dtype.equals("DoubleType")) level="interval" else if(dtype.equals("IntegerType")) level="ordinal" //Set True to all variables except id keep=true if(col.equals("id")) keep=false //Add Row to the Arraylist data.add(Row(col,role,level,keep,dtype)) } //Define a DataFrame Schema val schema = StructType( List( StructField("varname", StringType, true), StructField("role", StringType, true), StructField("level", StringType, true), StructField("keep", BooleanType, true), StructField("dtype", StringType, true) ) ) //Create meta DataFrame val meta = spark.createDataFrame(data,schema ) //Show the value of meta DataFrame println("The metadata of the dataset") meta.show(df.columns.length) //Extract all nominal variables that are not dropped println("All nominal variables that are not dropped: ") meta.filter($"level"==="nominal" && $"keep").select("varname").show() //Count the number of variables per role and level println("The number of variables per role and level: ") meta.groupBy("role","level").count().show() //Interval variables getVarDF("interval",meta,df) //Ordinal variables getVarDF("ordinal",meta,df) //Binary variables getVarDF("binary",meta,df) //Checking missing value var vars_with_missing=new util.ArrayList[String]() var missing=0.0 for (column <- df.columns) { missing=df.filter(col(column) === -1).count() if(missing>0) { println(column+" has "+missing+"/"+totalRow+" record with missing values") vars_with_missing.add(column) } } println("Totally, there are "+vars_with_missing.size()+" variables with missing values") //Checking the cardinality of the categorical variables //Get the list of categorical Variables val catDF = getVarDF("nominal",meta,df,false) println("\nDistinct values of each categorical variable") for (column <- catDF.columns) { //Count the number of distinct values of each variable println(column+" has "+df.select(column).distinct().count()+" distinct values") } } def getVarDF(varName : String, metaDF: DataFrame, dataFrame: DataFrame, describe:Boolean =true) : DataFrame ={ //Get the list of Variables val varCols = metaDF.filter($"level" === varName && $"keep").select("varname").map(r => r.getString(0)).collect.toList //Convert List of String to List of Column val colNames = varCols.map(name => col(name)) // Create a new DataFrame with a specified columns from the original data frame val varDF = dataFrame.select(colNames: _*) //Print the descripion of the DataFrame if the boolean value is true if(describe==true) { println(varName+" variables: ") varDF.describe().show() } return varDF } } |