Cassandra source and sink Connector, version 4.0.0

Cassandra

If you already have a publicly accessible deployment of Cassandra, you can skip ahead and Configure and install a connector, using the Axual Self-Service UI. Use the configuration instructions below.
If you don’t have one available, the following section instructs you to deploy a publicly available instance of Cassandra.

Deploying a Cassandra instance

We’ll deploy a Cassandra instance using Google Cloud Services.
If you have a Google account, you can sign up for a free trial of Google cloud.

Google lists multiple solutions for creating a Cassandra instance. In this guide we’ll be using Cassandra Cluster packaged by Bitnami.
Let’s get started.

  1. Create a new Cassandra instance here.

    1. Deployment name: my-cassandra

    2. Zone: europe-west1-b
      The zone isn’t very relevant, but usually you would select a region geographically closest to the Connect-Cluster.

    3. Your cluster name: my_cassandra_cluster

    4. Disk size in GB: 10

    5. Cassandra nodes - Number of nodes to create: 2

    6. Machine type:

      • Machine family: GENERAL-PURPOSE

      • Series: N1

      • Machine type: n1-standard-1 (1vCPU, 3.75 GB memory)

      • Data disk type: Standard persistent disk

    7. Accept the terms

    8. Click Deploy.+ You can wait for the operation to complete, or continue to the next step and return here once you’ve opened up the firewall.

    9. Go to the deployment list, click my-cassandra, and check your connectivity information (on the right side):

      • Cassandra Primary Node IP: You will use this later as the value for the connect.cassandra.contact.points config

      • Cassandra Password: You will use this later as the value for the connect.cassandra.password config.

  2. We’ll now open the firewall.

    1. Click + CREATE FIREWALL RULE, on the bar at the top of the screen.

      • Name: allow-access-to-Cassandra

      • Description: "Allows access to Cassandra from the internet"

      • Logs: off

      • Network: default

      • Priority: 900

      • Direction of traffic: ingress

      • Action on match: allow

      • Targets: All instances in the network

      • Source filter: IPv4 ranges

      • Source IPv4 ranges: 0.0.0.0/0

      • Second source filter: None

      • Protocols and ports: Specified protocols and ports

        • Check TCP

        • Ports: 9042

    2. Click Create
      The Cassandra cluster now accepts incoming connections from the internet.

  3. Let’s put some initial data on it.

    1. Return to the deployment list and click my-cassandra to open this deployment.

    2. Click SSH TO INITIAL PRIMARY SERVER. A window will pop up. A terminal is simulated.

      • Type down in the terminal window: cqlsh -u cassandra

      • You will be prompted for the Cassandra password. Paste it.

      • You now have an open session with your Cassandra cluster. We’ll use the Cassandra query language (CQL) in order to create our initial data. Paste down the following text block:

        CREATE KEYSPACE IF NOT EXISTS my_keyspace_name
        WITH replication = {
          'class': 'SimpleStrategy',
          'replication_factor': 2
        };
        
        USE my_keyspace_name;
        
        CREATE TABLE long_tutorials(
           id             int      PRIMARY KEY,
           name           text                ,
           description    text
        );
        
        CREATE TABLE long_tutorials_duplicate(
           id             int      PRIMARY KEY,
           name           text                ,
           description    text
        );
        
        INSERT INTO long_tutorials (id, name, description) VALUES
        (1, 'deploying cassandra', 'deploying a cassandra cluster on a public cloud');
        
        INSERT INTO long_tutorials (id, name, description) VALUES
        (2, 'updating firewall rules', 'whitelisting incoming trafic towards your cluster');
        
        INSERT INTO long_tutorials (id, name, description) VALUES
        (3, 'populating cassandra', 'creating some initial data to play with');
        
        INSERT INTO long_tutorials (id, name, description) VALUES
        (4, 'deploying kafka connect', 'thank god someone else did this part');
        
        INSERT INTO long_tutorials (id, name, description) VALUES
        (5, 'configuring a source connector', 'importing data from cassandra to kafka');
        
        INSERT INTO long_tutorials (id, name, description) VALUES
        (6, 'cleaning up resources', 'deleting unused resources to avoid unwanted charges.');
        
        SELECT * FROM long_tutorials;
      • You can now close this "SSH" / "terminal" window.

You now have a publicly available Cassandra cluster, inclusing sample data.

Configuring a new source Connector

  1. Follow the Creating streams documentation in order to create one stream and deploy it onto an environment.
    The name of the stream will be long_tutorials_kafka.
    The key/value types will be String/String.

  2. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_cassandra_source.
    The plugin name is com.datamountaineer.streamreactor.connect.cassandra.source.CassandraSourceConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

    For advanced configuration, see the official source connector documentation.

  3. Provide the following minimal configuration in order to connect to the previously configured Cassandra instance.

    connect.cassandra.contact.points
    This is the IP address used to connect to the cluster. You noted this down after finishing your Google Cloud deployment.

    Example value:
    123.123.123.123

    connect.cassandra.port

    9042

    connect.cassandra.username

    cassandra

    connect.cassandra.password
    This is the password used to connect to the cluster. You noted this down after finishing your Google Cloud deployment.

    Example value:
    passWORD1234

    connect.cassandra.key.space

    my_keyspace_name

    connect.cassandra.kcql

    INSERT INTO long_tutorials_kafka
    SELECT * FROM long_tutorials PK id
  4. Authorize the my_cassandra_source source Connector-Application to produce to the long_tutorials_kafka stream.

  5. You can now start the source Connector-Application.

  6. You can now check the long_tutorials_kafka stream to see the events published by the Connector.

    Known bug: With this minimalist example, the Connector might insert some duplicates of the Cassandra-entries into kafka.

Configuring a new sink Connector

To keep things simple, we’ll consume the topic we just used for the source connector and publish everything into another Cassandra table.
We already created an additional table within Cassandra during the preparation phase which we’ll use for this step.

  1. Follow the Configure and install a connector documentation to set up a new Connector-Application.
    Let’s call it my_cassandra_sink.
    The plugin name is com.datamountaineer.streamreactor.connect.cassandra.sink.CassandraSinkConnector.
    If a plugin isn’t available, ask a platform operator to install plugins.

    The configuration from the table above, which you provided for the source Connector, is still valid, so reuse all of that.
    We are only going to change the KCQL command and add a new config.
    For advanced configuration, see the official source connector documentation.

    connect.cassandra.kcql

    INSERT INTO long_tutorials_duplicate SELECT * FROM long_tutorials_kafka PK id

    topics

    long_tutorials_kafka

  2. Authorize the my_cassandra_sink sink Connector-Application to consume the long_tutorials_kafka stream.

  3. You can now start the sink Connector-Application.

  4. Let’s use the SSH / terminal functionality again to check the contents of the long_tutorials_duplicate table in Cassandra:

    • Open your cassandra-instance page in google cloud.

    • Click SSH TO INITIAL PRIMARY SERVER. A window will pop up. A terminal is simulated.

      • Type down in the terminal window: cqlsh -u cassandra

      • You will be prompted for the Cassandra password. Paste it.

      • Paste down the following text block:

        SELECT * FROM my_keyspace_name.long_tutorials_duplicate;

        You should see that the original entries have completed their round trip back into Cassandra, on a different table.

Cleanup

Once you are done, stop the two Connector-Applications and cleanup the unused axual resources.
In case you deployed your resources via Google Cloud, don’t forget to delete your deployment and your firewall rule once you are done.

License

Cassandra source and sink Connectors are licensed under the Apache License, Version 2.0.

Source code

The source code for the Connect-Plugin can be found at github.com/lensesio/stream-reactor.