Kafka Integration
![]() | This method for integrating with Kafka is deprecated. Use the Kafka Driver for all new integrations. If you choose to use Event Correlators to integrate with Kafka, note that each extension and dependency listed below may have additional sub-dependencies which must be installed as well. |
This tutorial explains how to use Event Correlators to get events from Kafka sources.
Dependencies
Kafka sources requires the following dependencies to be installed.
Extensions must be installed in the extensions directory specified in the Event Correlators plugin properties:
io-kafka extension (Versions 5.x and above with group id
io.siddhi.extension.*
)siddhi-map-json extension (Versions 5.x and above with group id
io.siddhi.extension.*
)
The Kafka-clients library and extension dependencies must be installed to the
sub-directory of the Iotellect installation folder:
kafka-clients library
WSO2 Carbon Siddhi Metrics Core (v3.0.57)
Jackson fasterxml:
![]() | Information on Downloading and Installing Siddhi Extensions could be helpful if you’re having trouble correctly installing the necessary dependencies. |
About Kafka Sources
In Kafka, messages are created by producers. For example, a device that sends temperature data can be a producer. Once a message from a producer reaches a Kafka cluster, it is stored and published in a topic. Consumers can subscribe to these topics to get the stored messages.
Each topic has one or more partitions. A partition acts as a separate feed within a topic that can be accessed by its identifier. Partitions are numbered starting from 0. Messages are assigned to a specific partition based on a value of the key field stored in a message. For example, if several devices include their identifier in the messages, each device can be assigned its own partition.
The correlator engine acts as a consumer for Kafka topics. A Kafka stream can subscribe to a topic and receive data from all its partitions. Once the stream receives events, you can use it like a regular correlator stream. For example, you can filter the input stream or detect patterns in the incoming events.
Kafka Sources
Just like any other stream, Kafka streams use decorators to define stream parameters. To get more information about how event correlator streams work, see Event Correlator Scripts.
an Example of a Kafka Source Stream:
@Source(
type="kafka",
bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092",
topic.list="example", group.id="correlator",
threading.option="single.thread",
@map(type='json'))
define stream InKafkaTemperature (device string, temperature int);
Kafka @Source Stream Parameters:
Parameter | Optional | Description |
type | No | Source stream type. For Kafka sources, the value is always kafka. |
bootstrap.servers | No | Comma-separated list of Kafka hosts. The correlator engine will receive events from the specified hosts. |
topic.list | No | Comma-separated list of topics. The stream will receive events from all specified topics. |
No | Name of a consumer group. If you want to split the messages from a single topic between several input streams, use the same group name. This will ensure that these streams do not receive duplicate events. | |
threading.option | No | Defines the way this source will be processed. Possible values:
|
seq.enabled | Yes | This option is used when the sequence of received events matters. In this case, the stream must define an attribute that will act as an identifier. This parameter is optional and is false by default. |
is.binary.message | Yes | If events are in the binary format, this parameter must be set to true. This parameter is optional and is false by default. |
topic.offset.map | Yes | Defines topic offsets in the <topic> = <offset>, <topic> = <offset> format. An offset will determine how many messages will be skipped from the beginning of the topic. For example, if the offset value is 100, then first 100 messages will be skipped when correlator engine reads from the topic. This parameter is optional and no messages are skipped by default. |
optional.configuration | Yes | Defines all other parameters for the consumer configuration. The format is parameter:value, parameter:value. This parameter is optional. |
@map | No | Mapper for the inbound data. Kafka extension supports inbound data in text, xml, json, and binary format. This format can be specified in the mapper parameter. For example, to receive data in json format, specify a @map(type='json') mapper for the source stream. |
Example
The following script receives data from seven partitions (0-6) from a temperature topic. The stream will connect to two servers from the Kafka cluster: kafka1.example.com:9091 and kafka2.example.com:9092. The incoming stream has two fields: device and temperature. For any events that have a temperature value over 100, an event is generated in the output stream.
@Source(
type="kafka",
bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092",
topic.list="temperature", group.id="correlator",
threading.option="single.thread",
partition.no.list="0,1,2,3,4,5,6",
seq.enabled="false",
is.binary.message="false",
@map(type='json'))
define stream InKafkaTemperature (device string, temperature int);
@Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))
define stream OutKafkaTemperature (message string, temperature int);
from InKafkaTemperature[temperature > 100]
select
device as message,
temperature as temperature
insert into OutKafkaTemperature;
Was this page helpful?