Shell Sink Connector

The Shell sink connector allows you to run shell commands triggered by messages consumed from Kafka. The connector polls data from Kafka and allows substitution of message elements (topic,key,value) into the resulting shell command.

Quick Start - Writing a file.

Prerequisites:

  • Confluent Platform <https://docs.confluent.io/current/installation/installing_cp/index.html> is installed and services are running. This quick start assumes that you are using cp-demo <https://github.com/confluentinc/cp-demo>, but standalone installations are also supported.

In this demo we’ll use the Shell Sink connector to read messages from Kafka and use the Shell Sink connector to write the data to a new file.

Create the Kafka topic

Before we can read the file we need to create a topic to hold the file data. From inside a cp-demo broker container (docker-compose exec kafka1 bash):

  1. Create topics:

    kafka-topics --zookeeper zookeeper:2181 --topic file.data --create --replication-factor 1 --partitions 1
    

Produce messages to the topic

From inside a cp-demo broker container (docker-compose exec kafka1 bash):

  1. Create a test file:

    echo "{\"schema\":null,\"payload\":\"test message\"}" > /etc/kafka/secrets/test_message
    
  2. Create a test script:

    kafka-console-producer --broker-list kafka1:10091 --topic file.data < /etc/kafka/secrets/test_message
    

Load the Shell Sink Connector

Now we submit the Shell Sink connector to the cp-demo connect instance. This will fetch the previously produced messages from Kafka into a file:

  1. From outside the container in the cp-demo root directory:

    curl -X POST -H "Content-Type: application/json" \
    --cert scripts/security/connect.certificate.pem \
    --key scripts/security/connect.key \
    --tlsv1.2 \
    --cacert scripts/security/snakeoil-ca-1.crt \
    --data '{ \
    "name": "shell-sink", \
    "config": {"connector.class":"uk.co.threefi.connect.shell.ShellSinkConnector", \
            "tasks.max":"1", \
            "shell.command":"echo \"${value}\" >> /etc/kafka/secrets/out.file", \
            "topics":"file.data" \
            }}' \
    https://localhost:8083/connectors
    

Confirm the results

From inside a cp-demo Connect container (docker-compose exec connect bash):

cat /etc/kafka/secrets/out.file

Features

The special strings ${key}, ${topic} and ${value} can be used in the shell.command property to inject message data into the command run.