Event Correlator Scripts

Correlator scripts use Streaming SQL Language.

Because Streaming SQL is an extensive language on its own, this section will explain the basic concepts of the language as they relate to Iotellect. To get more information about syntax, operators and capabilities of Streaming SQL language, refer to the Streaming SQL Guide.

Main concepts of Streaming SQL language are:

  • Stream. A stream is a series of events ordered in time. Streams are divided into the input streams and output streams. A correlator listens to its input streams and outputs events to its output streams.
  • Query. A query is an expression that can take events from one or more streams, process these events in a streaming manner, and output an event to an output stream.
  • Function. A function is an encapsulated complex execution logic that produces operations on event data and returns the resulting data. You can call functions to perform operations on the event data.
  • Filter. A filter is a conditional expression defined for a stream. As a result, only the events that match the defined expression will be processed.
  • Window. A window is a subset of data by a specific criterion that is taken from a stream. Windows are dynamic. As new events are generated in a stream, the window updates itself with new data.
  • Pattern. A pattern is an expression that defines correlation logic for events. With patterns you can correlate events from one or more input streams to each other and generate output events with data taken from these events.

Below is an example script. It takes values from events generated by a virtual device and outputs them into the output stream along with an additional message.

@Source(type = 'internalEvent', context='users.admin.devices.virtual', event='event1', @map(type='internalEvent'))
define stream SourceStream (string string, int int);

@Sink(type = 'internalEvent', event='test', @map(type='internalEvent'))
define stream OutStream (message string, str string, num double);

from SourceStream
select 'Event 1 detected' as message, string as str, cast(int, "double") as num
insert into OutStream;

In this example, SourceStream is an input stream, OutStream is an output stream. @Source and @Sink are stream annotations as explained below. The from ... select ... insert into is a query. The cast(int, "float") is a built-in function that converts integer values to double. There are no filters, windows, and patterns in this example. They are described in their own sections.

Defining Streams

This is done with the define stream directive followed by the stream attribute definitions:

define stream TestStream (message string, num double);

Above, the TestStream stream has two attibutes: message of string type and num of double type.

Attribute names must correspond to event field names. For example, if an event in Iotellect is named message, you cannot use a different field name for it in the correlator script. If the used names do not match, the event correlator may ignore some events. This can lead to the loss of data.

Binding Streams to Iotellect Events

The definition of a stream requires an annotation to be useful. The mechanism that binds Iotellect events to correlator engine streams uses annotations to do so.

The @Source annotation specifies that the stream definition following it is an input stream.

Example of an annotated input stream:

@Source(type = 'internalEvent', context='users.admin.devices.virtual', event='event1', @map(type='internalEvent'))
define stream SourceStream (string string, int int);

@Source annotation attributes:

  • type defines the input stream type. This parameter defines the protocol that will be used to create this input stream. By default, only the internalEvent type is supported. This type corresponds to events generated within Iotellect. You can extend the supported stream types by installing extensions. For example, you can use MQTT and RabbitMQ stream types by installing the corresponding correlator engine extensions.
  • context is a custom attribute for the internalEvent stream type. It defines a context that has an event that the correlator must listen to.
  • event is a custom attribute for the internalEvent stream type. It defines the name of the event in the specified context. The correlator will listen to events with this name generated by the context.
  • @map() is an input format mapper for correlator engine. The default internalEvent mapper maps the format of the events generated within Iotellect to the format that the correlator engine can process.

The @Sink annotation specifies that the stream definition following it is an output stream.

Example of an annotated output stream:

@Sink(type = 'internalEvent', context='users.admin.devices.virtual' event='event2', @map(type='internalEvent'))
define stream OutStream (string string, int int);

@Sink annotation attributes:

  • type defines the output stream type. This attribute is the same as the type attribute of the @Source annotation.
  • context is a custom attribute for the internalEvent stream type. It defines a context that will generate the output events. If this attribute is not specified, the events will be generated by the correlator context itself.
  • event is a custom attribute for the internalEvent stream type. It defines the name of the event in the specified context that will be generated. If a correlator context generates the events, its Output Event Format property specifies the event format. If it is a different context, the event with the specified name must already exist in the specified context. In both cases the event field names must match the names of the stream attributes. If you need a subset of fields from an event, you can specify stream attributes only for these fields.
  • @map() is an output format mapper for the correlator engine. The default internalEvent mapper maps the format of the events generated by the correlator engine to the format used by Iotellect.

Processing Event Data

To process event data, use queries. Queries are statements that collect data from one or more input streams and output data to an output stream.

A simple query looks like this:

from SourceStream 
select int as num, string as message
insert into OutStream;

This query selects fields with names int and string from the input stream. The value of the int field is put to the num field. The value of the string field is put to the message field. The resulting event has two fields: num and message. It is sent to the output stream.

The following query executes functions to calculate the field values for the outbound event:

from SourceStream
select ifThenElse(regex:matches("(.*)bbb(.*)", string), "match", "no match") as message, cast(int, 'double') as num
insert into OutStream;

This query uses three functions to calculate field values. The ifThenElse() function is a built-in function of the Streaming SQL language. It provides conditional logic. The cast() function is a built-in function of the the Streaming SQL language. It converts its parameter to a different type. In this example, an integer value stored in the int field is converted to a double value that is written to the num field in the output stream. The regex:matches() function matches the string field of the input event to a regular expression. The regex:matches() function is an extension function. To use extension functions, you need to install the corresponding extensions.

Extension functions must be called using a namespace. In the example above, regex is the namespace of the matches function.

For more information about built-in functions, see Streaming SQL API documentation. For more information about extension functions, see correlator engine extensions documentation.

Filters

A filter is a condition defined for a stream. Only the events that pass the conditional will be processed. All other events will be ignored.

The following filter selects events from a specific user:

@Source(type = 'internalEvent', context='users.admin.models.actionHandler', event='action', @map(type='internalEvent'))
define stream UserActions (user string, action string, level int);``

from UserActions[user == 'administrator']
select action as message
insert into OutStream;

You can use logic operators inside filters to combine conditionals. The following example selects events form a specific user that are above a certain severity level.

from UserActions[user == 'administrator' and level > 1]
select action as message
insert into OutStream;

Windows

A window is a subset of data by a specific criterion that is taken from a stream. As time goes and more data is added via the input stream, the data provided by a window updates as well. Like with an input stream, you can process the data inside a window and send it to the output stream.

For example, you may use a window to keep the last ten temperature values detected by a device and check all other incoming events against these values. As new events come, the data in this window will be updated accordingly. As another example, you can use a window to select the last five minutes of activity for a certain device. As time goes, the data in this window will be updated, so the events contained in the window will always be up-to-date.

Windows are defined on a stream by using a #window prefix followed by the dot and window type with  parameters. The following window gets 10 last events from a stream:

from SourceStream#window.length(10)

Window definitions can be combined with filters. The following window gets 10 last events from a stream that belong to a specific user:

from SourceStream#window.length(10)[user == 'administrator']

The following query uses a window and reports the maximum temperature for the last 10 events. An output event is generated each time a new event is received:

from SourceStream#window.length(10)
select "Maximum temperature" as message, cast(max(int), 'double') as num
insert into OutStream;

Windows can be sliding and tumbling.

A sliding window updates with every new event that matches the window criteria. A tumbling window updates when the full length of the window is filled with matching events. For example, a window fits three last events. A new event is generated. A sliding window will update. It will have two old events and one new event. A tumbling window will not update until two more events come. When it does update, it will contain three new events.

To define a sliding window, use length and time window types. The length type will select a number of last events. The time type will select all events for a specified amount of time.

-- select last 10 events from SourceStream
from SourceStream#window.length(10)

-- select all events that came in the last 10 minutes from SourceStream
from SourceStream#window.time(10 min)

To define a tumbling window, use lengthBatch and timeBatch window types. The lengthBatch type will update every specified number of events. The timeBatch type will update the window every specified amount of time.

-- select every 10 events from SourceStream
from SourceStream#window.lengthBatch(10)

-- select all events that come every 10 minutes from SourceStream
from SourceStream#window.timeBatch(10 min)

For example, if the length of the lengthBatch window is 10, it will generate an outbound event for every 10 inbound events. A timeBatch window with 10-minute period will generate an outbound event every 10 minutes.

The following example will generate an event every 10 minutes. This event will contain the maximum value of the int field from all events that are received in the last 10 minutes. If no events are received, no event will be generated.

from SourceStream#window.timeBatch(10 min)
select 'Maximum temperature' as message, cast(max(int), 'double') as num
insert into OutStream;

Patterns

With patterns, you can correlate events to each other and implement complex event-based logic.

Pattern syntax is very flexible and customizable. The following example is just a small subset of what you can do with patterns. For the full reference, see Streaming SQL Guide.

A basic pattern defines a sequence of events that must follow each other separated by a -> operator. The following example demonstrates such pattern. If an emergency alarm is activated and within 10 minutes it is followed by a temperature increase above 80 degrees, then fire extinguishers are activated.

from InStreamAlarms[alarm == 'emergency'] ->
InStreamTemperature[temp > 80]
within 10 min
select
"activateExtinguishers" as action
insert into OutSystemControl;

As another example, there are two input streams. One is from a sensor device that sends information about the level of fuel in a tank. Another one signals when the operator goes away and comes back. The output stream controls the pump.

-- 0 to 100
@Source(type = 'internalEvent', context='users.admin.devices.fuelSensor', event='fuelLevel', @map(type='internalEvent'))
define stream SourceSensor (level int);

-- "isAway", "isPresent"
@Source(type = 'internalEvent', context='users.admin.models.operator', event='action', @map(type='internalEvent'))
define stream SourceOperator (status string, name string);

-- 1 - enable, 0 - disable
@Sink(type = 'internalEvent', context='users.admin.models.pumpController', event='control', @map(type='internalEvent'))
define stream OutPumpControl (action int, message string, operator string, value int);

If the operator went away, did not come back within 10 minutes, and during this time fuel level approached 5%, enable the pump. This pattern uses event references (e1 and e2) to refer to data from the related events.

from every( e1=SourceOperator[status == 'isAway'] ) ->
not SourceOperator[status == 'isPresent' and name == e1.name] for 10 min and e2=SourceSensor[level <= 5]
select
1 as action,
"Automatic activation (operator not present)" as message,
e1.name as operator,
e2.level as value
insert into OutPumpControl;

If at some point the fuel level rises to a value above 95% and does not drop below this level for the next 2 minutes, stop the pump. There is no need to start looking for this pattern for each event with level above 95% (no need for every keyword). The first event with level above 95% will start the pattern and all other events will be processed as a part of it.

from e1=SourceSensor[level > 95] -> 
not SourceSensor[level < e1.level] for 2 min
select
0 as action,
"Automatic pump shutdown" as message,
"auto" as operator,
level as value
insert into OutPumpControl;

Was this page helpful?