Kafka Integration

This method for integrating with Kafka is deprecated. Use the Kafka Driver for all new integrations.

If you choose to use Event Correlators to integrate with Kafka, note that each extension and dependency listed below may have additional sub-dependencies which must be installed as well.

This tutorial explains how to use Event Correlators to get events from Kafka sources.

Dependencies

Kafka sources requires the following dependencies to be installed.

Extensions must be installed in the extensions directory specified in the Event Correlators plugin properties:

The Kafka-clients library and extension dependencies must be installed to the <installation folder>\lib sub-directory of the Iotellect installation folder:

Information on Downloading and Installing Siddhi Extensions could be helpful if you’re having trouble correctly installing the necessary dependencies.

About Kafka Sources

In Kafka, messages are created by producers. For example, a device that sends temperature data can be a producer. Once a message from a producer reaches a Kafka cluster, it is stored and published in a topic. Consumers can subscribe to these topics to get the stored messages.

Each topic has one or more partitions. A partition acts as a separate feed within a topic that can be accessed by its identifier. Partitions are numbered starting from 0. Messages are assigned to a specific partition based on a value of the key field stored in a message. For example, if several devices include their identifier in the messages, each device can be assigned its own partition.

The correlator engine acts as a consumer for Kafka topics. A Kafka stream can subscribe to a topic and receive data from all its partitions. Once the stream receives events, you can use it like a regular correlator stream. For example, you can filter the input stream or detect patterns in the incoming events.

Kafka Sources

Just like any other stream, Kafka streams use decorators to define stream parameters. To get more information about how event correlator streams work, see Event Correlator Scripts.

an Example of a Kafka Source Stream:

@Source(

    type="kafka", 
    bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092", 
    topic.list="example", group.id="correlator", 
    threading.option="single.thread", 
    @map(type='json'))

define stream InKafkaTemperature (device string, temperature int);

Kafka @Source Stream Parameters:

Parameter

Optional

Description

type

No

Source stream type. For Kafka sources, the value is always kafka.

bootstrap.servers

No

Comma-separated list of Kafka hosts. The correlator engine will receive events from the specified hosts.

topic.list

No

Comma-separated list of topics. The stream will receive events from all specified topics.

http://group.id

No

Name of a consumer group. If you want to split the messages from a single topic between several input streams, use the same group name. This will ensure that these streams do not receive duplicate events.

threading.option

No

Defines the way this source will be processed. Possible values:

  • single.thread. The source will run on a single thread.

  • topic.wise. Each topic will run on a separate thread.

  • partition.wise. Use a separate thread for each partition.

seq.enabled

Yes

This option is used when the sequence of received events matters. In this case, the stream must define an attribute that will act as an identifier. This parameter is optional and is false by default.

is.binary.message

Yes

If events are in the binary format, this parameter must be set to true. This parameter is optional and is false by default.

topic.offset.map

Yes

Defines topic offsets in the <topic> = <offset>, <topic> = <offset> format. An offset will determine how many messages will be skipped from the beginning of the topic. For example, if the offset value is 100, then first 100 messages will be skipped when correlator engine reads from the topic. This parameter is optional and no messages are skipped by default.

optional.configuration

Yes

Defines all other parameters for the consumer configuration. The format is parameter:value, parameter:value. This parameter is optional.

@map

No

Mapper for the inbound data. Kafka extension supports inbound data in text, xml, json, and binary format. This format can be specified in the mapper parameter. For example, to receive data in json format, specify a @map(type='json') mapper for the source stream.

Example

The following script receives data from seven partitions (0-6) from a temperature topic. The stream will connect to two servers from the Kafka cluster: kafka1.example.com:9091 and kafka2.example.com:9092. The incoming stream has two fields: device and temperature. For any events that have a temperature value over 100, an event is generated in the output stream.

@Source(

    type="kafka", 
    bootstrap.servers="kafka1.example.com:9091, kafka2.example.com:9092", 
    topic.list="temperature", group.id="correlator", 
    threading.option="single.thread", 
    partition.no.list="0,1,2,3,4,5,6", 
    seq.enabled="false", 
    is.binary.message="false", 
    @map(type='json'))

define stream InKafkaTemperature (device string, temperature int);

@Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))

define stream OutKafkaTemperature (message string, temperature int);

from InKafkaTemperature[temperature > 100]
select 
  device as message,
  temperature as temperature
insert into OutKafkaTemperature;

Was this page helpful?