HTTP Sink Connector

The HTTP sink connector allows you to export data from Kafka topics to HTTP based APIS. The connector polls data from Kafka to write to the API based on the topics subscription.

Quick Start - Poor mans’s Replicator

Prerequisites:

  • Confluent Platform is installed and services are running. This quick start assumes that you are

using cp-demo <https://github.com/confluentinc/cp-demo>, but standalone installations are also supported.

Confluent Replicator is a fully featured solution to replicate messages between topics and clusters. To see the basic functionality of the HTTP connector, we’ll be creating our own replicator using the HTTP Connector to produce messages from a source topic to the REST Proxy instance in cp-demo.

Create the source and sink topics

Before we can replicate data we need to create source and destination topics and create some input data. From inside a cp-demo broker container (docker-compose exec kafka1 bash):

  1. Create topics:

    kafka-topics --zookeeper zookeeper:2181 --topic jsontest.source --create --replication-factor 1 --partitions 1
    kafka-topics --zookeeper zookeeper:2181 --topic jsontest.replica --create --replication-factor 1 --partitions 1
    
  2. Input some source data:

    kafka-console-producer --broker-list localhost:10091 --topic jsontest.source
    >{"foo1":"bar1"}
    >{"foo2":"bar2"}
    >{"foo3":"bar3"}
    >{"foo4":"bar4"}
    >{"foo5":"bar5"}
    
  3. Start a console consumer to monitor the output from the connector:

    kafka-console-consumer --bootstrap-server localhost:10091 --topic jsontest.replica --from-beginning
    

Load the HTTP Sink Connector

Now we submit the HTTP connector to the cp-demo connect instance:

  1. From outside the container:

    curl -X POST -H "Content-Type: application/json" --data '{ \
    "name": "http-sink", \
    "config": { \
            "connector.class":"uk.co.threefi.connect.http.HttpSinkConnector", \
            "tasks.max":"1", \
            "http.api.url":"https://restproxy:8086/topics/jsontest.replica", \
            "topics":"jsontest.source", \
            "request.method":"POST", \
            "headers":"Content-Type:application/vnd.kafka.json.v2+json|Accept:application/vnd.kafka.v2+json", \
            "value.converter":"org.apache.kafka.connect.storage.StringConverter", \
            "batch.prefix":"{\"records\":[", \
            "batch.suffix":"]}", \
            "batch.max.size":"5", \
            "regex.patterns":"^~$", \
            "regex.replacements":"{\"value\":~}", \
            "regex.separator":"~" }}' \
    http://localhost:8083/connectors
    

    Your output should resemble:

    { \
    "name":"http-sink", \
    "config":{ \
            "connector.class":"uk.co.threefi.connect.http.HttpSinkConnector", \
            "tasks.max":"1", \
            "http.api.url":"https://restproxy:8086/topics/jsontest.replica", \
            "topics":"jsontest.source", \
            "request.method":"POST", \
            "headers":"Content-Type:application/vnd.kafka.json.v2+json|Accept:application/vnd.kafka.v2+json", \
            "value.converter":"org.apache.kafka.connect.storage.StringConverter", \
            "batch.prefix":"{\"records\":[", \
            "batch.suffix":"]}", \
            "batch.max.size":"5", \
            "regex.patterns":"^~$", \
            "regex.replacements":"{\"value\":~}", \
            "regex.separator":"~", \
            "name":"http-sink"}, \
    "tasks":[], \
    "type":null \
    }
    

Tip

Note the regex configurations. REST Proxy expects data to be wrapped in a structure as below:

{"records":[{"value":{"foo1":"bar1"}},{"value":{"foo2":"bar2"}}]}

The regex configurations and batching parameters create this structure around the original messages.

Confirm the results

In your earlier opened console consumer you should see the following:

{"foo1":"bar1"}
{"foo2":"bar2"}
{"foo3":"bar3"}
{"foo4":"bar4"}
{"foo5":"bar5"}

Tip

In this example we have configured batch.max.size to 5. This means, if you produce more than 5 messages in a way in which connect will see them in a signle fetch (e.g. by producing them before starting the connector. You will see batches of 5 messages submitted as single calls to the HTTP API.

Features

Key/Topic substitutions

The special strings ${key} and ${topic} can be used in the http.api.url and regex.replacements property to inject metadata into the destinationAPI.

Regex Replacements

The HTTP Sink connector can take a number of regex patterns and replacement strings that are applied the the message before being submitted to the destination API. For more information see the configuration options regex.patterns, regex.replacements and regex.separator

Batching

The HTTP Sink connector batches up requests submitted to HTTP APIs for efficiency. Batches can be built with custom separators, prefixes and suffixes. For more information see the configuration options batch.prefix, batch.suffix and batch.separator.

You can also control when batches are submitted with configuration for maximum size of a batch. For more information see the configuration option batch.max.size

All regex options mentioned above still apply when batching and will be applied to individual messages before being submitted to the batch.