Kafka Connect Made Convenient
Working with connectors can be easier with kcctl
This post assumes that you already have the knowledge to get a working Kafka environment, e.g., you have at least a working Kafka cluster with Kafka Connect, or you have a working service hosted somewhere. For simplicities sake, you can sign up for a free trial account at Aiven. Setting up a Kafka service there is as trivial as a few mouse clicks.
I have recently begun working with data pipelines using Kafka to move data between various sinks and sources. For example, you might use a MySQL DB as a source of data and use Kafka to propagate that data into S3 for long-term storage, sending Kafka metrics to Influx for use by operations in tracking the server's health. I’m sure you can come up with scenarios that would make sense to you personally without much help. All of these sources and sinks are most conveniently tied together using Kafka Connect.
Kafka Connect relies on a REST API for the configuration of connectors. I love REST API’s. They can be convenient to use, allow a clean separation of architectural boundaries, and are conveniently agnostic when it comes to interfacing. However, when we’re talking about ease of development, I personally don’t like them, primarily because I either have to construct URIs by hand for every call or come up with some way of avoiding that task like pulling up the closest command from shell command history and tweaking it. Nothing insanely painful but it would be much easier if I had a CLI tool.
This is where kcctl comes in. This tool provides a CLI wrapper around the REST API which allows you to interact with Kafka Connect more conveniently. It takes the responses and prints them in an easily readable format to the terminal. The installation instructions for kcctl can be found at the GitHub repo link above. I will caveat the installation instructions with one item; as of the time of this writing, the brew install method was not working on my Mac M1 laptop. I suspect this is another M1 issue because installing with homebrew works fine on my Linux laptop. My suggestion for anyone having difficulty on a Mac M1 machine would be to either build from the source or try the precompiled pre-release version. You can find a link to the precompiled release in the repo README file.
Once the kcctl is installed, you set the server context for the tool. The context handles the server URL and credentials for your cluster. There are two ways to do this shown below.
kcctl config set-context my-cluster --cluster http://localhost:8083 --username myusername --password mypasswordkcctl config set-context
--cluster=https://user-name:password@localhost:443 my-context
Both the commands above will set the URL and credentials of my cluster, as well as a name for the context, my-context in this case. The context name is required because kcctl remembers multiple contexts and needs a way to reference them. The sharp-eyed will notice that the context name is not a positional argument. To address the potential security concerns of storing passwords in the shell history, you can instead set them via a configuration file. The contexts you provide are stored in a dotfile in JSON format. You can find it in your home directory, in ~/.kcctl, which you can use to set sensitive information outside of the shell. One caveat on contexts, the tool is still young so there’s no convenient way of removing a context at the moment, you just have to remove them from the dotfile but the JSON format makes it fairly intuitive to do.
We can verify the cluster and any credentials using the kcctl info
command which will return some basic information about the Kafka Connect cluster. The version number is the version of Connect worker serving the API (and its git commit) and the cluster ID is the ID of the Kafka cluster its connected to.
URL: https://<USERNAME>:<PASSWORD>@localhost:443
Version: 3.2.0
Commit: 54454035f8b22b67
Kafka Cluster ID: tArO_G21TQyIPS4A3BE1nw
Now that I’ve confirmed I’m connected, we can get a list of available plugins with the get plugins command.
kcctl get pluginsTYPE CLASS VERSION
source com.couchbase.connect.kafka.CouchbaseSourceConnector 4.1.6
sink com.couchbase.connect.kafka.CouchbaseSinkConnector 4.1.6
...
Let’s say I have a Postrgres DB created that I’d like to tie to Kafka as a data source. First, I’ll need to set up my configuration information.
{
"name": "debezium-connector",
"config": {
"database.server.name": "demo-pg",
"database.hostname": "demo-pg.host.com",
"connector.class": "io.debezium.connector.postgresql.PostgresConnector",
"name": "debezium-connector",
"database.port": "28577",
"database.user": "USER",
"database.password": "PASSWORD",
"database.dbname": "DB-NAME", "database.history.kafka.topic": "DB-NAME-schema",
"database.history.kafka.bootstrap.servers": "demo-kafka.host.com:28579"
}
}
We then use the apply
command to apply the configuration, but before we do that, we should check the validity of the configuration just to be safe. We can do this by using the --dry-run
flag with the command. If all goes well, we should see the following:
kcctl apply --dry-run --file register_debezium_connector.json
The configuration is valid!
Now we can apply the configuration to register our new connector. We literally just remove the dry-run
flag.
kcctl apply --file register_debezium_connector.json
Created connector debezium-connector
We can check on the connector by executing kkcctl get connectors
which will return a response that looks like the following.
NAME TYPE STATE TASKS
test-connector source RUNNING 0: RUNNING
debezium-connector source RUNNING 0: RUNNING
If the connecter were failing the command would report that as well. We can simulate a failure by simply shutting off Postgres.
kcctl get connectorsNAME TYPE STATE TASKS
test-connector source RUNNING 0: RUNNING
debezium-connector source RUNNING 0: FAILED
If we use the describe command we can get a lot of details about the connector as well as the failure.
Name: debezium-connector
Type: source
State: RUNNING
Worker ID: demo-kafka-2.aiven.local:3000
Config:
connector.class: io.debezium.connector.postgresql.PostgresConnector
incrementing.column.name: id
...
Tasks:
0:
State: FAILED
Worker ID: demo-kafka-2.aiven.local:3000
Trace: io.debezium.DebeziumException: Couldn't obtain encoding for database star_trek
at io.debezium.connector.postgresql.connection.PostgresConnection.getDatabaseCharset(PostgresConnection.java:476)
at io.debezium.connector.postgresql.PostgresConnectorTask.start(PostgresConnectorTask.java:75)
at io.debezium.connector.common.BaseSourceTask.start(BaseSourceTask.java:130)
at io.debezium.connector.common.BaseSourceTask.startIfNeededAndPossible(BaseSourceTask.java:207)
at io.debezium.connector.common.BaseSourceTask.poll(BaseSourceTask.java:148)
at org.apache.kafka.connect.runtime.WorkerSourceTask.poll(WorkerSourceTask.java:305)
at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:249)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: org.postgresql.util.PSQLException: Connection to demo-pg-cratliff1-demo.aivencloud.com:28577 refused. Check that the hostname and port are correct and that the postm
aster is accepting TCP/IP connections.
...
There was a great deal of detail left out but you get the picture. I much prefer working with Kafka Connect in this fashion. If you have a need or desire to work with Kafka Connect and your primary interface is the CLI, I would urge you to take a few moments and look over the repo.