In this tutorial, we will write a program that use Spark Cassandra Connector to connect Apache Spark to Cassandra. First, we add the following dependencies to the pom.xml file:

Spark Cassandra Connector allows users using both RDD and DataFrame to access Cassandra database (click here for more details). However, in this tutorail we will only use DataFrame due to its simplicity and efficiency compared to RDD (more details of Spark DataFrame can be found on the tutorial A basic Spark application using DataFrame).

To process Cassandra data, we need to create a Spark session as follows:

Next, we use CassandraConnector() to execute CQL queries from a Spark application as follows:

We use read() function to load Cassandra data with Spark’s DataFrame as follows (see more on the tutorial Loading data from various sources):

Run the above code, we get the result:

After loading data with DataFrame, we can perform data processing and analysis with the same steps as described on the tutorials Spark SQL, Dataset and DataFrames. In this example, we use select() and filter() functions to find all employers whose salaries are more than 50.000 $.

To save the processed data into Cassandra database, we use the createCassandraTable() function to create a new Cassandra table, then use the write() function to save data into this table (Format Helper is used for cleaner code)

Next, we check the saving operation by using read() function to read the data from newly created table and display it on the screen:

So, we have finished writing a program that uses Spark Cassandra Connector to connect Spark to Cassandra. The full code of this tutorial is provided below.

Note: Below are some common errors that may occur when running program:

* Error due to incompatible version between Apache Spark and Spark Cassandra Connector: At the time of this post (02/2019), Spark Cassandra Connector only supports Scala 2.11, so we need to config our Project to use Scala 2.11 instead of newer version like Scala 2.12 (details of version compatibility can be found at here) .

The above error occurs when we use Scala 2.12 instead of Scala 2.11. To fix it, we right click on Project => Open Module Settings. In the Global Libraries section, we choose scala-sdk-2.11.12 ( we can click ‘-’ to delete other Scala version. If there is still an error, we need to click File => Invalidate Caches/ Restart… to remove caches of IntelliJ)

* Error due to incompatible Java version: this error occurs when we use newer Java vervion than Java 1.8 (as explained on the tutorial Install and interact with Cassandra using CQL Shell). To fix this error, we click on Project => Open Module Settings and select Java 1.8).

* Error due to incompatible between cassandra-driver-core and spark-cassandra-connector: to fix this error, we just need to delete the dependency of cassandra-driver-core in the pom.xml file.

February 9, 2019
ITechSeeker