Automatically restarting failed Kafka Connectors and Connect Tasks

Sandeep Dwivedi
2 min readMay 31, 2022

Here’s a way to automatically restart Kafka Connect connectors if they fail. Restarting automatically only makes sense if it’s a transient failure; if there’s a problem with your pipeline (e.g. bad records or a mis-configured server) then you don’t gain anything from this. You might want to check out Kafka Connect’s error handling and dead letter queues too.

Let’s say you’ve got a couple of connectors set up, pulling data from MySQL with Debezium and streaming it to Any application. The MySQL source is a bit flaky and goes offline periodically. You can view the status of all your connectors and tasks:

You can find Ansible deployment at GitHub, although if want to get hands-on go with commands. jq needs to be installed on system.

Replace username, password and server_name with your credentials and Kafka server.

curl --user "<USERNAME:PASSWORD>" -s https://<SERVER_NAME>:8083/connectors| jq '.[]'| \
xargs -I{connector_name} curl --user "<USERNAME:PASSWORD>" -s https://<SERVER_NAME>:8083/connectors/{connector_name}"/status"| \
jq -c -M '[.name,.connector.state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort

sink-elastic-orders-00 | RUNNING | RUNNING
ora-ba1-aerospike-testing-2 | RUNNING | FAILED

If a connector’s task(s) are failed you can restart them using the REST API:

curl -X POST --user "<USERNAME:PASSWORD>" https://<SERVER_NAME>:8083/connectors/ora-ba1-aerospike-testing-2/tasks/0/restart

after which it comes back

curl --user "<USERNAME:PASSWORD>" -s https://<SERVER_NAME>:8083/connectors"| \
jq '.[]'| \
xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
column -s : -t| sed 's/\"//g'| sort

sink-elastic-orders-00 | RUNNING | RUNNING
ora-ba1-aerospike-testing-2 | RUNNING | RUNNING

But, manually watching and restarting tasks isn’t fun, so let’s automate it. Here’s a bit of bash that will restart any failed tasks. It’s the same pattern as above for iterating through the connectors on Kafka Connect’s REST API, coupled with jq 's ability to filter data (select(.tasks[].state=="FAILED"))

You can find script on below link, in this script I have made connectors restart if they are in FAILED state after that this script will restart connector task as well.

Change these variable from above script and save as shell script connector_auto_start.sh

SERVER_NAME={{ inventory_hostname }}
USERNAME={{ kafka_user }}
PASSWORD={{ kafka_password }}

Which as any admin will know can be scheduled to run with a crontab such as this:

*/5 * * * * /app/connectors/connector_auto_start.sh 2>&1 >> /app/connectors/connector_auto_start.log

Now every five minutes the script will look for any FAILED tasks and send a REST call to restart them.

After this we need to fix logrotation as well for above logs, so that logs can be rotated weekly. put this under /etc/logrotate.d

/app/connectors/connector_auto_start.log{
weekly
copytruncate
create 644 root root
dateext
rotate 4
compress
missingok
}

--

--