Advanced features

Custom fragments

Cumulocity IoT APIs give you the possibility to structure your data freely. In Apama EPL, this is done by adding entries to params, which is of the type dictionary<string, any>. Each Cumulocity IoT event in the com.apama.cumulocity package (such as Alarm, Event, Measurement or Operation) has a params field, which is translated to fragments or optional fields. Thus, when receiving events, your code should look up entries in the params field. When sending events, this can be done by defining event types, or you can use the dictionary<string, any> type. When receiving events, the EPL type will be dictionary<any, any>. Note that EPL is strongly typed, so if you are creating an event with no fragments, a new dictionary<string, any> expression is required. If you are providing entries inline with a dictionary literal, then EPL will determine the type based on the type of the first key-value pair - thus, for dictionary<string, any>, cast the first value to an any type with the <any> cast operator:

send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;

The MeasurementValue type is provided for the measurements in the Measurement type. MeasurementValue has value and unit fields and params for other fragments.

Example 1:

send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
		"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
		"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
		"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
		"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
		"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
	new dictionary<string,any>) to Measurement.SEND_CHANNEL;

This will result in the following JSON structure:

	"type": "c8y_TemperatureMeasurement",
	"time": "...",
	"source": {
		"id": "12345"
	"c8y_TemperatureMeasurement": {
		"T1": {
			"value": 1,
			"unit": "C"
		"T2": {
			"value": 1,
			"unit": "C"
		"T3": {
			"value": 1,
			"unit": "C"
		"T4": {
			"value": 1,
			"unit": "C"
		"T5": {
			"value": 1,
			"unit": "C"

Measurement fragments

A measurement can be broken into individual measurement fragments. This can be done for each fragment and series present in the measurement. See Cumulocity IoT’s domain model in the Concepts guide for more information on measurement fragments.

Listen for events of type com.apama.cumulocity.MeasurementFragment when you require filtering based on measurement fragments or series, instead of listening for Measurement events and looking inside the measurements dictionary. For more information, see Using measurement fragments in the Apama documentation. To listen for measurement fragments, you must change the default behavior for the measurement format as described below.

To customize the setting of the measurement format, you need to change the tenant options by sending a REST request to Cumulocity IoT. Specify a category, key, and value in the REST request:

For example, specify the following to set the measurement format value to send both, complete measurements and measurement fragments:

   "category": "apama",
   "key": "measurementFormat",
   "value": "BOTH"

See Changing the tenant options for more information.

Changing the tenant options

You can customize the settings of Apama EPL Apps and Apama Analytics Builder by sending REST requests to Cumulocity IoT to change the relevant tenant options. See also Tenants in the Reference guide.

The key names that you can use with the REST requests are listed in Keys for Apama EPL Apps and in the Apama Analytics Builder documentation under Configuration.

A category name is needed along with the key name; this is either apama for Apama EPL Apps or analytics.builder for Apama Analytics Builder.

You can find some concrete examples under Using curl commands for setting various tenant options. However, you can use any tool you like.

To change the tenant options, you need ADMIN permission for “Option management” in Cumulocity IoT. See Managing permissions in the User guide for more information.

Info: After you have changed a tenant option using a REST request, the Apama correlator will automatically restart.

Keys for Apama EPL Apps

The category name that is to be used with the key names listed below is always apama.

Key name Description
measurementFormat The measurement format. Possible values are MEASUREMENT_ONLY (default) and BOTH. See Measurement fragments for more information.

Using curl commands for setting various tenant options

You can set or change various tenant options by sending POST requests to Cumulocity IoT. The information below explains how you can do this using the curl command-line tool. See for detailed information on curl.

The syntax of the curl command depends on the environment in which you are working. The syntax for a Bash UNIX shell, for example, is as follows:

curl --user <username> -X POST -H 'Content-Type: application/json' -d '{"category": "<categoryname>", "key": "<keyname>", "value": "<value>"}' -k https://<hostname>/tenant/options


For example (Bash shell):

curl --user User123 -X POST -H 'Content-Type: application/json' -d '{"category": "apama", "key": "measurementFormat", "value": "BOTH"}' -k https://mytenant/tenant/options


Triggering a statement by an arriving event is not the only possibility. The following sections cover other ways to combine listeners. Refer to Defining Event Listeners in the Apama documentation for full details.


Filters enable you to trigger by combinations or sequences of other triggers. If you have a trigger like this

on all Event() as e { ... }

it is also possible to add filters in the pattern.

on all Event(type = "c8y_EntranceEvent") as e { ... }

You can listen for more than one event:

on Event() as e and Alarm() as a { ... }

This will trigger on receiving an Event and an Alarm event - the first of each will be captured.

You can also trigger by sequences:

on all (Event() as e -> Alarm() as a) { ... }

This will trigger for every pair “Event followed by Alarm”. On receiving an event, it will stop listening for further events and start listening for alarms instead. Once an alarm is received, it will start listening for events again.


You can also trigger listeners based on time. You can either trigger in a certain interval, for example, fire every 5 minutes (300 seconds):

on all wait(300.0) { ... }

Or you can have a listener fire at certain times of the day, with similar functionality to Unix’s cron scheduler:

// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59

on all at(*, *, *, *, *) {} // trigger every minute

on at(*/10, *, *, *, *) {} // trigger every 10 minutes
on at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on at(0, */2, (1-7), *, *) {} // trigger every 2 hours on every day in the first week of every month

You can also combine timer patterns with other patterns. For example, you can check if there was an event within a certain time after another event:

on Event() -> wait(600.0) and not Alarm() { ... }

This will trigger if there is an event and within 10 minutes (600 seconds) there is no alarm. Note the use of not which terminates the listener if the event occurs.

Streams - windows

Streams give you the possibility to operate on windows of events. Streams use the from keyword instead of on and define a window to operate over, and select what output they want from that window using aggregates. Windows can be restricted by two means:

  1. Windows for a certain time - use the within keyword.

    from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements    ["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
  2. Windows with a certain amount of events - use the retain keyword.

    from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }

Streams - outputting periodically

Streams can also control how frequently they evaluate, using the every specifier.

// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }

// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }

// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }

See the Apama documentation for built-in aggregate functions.

Creating own event types

As well as the predefined event types, you can define your own event types. These can be useful to detect patterns of events occurring which trigger other parts of the same module.

event MyEvent {
	Measurement m1;
	Measurement m2;


on Measurement() as m1 -> Measurement() as m2 {
	route MyEvent(m1, m2);

Info: Cumulocity IoT deploys each module into its own namespace, so event definitions from one module cannot be used in other modules. This prevents dependencies between modules.

Creating own actions

Typically, you will structure a monitor using actions (much like functions in Java), as shown in the following examples.

Increasing the given severity:

action upgradeSeverity(string old) returns string {
	if old = "WARNING" { return "MINOR"; }
	if old = "MINOR"   { return "MAJOR"; }
	if old = "MAJOR"   { return "CRITICAL"; }
	return old;

Calculating the distance between two geo-coordinates:

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;


You can define variables in your modules.

string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];

If you define a monitor-scope variable (that is, inside a monitor but not within any actions on that monitor), then that can be used in a listener if you use a colon (:) instead of as for the event co-assignment in the listener. Thus, the example below logs the latest event every 10 seconds:

monitor MyMonitor {
    // monitor scope:
    Event e;
    action onload() {
        on all Event():e {}
        on all wait(10.0) {
            log e.toString();

When a listener starts, it takes a copy of all of the local variables. The example below thus logs each event after a 10 second delay, even if other events come in between.

monitor MyMonitor {
    // monitor scope:
    Event e;
    action onload() {
      on all Event():e {
            on all wait(10.0) {
                log e.toString();

Spawning monitor instances and contexts

While it is possible to handle multiple devices in a single monitor (for example, using group by and partition by in streams, or maintaining a dictionary keyed on the device ID for other state), it is often useful to separate processing of different devices into separate monitor instances.

New monitor instances can be created using the spawn statement. This takes a copy of the monitor’s monitor scope variables and runs the named action in a new monitor instance. No listeners are copied into the new monitor. It is also possible to specify a context to spawn the new monitor instance in. Different contexts can run concurrently with each other, and also help isolate different monitors from each other. When constructing a context, supply a name to identify the context, and a boolean to control if the context is public - that is, it receives the Cumulocity IoT events by default (sent to the default channel).

This pattern is often used with the unmatched keyword to identify events that are not matched by any other listeners in that context. By using a separate context for each monitor, the unmatched behavior is scoped to that monitor. For example:

monitor PerDeviceMeasurementTracker {
	action onload() {
		spawn factory to context("PerDeviceMeasurementTracker", true);
	action factory() {
		on all unmatched Measurement() as m {
			spawn perDevice(m);

	dictionary<string, Measurement> latestMeasurementByType; // measurements for this device

	action perDevice(Measurement m) {
		on all Measurement(source = m.source) as m {
	action processMeasurement(Measurement m) {
		latestMeasurementByType[m.type] := m;