HTTP Sink Connector¶
The HTTP sink connector allows you to export data from Kafka topics to HTTP based APIS. The connector polls data from Kafka to write to the API based on the topics subscription.
Quick Start - Poor mans’s Replicator¶
Prerequisites:
- Confluent Platform is installed and services are running. This quick start assumes that you are
using cp-demo <https://github.com/confluentinc/cp-demo>, but standalone installations are also supported.
Confluent Replicator is a fully featured solution to replicate messages between topics and clusters. To see the basic functionality of the HTTP connector, we’ll be creating our own replicator using the HTTP Connector to produce messages from a source topic to the REST Proxy instance in cp-demo.
Create the source and sink topics¶
Before we can replicate data we need to create source and destination topics and create some input data.
From inside a cp-demo broker container (docker-compose exec kafka1 bash
):
Create topics:
kafka-topics --zookeeper zookeeper:2181 --topic jsontest.source --create --replication-factor 1 --partitions 1 kafka-topics --zookeeper zookeeper:2181 --topic jsontest.replica --create --replication-factor 1 --partitions 1
Input some source data:
kafka-console-producer --broker-list localhost:10091 --topic jsontest.source >{"foo1":"bar1"} >{"foo2":"bar2"} >{"foo3":"bar3"} >{"foo4":"bar4"} >{"foo5":"bar5"}
Start a console consumer to monitor the output from the connector:
kafka-console-consumer --bootstrap-server localhost:10091 --topic jsontest.replica --from-beginning
Load the HTTP Sink Connector¶
Now we submit the HTTP connector to the cp-demo connect instance:
From outside the container:
curl -X POST -H "Content-Type: application/json" --data '{ \ "name": "http-sink", \ "config": { \ "connector.class":"uk.co.threefi.connect.http.HttpSinkConnector", \ "tasks.max":"1", \ "http.api.url":"https://restproxy:8086/topics/jsontest.replica", \ "topics":"jsontest.source", \ "request.method":"POST", \ "headers":"Content-Type:application/vnd.kafka.json.v2+json|Accept:application/vnd.kafka.v2+json", \ "value.converter":"org.apache.kafka.connect.storage.StringConverter", \ "batch.prefix":"{\"records\":[", \ "batch.suffix":"]}", \ "batch.max.size":"5", \ "regex.patterns":"^~$", \ "regex.replacements":"{\"value\":~}", \ "regex.separator":"~" }}' \ http://localhost:8083/connectors
Your output should resemble:
{ \ "name":"http-sink", \ "config":{ \ "connector.class":"uk.co.threefi.connect.http.HttpSinkConnector", \ "tasks.max":"1", \ "http.api.url":"https://restproxy:8086/topics/jsontest.replica", \ "topics":"jsontest.source", \ "request.method":"POST", \ "headers":"Content-Type:application/vnd.kafka.json.v2+json|Accept:application/vnd.kafka.v2+json", \ "value.converter":"org.apache.kafka.connect.storage.StringConverter", \ "batch.prefix":"{\"records\":[", \ "batch.suffix":"]}", \ "batch.max.size":"5", \ "regex.patterns":"^~$", \ "regex.replacements":"{\"value\":~}", \ "regex.separator":"~", \ "name":"http-sink"}, \ "tasks":[], \ "type":null \ }
Tip
Note the regex configurations. REST Proxy expects data to be wrapped in a structure as below:
{"records":[{"value":{"foo1":"bar1"}},{"value":{"foo2":"bar2"}}]}
The regex configurations and batching parameters create this structure around the original messages.
Confirm the results¶
In your earlier opened console consumer you should see the following:
{"foo1":"bar1"} {"foo2":"bar2"} {"foo3":"bar3"} {"foo4":"bar4"} {"foo5":"bar5"}
Tip
In this example we have configured batch.max.size
to 5. This means, if you produce more than 5 messages in
a way in which connect will see them in a signle fetch (e.g. by producing them before starting the connector.
You will see batches of 5 messages submitted as single calls to the HTTP API.
Features¶
Key/Topic substitutions¶
The special strings ${key}
and ${topic}
can be used in the http.api.url and regex.replacements property to
inject metadata into the destinationAPI.
Regex Replacements¶
The HTTP Sink connector can take a number of regex patterns and replacement strings that are applied the the message
before being submitted to the destination API. For more information see the configuration options regex.patterns
,
regex.replacements
and regex.separator
Batching¶
The HTTP Sink connector batches up requests submitted to HTTP APIs for efficiency. Batches can be built with custom
separators, prefixes and suffixes. For more information see the configuration options batch.prefix
, batch.suffix
and batch.separator
.
You can also control when batches are submitted with configuration for maximum size of a batch. For more information
see the configuration option batch.max.size
All regex options mentioned above still apply when batching and will be applied to individual messages before being submitted to the batch.