Automatically restarting failed Kafka Connectors and Connect Tasks
Here’s a way to automatically restart Kafka Connect connectors if they fail. Restarting automatically only makes sense if it’s a transient failure; if there’s a problem with your pipeline (e.g. bad records or a mis-configured server) then you don’t gain anything from this. You might want to check out Kafka Connect’s error handling and dead letter queues too.
Let’s say you’ve got a couple of connectors set up, pulling data from MySQL with Debezium and streaming it to Any application. The MySQL source is a bit flaky and goes offline periodically. You can view the status of all your connectors and tasks:
You can find Ansible deployment at GitHub, although if want to get hands-on go with commands. jq
needs to be installed on system.
Replace username, password
and server_name
with your credentials and Kafka server.
curl --user "<USERNAME:PASSWORD>" -s https://<SERVER_NAME>:8083/connectors| jq '.[]'| \
xargs -I{connector_name} curl --user "<USERNAME:PASSWORD>" -s https://<SERVER_NAME>:8083/connectors/{connector_name}"/status"| \
jq -c -M '[.name,.connector.state]|join(":|:")'| column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00 | RUNNING | RUNNING
ora-ba1-aerospike-testing-2 | RUNNING | FAILED
If a connector’s task(s) are failed you can restart them using the REST API:
curl -X POST --user "<USERNAME:PASSWORD>" https://<SERVER_NAME>:8083/connectors/ora-ba1-aerospike-testing-2/tasks/0/restart
after which it comes back
curl --user "<USERNAME:PASSWORD>" -s https://<SERVER_NAME>:8083/connectors"| \
jq '.[]'| \
xargs -I{connector_name} curl -s "http://localhost:8083/connectors/"{connector_name}"/status"| \
jq -c -M '[.name,.connector.state,.tasks[].state]|join(":|:")'| \
column -s : -t| sed 's/\"//g'| sort
sink-elastic-orders-00 | RUNNING | RUNNING
ora-ba1-aerospike-testing-2 | RUNNING | RUNNING
But, manually watching and restarting tasks isn’t fun, so let’s automate it. Here’s a bit of bash that will restart any failed tasks. It’s the same pattern as above for iterating through the connectors on Kafka Connect’s REST API, coupled with jq
's ability to filter data (select(.tasks[].state=="FAILED")
)
You can find script on below link, in this script I have made connectors restart if they are in FAILED
state after that this script will restart connector task as well.
Change these variable from above script and save as shell script connector_auto_start.sh
SERVER_NAME={{ inventory_hostname }}
USERNAME={{ kafka_user }}
PASSWORD={{ kafka_password }}
Which as any admin will know can be scheduled to run with a crontab such as this:
*/5 * * * * /app/connectors/connector_auto_start.sh 2>&1 >> /app/connectors/connector_auto_start.log
Now every five minutes the script will look for any FAILED
tasks and send a REST call to restart them.
After this we need to fix logrotation as well for above logs, so that logs can be rotated weekly. put this under /etc/logrotate.d
/app/connectors/connector_auto_start.log{
weekly
copytruncate
create 644 root root
dateext
rotate 4
compress
missingok
}