Custom fragments
Cumulocity IoT APIs let you structure your data freely. In Apama EPL, this is done by adding entries to params
, which is of the type dictionary<string, any>
. Each Cumulocity IoT event in the com.apama.cumulocity
package (such as Alarm
, Event
, Measurement
or Operation
) has a params
field, which is translated to fragments or optional fields. Thus, when receiving events, your code must look up entries in the params
field. When sending events, this can be done by defining event types, or you can use the dictionary<string, any>
type. When receiving events, the EPL type is dictionary<any, any>
. Note that EPL is strongly typed, so if you are creating an event with no fragments, a new dictionary<string, any>
expression is required. If you are providing entries inline with a dictionary literal, then EPL determines the type based on the type of the first key-value pair - thus, for dictionary<string, any>
, cast the first value to an any
type with the <any>
cast operator:
send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;
The MeasurementValue
type is provided for the measurements in the Measurement
type. MeasurementValue
has value
and unit
fields and params
for other fragments.
Example 1:
send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
"c8y_TemperatureMeasurement":{
"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
}},
new dictionary<string,any>) to Measurement.SEND_CHANNEL;
This will result in the following JSON structure:
{
"type": "c8y_TemperatureMeasurement",
"time": "...",
"source": {
"id": "12345"
},
"c8y_TemperatureMeasurement": {
"T1": {
"value": 1,
"unit": "C"
},
"T2": {
"value": 1,
"unit": "C"
},
"T3": {
"value": 1,
"unit": "C"
},
"T4": {
"value": 1,
"unit": "C"
},
"T5": {
"value": 1,
"unit": "C"
},
}
}
Measurement fragments
A measurement can be broken into individual measurement fragments. This can be done for each fragment and series present in the measurement. See Cumulocity IoT’s domain model in the Concepts guide for more information on measurement fragments.
Listen for events of type com.apama.cumulocity.MeasurementFragment
when you require filtering based on measurement fragments or series,
instead of listening for com.apama.cumulocity.Measurement
events and looking inside the measurements
dictionary.
For more information, see Using measurement fragments in the Apama documentation.
Listeners
Triggering a statement by an arriving event is not the only possibility. The following sections cover other ways to combine listeners. Refer to Defining Event Listeners in the Apama documentation for full details.
Filters
Filters enable you to trigger by combinations or sequences of other triggers. If you have a trigger like this
on all Event() as e { ... }
it is also possible to add filters in the pattern.
on all Event(type = "c8y_EntranceEvent") as e { ... }
You can listen for more than one event:
on Event() as e and Alarm() as a { ... }
This will trigger on receiving an Event and an Alarm event - the first of each will be captured.
You can also trigger by sequences:
on all (Event() as e -> Alarm() as a) { ... }
This will trigger for every pair “Event followed by Alarm”. On receiving an event, it will stop listening for further events and start listening for alarms instead. Once an alarm is received, it will start listening for events again.
Timers
You can also trigger listeners based on time. You can either trigger in a certain interval, for example, fire every 5 minutes (300 seconds):
on all wait(300.0) { ... }
Or you can have a listener fire at certain times of the day, with similar functionality to Unix’s cron scheduler:
// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59
on all at(*, *, *, *, *) {} // trigger every minute
on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month
You can also combine timer patterns with other patterns. For example, you can check if there was an event within a certain time after another event:
on Event() -> wait(600.0) and not Alarm() { ... }
This will trigger if there is an event and within 10 minutes (600 seconds) there is no alarm. Note the use of not
which terminates the listener if the event occurs.
You can use a tenant option to set the time zone used for on all at
timers. To set the tenant option, specify the microservice.runtime
category and the timezone
key.
For example:
{
"category" : "microservice.runtime",
"key" : "timezone",
"value" : "Europe/Warsaw"
}
See also Timezone variable in the Microservice SDK guide and Supported time zones in the Apama documentation.
Streams - windows
Streams give you the possibility to operate on windows of events. Streams use the from
keyword instead of on
and define a window to operate over, and select what output they want from that window using aggregates. Windows can be restricted by two means:
-
Windows for a certain time - use the
within
keyword.from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements ["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
-
Windows with a certain amount of events - use the
retain
keyword.from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
Streams - outputting periodically
Streams can also control how frequently they evaluate, using the every
specifier.
// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }
// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }
// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
See the Apama documentation for built-in aggregate functions.
Creating own event types
As well as the predefined event types, you can define your own event types. These can be useful to detect patterns of events occurring which trigger other parts of the same module.
event MyEvent {
Measurement m1;
Measurement m2;
}
...
on Measurement() as m1 -> Measurement() as m2 {
route MyEvent(m1, m2);
}
Creating own actions
Typically, you will structure a monitor using actions (much like functions in Java), as shown in the following examples.
Increasing the given severity:
action upgradeSeverity(string old) returns string {
if old = "WARNING" { return "MINOR"; }
if old = "MINOR" { return "MAJOR"; }
if old = "MAJOR" { return "CRITICAL"; }
return old;
}
Calculating the distance between two geo-coordinates:
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
Variables
You can define variables in your modules.
string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];
If you define a monitor-scope variable (that is, inside a monitor but not within any actions on that monitor), then that can be used in a listener if you use a colon (:) instead of as
for the event co-assignment in the listener. Thus, the example below logs the latest event every 10 seconds:
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {}
on all wait(10.0) {
log e.toString();
}
}
}
When a listener starts, it takes a copy of all of the local variables. The example below thus logs each event after a 10 second delay, even if other events come in between.
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {
on all wait(10.0) {
log e.toString();
}
}
}
}
Spawning monitor instances and contexts
While it is possible to handle multiple devices in a single monitor (for example, using group by
and partition by
in streams, or maintaining a dictionary keyed on the device ID for other state), it is often useful to separate processing of different devices into separate monitor instances.
New monitor instances can be created using the spawn
statement. This takes a copy of the monitor’s monitor scope variables and runs the named action in a new monitor instance. No listeners are copied into the new monitor. It is also possible to specify a context to spawn the new monitor instance in. Different contexts can run concurrently with each other, and also help isolate different monitors from each other. When constructing a context, supply a name to identify the context, and a Boolean to control if the context is public - that is, it receives the Cumulocity IoT events by default (sent to the default channel).
This pattern is often used with the unmatched keyword to identify events that are not matched by any other listeners in that context. By using a separate context for each monitor, the unmatched behavior is scoped to that monitor. For example:
monitor PerDeviceMeasurementTracker {
action onload() {
spawn factory to context("PerDeviceMeasurementTracker", true);
}
action factory() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all unmatched Measurement() as m {
spawn perDevice(m);
}
}
dictionary<string, Measurement> latestMeasurementByType; // measurements for this device
action perDevice(Measurement m) {
processMeasurement(m);
on all Measurement(source = m.source) as m {
processMeasurement(m);
}
}
action processMeasurement(Measurement m) {
latestMeasurementByType[m.type] := m;
}
}
Optimizing requests with concurrent connections
In order to provide better performance for requests to the Cumulocity IoT platform, Streaming Analytics uses multiple client connections to perform requests concurrently. This can provide improved performance, but may also change the ordering in which requests are executed and responses are returned. By default, the Cumulocity IoT transport tries to use multiple connections and restricts ordering to avoid races that may affect your EPL application.
An attempt is made to ensure order is maintained when required. For example, all updates to a single managed object are performed serially in the order they were sent to the transport. For more details see Optimizing requests to Cumulocity IoT with concurrent connections in the Apama documentation.
You can adjust the default number of client connections with the client.numClients
tenant option in the streaminganalytics
category. For example:
{
"category": "streaminganalytics",
"key": "client.numClients",
"value": "5"
}
If you require a fully serial transport, set the value of client.numClients
to 1.
Controlling access to the Streaming Analytics application
By default, the Streaming Analytics application gives you access to the Analytics Builder and EPL Apps pages. Administrators may wish to control which of these are shown on different tenants or for different users, or modify the wording of the cards on the home screen (see also Customizing the home screen of the Streaming Analytics application).
Which pages are available also depends on the variant of the Apama-ctrl microservice that is running.
- If the microservice is not running, an error message is shown indicating that the microservice cannot be accessed, and only a card with information about smart rules is shown.
- If the Apama-ctrl-starter microservice is running, the EPL Apps card is not shown (and cannot be enabled) as the EPL apps functionality is not available in Apama-ctrl-starter.
- If the Apama-ctrl-smartrules or Apama-ctrl-smartrulesmt microservice is running, neither the EPL Apps card nor the Analytics Builder card is shown (and cannot be enabled). In this case, only the card with information about the smart rules is shown.
- For other variants of the Apama-ctrl microservice, both the Analytics Builder and EPL Apps cards are shown by default.
For an entire tenant, if a “feature application” named feature-disable-analyticsbuilder
and/or feature-disable-eplapps
is
available within the tenant, then the relevant part is disabled. This can be done either within a tenant or by an Enterprise tenant or Management tenant
(see also Enterprise tenant > Managing tenants in the User guide)
and then subscribing to subtenants (the subtenant administrators are then not able to unsubscribe this application if the parent tenant wishes
to restrict access to the functionality). To create such a “feature application” within a tenant, send a POST request to /application/applications
(as an administrator with the permission to create applications). For example, to disable Analytics Builder:
{
"name":"feature-disable-analyticsbuilder",
"contextPath": "feature-disable-analyticsbuilder",
"type":"HOSTED",
"resourcesUrl":"/",
"manifest": {
"noAppSwitcher": true
},
"key":"feature-disable-analyticsbuilder-key"
}
Or to disable EPL Apps:
{
"name":"feature-disable-eplapps",
"contextPath": "feature-disable-eplapps",
"type":"HOSTED",
"resourcesUrl":"/",
"manifest": {
"noAppSwitcher": true
},
"key":"feature-disable-eplapps-key"
}
You can also send the POST request using a curl command, for example, to disable Analytics Builder:
curl --user username -X POST -H 'Content-Type: application/json' -d '{"name":"feature-disable-analyticsbuilder", "contextPath": "feature-disable-analyticsbuilder", "type":"HOSTED", "resourcesUrl":"/","manifest": {"noAppSwitcher": true},"key":"feature-disable-analyticsbuilder-key"}' -k https://{{hostname}}/application/applications/
Or to disable EPL Apps:
curl --user username -X POST -H 'Content-Type: application/json' -d '{"name":"feature-disable-eplapps", "contextPath": "feature-disable-eplapps", "type":"HOSTED", "resourcesUrl":"/", "manifest": {"noAppSwitcher": true},"key":"feature-disable-eplapps-key"}' -k https://{{hostname}}/application/applications/
By default, all users can see the same set of pages (according to the limitations above).
You can also restrict the visibility of the pages to only users who have the permission ROLE_ANALYTICSBUILDER_READ or ROLE_EPLAPPS_READ,
which can be assigned directly to users or via groups (see also Administration > Managing permissions in the User guide).
To enable this, set the category of the tenant option to streaminganalytics
and the applicationAccess
key to the value “role”
(also see the Tenant API in the Cumulocity IoT OpenAPI Specification) or use a curl command as given in the example below:
curl --user username -X POST -H 'Content-Type: application/json' -d '{"category": "streaminganalytics", "key": "applicationAccess", "value": "role"}' -k https://mytenant/tenant/options
where you must replace the username with the name of a user who has ADMIN permission for “Option management”.
Note that this only affects the visibility of the cards and pages in the Streaming Analytics application. The supported REST services only require READ and ADMIN permissions for “CEP management”.
Customizing the home screen of the Streaming Analytics application
The cards that are shown on the home screen of the Streaming Analytics application contain text and links which you can customize on a per-tenant and per-language basis. To do this, download the documentation.json file for the language you wish to customize from the URL /service/cep/apamacorrelator/EN/documentation.json (this must be authenticated for a user in the Cumulocity IoT tenant). Replace the “EN” within the URL with the language code for the file you want to download.
The documentation.json file includes the URLs for the documentation links across the Streaming Analytics application and the text that is shown on the home screen. You can modify this to your requirements.
After you have made all required changes, package the modified copies into a ZIP file containing the following files:
- files/support/cumulocity/EN/documentation.json
- cumulocity.json
where the cumulocity.json file contains the following:
{
"contextPath": "streaminganalytics-customization",
"availability": "MARKET",
"type": "HOSTED",
"name": "streaminganalytics-customization",
"key": "streaminganalytics-customization-key",
"noAppSwitcher": true
}
Then upload the ZIP file using the Administration application: go to Ecosystem > Applications, click Add application, and select the method Upload web application. See Administration > Managing applications in the User guide for detailed information.
You may need to clear your browser cache for the changes to take effect.
You can include multiple languages in a single ZIP file as needed. You can subscribe this to subtenants as needed from an Enterprise tenant.
On new releases of the platform, it is recommended that you review the source documentation.json file for any changes. New entries in this file will be picked up with their default values.
Modifying microservice permissions and resource usage
The resource usage and permissions that the Apama-ctrl microservice operates with are defined in the manifest file of the Apama-ctrl microservice. See Microservice manifest in the Microservice SDK guide for more information.
If you have access to the microservice image (typically available only to operations), then you are able to extract the microservice image, modify the manifest, rebuild the microservice, and reupload the microservive to Cumulocity IoT as an application in the Administration application.
The manifest specifies CPU and memory resource usage. In some circumstances, these must be changed (different sizes of the microservice image are provided with different configurations).
The manifest also specifies the permissions with which the microservice runs. This is the set of permissions that every request from EPL (or any other code running in the Apama-ctrl microservice) runs with. The Apama-ctrl microservice itself requires the following permissions:
- ROLE_APPLICATION_MANAGEMENT_READ
- ROLE_INVENTORY_READ
- ROLE_INVENTORY_ADMIN
- ROLE_INVENTORY_CREATE
- ROLE_MEASUREMENT_READ
- ROLE_MEASUREMENT_ADMIN
- ROLE_EVENT_READ
- ROLE_EVENT_ADMIN
- ROLE_ALARM_READ
- ROLE_ALARM_ADMIN
- ROLE_DEVICE_CONTROL_READ
- ROLE_DEVICE_CONTROL_ADMIN
- ROLE_IDENTITY_READ
- ROLE_IDENTITY_ADMIN
- ROLE_CEP_MANAGEMENT_READ
- ROLE_CEP_MANAGEMENT_ADMIN
- ROLE_OPTION_MANAGEMENT_READ
- ROLE_SMS_ADMIN
- ROLE_AUDIT_ADMIN
- ROLE_AUDIT_READ
- ROLE_USER_MANAGEMENT_READ
- ROLE_USER_MANAGEMENT_OWN_READ
- ROLE_TENANT_MANAGEMENT_READ
- ROLE_BULK_OPERATION_ADMIN
- ROLE_BULK_OPERATION_READ
- ROLE_MACHINE_LEARNING_READ
You can add other roles to this list (or remove them from it) to grant (or remove) permissions to EPL code.