Basic functionality
Developing apps
An EPL app is a monitor (*.mon) file. You can develop EPL apps in two different ways:
- You can use the Streaming Analytics application which is available from Cumulocity’s application switcher and develop your EPL apps within Cumulocity.
- Or you can install Apama on your local machine and then develop your EPL apps (as monitor files) in Software AG Designer, that is, in a separate environment.
See also Using the Apama Event Processing Language (EPL).
Developing apps with the Streaming Analytics application
The EPL Apps page of the Streaming Analytics application provides an interface for interactively editing new or existing EPL apps (*.mon files) as well as importing and activating (deploying) EPL apps.
Any user on the tenant wishing to use the EPL Apps page must be a CEP Manager. See Managing permissions.
Step 1 - Invoke the Streaming Analytics application
Open the application switcher and click the icon for the Streaming Analytics application. Then navigate to the EPL Apps page.
When you go to the EPL Apps page, the EPL app manager is shown first, listing any existing EPL apps. Each app is shown as a card. You can add new EPL apps and manage existing EPL apps from here.
Each card that is shown for an app has an actions menu at the top which allows you to edit, download or delete the app.
From this page, you can:
-
Edit existing EPL apps. Either use the Edit command from the actions menu or simply click on the card that is shown for the app.
-
Create new EPL apps. See below.
-
Import EPL apps. If you prefer to develop your apps outside of Cumulocity (for example, using Software AG Designer), click Import EPL in the top menu bar to upload an Apama monitor (*.mon) file as an app into the Streaming Analytics application.
-
Download EPL apps. Use the Download command from the actions menu to download the app as a *.mon file.
-
Deploy existing EPL apps. On the card that is shown for an app, change the mode from Inactive to Active. For more information, see Deploying apps. When activating an app, any syntax errors are reported back immediately. The error state is shown on the card, helping you to ensure your app is in good shape. Click on the error to display information on what went wrong. It is not possible to activate an app if it has syntax errors. The errors are shown on the card until they have been fixed and the app has been activated again.
-
Reload all EPL apps. Click Reload in the top menu bar to refresh the display to show any changes other users have made since the page loaded, including any errors that have been introduced in the meantime.
Step 2 - Create an EPL app
Click New EPL app in the top menu bar. In the resulting Create app dialog box, enter a unique app name. You can also enter a description which will be shown on the card that is created for the new app. Click OK.
The EPL editor appears. The EPL code for the new app already contains the typical basic event definitions and utilities that are required for working with Cumulocity. You can adapt them as required for your app. Consult the documentation and samples for more details.
To help you get started, several samples are available. To see them, click Samples which is shown to the right of the editor. Click on a sample to see a preview of its contents. You can select part of the sample code and copy it over into your own code using the standard key combinations Ctrl+C and Ctrl+V. You can also use the command buttons to copy the entire code to the clipboard and insert it at an appropriate position in your own code, or to replace all of your existing code with the sample code.
Using the buttons in the top menu bar, you can undo/redo your last changes in the current session and you can save your changes.
It is also possible to change the mode from Inactive to Active (or vice versa) in the EPL editor. Again, when there is an error in your EPL code, it is not possible to activate the app. The errors are highlighted within the code.
Click the close icon in the top menu bar to leave the EPL editor and thus to return to the list of EPL apps.
Step 3 - Test the EPL app
Once your app is activated, you should be able to see the results of it running. This may include sending measurements, receiving data, creating alarms, and logging in the Apama-ctrl microservice. For information on how to check the log files of the Apama-ctrl microservice, see Monitoring microservices.
See also Deploying apps.
Developing apps with Software AG Designer
Software AG Designer provides a full development environment and is the tool of choice when you have a complex EPL application. When your EPL app (that is, the monitor file) is ready, you must import it into Cumulocity.
Step 1 - Install Apama
Download the apama-c8y-dev
package of Apama from https://www.apamacommunity.com/downloads/ and extract it to install Apama. This installs the freemium Apama Community Edition with reduced capabilities and several restrictions. To unlock all features you need a license.
If you have a license, copy the license file into the Apama work directory (APAMA_WORK/license).
The apama-c8y-dev
package includes Software AG Designer which is available for both the Apama Community Edition and licensed editions of Apama.
Step 2 - Create a project
Once installed, create an Apama project in Software AG Designer and enable it for Cumulocity connectivity. For instructions on how to create an Apama project, refer to Creating Apama projects in the Apama documentation.
Step 3 - Add Apama bundles to the project
Add the following Apama bundles to the newly created Apama project. These are required by Cumulocity so that it can activate your app. For instructions on how to add bundles to a project, refer to Adding bundles to projects in the Apama documentation.
- Cumulocity > Event Definitions for Cumulocity
Provides event APIs required for sending and receiving data to/from Cumulocity. - Cumulocity > Utilities for Cumulocity
Provides helper utility functions for working with data received from Cumulocity. - Any Extractor
Provides support for extracting values from theany
type. - Time Format
Required to access all the methods of the Time Format plug-in. Useful for formatting and parsing time. - HTTP Client Generic Events
Exposes predefined generic events used by the HTTP client connectivity plug-in. - Automatic onApplicationInitialized
This starts all connectivity plug-ins immediately on start up. - HTTP Client > JSON with generic request/response event definitions
Allows EPL apps to make HTTP calls. - Cumulocity > Cumulocity Client
Exposes the Cumulocity client to EPL apps.
The bundles above are the only ones that are permissible in an EPL app, so be careful not to add any other bundles or your app may not work when activated in Cumulocity.
Step 4 - Create a monitor file
To create a new Apama monitor file, refer to Creating new monitor files for EPL applications in the Apama documentation.
Before you import the newly created monitor file as an EPL app into Cumulocity and activate it there, you might want to test if the monitor file works as expected from within Software AG Designer.
For further information, see The Cumulocity Transport Connectivity Plug-in in the Apama documentation.
Step 5 - Run and test the monitor file
When running the project locally, you must provide your Cumulocity credentials in the project configuration. Configure the credentials in the CumulocityIoT.properties file under the Cumulocity client. For example:
CUMULOCITY_USERNAME=user@example.com
CUMULOCITY_SERVER_URL=http://exampleTenant.cumulocity.com
CUMULOCITY_PASSWORD=examplePassword
CUMULOCITY_APPKEY=apamaAppKey
CUMULOCITY_APPKEY
.Note that the above description assumes that you are connecting to a tenant where the URL identifies the tenant. If that is not true (for example, if you are connecting by an IP address), you may need to set this in the CumulocityIoT.properties file:
CUMULOCITY_TENANT=my_custom_tenant
If the project needs to run locally in a multi-tenant environment, enable the multi-tenant support and provide the name of the multi-tenant microservice to use by configuring the following properties in the CumulocityIoT.properties file under the Cumulocity client:
# Enable multi-tenant support
CUMULOCITY_MULTI_TENANT_APPLICATION=true
# The name of the multi-tenant microservice to use.
# If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or
# create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want
# to run the project.
CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME=example-multi-tenant-ms
In addition, make sure that the monitor files are able to work with the multi-tenant microservice. For more information, see Working with multi-tenant deployments in the Apama documentation.
You can now proceed with testing your EPL in Software AG Designer.
Once the EPL app is ready, refer to Deploying apps to find out how to deploy it to Cumulocity.
Deploying apps
You can deploy the following to Cumulocity:
- EPL apps. You can develop or import a single *.mon file with the Streaming Analytics application. This is the simplest mechanism for deploying an EPL app.
- Apama applications. You can upload complex Apama applications (that is, Apama projects developed with Software AG Designer) to Cumulocity and deploy them as custom microservices using the Cumulocity Microservice SDK.
Deploying EPL apps as single *.mon files with the Streaming Analytics application
When an EPL app (that is, a *.mon file) is activated in Cumulocity, the *.mon file is assigned a unique package name. This prevents conflicts when multiple modules are activated. For this reason, you should not specify a package
statement in a *.mon file. If you must share events between different parts of your application, then write the event definitions and monitors that use it in a single *.mon file.
There is a restricted set of utilities and base events available for your EPL app. At the time of writing, these include the Time Format and HTTP Client > JSON with generic request/response event definitions bundles.
When any EPL app signals a runtime error, this will be raised as an alarm. Runtime errors include uncaught exceptions, as well as any explicit logging of warnings and errors that your EPL app wants to do. Health issues that relate to the Apama runtime in general will also be raised as alarms.
For more detailed diagnostics of the Apama runtime and any active EPL apps, you can look at the logs for the Apama-ctrl microservice. See Monitoring microservices for more information on log files. However, some familiarity with Apama is necessary to get the most out of an Apama log file.
Deploying Apama applications as microservices
Using Software AG Designer, you can also develop more complex projects which:
- are spread across multiple *.mon files
- must be isolated from other Apama applications
- use connectivity plug-ins or EPL plug-ins that are not enabled by default
These kinds of applications should be deployed as microservices to Cumulocity.
Required settings in the microservice manifest
The microservice manifest provides the required settings to manage microservice instances and the application deployment in Cumulocity. For detailed information, see Microservice manifest.
Apama can be used in either a single-tenant microservice or a multi-tenant microservice. Therefore, the microservice manifest must set the isolation level to either PER_TENANT or MULTI_TENANT. When Apama is used in a multi-tenant microservice, the Apama application must be written to be multi-tenant aware. For more information, see Working with multi-tenant deployments in the Apama documentation.
The following permissions are required by the microservice in order to start up and use all features in the Cumulocity transport from EPL. These are set with requiredRoles in the microservice manifest.
- ROLE_APPLICATION_MANAGEMENT_READ
- ROLE_INVENTORY_READ
- ROLE_INVENTORY_ADMIN
- ROLE_INVENTORY_CREATE
- ROLE_MEASUREMENT_READ
- ROLE_MEASUREMENT_ADMIN
- ROLE_EVENT_READ
- ROLE_EVENT_ADMIN
- ROLE_ALARM_READ
- ROLE_ALARM_ADMIN
- ROLE_DEVICE_CONTROL_READ
- ROLE_DEVICE_CONTROL_ADMIN
- ROLE_IDENTITY_READ
- ROLE_OPTION_MANAGEMENT_READ
- ROLE_BULK_OPERATION_READ
- ROLE_SMS_ADMIN
To deploy an Apama application as a microservice
-
Develop your application in Software AG Designer in the usual way.
-
You can use Apama’s Docker support to turn the entire project into a microservice. In the Project Explorer view, right-click the project and select Apama > Add Docker Support, which will add a Dockerfile to the root of your project directory. When used for building, it will make use of the Apama images available on Docker Hub. You will need Docker Hub credentials that give you access to the Apama images. Apama Docker images are exclusively Linux-based.
-
Add any custom steps to the Dockerfile that might be necessary, for example, building a custom plug-in, or copying your license file into the image.
-
Use the Cumulocity microservice utility tool for packaging and deploying the project; for detailed information, see Microservice utility tool. When creating the directory structure for the microservice utility tool to build from, copy your entire project directory inside that directory with the name “docker/”. For example:
docker/monitors/
docker/eventdefinitions/
docker/Dockerfile
docker/…
cumulocity.jsonYou must create the microservice manifest manually, but there is no need for anything special in the microservice manifest; no roles or probes are required. However, if you want to configure a liveness or readiness probe, you can configure an
httpGet
probe for the path /ping on port 15903 (Apama’s default port). Enabling auto-scaling is not recommended, as Apama applications are usually stateful and do not automatically partition their input.You can pack, deploy and subscribe from this directory, resulting in your Apama application being turned into a running microservice. The behavior of the application when being run outside of Cumulocity (from Software AG Designer or your test environment) will be near-identical to its behavior inside Cumulocity. When deployed as a microservice doing requests to the Cumulocity API, Apama will automatically pick up the credentials to connect to the tenant you deployed it to, overwriting any other credentials provided to Apama. However, if you wish to receive real-time events, you must have valid credentials specified in the project configuration as you do when connecting to Cumulocity from an external Apama environment.
-
When you are ready to deploy to Cumulocity, upload the application as a microservice. For details, refer to Managing microservices.
Apama 10.15.0 introduces several new container images provided via Docker Hub and some of the existing container images have changed content.
When building images for use as a Cumulocity microservice, this is now different to earlier releases.
You must now use the
softwareag/apama-cumulocity-jre image with the
softwareag/apama-cumulocity-builder image as a builder image.
To do this with the default project Dockerfile created by Software AG Designer in 10.15.0 and previous versions,
you must either change the FROM
lines in the Dockerfile appropriately
(you only need to do this once) or build using the following flags (you must do this every time):
--build-arg APAMA_BUILDER=softwareag/apama-cumulocity-builder:10.15 --build-arg APAMA_IMAGE=softwareag/apama-cumulocity-jre:10.15
Testing apps
You can use the Apama EPL Apps Tools on GitHub to script uploads of your EPL apps and manage them for CI/CD (continuous integration and continuous delivery) use cases. This tooling also provides extensions to the PySys test framework to allow you to simply write tests for your EPL apps and to run them automatically.
Apama EPL Apps Tools is available from https://github.com/Cumulocity-IoT/apama-eplapps-tools. See the EPL Apps Tools documentation for detailed information.
For more information on PySys, see the API Reference for Python that you can access from the Apama documentation.
Supported REST services
EPL apps are designed to listen for REST (Representational State Transfer) services and supports all GET, POST, PUT and DELETE operations. Example requests for the different operations are listed below.
To perform these operations, you must have READ and ADMIN permissions for “CEP management” (see also Managing permissions).
Request headers for all operations
Each request must be authenticated to Cumulocity.
Name | Description |
---|---|
Accept | “application/json”. This is a mandatory parameter. |
Common response codes
The following common error response codes can be expected for all requests:
Code | Description |
---|---|
401 | Unauthorized. |
403 | Forbidden. EPL apps are not available with the Apama-ctrl-starter microservice. |
Any other response codes that can be expected from a specific request are given below.
Common field descriptions
The following common fields are available with the responses, depending on the operation:
Field | Description |
---|---|
contents | The full contents of the EPL file. |
description | A description of the file. |
eplPackageName | The package name of the EPL file. If the name contains special characters (including spaces), these characters are escaped to make them valid EPL identifiers and avoid injection errors. |
errors | A list of all compilation errors in the file, if any, with line numbers and text. |
id | A unique identifier of the file. |
name | The name provided for this bit of EPL. |
state | Whether the EPL is injected into the correlator and running. This can either be active or inactive . |
warnings | A list of all compilation warnings in the file, if any, with line numbers and text. |
GET - Retrieve all available EPL files
Endpoint: /service/cep/eplfiles
Example request
GET /service/cep/eplfiles
Responses
Code | Description |
---|---|
200 | Successful operation. See also the example value below. |
400 | Bad request. Header contents has unexpected value. |
Example value for response code 200:
{
"eplfiles":[
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
]
}
GET - Retrieve all available EPL files with their contents
Endpoint: /service/cep/eplfiles
Request parameters
Name | Description |
---|---|
contents | Boolean type. Fetches the EPL files with their contents. This is an optional query parameter. |
Example request
GET /service/cep/eplfiles?contents=true
Responses
Code | Description |
---|---|
200 | Successful operation. See also the example value below. |
Example value for response code 200:
{
"eplfiles":[
{
"contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
]
}
GET - Retrieve EPL file by identifier
Endpoint: /service/cep/eplfiles/{id}
Request parameters
Name | Description |
---|---|
id | Identifier of the EPL file to be fetched. This is a mandatory parameter. |
Example request
GET /service/cep/eplfiles/{{id}}
Responses
Code | Description |
---|---|
200 | Successful operation. See also the example value below. |
404 | File with identifier not found. See also the example value for this response code at the end of this section. |
Example value for response code 200:
{
"contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
POST - Create a new EPL application
Endpoint: /service/cep/eplfiles
Example request
POST /service/cep/eplfiles
The following is an example of a request body:
{
"name": "Ordinal1",
"contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"state": "active",
"description": ""
}
Note the following:
- The
name
is used for the package of the file (thus the EPL file must not contain apackage
statement) and must be unique across all EPL files. The name is prefixed and certain characters are escaped. The actual package name used is returned in theeplPackageName
field for convenience (you can search for this in the microservice log file to find log statements). - Make sure to provide safely escaped
contents
. description
is optional and can be empty.
Responses
Code | Description |
---|---|
201 | Successfully created / Created with errors in file / Created with warnings in file. See also the examples below. |
405 | Invalid input. |
Example for response code 201 when successfully created:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
Example for response code 201 when created with warnings or errors:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
{
"line":5,
"text":"assigning a float to an integer variable"
}
],
"id":"39651",
"name":"Ordinal1",
"state":"inactive",
"warnings":[
{
"line":10,
"text":"\"assert\" may become a reserved word in future versions of EPL"
}
]
}
PUT - Update EPL file by identifier
Endpoint: /service/cep/eplfiles/{id}
Request parameters
Name | Description |
---|---|
id | Identifier of the EPL file to be updated. The identifier must be included in the path. This is a mandatory parameter. |
Example request
PUT /service/cep/eplfiles/{id}
The following is an example of a request body:
{
"name": "Ordinal1",
"contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
"state": "active",
"description": ""
}
See also the information given for the POST request.
Responses
Code | Description |
---|---|
200 | Successfully updated. See also the example values below. |
404 | File with identifier not found. See also the example value for this response code at the end of this section. |
Example value for response code 200 when successfully updated with no errors:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
],
"id":"39615",
"name":"Ordinal1",
"state":"active",
"warnings":[
]
}
Example value for response code 200 when updated with errors or warnings:
{
"description":"",
"eplPackageName": "eplfiles.Ordinal1",
"errors":[
{
"line":5,
"text":"assigning a float to an integer variable"
}
],
"id":"39651",
"name":"Ordinal1",
"state":"inactive",
"warnings":[
{
"line":10,
"text":"\"assert\" may become a reserved word in future versions of EPL"
}
]
}
DELETE - Delete EPL file by identifier
Endpoint: /service/cep/eplfiles/{id}
Request parameters
Name | Description |
---|---|
id | Identifier of the EPL file to be deleted. The identifier must be included in the path. This is a mandatory parameter. |
Example request
DELETE /service/cep/eplfiles/{{id}}
Responses
Code | Description |
---|---|
200 | Successfully deleted. |
404 | File with identifier not found. See also the example value for this response code at the end of this section. |
Example value for response code 404
The response code 404 indicates that a file with a specific identifier was not found.
{
"error":"Not Found",
"exception":"com.apama.in_c8y.FileNotFoundException",
"message":"File with id 39613 not found",
"path":"/eplfiles/39613",
"status":404,
"timestamp":"2020-01-17T12:21:42.457+0000"
}
where
error
is the error message.exception
specifies the exception that was raised.message
is a description of the exception message.path
is the path that was requested.status
is the status of the application.timestamp
is the timestamp in ISO format.
Events and channels
In Apama EPL, interactions with the rest of the Cumulocity ecosystem are done via events. A number of event definitions is provided for accessing Cumulocity data.
Predefined event types
There are some predefined event types to interact with several Cumulocity APIs. Events are sent to Apama applications automatically when a new measurement, alarm or event is created. For interacting with the Cumulocity backend, you can create an event and send it to the relevant channel. Cumulocity will automatically execute either the database query or create the API calls necessary for sending mails, SMS, or similar.
Look at the data model in the API Reference for EPL (ApamaDoc) to see how the events for each stream are structured.
Sending events to a channel
Sending an event is done by constructing the event, either with new <type>
followed by assignments to the fields, or with a constructor specifying all of the fields. The send
statement is then used to send the event to Cumulocity. The send
statement requires a channel - this is the SEND_CHANNEL
constant on the event type.
Listening to events
You can trigger your EPL by listening to events on channels. You can subscribe to channels with the monitor.subscribe("string name")
method. This can be done in the startup of your monitor, or if you only want to receive events some of the time, called as needed, followed by monitor.unsubscribe("string name")
.
Listen for events using the on
statement, followed by the event type that you are listening to, open and close parentheses, and as <identifier>
to name a variable that will hold the event.
By default, a listener will fire once; to make it repeat for all events, use the all
keyword before the event type.
Filters
Adding filters can be done by specifying one or more fields between the parentheses for a listener. Only top-level fields can be filtered for. Use if
statements for more complex filtering, or for filtering on subproperties of events (for example, in dictionaries).
Standard event types and channels
For the standard Cumulocity events, there are constants that contain the channels for sending and receiving events, for example:
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
send msmnt to Measurement.SEND_CHANNEL;
The events listed in the following table are part of the com.apama.cumulocity
package.
Event | Channel for sending | Channel for receiving |
---|---|---|
Operation | Operation.SEND_CHANNEL | Operation.SUBSCRIBE_CHANNEL |
Measurement | Measurement.SEND_CHANNEL | Measurement.SUBSCRIBE_CHANNEL |
Event | Event.SEND_CHANNEL | Event.SUBSCRIBE_CHANNEL |
Alarm | Alarm.SEND_CHANNEL | Alarm.SUBSCRIBE_CHANNEL |
ManagedObject | ManagedObject.SEND_CHANNEL | ManagedObject.SUBSCRIBE_CHANNEL |
MeasurementFragment | MeasurementFragment.SEND_CHANNEL | MeasurementFragment.SUBSCRIBE_CHANNEL |
Measurement fragments
Measurement
and MeasurementFragment
events are always published.
You can generate listeners in EPL that will match on the contents of MeasurementFragment
events rather than Measurement
events. For example:
on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
}
See also Measurement fragments.
Distinguishing between create and update notifications
When listening for Alarm
, Event
, ManagedObject
or Operation
events from Cumulocity, you may want to to distinguish between create and update operations. Each of these event types have actions named isCreate()
and isUpdate()
for this purpose.
Example for listening for new alarms:
on all Alarm() as alarm {
if alarm.isCreate() {
log "Alarm created: " + alarm.toString() at INFO;
}
// else it's an update
}
And similarly, only for updated alarms:
on all Alarm() as alarm {
if alarm.isUpdate() {
log "Alarm updated: " + alarm.toString() at INFO;
}
// else it's a create
}
For events that have come from Cumulocity, one of isUpdate()
or isCreate()
will always return true. Both actions are provided for choice and readability.
For more information, including examples for the different types of objects, see Receiving update notifications in the Apama documentation.
See also the API Reference for EPL (ApamaDoc) for more information about the isCreate()
and isUpdate()
actions.
Example
This example listens for new measurements using the com.apama.cumulocity.MeasurementFragment
API. It filters incoming measurements to find speed values above a given maximum speed and raises an alarm if the limit is breached.
- Subscribe to the
MeasurementFragment.SUBSCRIBE_CHANNEL
channel. - Listen to the measurement fragment and filter on
type
, which isc8y_SpeedMeasurement
. Ensure thatvalueFragment
has the valuec8y_speed
and thatvaluesSeries
filters onspeedX
only. Also filter onvalue
when it is greater thanSPEED_LIMIT
. - Create the event using the constructor specifying all of the fields.
- Send the event to the correct channel -
Alarm.SEND_CHANNEL
.
The resulting *.mon file can look like this:
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.MeasurementFragment;
monitor TriggerAlarmForSpeedBreach {
constant float SPEED_LIMIT := 30.0;
action onload() {
monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
// Everytime a measurement fragment with the specific details of the match criteria is triggered then we should raise an alarm
on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
send Alarm("", "c8y_SpeedAlarm", mf.source, currentTime,
"Speed limit breached", "ACTIVE", "CRITICAL", 1,
new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
}
}
Built-in actions
Overview
With Apama EPL, it is possible to utilize functions, called “actions”. Every monitor will have at least one action - the onload
action. This section covers the already built-in actions ready to use.
See also the API Reference for EPL (ApamaDoc) for actions on the built-in types.
Querying Cumulocity data
To interact with your historical data, you can use one of the following request-response event pairs to look up resources.
Example: To look up alarms, you can send a com.apama.cumulocity.FindAlarm
request event with appropriate query parameters to the FindAlarm.SEND_CHANNEL
channel.
In response, you can expect 0 or more com.apama.cumulocity.FindAlarmResponse
events (depending on the number of resources that match the lookup request)
and a com.apama.cumulocity.FindAlarmResponseAck
event on the FindAlarmResponse.SUBSCRIBE_CHANNEL
channel.
Similar functionality is also provided for looking up managed objects, events, measurements and operations.
The events listed in the following table are part of the com.apama.cumulocity
package.
To look up | Request-Response Events | Example |
---|---|---|
ManagedObject | FindManagedObject FindManagedObjectResponse FindManagedObjectResponseAck |
Example |
Alarm | FindAlarm FindAlarmResponse FindAlarmResponseAck |
Example |
Event | FindEvent FindEventResponse FindEventResponseAck |
Example |
Measurement | FindMeasurement FindMeasurementResponse FindMeasurementResponseAck |
Example |
Operation | FindOperation FindOperationResponse FindOperationResponseAck |
Example |
CurrentUser | CurrentUser GetCurrentUser GetCurrentUserResponse |
Example |
TenantOption | TenantOption FindTenantOptions FindTenantOptionsResponse |
Documentation |
Invoking other parts of the Cumulocity REST API
The Cumulocity REST API covers some extra functionality which is not covered with the individual event types. To invoke any other part of the REST API, a generic request-response API is provided which you can use to invoke any part of the Cumulocity API.
You can use the following request-response events:
- com.apama.cumulocity.GenericRequest
- com.apama.cumulocity.GenericResponse
- com.apama.cumulocity.GenericResponseComplete
This includes personal identifiable information, such as username, email address, and so on.
For more information, see REST implementation in the Cumulocity OpenAPI Specification and Invoking other parts of the Cumulocity REST API in the Apama documentation.
Invoking HTTP services
To interact with HTTP services using REST and JSON, create a com.softwareag.connectivity.httpclient.HttpTransport
instance using one of the factory methods:
- HttpTransport.getOrCreate(string host, integer port) returns HttpTransport
- HttpTransport.getOrCreateWithConfiguration(string host, integer port, dictionary <string, string> configurations) returns HttpTransport (the keys in the configurations dictionary are the constants on HttpTransport with the
CONFIG_
prefix)
On the HttpTransport
object, call one of the create methods, passing a path and payload as needed, to produce a Request
object.
On the Request
object, you may set cookies, headers or query parameters as needed, and can then invoke the request with the execute(action<Response> callback)
. Supply the name of an action in your monitor for the callback, and it will be invoked with the Response
when the request has completed (or timed out).
In the callback, the Response
object is supplied with statusCode
and payload
. Fields on the payload are accessible via the com.apama.util.AnyExtractor
object it is supplied in - see the information on access fragments below.
Refer to the API Reference for EPL (ApamaDoc) for further details.
Utility functions
Access fragments
You can access fragments via the params
dictionary of most events. The AnyExtractor
object can be constructed to help you extract data from any objects containing multiple subfragments and access:
-
action getInteger(string path) returns integer
-
action getFloat(string path) returns float
-
action getString(string path) returns string
-
action getBoolean(string path) returns Boolean
-
action getSequence(string path) returns sequence<any>
-
action getDictionary(string path) returns dictionary<any, any>
You can use a JSON path to navigate in the object structure. For example:
string s := AnyExtractor(measurement.params["fragment"]).getString("sub.fragment.object");
Example “fragment”: “c8y_TemperatureMeasurement”.
Example “sub.fragment.object”: “c8y_TemperatureMeasurement.T.Unit”.
Casting “any” values
Alternatively, use a cast to convert an any
to a particular type:
string s := <string> measurement.params["strfragment"];
Note that a cast operation will throw if the object is of a different type.
currentTime and the TimeFormatter
The read-only variable currentTime
can be used to obtain the current server time. Apama deals with time using seconds since the Unix Epoch (1 Jan 1970 UTC). You can easily transform it to a human-readable form using the TimeFormat
object.
The TimeFormat
object can be used for formatting dates and times, and also for parsing them.
Example:
using com.apama.correlator.timeformat.TimeFormat;
monitor Example {
action onload {
log TimeFormat.format(currentTime, "yyyy.MM.dd 'at' HH:mm:ss") at INFO;
}
}
For more information on TimeFormat
and its functions, see Using the TimeFormat Event Library in the Apama documentation and the API Reference for EPL (ApamaDoc).
inMaintenanceMode
The Util.inMaintenanceMode()
function is a fast way to check if the device is currently in maintenance mode. It takes a managed object as a parameter and returns a Boolean which is true if the device is in maintenance mode.
Example:
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.Util;
monitor ExampleMonitor {
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
on all Measurement() as m {
integer reqId := integer.getUnique();
send FindManagedObject(reqId, m.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId, id = m.source) as d and not FindManagedObjectResponseAck(reqId = reqId) {
if not Util.inMaintenanceMode(d.managedObject) {
send Event("", "c8y_Event", m.source, currentTime, "Received measurement from active device", new dictionary<string,any>) to Event.SEND_CHANNEL;
}
}
}
}
}
replacePlaceholders
To build strings, you can use concatenation as follows:
string s:= "An event with the text " + evt.text + " has been created.";
If the texts get longer and have more values that are dynamically set from the data, you can use the Util.replacePlaceholders()
function.
In your text string, you mark the placeholders with the field name from the event and surround it by #{}
.
The second parameter to replacePlaceholders
can be any event type.
Utils::replacePlaceholders
looks up the field name specified in the event or in the parameters of the event to generate the text replacement.
You can use field names of type #{X.Y}
to access nested structures in the event.
myMailText := Util.replacePlaceholders("The device #{source} created an event with the text #{text} at #{time}", alarm);
If the replacement string is of a form such as #{source.name}
where source.name
is the name of the underlying managed object/device
or #{source.c8y_Hardware.notes}
where c8y_Hardware
is a fragment on the managed object,
then special handling is required to achieve the replacement.
After the initial replacement, you must update the placeholder field name and run Util::replacePlaceholders
again with the source managedObject
.
myMailText := Util.replacePlaceholders("The device #{source} with the serial number #{source.c8y_Hardware.serialNumber} created an event with the text #{text} at #{time}. The device is located at #{source.c8y_Address.street} in #{source.c8y_Address.city}.", alarm);
myMailText := myMailText.replaceAll("#{source.", "#{");
myMailText := Util.replacePlaceholders(myMailText, managedObject);
Advanced features
Custom fragments
Cumulocity APIs let you structure your data freely. In Apama EPL, this is done by adding entries to params
, which is of the type dictionary<string, any>
. Each Cumulocity event in the com.apama.cumulocity
package (such as Alarm
, Event
, Measurement
or Operation
) has a params
field, which is translated to fragments or optional fields. Thus, when receiving events, your code must look up entries in the params
field. When sending events, this can be done by defining event types, or you can use the dictionary<string, any>
type. When receiving events, the EPL type is dictionary<any, any>
. Note that EPL is strongly typed, so if you are creating an event with no fragments, a new dictionary<string, any>
expression is required. If you are providing entries inline with a dictionary literal, then EPL determines the type based on the type of the first key-value pair - thus, for dictionary<string, any>
, cast the first value to an any
type with the <any>
cast operator:
send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;
The MeasurementValue
type is provided for the measurements in the Measurement
type. MeasurementValue
has value
and unit
fields and params
for other fragments.
Example 1:
send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
"c8y_TemperatureMeasurement":{
"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
}},
new dictionary<string,any>) to Measurement.SEND_CHANNEL;
This will result in the following JSON structure:
{
"type": "c8y_TemperatureMeasurement",
"time": "...",
"source": {
"id": "12345"
},
"c8y_TemperatureMeasurement": {
"T1": {
"value": 1,
"unit": "C"
},
"T2": {
"value": 1,
"unit": "C"
},
"T3": {
"value": 1,
"unit": "C"
},
"T4": {
"value": 1,
"unit": "C"
},
"T5": {
"value": 1,
"unit": "C"
},
}
}
Measurement fragments
A measurement can be broken into individual measurement fragments. This can be done for each fragment and series present in the measurement. See Cumulocity’s domain model for more information on measurement fragments.
Listen for events of type com.apama.cumulocity.MeasurementFragment
when you require filtering based on measurement fragments or series,
instead of listening for com.apama.cumulocity.Measurement
events and looking inside the measurements
dictionary.
For more information, see Using measurement fragments in the Apama documentation.
Listeners
Triggering a statement by an arriving event is not the only possibility. The following sections cover other ways to combine listeners. Refer to Defining Event Listeners in the Apama documentation for full details.
Filters
Filters enable you to trigger by combinations or sequences of other triggers. If you have a trigger like this
on all Event() as e { ... }
it is also possible to add filters in the pattern.
on all Event(type = "c8y_EntranceEvent") as e { ... }
You can listen for more than one event:
on Event() as e and Alarm() as a { ... }
This will trigger on receiving an Event and an Alarm event - the first of each will be captured.
You can also trigger by sequences:
on all (Event() as e -> Alarm() as a) { ... }
This will trigger for every pair “Event followed by Alarm”. On receiving an event, it will stop listening for further events and start listening for alarms instead. Once an alarm is received, it will start listening for events again.
Timers
You can also trigger listeners based on time. You can either trigger in a certain interval, for example, fire every 5 minutes (300 seconds):
on all wait(300.0) { ... }
Or you can have a listener fire at certain times of the day, with similar functionality to Unix’s cron scheduler:
// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59
on all at(*, *, *, *, *) {} // trigger every minute
on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month
You can also combine timer patterns with other patterns. For example, you can check if there was an event within a certain time after another event:
on Event() -> wait(600.0) and not Alarm() { ... }
This will trigger if there is an event and within 10 minutes (600 seconds) there is no alarm. Note the use of not
which terminates the listener if the event occurs.
You can use a tenant option to set the time zone used for on all at
timers. To set the tenant option, specify the microservice.runtime
category and the timezone
key.
For example:
{
"category" : "microservice.runtime",
"key" : "timezone",
"value" : "Europe/Warsaw"
}
See also Timezone variable in this documentation and Supported time zones in the Apama documentation.
Streams - windows
Streams give you the possibility to operate on windows of events. Streams use the from
keyword instead of on
and define a window to operate over, and select what output they want from that window using aggregates. Windows can be restricted by two means:
-
Windows for a certain time - use the
within
keyword.from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements ["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
-
Windows with a certain amount of events - use the
retain
keyword.from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
Streams - outputting periodically
Streams can also control how frequently they evaluate, using the every
specifier.
// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }
// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }
// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
See the Apama documentation for built-in aggregate functions.
Creating own event types
As well as the predefined event types, you can define your own event types. These can be useful to detect patterns of events occurring which trigger other parts of the same module.
event MyEvent {
Measurement m1;
Measurement m2;
}
...
on Measurement() as m1 -> Measurement() as m2 {
route MyEvent(m1, m2);
}
Creating own actions
Typically, you will structure a monitor using actions (much like functions in Java), as shown in the following examples.
Increasing the given severity:
action upgradeSeverity(string old) returns string {
if old = "WARNING" { return "MINOR"; }
if old = "MINOR" { return "MAJOR"; }
if old = "MAJOR" { return "CRITICAL"; }
return old;
}
Calculating the distance between two geo-coordinates:
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
Variables
You can define variables in your modules.
string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];
If you define a monitor-scope variable (that is, inside a monitor but not within any actions on that monitor), then that can be used in a listener if you use a colon (:) instead of as
for the event co-assignment in the listener. Thus, the example below logs the latest event every 10 seconds:
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {}
on all wait(10.0) {
log e.toString();
}
}
}
When a listener starts, it takes a copy of all of the local variables. The example below thus logs each event after a 10 second delay, even if other events come in between.
monitor MyMonitor {
// monitor scope:
Event e;
action onload() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event():e {
on all wait(10.0) {
log e.toString();
}
}
}
}
Spawning monitor instances and contexts
While it is possible to handle multiple devices in a single monitor (for example, using group by
and partition by
in streams, or maintaining a dictionary keyed on the device ID for other state), it is often useful to separate processing of different devices into separate monitor instances.
New monitor instances can be created using the spawn
statement. This takes a copy of the monitor’s monitor scope variables and runs the named action in a new monitor instance. No listeners are copied into the new monitor. It is also possible to specify a context to spawn the new monitor instance in. Different contexts can run concurrently with each other, and also help isolate different monitors from each other. When constructing a context, supply a name to identify the context, and a Boolean to control if the context is public - that is, it receives the Cumulocity events by default (sent to the default channel).
This pattern is often used with the unmatched keyword to identify events that are not matched by any other listeners in that context. By using a separate context for each monitor, the unmatched behavior is scoped to that monitor. For example:
monitor PerDeviceMeasurementTracker {
action onload() {
spawn factory to context("PerDeviceMeasurementTracker", true);
}
action factory() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all unmatched Measurement() as m {
spawn perDevice(m);
}
}
dictionary<string, Measurement> latestMeasurementByType; // measurements for this device
action perDevice(Measurement m) {
processMeasurement(m);
on all Measurement(source = m.source) as m {
processMeasurement(m);
}
}
action processMeasurement(Measurement m) {
latestMeasurementByType[m.type] := m;
}
}
Best practices and guidelines
EPL monitors
Symptom: Your event processing rules are disabled automatically
If a monitor throws an exception from onload
or a listener and the exception is not caught, then the monitor will be terminated. Catch exceptions or avoid the reason for them occurring.
Similarly, if a monitor completes processing an event and has no listeners left active, then it cannot be triggered again, and will automatically remove itself.
Avoid excessive memory usage per monitor
Make sure that your event processing rules do not leak listeners. For example, when doing request-response operations, ensure that no listeners are left active after the response is processed, or if a timeout occurs and there is no response.
Number formats
Cumulocity measurements use the float type. Note that the timestamps are stored as floats (seconds since 1 Jan 1970, 00:00 UTC).
Subscribing to channels and contexts
A context is a parallel processing unit within Apama. Monitor instances can be deployed to multiple contexts using the spawn...to
syntax. When subscribing to a channel, all monitor instances within a context will receive events for that subscription. So it is recommended practice to put different subscriptions in different contexts. The use of contexts can prevent part of the application being overloaded from affecting other parts of the application.
Contexts are created with a user-friendly name, and each individual instance of the context object corresponds to different contexts, even if they have the same name.
For example:
action onload() {
context subContext := context("Worker");
spawn worker() to subContext;
}
action worker() {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement() as m {
...
}
}
Apama limitations in Cumulocity
Using Apama within the Cumulocity environment necessarily has some restrictions to the capabilities available when Apama is used standalone.
There are a number of ways that assets may be deployed to Apama within Cumulocity and the restrictions vary according to those mechanisms:
- EPL apps - the simplest mechanism to deploy Apama assets into a fully managed Apama correlator, see Deploying apps.
- Custom microservice - where more complex Apama projects can be built using Cumulocity’s Microservice SDK, see Microservices.
When designing an Apama solution to be deployed within any form of Cumulocity environment, consider the following points.
General Apama limitations when using EPL apps or a custom microservice
- For scalability, a correlator may move between hosts and therefore does not have access to a persistent file system. It is a standard Cumulocity constraint that all microservices (either provided by the platform, or custom) must be stateless, see Microservices.
The Apama features affected by this include:
- Correlator persistence.
- MemoryStore persistence.
- Non-HTTP/REST connections to an external system or process are mostly impractical. Although if a service is available over the internet, then it can be used (for example, an HTTP client inside Apama could connect to publicly accessible HTTP servers).
The Apama features affected by this include:
- Apama Database Connector (ADBC).
- Correlator-integrated support for the Java Message Service (JMS).
- Distributed memory stores.
- Connections between correlators.
- For security and implementing user access control, Cumulocity does not make the correlator port available to external processes, see Microservices.
The following capabilities require access to the correlator port and hence are not compatible with this access control:
- Command line tools such as engine_connect, engine_management, engine_send, engine_receive.
- Engine Management API, Event Service API, Scenario Service API.
- Connecting to adapters running out-of-process in an IAF.
- Dashboards (provided in-the-box with Apama).
- Debugging from Software AG Designer. Instead, debug your app running in a local correlator.
- Correlator REST interface.
- To reduce both the memory usage of an application during startup and the application’s startup time, ensure the application is completely initialized before injecting monitors that automatically unload, and before running time-consuming queries.
Specific Apama limitations when using EPL apps
- For ease of use, the correlator startup is controlled by Cumulocity. Thus, features that require you to change configuration files or command line options are not accessible.
The Apama features affected by this include:
- Persistence.
- Connectivity plug-ins.
- For security, the file system used by the correlator is not accessible.
The Apama features affected by this include:
- Accessing the input log.
- Using custom plug-ins.
- Using file system assets for enrichment.
- For simplicity, it is only possible to make independent EPL injections. Each monitor is managed independently and so dependencies between different monitors cannot be created.
The Apama features affected by this include:
- A *.mon file must not contain a package statement (to do so is an error).
- It is not possible to share event definitions between separate *.mon files.
- It is not possible to use Apama queries.
- You can only use the bundles listed in Developing apps with Software AG Designer.
All of these restrictions are implemented to ensure the smooth and secure operation of EPL apps within Cumulocity.
Examples
Calculating an hourly average of measurements
We are assuming the input data looks like this:
{
"c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
"time": "...",
"source": {"id":"..."},
"type": "c8y_TemperatureMeasurement"
}
To create the average (mean), we need the following parts in the module:
-
A time window over one hour, grouped by device (source).
-
A
select
that returns the average calculation every hour, the source and the unit (as we must use an aggregate over the window contents, we select the last unit - we assume all measurements are of the same unit). Note theAverageByDevice
event definition to hold these. -
Everything created as a new measurement.
For example:
using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;
monitor HourlyAvgMeasurementDeviceContext {
event AverageByDevice {
string source;
float avgValue;
string unit;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
group by m.source select
AverageByDevice(m.source,
avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
{"c8y_AverageTemperatureMeasurement":
{
"T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
}
}, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
}
}
}
Creating alarms from bit measurements
Devices often keep alarm statuses in registers and cannot interpret the meaning of alarms. In this example, we assume that a device just sends the entire register as a binary value in a measurement. A rule must identify the bits and create the respective alarm.
We create three dictionaries to map alarm text, type and severity for each of the bits, and an action to look up the value. We use -1 to indicate a default value, and replace <position> with the string form of the position.
dictionary<integer, string> positionToAlarmType := {
0 : "c8y_HighTemperatureAlarm",
1 : "c8y_ProcessingAlarm",
2 : "c8y_DoorOpenAlarm",
3 : "c8y_SystemFailureAlarm",
-1 : "c8y_FaultRegister<position>Alaram"
};
dictionary<integer, string> positionToAlarmSeverity := {
0 : "MAJOR",
1 : "WARNING",
2 : "MINOR",
3 : "CRITICAL",
-1 : "MAJOR"
};
dictionary<integer, string> positionToAlarmText := {
0 : "The machine temperature reached a critical status",
1 : "There was an error trying to process data",
2 : "Door was opened",
3 : "There was a critical system failure",
-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};
action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
string template := lookup.getOr(bitPosition, lookup[-1]);
return template.replaceAll("<position>", bitPosition.toString());
}
To analyze the binary measurement value, we will interpret it as a string value and loop through each character. The getActiveBits()
function will do that and return a list of the bit positions at where the measurement had a “1”. We can then use a for
loop to iterate through that:
action getBitPositions(string binaryAsText) returns sequence<integer> {
sequence<integer> bitsSet := new sequence<integer>;
integer i := 0;
while i < binaryAsText.length() {
string character := binaryAsText.substring(i, i+1);
if character = "1" {
bitsSet.append(binaryAsText.length() - i - 1);
}
i:=i+1;
}
return bitsSet;
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Measurement(type = "c8y_BinaryFaultRegister") as m {
string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
integer bitPosition;
for bitPosition in getBitPositions(faultRegister) {
Alarm alarm := new Alarm;
alarm.type := getText(bitPosition, positionToAlarmType);
alarm.severity := getText(bitPosition, positionToAlarmSeverity);
alarm.text := getText(bitPosition, positionToAlarmText);
alarm.source := m.source;
alarm.time := m.time;
alarm.status := "ACTIVE";
send alarm to Alarm.SEND_CHANNEL;
}
}
}
Creating a measurement like this
{
"c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
"time": "...",
"source": {"id": "..."},
"type": "c8y_BinaryFaultRegister"
}
will trigger the last statement three times.
- measurement at bit position 1 - c8y_ProcessingAlarm, WARNING, “There was an error trying to process data”
- measurement at bit position 2 - c8y_DoorOpenAlarm, MINOR, “Door was opened”
- measurement at bit position 4 - c8y_FaultRegister4Alarm, MAJOR, “An undefined alarm was reported on position 4 in the binary fault register”
and therefore create three alarms.
Consumption measurements
Assuming we have a sensor which measures the current fill level of something and sends the values on a regular basis to Cumulocity, we can easily create additional consumption values. Calculating the absolute difference between two measurements can be useful, but it will only give you a clear view if the measurements are sent always in the same interval. Therefore, we will put the absolute difference in relation to the time difference and calculate as a per hour consumption.
We will compare the value and time difference of two adjacent measurements for a device, using a stream retaining 2 entries, and selecting the first and last timestamp and value.
using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;
monitor FillLevelMeasurements {
event FillLevel {
float firstValue;
float firstTime;
float lastValue;
float lastTime;
string source;
}
action calculateConsumption(FillLevel l) returns float {
if(l.firstTime = l.lastTime) {
return 0.0;
} else {
return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
}
}
action onload() {
// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {
Measurement m := new Measurement;
m.type := "c8y_HourlyWaterConsumption";
m.time := currentTime;
m.source := fill.source;
MeasurementValue mv := new MeasurementValue;
mv.value := calculateConsumption(fill);
mv.unit := "l/h";
m.measurements[m.type] := {"consumption":mv};
send m to Measurement.SEND_CHANNEL;
}
}
}
Miscellaneous sample apps
The EPL editor in the Streaming Analytics application provides several sample apps which demonstrate how to use Apama EPL, for example, to query for Cumulocity objects or to create alarms. You can use these samples to build your own apps.
Study - Circular geofence alarms
Overview
This section gives an in-depth example how you can create more complex rules. It uses multiple of the features explained before in the other sections of this guide.
If you are just starting with Apama EPL, take a look at Examples.
Prerequisites
Goal
We want our tracking devices that are continuously sending location events to automatically generate alarms if they move outside a geofence. This geofence will be a circle and should be configurable for each device separately. The alarm will be created at the moment the device moves outside the geofence. While it is moving outside, it should not create new alarms because the first one will remain active. As soon as the device moves back into the geofence, the alarm will be cleared.
Cumulocity data model
Location event structure (the part we need):
{
"id": "...",
"source": {"id": "..."},
"text": "...",
"time": "...",
"type": "...",
"c8y_Position": {"alt": ..., "lng": ..., "lat": ...}
}
We store the geofence configuration in the device (the radius will be configured in meters):
{
"c8y_Geofence": {"lat": ..., "lng": ..., "radius": ...}
}
Additionally, we want to enable/disable the geofence alarms for each device without removing the configuration entirely. We will do that by adding/removing “c8y_Geofence” to c8y_SupportedOperations
in the device:
{
"c8y_SupportedOperations": [..., "c8y_Geofence", ...]
}
Calculation
The device is outside of the geofence if the distance between the current position and the center is bigger than the configured radius of the geofence. What we need is a function that can calculate the difference between two geo-coordinates:
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
The above action will return the distance in meters.
Step 1: Filtering the input
The main input for this module will be events. To discard non-matching events as early as possible, we perform this as the first check in the listener:
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_Position") {
// we have an event
}
}
Step 2: Collecting necessary data
In the next step, we need the configuration of the geofence for the calculation and grab it.
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
...
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
and not FindManagedObjectResponseAck(reqId) {
ManagedObject dev := resp.managedObject;
}
Step 3: Checking if the device supports c8y_Geofence
With the device available we will now check if there is a geofence configured for the device and if it is activated (contains “c8y_Geofence” in supportedOperations
). To check the c8y_SupportedOperations
array, we can use the indexOf()
function. This function will loop through all elements and return the index of that entry, or a negative number if the value is not present. For the configuration, we will just check if the device contains the fragment “c8y_Geofence”.
Once we have an event and a device, we extract the data from the event’s c8y_Position
and the device’s c8y_Geofence
. These objects are mapped to dictionary<any, any>
entries in the params
. As the params
hold values of type any
, we must cast to a dictionary<any, any>
.
if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
float eventLat := <float> evtPos["lat"];
float eventLng := <float> evtPos["lng"];
dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
float centerLat := <float> devGeofence["lat"];
float centerLng := <float> devGeofence["lng"];
float maxDistance := <float> devGeofence["radius"];
}
Step 4: Creating the trigger
As mentioned earlier, the device is outside of the fence if the distance between the current device position and the geofence center is bigger than the configured geofence radius. To trigger the alarm, we need two events so we can check if the device entered or left the geofence within these two events.
In the first step, we calculate the distance with the function mentioned earlier:
float d := distance(centerLat, centerLng, eventLat, eventLng);
Now we re-route this as an event with:
event LocationEventWithDistance {
string source;
float distance;
Event e;
float maxDistance;
}
...
route LocationEventWithDistance(e.source, d, e, maxDistance);
We place the source in the event so we can easily match it in a listener.
We now set up a listener triggered by the event LocationEventWithDistance
, listening for the next LocationEventWithDistance
- for the same source:
on all LocationEventWithDistance() as firstPos {
on LocationEventWithDistance(source = firstPos.source) as secondPos {
// now have two events with distances
}
}
This pair of LocationEventWithDistance
events now holds all data for checking if we should create the alarm or not. Note that we are filtering the secondPos
event to be for the same source as the first - there will be an active listener for every device we have received an event from.
Step 5: Creating the alarm
To create the alarm, we now need two events where the first one has a distance smaller than the radius and the second one has a distance bigger than the radius. This would mean that the device just left the geofence.
if firstPos.distance <= firstPos.maxDistance and
secondPos.distance > secondPos.maxDistance {
send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
"Device moved out of circular geofence", "ACTIVE",
"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
Step 6: Clearing the alarm
To clear the alarm, we must just switch the condition at the bottom and additionally grab the currently active alarm to get its ID. We do not need to care about whether there is an existing alarm at this point. If there is none, the listener will trigger the and not FindAlarmResponseAck
, terminating the listener:
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
...
if firstPos.distance > firstPos.maxDistance and
secondPos.distance <= secondPos.maxDistance {
integer reqId:= integer.getUnique();
send FindAlarm(reqId, {"source": firstPos.source,
"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
on FindAlarmResponse(reqId=reqId) as alarmResponse
and not FindAlarmResponseAck(reqId=reqId) {
send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
firstPos.source, currentTime, "Device moved back into circular geofence",
"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
}
}
Putting everything together
We can now combine all the parts into one module. The order of the listeners does not matter.
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;
monitor MonitorDevicesForCircularGeofence {
event LocationEventWithDistance {
string source;
float distance;
Event e;
float maxDistance;
}
action onload {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
on all Event() as e {
if e.params.hasKey("c8y_Position") {
// we have an event
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
and not FindManagedObjectResponseAck(reqId) {
ManagedObject dev := resp.managedObject;
if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
float eventLat := <float> evtPos["lat"];
float eventLng := <float> evtPos["lng"];
dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
float centerLat := <float> devGeofence["lat"];
float centerLng := <float> devGeofence["lng"];
float maxDistance := <float> devGeofence["radius"];
float d := distance(centerLat, centerLng, eventLat, eventLng);
route LocationEventWithDistance(e.source, d, e, maxDistance);
}
}
}
}
on all LocationEventWithDistance() as firstPos {
on LocationEventWithDistance(source = firstPos.source) as secondPos {
// now have two events with distances
if firstPos.distance <= firstPos.maxDistance and
secondPos.distance > secondPos.maxDistance {
send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
"Device moved out of circular geofence", "ACTIVE",
"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}
if firstPos.distance > firstPos.maxDistance and
secondPos.distance <= secondPos.maxDistance {
integer reqId:= integer.getUnique();
send FindAlarm(reqId, {"source": firstPos.source,
"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
on FindAlarmResponse(reqId=reqId) as alarmResponse
and not FindAlarmResponseAck(reqId=reqId) {
send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
firstPos.source, currentTime, "Device moved back into circular geofence",
"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
}
}
}
}
}
action distance(float lat1, float lon1, float lat2, float lon2) returns float {
float R := 6371000.0;
float toRad := float.PI / 180.0;
float lat1Rad := lat1 * toRad;
float lat2Rad := lat2 * toRad;
float deltaLatRad := (lat2-lat1) * toRad;
float deltaLonRad := (lat2-lat1) * toRad;
float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
return R * c;
}
}
Connecting Apama to other microservices
Overview
Streaming analytics applications using Apama can make use of applications running in other microservices. This section uses the /health
endpoint of an Apama-ctrl microservice, but the steps apply to connecting to any other microservice running inside Cumulocity. This section is going to show you how to create a connection to the Cumulocity platform from within Apama EPL which can be used to invoke other microservices directly. It will then show you how to make a request and decode the result.
We will assume that you are developing an EPL app using the EPL editor that is part of the Streaming Analytics application and demonstrate a request to a microservice. The steps in this guide will also work with any other way you could be creating an Apama application and can be used to interact with any microservice.
We will be making use of the CumulocityRequestInterface
API. For more technical information about this API, see Invoking microservices in the Apama documentation.
Creating an EPL app
Click the Streaming Analytics icon in the application switcher. On the resulting home screen, navigate to the EPL Apps page and then click New EPL app. You will now see an EPL editor window in which to create the app which interacts with another microservice.
Connecting to the Cumulocity platform
To support making these requests, we provide a helper event with actions to automatically connect to the Cumulocity platform and then create requests which can be used to call other microservices. This helper event is called CumulocityRequestInterface
and is within the com.apama.cumulocity
package. This helper event provides a static action which will connect to Cumulocity and return an instance of the event. It can automatically connect either from within a microservice or the Cumulocity platform itself, or from a remote correlator. That instance has an action which will create a request to call a specific microservice.
To create the connection from your own code, simply call the connectToCumulocity
method and store the result:
CumulocityRequestInterface cumulocity := CumulocityRequestInterface.connectToCumulocity();
This will automatically create a connection using the credentials and connection details provided to your microservice, or using the configuration for the Cumulocity transport when connecting from an external Apama instance.
Making microservice requests
The CumulocityRequestInterface
instance has an action on it to create a request:
/**
* Allows creation of a request on a transport that
* has been configured for a Cumulocity connection.
*
* @param method The type of HTTP request, for example "GET".
* @param path A specific path to be appended to the request.
* @param payload A dictionary of elements to be included in the request.
*/
action createRequest(string method, string path, any payload) returns Request
This takes the HTTP method to use (usually GET, PUT or POST), a path including the Cumulocity service prefix (typically something like /service/serviceName/path/on/service) and the payload. The payload will be converted to a JSON document before submitting to the microservice. The action returns a Request
object which is part of the HTTP Client interface, documentation of which can be found in the API Reference for EPL (ApamaDoc).
Requests are executed with a call-back action as an argument which will be invoked when the request is completed with the response as an argument. If you must set any options, query parameters or headers on the request, you can set those on the Request
object before calling it. For example:
action responseCallback(Response resp) {
string objectId := resp.payload.getString("id");
...
}
...
Request req := cumulocity.createRequest("GET", "/service/otherService/data", any());
req.setQueryParameter("type", "object");
req.execute(responseCallback);
The response will also be decoded from JSON and the response payload uses the AnyExtractor
pattern which you can find linked from the Response
event in the HTTP Client transport documentation. The above example will be equivalent to the REST request GET http://cumulocity/service/otherService/data?type=object
.
Example request to a microservice endpoint
The following is a very simple application that shows how to query another microservice. We are using the /health
endpoint of an Apama-ctrl microservice as an example.
We will start with EPL which connects to Cumulocity and calls an action to send the request.
using com.apama.cumulocity.CumulocityRequestInterface;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.Response;
monitor CallAnotherMicroservice {
CumulocityRequestInterface requestIface;
action onload() {
requestIface := CumulocityRequestInterface.connectToCumulocity();
sendHealthRequest();
}
Sending the request
First, we create the Request
with: the request type, the request path, and the payload any()
because in this example we do not need to put anything in the payload.
We then use execute
to send the request and provide an action to be called with the response.
action sendHealthRequest()
{
Request healthRequest:=
requestIface.createRequest("GET", "/service/cep/health", any());
healthRequest.execute(responseHandler);
}
We use an Apama-ctrl microservice for this example, which has the context path of /cep
. To modify this for another microservice, substitute /cep
with the context path as defined in the manifest for your microservice.
The /health
endpoint completes the request path for this example, but could be replaced with any valid endpoint of the microservice.
Receiving the response
Here is the defined action that we used when sending the request. This action is called in response to the sent request and is provided with the Response
object.
For this example, we simply log the status code and the body of the response.
action responseHandler(Response healthResponse)
{
integer statusCode := healthResponse.statusCode;
string payload := healthResponse.payload.data.toString();
log "Health response status code = " + statusCode.toString() +
", response body = " + payload at INFO;
}
Other microservices
This section was demonstrating talking to an Apama-ctrl microservice. However, you can also access any other microservice through Cumulocity as long as it uses standard REST requests with JSON payloads. You must simply construct the appropriate /service
URL using the name of your microservice followed by the path of the request within your microservice.