The functionality described in this CEL analytics guide is deprecated. All new Cumulocity IoT installations will use the Apama CEP engine. Software AG will terminate support for using CEL (Esper) in Cumulocity IoT on 31 Dec 2020 following its deprecation in 2018.
For further information on using Apama's Event Processing Language in Cumulocity IoT refer to the Streaming Analytics guide.
To create the median we need the following parts in the module:
A context to separate the measurements correctly per device
A time window over one hour
An output that returns only the last average calculation every hour
Everything created as a new measurement
Module:
create context HourlyAvgMeasurementDeviceContext
partition measurement.source.value from MeasurementCreated;
@Name("Creating_hourly_measurement")
context HourlyAvgMeasurementDeviceContext
insert into CreateMeasurement
select
m.measurement.source as source,
current_timestamp().toDate() as time,
"c8y_AverageTemperatureMeasurement" as type,
{
"c8y_AverageTemperatureMeasurement.T.value", avg(cast(getNumber(m, "c8y_TemperatureMeasurement.T.value"), double)),
"c8y_AverageTemperatureMeasurement.T.unit", getString(m, "c8y_TemperatureMeasurement.T.unit")
} as fragments
from MeasurementCreated.win:time(1 hours) m
where getObject(m, "c8y_TemperatureMeasurement") is not null
output last every 1 hours;
Creating alarm if the operation was not executed
Operations usually run to a fixed sequence when handled by the device.
PENDING (after creation)
EXECUTING (once device received the operation and starts the handling)
SUCCESSFUL or FAILED (depending on the execution result)
An operation that does not reach SUCCESSFUL or FAILED within a certain time usually indicates an issue (like device lost connection or device got stuck while handling).
Even if the operation was not handled successfully the device should update the operation as FAILED.
For this example we will use 10 minutes as a acceptable duration for operation handling.
We will check for the following sequence:
OperationCreated
OperationUpdated for the same operation within 10 minutes that sets the status to either SUCCESSFUL or FAILED
If the second part does not appear we will create a new alarm:
@Name("handle_not_finished_operation")
insert into CreateAlarm
select
o.operation.deviceId as source,
CumulocitySeverities.MAJOR as severity,
CumulocityAlarmStatuses.ACTIVE as status,
"c8y_OperationNotFinishedAlarm" as type,
current_timestamp().toDate() as time,
replaceAllPlaceholders("The device has not finished the operation #{id} within 10 minutes", o.operation) as text
from pattern [
every o = OperationCreated
-> (timer:interval(10 minutes)
and not OperationUpdated(
operation.id.value = o.operation.id.value
and (operation.status in (OperationStatus.SUCCESSFUL, OperationStatus.FAILED))
))
];
Creating alarms from bit measurements
Devices often keep alarm statuses in registers and can not interpret the meaning of alarms.
In this example, we assume that a device just sends the entire register as a binary value in a measurement. A rule must identify the bits and create the respective alarm.
We create three expressions to resolve alarm text, type, and severity for each of the bits.
create expression String getFaultRegisterAlarmType(position) [
switch (position) {
case 0:
"c8y_HighTemperatureAlarm";
break;
case 1:
"c8y_ProcessingAlarm";
break;
case 2:
"c8y_DoorOpenAlarm";
break;
case 3:
"c8y_SystemFailureAlarm";
break;
default:
"c8y_FaultRegister" + position + "Alarm";
break;
};
];
create expression CumulocitySeverities getFaultRegisterAlarmSeverity(position) [
importClass(com.cumulocity.model.event.CumulocitySeverities);
switch (position) {
case 0:
CumulocitySeverities.MAJOR;
break;
case 1:
CumulocitySeverities.WARNING;
break;
case 2:
CumulocitySeverities.MINOR;
break;
case 3:
CumulocitySeverities.CRITICAL;
break;
default:
CumulocitySeverities.MAJOR;
break;
};
];
create expression String getFaultRegisterAlarmText(position)[
switch(position) {
case 0:
"The machine temperature reached a critical status";
break;
case 1:
"There was an error trying to process data";
break;
case 2:
"Door was opened";
break;
case 3:
"There was a critical system failure";
break;
default:
"An undefined alarm was reported on position " || position || " in the binary fault register";
break;
};
];
To analyze the binary measurement value we will interpret it as a string value and loop through each character.
The getActiveBits() function will do that and return a list of the bit positions at where the measurement had a “1”.
It will not return it as a List but instead as a List
create schema BitPosition(
position int
);
create schema MeasurementWithBinaryFaultRegister(
measurement Measurement,
faultRegister String
);
create expression Collection getActiveBits(value) [
importPackage(java.util);
var bitOnNumbers = new ArrayList();
var size = value.length;
for(var no = 0; no < size; no++) {
if(value.charAt(no) == "1") {
bitOnNumbers.add(Collections.singletonMap('position', size - no - 1));
}
}
bitOnNumbers;
];
@Name("extract_fault_register")
insert into MeasurementWithBinaryFaultRegister
select
m.measurement as measurement,
getString(m, "c8y_BinaryFaultRegister.errors.value") as faultRegister
from MeasurementCreated m
where getObject(m, "c8y_BinaryFaultRegister") is not null;
@Name("creating_alarm")
insert into CreateAlarm
select
m.measurement.source as source,
getFaultRegisterAlarmSeverity(bit.position) as severity,
CumulocityAlarmStatuses.ACTIVE as status,
m.measurement.time as time,
getFaultRegisterAlarmType(bit.position) as type,
getFaultRegisterAlarmText(bit.position) as text
from
MeasurementWithBinaryFaultRegister m unidirectional,
MeasurementWithBinaryFaultRegister[getActiveBits(faultRegister)@type(BitPosition)] as bit;
Assuming we have a sensor which measures the current fill level of something and sends the values in a regular basis to Cumulocity we can easily create additional consumption values.
Calculating the absolute difference between two measurements can be useful but it will only give you a clear view if the measurements are send always in the same interval.
Therefore we will put the absolute difference in relation to the time difference and calculate as a per hour consumption.
We will compare the value and time difference of two adjacent measurements for a device (we will need a context for that).
create schema FillLevelMeasurement(
measurement Measurement,
value double
);
create schema AdjacentFillLevelMeasurements(
firstValue double,
lastValue double,
firstTime Date,
lastTime Date,
source String
);
create context ConsumptionMeasurementDeviceContext
partition measurement.source.value from FillLevelMeasurement;
create expression double calculateConsumption(firstValue, lastValue, firstTime, lastTime) [
if (lastTime == firstTime) {
0;
} else {
((firstValue - lastValue) * 3600000) / (lastTime - firstTime);
}
];
@Name("filter_fill_level_measurements")
insert into FillLevelMeasurement
select
m.measurement as measurement,
cast(getNumber(m, "c8y_WaterTankFillLevel.level.value"), double) as value
from MeasurementCreated m
where getObject(m, "c8y_WaterTankFillLevel") is not null;
@Name("combine_two_latest_measurements")
context ConsumptionMeasurementDeviceContext
insert into AdjacentFillLevelMeasurements
select
first(m.value) as firstValue,
first(m.measurement.time) as firstTime,
last(m.value) as lastValue,
last(m.measurement.time) as lastTime,
context.key1 as source
from FillLevelMeasurement.win:length(2) m;
@Name("create_consumption_measurement")
insert into CreateMeasurement
select
m.lastTime as time,
m.source as source,
"c8y_HourlyWaterConsumption" as type,
{
"c8y_HourlyWaterConsumption.consumption.value", calculateConsumption(m.firstValue, m.lastValue, m.firstTime.toMillisec(), m.lastTime.toMillisec()),
"c8y_HourlyWaterConsumption.consumption.unit", "l/h"
} as fragments
from AdjacentFillLevelMeasurements m;
Using Zementis analytic models in Cumulocity
The CEP rule/module below shows how to use Zementis analytic models inside Cumulocity.
create constant variable string model_name = "model_name";
create constant variable string model_url = "https://myadapa.zementis.com:443/adapars/apply/";
create constant variable string auth = "Basic ...";
create constant variable string source_device = "12345";
create expression string js:getLabel(stringObj)[
var zemOutputs = JSON.parse(stringObj).outputs;
output = zemOutputs.pop().Predicted_label;
];
@Name("inputData")
insert into inputDataAll
select
m.source as source,
getNumber(m, "c8y_SteamMeasurement.Temperature.value") as `steam.temperature`,
getNumber(m, "c8y_SteamMeasurement.Pressure.value") as `steam.pressure`,
getNumber(m, "c8y_SteamMeasurement.Steamoutput.value") as `steam.steamoutput`
from MeasurementCreated m
where
measurement.source.value = source_device;
@Name("requestZementis")
insert into SendRequest
select
"GET" as method,
model_url || model_name || "?record=" || toJSON(m.*) as url,
auth as authorization,
"application/json" as contentType,
m.source as source
from inputDataAll m;
@Name("responseZementis")
insert into CreateEvent
select
"response_received_" || getString(response, "status") as type,
getLabel(response.body) as text,
response.creationTime as dateTime,
getString(response, "source.value") as source
from ResponseReceived response
where
getString(response, "source.value") = source_device;
@Name("generateAlarm")
insert into CreateAlarm
select
response.creationTime as dateTime,
getString(response, "source.value") as source,
"cepFailureAlarm" as type,
"Zementis Test Failure" as text,
"ACTIVE" as status,
"MAJOR" as severity
from ResponseReceived response
where
getString(response, "source.value") = source_device
and getLabel(response.body) = "0";
The Cumulocity CEP module works as follows:
The data from a specific device is filtered. The measurement values that should be passed for analysis are selected.
In order to apply the analytic model, an outbound HTTP request is performed to the above Zementis endpoint. The measurement values that need to be analyzed are passed in request URL parameters.
Depending on the score returned from the model, an alarm is raised.