The connection of Kafka to other databases is normally divided into Source Connector and Sink Connector. Source Connector is used to read data from Databases and publish it to Kafka broker while Sink Connector is used to write from Kafka data to Databases.

In this tutorial we will learn how to connect Kafka with Cassandra Sink to save Kafka data to a Cassandra table by using a library of Landoop lenses. This connection can be established with the following steps:

1. Download Cassandra Connector at here.

2. Config Kafka Connect Plugin

– Create a directory named kafka/plugins in /usr/local/share and copy .jar file we have just downloaded above to this plugins directory.

– Kafka Connector allows users to run program with either Standalone mode (running on one machine) or Distributed mode (running on several machines). To run with Standalone mode, we use the connect-standalone.properties file and  connect-distributed.properties file is used for Distributed mode (both files are in kafka_2.12-2.1.0/config). To enable Kafka to use Cassandra Connector, we have to set the plugin.path in connect-standalone.properties file (or connect-distributed.properties) by adding the path of plugins directory to plugin.path as follows:

* Note: In some other tutorials, kafka-connect-cassandra-1.2.0-2.0.0-all.jar file is copied directly to kafka_2.12-2.1.0/libs instead of being copied to the ‘plugins’ directory as above. However, this method often causes the error “java.lang.InstantiationError: com.typesafe.scalalogging.Logger”.

3. Config Connector Configuration file.

– Download cassandra-sink.properties file at here and copy it to kafka_2.12-2.1.0/config.

– Open cassandra-sink.properties file and change the parameters according to our requirements (see example below)

To check the connection between Kafka and Cassandra Sink, we will try to write data of Kafka’s “employee-topic” topic to emp table of Cassandra (emp table was already created on the tutorial Install and interact with Cassandra using CQL Shell) as follows:

1. Change parameters of config/cassandra-sink.properties:

2. Check the existing data in emp table to see the differences after connecting to Kafka

3. Run Kafka Cassandra Connector with the following command (Standalone mode)

4. Run Kafka Producer and publish a Json data a follows:

If the data has been published successfully, we will see the following result on the terminal of step 3:

Re-check data in the emp table, we can see that the data of Kafka employee-topic has been inserted into this table.

5. Similar to step 4, we publish more data to employee-topic and check if they have been inserted successfully into emp table of Cassandra.

So, we have finished connecting Kafka with Cassandra Sink to save Kafka data into a Cassandra table. With the help of Landoop lenses, the connection is established automatically without any code (we just need to specify parameters in the configuration file). In addition, we can also run several configuration files simultaneously with the following command (see more at here):

February 20, 2019
ITechSeeker