Couchbase source and sink connector

Couchbase

The source connector streams documents from Couchbase Server using the high-performance Database Change Protocol (DCP) and publishes the latest version of each document to a Kafka topic in near real-time.

The sink connector subscribes to Kafka topics and writes the messages to Couchbase Server.

If you already have a publicly accessible deployment of Couchbase, you can already configure and install a connector using the Axual Self-Service UI. Use the configuration instructions below (source / sink ).

If you don’t have one available, follow the next section to deploy a publicly available instance of Couchbase.

Deploying a Couchbase instance

Deploy a trial Couchbase cloud cluster.

Follow Couchbase’s instructions and import the travel-sample dataset. While the dataset is getting imported, let’s set up the connectivity to the cluster:

  1. Click "Clusters" in the left-side menu

  2. Click on the name of your cluster

  3. Click the Connect tab

  4. Note down the Connection → Wide Area Network URL : cb.exampleuuid.cloud.couchbase.com

  5. Click Manage allowed IP

  6. Add permanent IP: 0.0.0.0/0

  7. Click BACK in the top-left to return to the previous page of the Connect tab. Open Database Access → Database Credentials → Manage Credentials

  8. Click Create Database Credentials

  9. Provide a username and password. Example: mytest / ABCdef123!

  10. Bucket level access: All Buckets / All Scopes / Read/Write

  11. Click Create

Configuring a new source connector

Follow the 2022.1@axual::self-service/stream-management.html.adoc#creating-streams documentation in order to create one stream and deploy it onto an environment.

The name of the stream will be <bucketName>.<scopeName>.<collectionName>, for every collection we’ll be watching. In our example, that name turns out to be travel-sample.inventory.airport. The key/value types will be String.

Follow the Configuring Connector-Applications documentation to set up a new connector application. The values you will need to supply as configuration will be listed in this section.

For advanced configuration, see the official source connector documentation, as well as the source configuration options and this source-example file.

Provide the following minimal configuration in order to connect to the previously configured Couchbase instance.

couchbase.bucket

travel-sample

couchbase.enable.tls

true

couchbase.trust.certificate.path

/home/kafka/certificates/couchbase.pem

Axual operators have whitelisted Couchbase’s self-signed certificate in advance and made it available to users at this fixed location within the Axual-Connect cluster.

couchbase.username

mytest

couchbase.password

ABCdef123!

couchbase.seed.nodes

The Connection-Wide Area Network URL you noted down at the beginning.

couchbase.source.handler

com.couchbase.connect.kafka.handler.source.RawJsonSourceHandler

couchbase.collections

inventory.airport

topic.creation.enable

This property-key is not prefilled in the configuration by default, so you’ll have to add it as a custom property.

false

You can now start the source Connector-Application.

Configuring a new sink connector

To keep things simple, we’ll consume the topic we just created, and publish everything into a new bucket in couchbase.

Go to you couchbase project.

  • Click Clusters and then the Buckets tab, and click the Create Bucket button. Let’s call it "travel-copy". Use all the default settings.

  • Within the bucket, create a scope called "inventory"

  • Within the scope, create a collection called "airport" with a TTL of 2629800 (1 month)

Follow the Configuring Connector-Applications documentation to set up a new connector application. Authorise this new Connector-Application to consume the stream you created in the previous step. The values you will need to supply as configuration for the Connector-Application will be listed in this section.

For advanced configuration, see the official sink connector documentation, as well as the sink configuration options and this sink-example file.

Provide the following minimal configuration in order to connect to the previously configured Couchbase instance.

topics

travel-sample.inventory.airport

couchbase.bucket

travel-copy

couchbase.topic.to.collection

travel-sample.inventory.airport=inventory.airport

couchbase.enable.tls

true

couchbase.trust.certificate.path

/home/kafka/certificates/couchbase.pem

Axual operators have whitelisted Couchbase’s self-signed certificate in advance, and made it available to users at this location within the Axual-Connect cluster.

couchbase.username

mytest

couchbase.password

ABCdef123!

couchbase.seed.nodes

The Connection-Wide Area Network URL you noted down at the beginning.

You can now start the sink Connector-Application.

Cleanup

Once you are done, stop the connector applications and cleanup the unused axual resources.

Don’t forget to return to the Cluster overview in Couchbase and delete your cluster if you’re not using it anymore.

License

Couchbase connectors are licensed under the Apache License, Version 2.0.

Source code

The source code for the connector can be found on github.com/couchbase/kafka-connect-couchbase