Cassandra source and sink Connector, version 4.0.0
Cassandra
If you already have a publicly accessible deployment of Cassandra, you can skip ahead and Configure and install a connector, using the Axual Self-Service UI.
Use the configuration instructions below.
If you don’t have one available, the following section instructs you to deploy a publicly available instance of Cassandra.
Deploying a Cassandra instance
We’ll deploy a Cassandra instance using Google Cloud Services.
If you have a Google account, you can sign up for a free trial of Google cloud.
Google lists multiple solutions for creating a Cassandra instance.
In this guide we’ll be using Cassandra Cluster packaged by Bitnami.
Let’s get started.
-
Create a new Cassandra instance here.
-
Deployment name:
my-cassandra
-
Zone:
europe-west1-b
The zone isn’t very relevant, but usually you would select a region geographically closest to the Connect-Cluster. -
Your cluster name:
my_cassandra_cluster
-
Disk size in GB: 10
-
Cassandra nodes - Number of nodes to create:
2
-
Machine type:
-
Machine family:
GENERAL-PURPOSE
-
Series:
N1
-
Machine type:
n1-standard-1 (1vCPU, 3.75 GB memory)
-
Data disk type:
Standard persistent disk
-
-
Accept the terms
-
Click Deploy.+ You can wait for the operation to complete, or continue to the next step and return here once you’ve opened up the firewall.
-
Go to the deployment list, click
my-cassandra
, and check your connectivity information (on the right side):-
Cassandra Primary Node IP: You will use this later as the value for the
connect.cassandra.contact.points
config -
Cassandra Password: You will use this later as the value for the
connect.cassandra.password
config.
-
-
-
We’ll now open the firewall.
-
Click + CREATE FIREWALL RULE, on the bar at the top of the screen.
-
Name:
allow-access-to-Cassandra
-
Description: "Allows access to Cassandra from the internet"
-
Logs:
off
-
Network:
default
-
Priority:
900
-
Direction of traffic:
ingress
-
Action on match:
allow
-
Targets:
All instances in the network
-
Source filter:
IPv4 ranges
-
Source IPv4 ranges:
0.0.0.0/0
-
Second source filter:
None
-
Protocols and ports:
Specified protocols and ports
-
Check TCP
-
Ports: 9042
-
-
-
Click Create
The Cassandra cluster now accepts incoming connections from the internet.
-
-
Let’s put some initial data on it.
-
Return to the deployment list and click
my-cassandra
to open this deployment. -
Click
SSH TO INITIAL PRIMARY SERVER
. A window will pop up. A terminal is simulated.-
Type down in the terminal window:
cqlsh -u cassandra
-
You will be prompted for the Cassandra password. Paste it.
-
You now have an open session with your Cassandra cluster. We’ll use the Cassandra query language (CQL) in order to create our initial data. Paste down the following text block:
CREATE KEYSPACE IF NOT EXISTS my_keyspace_name WITH replication = { 'class': 'SimpleStrategy', 'replication_factor': 2 }; USE my_keyspace_name; CREATE TABLE long_tutorials( id int PRIMARY KEY, name text , description text ); CREATE TABLE long_tutorials_duplicate( id int PRIMARY KEY, name text , description text ); INSERT INTO long_tutorials (id, name, description) VALUES (1, 'deploying cassandra', 'deploying a cassandra cluster on a public cloud'); INSERT INTO long_tutorials (id, name, description) VALUES (2, 'updating firewall rules', 'whitelisting incoming trafic towards your cluster'); INSERT INTO long_tutorials (id, name, description) VALUES (3, 'populating cassandra', 'creating some initial data to play with'); INSERT INTO long_tutorials (id, name, description) VALUES (4, 'deploying kafka connect', 'thank god someone else did this part'); INSERT INTO long_tutorials (id, name, description) VALUES (5, 'configuring a source connector', 'importing data from cassandra to kafka'); INSERT INTO long_tutorials (id, name, description) VALUES (6, 'cleaning up resources', 'deleting unused resources to avoid unwanted charges.'); SELECT * FROM long_tutorials;
-
You can now close this "SSH" / "terminal" window.
-
-
You now have a publicly available Cassandra cluster, inclusing sample data.
Configuring a new source Connector
-
Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
The name of the stream will belong_tutorials_kafka
.
The key/value types will beString/String
. -
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_cassandra_source
.
The plugin name iscom.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector
.
If a plugin isn’t available, ask a platform operator to install plugins.For advanced configuration, see the official source connector documentation.
-
Provide the following minimal configuration in order to connect to the previously configured Cassandra instance.
connect.cassandra.contact.points
This is the IP address used to connect to the cluster. You noted this down after finishing your Google Cloud deployment.Example value:
123.123.123.123
connect.cassandra.port
9042
connect.cassandra.username
cassandra
connect.cassandra.password
This is the password used to connect to the cluster. You noted this down after finishing your Google Cloud deployment.Example value:
passWORD1234
connect.cassandra.key.space
my_keyspace_name
connect.cassandra.kcql
INSERT INTO long_tutorials_kafka SELECT * FROM long_tutorials PK id
-
Authorize the
my_cassandra_source
source Connector-Application to produce to thelong_tutorials_kafka
stream. -
You can now start the source Connector-Application.
-
You can now check the
long_tutorials_kafka
stream to see the events published by the Connector.Known bug: With this minimalist example, the Connector might insert some duplicates of the Cassandra-entries into kafka.
Configuring a new sink Connector
To keep things simple, we’ll consume the topic we just used for the source connector and publish everything into another Cassandra table.
We already created an additional table within Cassandra during the preparation phase which we’ll use for this step.
-
Follow the Configure and install a connector documentation to set up a new Connector-Application.
Let’s call itmy_cassandra_sink
.
The plugin name iscom.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector
.
If a plugin isn’t available, ask a platform operator to install plugins.The configuration from the table above, which you provided for the source Connector, is still valid, so reuse all of that.
We are only going to change the KCQL command and add a new config.
For advanced configuration, see the official source connector documentation.connect.cassandra.kcql
INSERT INTO long_tutorials_duplicate SELECT * FROM long_tutorials_kafka PK id
topics
long_tutorials_kafka
-
Authorize the
my_cassandra_sink
sink Connector-Application to consume thelong_tutorials_kafka
stream. -
You can now start the sink Connector-Application.
-
Let’s use the SSH / terminal functionality again to check the contents of the
long_tutorials_duplicate
table in Cassandra:-
Open your cassandra-instance page in google cloud.
-
Click
SSH TO INITIAL PRIMARY SERVER
. A window will pop up. A terminal is simulated.-
Type down in the terminal window:
cqlsh -u cassandra
-
You will be prompted for the Cassandra password. Paste it.
-
Paste down the following text block:
SELECT * FROM my_keyspace_name.long_tutorials_duplicate;
You should see that the original entries have completed their round trip back into Cassandra, on a different table.
-
-
Cleanup
Once you are done, stop the two Connector-Applications and cleanup the unused axual resources.
In case you deployed your resources via Google Cloud,
don’t forget to delete your deployment
and your firewall rule once you are done.
License
Cassandra source and sink Connectors are licensed under the Apache License, Version 2.0.
Source code
The source code for the Connect-Plugin can be found at github.com/lensesio/stream-reactor.