Examples

IMPORTANT

The functionality described in this CEL analytics guide is deprecated. All new Cumulocity IoT installations will use the Apama CEP engine. Software AG will terminate support for using CEL (Esper) in Cumulocity IoT on 31 Dec 2020 following its deprecation in 2018.

For further information on using Apama's Event Processing Language in Cumulocity IoT refer to the Streaming Analytics guide.

For details on migration, refer to Migrating from CEL (Esper) to Apama in the Streaming analytics guide.

Calculating an hourly median of measurements

We are assuming the input data looks like this:

{
  "c8y_TemperatureMeasurement": {
    "T": {
      "value": ...,
      "unit": "C"
    }
  },
  "time":"...",
  "source": {
    "id":"..."
  },
  "type": "c8y_TemperatureMeasurement"
}

To create the median we need the following parts in the module:

Module:

create context HourlyAvgMeasurementDeviceContext
  partition measurement.source.value from MeasurementCreated;

@Name("Creating_hourly_measurement")
context HourlyAvgMeasurementDeviceContext
insert into CreateMeasurement
select
  m.measurement.source as source,
  current_timestamp().toDate() as time,
  "c8y_AverageTemperatureMeasurement" as type,
  {
    "c8y_AverageTemperatureMeasurement.T.value", avg(cast(getNumber(m, "c8y_TemperatureMeasurement.T.value"), double)),
    "c8y_AverageTemperatureMeasurement.T.unit", getString(m, "c8y_TemperatureMeasurement.T.unit")
  } as fragments
from MeasurementCreated.win:time(1 hours) m
where getObject(m, "c8y_TemperatureMeasurement") is not null
output last every 1 hours;

Creating alarm if the operation was not executed

Operations usually run to a fixed sequence when handled by the device.

An operation that does not reach SUCCESSFUL or FAILED within a certain time usually indicates an issue (like device lost connection or device got stuck while handling). Even if the operation was not handled successfully the device should update the operation as FAILED. For this example we will use 10 minutes as a acceptable duration for operation handling. We will check for the following sequence:

If the second part does not appear we will create a new alarm:

@Name("handle_not_finished_operation")
insert into CreateAlarm  
select
    o.operation.deviceId as source,
    CumulocitySeverities.MAJOR as severity,
    CumulocityAlarmStatuses.ACTIVE as status,
    "c8y_OperationNotFinishedAlarm" as type,
    current_timestamp().toDate() as time,
    replaceAllPlaceholders("The device has not finished the operation #{id} within 10 minutes", o.operation) as text
from pattern [
    every o = OperationCreated
    	-> (timer:interval(10 minutes)
    	and not OperationUpdated(
    		operation.id.value = o.operation.id.value
    		and (operation.status in (OperationStatus.SUCCESSFUL, OperationStatus.FAILED))
    	))
];

Creating alarms from bit measurements

Devices often keep alarm statuses in registers and can not interpret the meaning of alarms. In this example, we assume that a device just sends the entire register as a binary value in a measurement. A rule must identify the bits and create the respective alarm.

We create three expressions to resolve alarm text, type, and severity for each of the bits.

create expression String getFaultRegisterAlarmType(position) [
    switch (position) {
        case 0:
          "c8y_HighTemperatureAlarm";
          break;
        case 1:
          "c8y_ProcessingAlarm";
          break;
        case 2:
          "c8y_DoorOpenAlarm";
          break;
        case 3:
          "c8y_SystemFailureAlarm";
          break;
        default:
          "c8y_FaultRegister" + position + "Alarm";
          break;
    };
];

create expression CumulocitySeverities getFaultRegisterAlarmSeverity(position) [
    importClass(com.cumulocity.model.event.CumulocitySeverities);
    switch (position) {
        case 0:
          CumulocitySeverities.MAJOR;
          break;
        case 1:
          CumulocitySeverities.WARNING;
          break;
        case 2:
          CumulocitySeverities.MINOR;
          break;
        case 3:
          CumulocitySeverities.CRITICAL;
          break;
        default:
          CumulocitySeverities.MAJOR;
          break;
    };
];

create expression String getFaultRegisterAlarmText(position)[
    switch(position) {
        case 0:
          "The machine temperature reached a critical status";
          break;
        case 1:
          "There was an error trying to process data";
          break;
        case 2:
          "Door was opened";
          break;
        case 3:
          "There was a critical system failure";
          break;
        default:
          "An undefined alarm was reported on position " || position || " in the binary fault register";
          break;
    };
];

To analyze the binary measurement value we will interpret it as a string value and loop through each character. The getActiveBits() function will do that and return a list of the bit positions at where the measurement had a “1”. It will not return it as a List but instead as a List where the map structure matches the scheme BitPosition so we can handle it as if it is a stream. This is used as an option to join the stream and trigger an alarm by individual measurement values listed.

create schema BitPosition(
  position int
);

create schema MeasurementWithBinaryFaultRegister(
  measurement Measurement,
  faultRegister String
);

create expression Collection getActiveBits(value) [
	importPackage(java.util);
	var bitOnNumbers = new ArrayList();
        var size = value.length;
	for(var no = 0; no < size; no++) {
	    if(value.charAt(no) == "1") {
		bitOnNumbers.add(Collections.singletonMap('position', size - no - 1));
            }
	}
	bitOnNumbers;
];

@Name("extract_fault_register")
insert into MeasurementWithBinaryFaultRegister
select
  m.measurement as measurement,
  getString(m, "c8y_BinaryFaultRegister.errors.value") as faultRegister
from MeasurementCreated m
where getObject(m, "c8y_BinaryFaultRegister") is not null;

@Name("creating_alarm")
insert into CreateAlarm
select
	m.measurement.source as source,
        getFaultRegisterAlarmSeverity(bit.position) as severity,
        CumulocityAlarmStatuses.ACTIVE as status,
	m.measurement.time as time,
	getFaultRegisterAlarmType(bit.position) as type,
	getFaultRegisterAlarmText(bit.position) as text
from
	MeasurementWithBinaryFaultRegister m unidirectional,
	MeasurementWithBinaryFaultRegister[getActiveBits(faultRegister)@type(BitPosition)] as bit;

Creating a measurement like this

{
	"c8y_BinaryFaultRegister": {
  	"errors": {
    	"value": 10110
    }
  },
  "time":"...",
  "source": {
  	"id":"..."
  },
  "type": "c8y_BinaryFaultRegister"
}

will trigger the last statement three times.

and therefore create three alarms.

Consumption measurements

Assuming we have a sensor which measures the current fill level of something and sends the values in a regular basis to Cumulocity we can easily create additional consumption values. Calculating the absolute difference between two measurements can be useful but it will only give you a clear view if the measurements are send always in the same interval. Therefore we will put the absolute difference in relation to the time difference and calculate as a per hour consumption.

We will compare the value and time difference of two adjacent measurements for a device (we will need a context for that).

create schema FillLevelMeasurement(
  measurement Measurement,
  value double
);

create schema AdjacentFillLevelMeasurements(
	firstValue double,
	lastValue double,
	firstTime Date,
	lastTime Date,
	source String
);

create context ConsumptionMeasurementDeviceContext
      partition measurement.source.value from FillLevelMeasurement;

create expression double calculateConsumption(firstValue, lastValue, firstTime, lastTime) [
  if (lastTime == firstTime) {
    0;
  } else {
    ((firstValue - lastValue) * 3600000) / (lastTime - firstTime);
  }
];

@Name("filter_fill_level_measurements")
insert into FillLevelMeasurement
select
  m.measurement as measurement,
  cast(getNumber(m, "c8y_WaterTankFillLevel.level.value"), double) as value
from MeasurementCreated m
where getObject(m, "c8y_WaterTankFillLevel") is not null;

@Name("combine_two_latest_measurements")
context ConsumptionMeasurementDeviceContext
insert into AdjacentFillLevelMeasurements
select
  first(m.value) as firstValue,
  first(m.measurement.time) as firstTime,
  last(m.value) as lastValue,
  last(m.measurement.time) as lastTime,
  context.key1 as source
from FillLevelMeasurement.win:length(2) m;

@Name("create_consumption_measurement")
insert into CreateMeasurement
select
  m.lastTime as time,
  m.source as source,
  "c8y_HourlyWaterConsumption" as type,
  {
    "c8y_HourlyWaterConsumption.consumption.value", calculateConsumption(m.firstValue, m.lastValue, m.firstTime.toMillisec(), m.lastTime.toMillisec()),
    "c8y_HourlyWaterConsumption.consumption.unit", "l/h"
  } as fragments
from AdjacentFillLevelMeasurements m;

Using Zementis analytic models in Cumulocity

The CEP rule/module below shows how to use Zementis analytic models inside Cumulocity.

We are assuming the input data looks like this:

{
  "c8y_SteamMeasurement": {
    "Temperature": {
      "value": ...,
      "unit": "C"
    }
  },
{
  "c8y_TemperatureMeasurement": {
    "Pressure": {
      "value": ...,
      "unit": "bar"
    }
  },
{
  "c8y_TemperatureMeasurement": {
    "Steamoutput": {
      "value": ...,
      "unit": "%"
    }
  },
  "time":"...",
  "source": {
    "id":"..."
  },
  "type": "c8y_TemperatureMeasurement"
}

First, a predictive model is created and uploaded via Zementis console. Assume, the model becomes available for data scoring on https://myadapa.zementis.com:443/adapars/apply/model_name endpoint.

CEP module:

create constant variable string model_name = "model_name";
create constant variable string model_url = "https://myadapa.zementis.com:443/adapars/apply/";
create constant variable string auth = "Basic ...";
create constant variable string source_device = "12345";

create expression string js:getLabel(stringObj)[
var zemOutputs = JSON.parse(stringObj).outputs;
output = zemOutputs.pop().Predicted_label;
];

@Name("inputData")
insert into inputDataAll
select 
     m.source as source,
     getNumber(m, "c8y_SteamMeasurement.Temperature.value") as `steam.temperature`,
     getNumber(m, "c8y_SteamMeasurement.Pressure.value") as `steam.pressure`,
     getNumber(m, "c8y_SteamMeasurement.Steamoutput.value") as `steam.steamoutput`
from MeasurementCreated m
where 
     measurement.source.value = source_device;

@Name("requestZementis")
insert into SendRequest
select 
     "GET" as method,
     model_url || model_name || "?record=" || toJSON(m.*) as url,
     auth as authorization,
     "application/json" as contentType,
     m.source as source
from inputDataAll m;

@Name("responseZementis")
insert into CreateEvent
select
     "response_received_" || getString(response, "status") as type,
     getLabel(response.body) as text,
     response.creationTime as dateTime,
     getString(response, "source.value") as source
from ResponseReceived response
where 
     getString(response, "source.value") = source_device;

@Name("generateAlarm")
insert into CreateAlarm 
select
     response.creationTime as dateTime,
     getString(response, "source.value") as source,
     "cepFailureAlarm" as type,
     "Zementis Test Failure" as text,
     "ACTIVE" as status,
     "MAJOR" as severity
from ResponseReceived response
where 
     getString(response, "source.value") = source_device
     and getLabel(response.body) = "0";

The Cumulocity CEP module works as follows: