Kafka Connect, designed to function as a service, provides a REST API for managing connectors. By default, this service operates on port 8083. In distributed mode, the REST API serves as the primary interface to the cluster. You can send requests to any cluster member, and if necessary, the REST API will forward them automatically.
In this tutorial, we will explore the features of the Connect REST API using simple command line examples.
Getting Essential Connect Cluster Information
To obtain basic Connect cluster information, including the worker version, commit details, and Kafka cluster ID, execute the following command:
Please note that the cluster ID differentiates this particular cluster from other Connect clusters running a separate set of connectors.
Listing Installed Plugins
The command below displays the plugins installed on the worker. Keep in mind that plugins must be installed beforehand to be used at runtime.
While Kafka does come with a few built-in plugins, it’s generally necessary to install additional plugins. The ideal place to find them is Confluent Hub, which offers a wide range of plugins and a command line tool to facilitate installation. For our two Connect workers, we utilized Docker containers with commands to download and install four connector plugins from Confluent Hub.
Formatting the Result of the Installed Plugin List
Identifying the various plugins in the command output can be a bit tricky. However, you can rerun the command and pipe the output to the
jq command to obtain a more readable format.
Now, it becomes much simpler to view the details of each available plugin. These plugins must be installed on all workers in the Connect cluster so that they are accessible when a connector instance or task is reassigned to a worker due to rebalancing.
Creating a Connector Instance
To create a connector instance, you can use the PUT or POST method to send a JSON file containing the connector’s configuration to a REST endpoint on your Connect worker. Using PUT is slightly more straightforward, as it will create the connector if it doesn’t already exist or update it if it does. If there are no updates required, PUT won’t trigger an error, making it the preferred approach for updating a connector.
Listing Connector Instances
To obtain a list of all existing connectors, use the following command:
Inspecting Configuration and Status for a Connector
To inspect the configuration of a specific connector, follow these steps:
You can also check a connector’s status. While the config command displays the connector’s static configuration, the status reveals the connector’s runtime entity.
Deleting a Connector
If you encounter issues with your setup and believe that modifying the configuration won’t solve the problem, or if you no longer require a particular connector to run, you can delete it based on its name.
Alternatively, you can create an interactive delete list using the peco tool. By piping the connector list stdout through it and subsequently using xargs to make a cURL call to the delete API, you can select and delete connector instances interactively.
The interactive list of connector instances will appear. To delete an instance, highlight it using the arrow key and press enter. When finished, simply press Ctrl-C to exit the interactive list.
Updating a Connector
As mentioned earlier, if you need to update a connector, you can use the PUT method to modify its configuration (refer to the “Create a Connector Instance” section above). Since PUT is employed for both creating and updating connectors, it’s generally the command you’ll use most frequently. Consequently, you won’t have to rewrite your configurations entirely.
Inspecting Task Details
The following command retrieves the status of a connector:
If a connector fails, the failure details pertain to its tasks. To identify the source of the problem, you must locate the stack trace associated with the task. The task represents the entity responsible for executing the connector and converter code, and hence, the stack trace resides within it.
Restarting the Connector and Tasks
After inspecting a task and resolving the underlying issues, such as restarting a database, you can restart the connector as follows:
Please note that restarting the connector alone won’t restart all of its tasks. You must also restart the failed task and subsequently retrieve its status using the following command:
Pausing and Resuming a Connector
When you pause a connector, all of its tasks are paused. However, this process occurs asynchronously, meaning that you can’t rely on all tasks pausing simultaneously. Since the tasks run in a thread pool, there isn’t a mechanism in place to synchronize their pausing.
You can pause a connector and its tasks using the following command:
Similarly, you can resume a connector and its tasks with ease:
Displaying All Tasks of a Connector
The following command conveniently displays all tasks associated with a connector:
This information is similar to what you can obtain from other APIs, but it is organized by task, with individual configurations provided.
Obtaining a List of Topics Used by a Connector
Starting from Apache Kafka 2.5, it’s possible to retrieve a list of topics used by a connector:
This demonstrates the topics from which the connector consumes or to which it produces. While this may not be particularly valuable for connectors that interact with a single topic, it proves highly beneficial in scenarios where developers use regular expressions for topic names in Connect. It also proves advantageous when a source connector employs SMTs to dynamically alter the destination topic names.