EPL Apps

Info
This documentation assumes basic familiarity with Apama application development. Refer to the Apama documentation for further details.

Basic functionality

Developing apps

An EPL app is a monitor (*.mon) file. You can develop EPL apps in two different ways:

  • You can use the Streaming Analytics application which is available from Cumulocity’s application switcher and develop your EPL apps within Cumulocity.
  • Or you can install Apama on your local machine and then develop your EPL apps (as monitor files) in Apama Plugin for Eclipse, that is, in a separate environment.

See also Using the Apama Event Processing Language (EPL).

Info
To be able to develop and deploy EPL apps with the Streaming Analytics application and/or to import monitor files from Apama Plugin for Eclipse into Cumulocity, your tenant must be subscribed to the Apama-ctrl microservice that supports EPL apps. If you do not see the EPL Apps page in the Streaming Analytics application and you wish to use EPL apps, contact product support.
Caution
An EPL app has the ability to make nearly arbitrary changes to the objects in a tenant, whether that’s inventory, alarms or many other sorts of object. A user who has ADMIN permission for “CEP management” is able to create and activate EPL apps and thus also has almost full control over the current tenant. Therefore, you should be careful about which users on the tenant have this permission.

Developing apps with the Streaming Analytics application

The EPL Apps page of the Streaming Analytics application provides an interface for interactively editing new or existing EPL apps (*.mon files) as well as importing and activating (deploying) EPL apps.

Any user on the tenant wishing to use the EPL Apps page must be a CEP Manager. See Managing permissions.

Step 1 - Invoke the Streaming Analytics application

Open the application switcher and click the icon for the Streaming Analytics application. Then navigate to the EPL Apps page.

When you go to the EPL Apps page, the EPL app manager is shown first, listing any existing EPL apps. Each app is shown as a card. You can add new EPL apps and manage existing EPL apps from here.

EPL Apps

Each card that is shown for an app has an actions menu at the top which allows you to edit, download or delete the app.

From this page, you can:

  • Edit existing EPL apps. Either use the Edit command from the actions menu or simply click on the card that is shown for the app.

  • Create new EPL apps. See below.

  • Import EPL apps. If you prefer to develop your apps outside of Cumulocity (for example, using Apama Plugin for Eclipse), click Import EPL in the top menu bar to upload an Apama monitor (*.mon) file as an app into the Streaming Analytics application.

  • Download EPL apps. Use the Download command from the actions menu to download the app as a *.mon file.

  • Deploy existing EPL apps. On the card that is shown for an app, change the mode from Inactive to Active. For more information, see Deploying apps. When activating an app, any syntax errors are reported back immediately. The error state is shown on the card, helping you to ensure your app is in good shape. Click on the error to display information on what went wrong. It is not possible to activate an app if it has syntax errors. The errors are shown on the card until they have been fixed and the app has been activated again.

  • Reload all EPL apps. Click Reload in the top menu bar to refresh the display to show any changes other users have made since the page loaded, including any errors that have been introduced in the meantime.

Step 2 - Create an EPL app

Click New EPL app in the top menu bar. In the resulting Create app dialog box, enter a unique app name. You can also enter a description which will be shown on the card that is created for the new app. Click OK.

The EPL editor appears. The EPL code for the new app already contains the typical basic event definitions and utilities that are required for working with Cumulocity. You can adapt them as required for your app. Consult the documentation and samples for more details.

Info
When you click Cancel without specifying an app name, the EPL editor also appears and the default name “New” is then shown in the breadcrumb. You can edit the EPL code, but as long as you do not specify an app name, you will not be able to save the app. Click App settings and specify an app name in the resulting dialog box.

EPL editor

To help you get started, several samples are available. To see them, click Samples which is shown to the right of the editor. Click on a sample to see a preview of its contents. You can select part of the sample code and copy it over into your own code using the standard key combinations Ctrl+C and Ctrl+V. You can also use the command buttons to copy the entire code to the clipboard and insert it at an appropriate position in your own code, or to replace all of your existing code with the sample code.

Using the buttons in the top menu bar, you can undo/redo your last changes in the current session and you can save your changes.

It is also possible to change the mode from Inactive to Active (or vice versa) in the EPL editor. Again, when there is an error in your EPL code, it is not possible to activate the app. The errors are highlighted within the code.

Info
Be aware that the EPL editor makes use of a standard web component. It provides many generic developer functions, some of which are not relevant to EPL, including but not limited to Quick Fix and Show Hover.

Click the close icon in the top menu bar to leave the EPL editor and thus to return to the list of EPL apps.

Caution
All unsaved changes are lost when you navigate to a different URL or close the browser window.
Step 3 - Test the EPL app

Once your app is activated, you should be able to see the results of it running. This may include sending measurements, receiving data, creating alarms, and logging in the Apama-ctrl microservice. For information on how to check the log files of the Apama-ctrl microservice, see Monitoring microservices.

See also Deploying apps.

Developing apps with Apama Plugin for Eclipse

Apama Plugin for Eclipse provides a full development environment and is the tool of choice when you have a complex EPL application. When your EPL app (that is, the monitor file) is ready, you must import it into Cumulocity.

Step 1 - Install Apama

Download the apama-c8y-dev package of Apama from https://download.cumulocity.com/Apama/ and extract it to install Apama. This installs the freemium Apama Community Edition with reduced capabilities and several restrictions. To unlock all features you need a license.

If you have a license, copy the license file into the Apama work directory (APAMA_WORK/license).

The apama-c8y-dev package includes Apama Plugin for Eclipse which is available for both the Apama Community Edition and licensed editions of Apama.

Step 2 - Create a project

Once installed, create an Apama project in Apama Plugin for Eclipse and enable it for Cumulocity connectivity. For instructions on how to create an Apama project, refer to Creating Apama projects in the Apama documentation.

Step 3 - Add Apama bundles to the project

Add the following Apama bundles to the newly created Apama project. These are required by Cumulocity so that it can activate your app. For instructions on how to add bundles to a project, refer to Adding bundles to projects in the Apama documentation.

  • Cumulocity > Event Definitions for Cumulocity
    Provides event APIs required for sending and receiving data to/from Cumulocity.
  • Cumulocity > Utilities for Cumulocity
    Provides helper utility functions for working with data received from Cumulocity.
  • Any Extractor
    Provides support for extracting values from the any type.
  • Time Format
    Required to access all the methods of the Time Format plug-in. Useful for formatting and parsing time.
  • HTTP Client Generic Events
    Exposes predefined generic events used by the HTTP client connectivity plug-in.
  • Automatic onApplicationInitialized
    This starts all connectivity plug-ins immediately on start up.
  • HTTP Client > JSON with generic request/response event definitions
    Allows EPL apps to make HTTP calls.
  • Cumulocity IoT > Cumulocity Notifications 2.0
    Exposes the Cumulocity client to EPL apps using the Notifications 2.0 mechanism. For general information on how to receive Cumulocity update notifications, see Receiving update notifications in the Apama documentation.
Info
The Cumulocity Notifications 2.0 connectivity bundle has been added for receiving notifications from Cumulocity using the Notifications 2.0 mechanism. The existing Cumulocity Client connectivity bundle that uses the legacy long-polling mechanism is deprecated in favor of this new bundle. In addition, it is now possible to add the Cumulocity REST Support connectivity bundle if you do not receive any notifications. You must only add one of these three bundles to your project. For information on how to migrate existing Apama projects to Notifications 2.0, see Migrating from Cumulocity Client to Cumulocity Notifications 2.0 in the Apama documentation.

The bundles above are the only ones that are permissible in an EPL app, so be careful not to add any other bundles or your app may not work when activated in Cumulocity.

Step 4 - Create a monitor file

To create a new Apama monitor file, refer to Creating new monitor files for EPL applications in the Apama documentation.

Before you import the newly created monitor file as an EPL app into Cumulocity and activate it there, you might want to test if the monitor file works as expected from within Apama Plugin for Eclipse.

For further information, see The Cumulocity Transport Connectivity Plug-in in the Apama documentation.

Step 5 - Run and test the monitor file

When running the project locally, you must provide your Cumulocity credentials in the project configuration. Configure the credentials in the CumulocityIoT.properties file under the Cumulocity client. For example:

CUMULOCITY_USERNAME=user@example.com
CUMULOCITY_SERVER_URL=http://exampleTenant.cumulocity.com
CUMULOCITY_PASSWORD=examplePassword
CUMULOCITY_APPKEY=apamaAppKey
Info
You must create an application in Cumulocity to get a value for CUMULOCITY_APPKEY.

Note that the above description assumes that you are connecting to a tenant where the URL identifies the tenant. If that is not true (for example, if you are connecting by an IP address), you may need to set this in the CumulocityIoT.properties file:

CUMULOCITY_TENANT=my_custom_tenant

If the project needs to run locally in a multi-tenant environment, enable the multi-tenant support and provide the name of the multi-tenant microservice to use by configuring the following properties in the CumulocityIoT.properties file under the Cumulocity client:

# Enable multi-tenant support
CUMULOCITY_MULTI_TENANT_APPLICATION=true

# The name of the multi-tenant microservice to use.
# If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or
# create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want
# to run the project.
CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME=example-multi-tenant-ms

In addition, make sure that the monitor files are able to work with the multi-tenant microservice. For more information, see Working with multi-tenant deployments in the Apama documentation.

You can now proceed with testing your EPL in Apama Plugin for Eclipse.

Once the EPL app is ready, refer to Deploying apps to find out how to deploy it to Cumulocity.

Deploying apps

You can deploy the following to Cumulocity:

Info
In the Streaming Analytics application, the term “activate” is used for deploying an app.

Deploying EPL apps as single *.mon files with the Streaming Analytics application

When an EPL app (that is, a *.mon file) is activated in Cumulocity, the *.mon file is assigned a unique package name. This prevents conflicts when multiple modules are activated. For this reason, you should not specify a package statement in a *.mon file. If you must share events between different parts of your application, then write the event definitions and monitors that use it in a single *.mon file.

There is a restricted set of utilities and base events available for your EPL app. At the time of writing, these include the Time Format and HTTP Client > JSON with generic request/response event definitions bundles.

When any EPL app signals a runtime error, this will be raised as an alarm. Runtime errors include uncaught exceptions, as well as any explicit logging of warnings and errors that your EPL app wants to do. Health issues that relate to the Apama runtime in general will also be raised as alarms.

For more detailed diagnostics of the Apama runtime and any active EPL apps, you can look at the logs for the Apama-ctrl microservice. See Monitoring microservices for more information on log files. However, some familiarity with Apama is necessary to get the most out of an Apama log file.

Deploying Apama applications as microservices

Using Apama Plugin for Eclipse, you can also develop more complex projects which:

  • are spread across multiple *.mon files
  • must be isolated from other Apama applications
  • use connectivity plug-ins or EPL plug-ins that are not enabled by default

These kinds of applications should be deployed as microservices to Cumulocity.

Required settings in the microservice manifest

The microservice manifest provides the required settings to manage microservice instances and the application deployment in Cumulocity. For detailed information, see Microservice manifest.

Apama can be used in either a single-tenant microservice or a multi-tenant microservice. Therefore, the microservice manifest must set the isolation level to either PER_TENANT or MULTI_TENANT. When Apama is used in a multi-tenant microservice, the Apama application must be written to be multi-tenant aware. For more information, see Working with multi-tenant deployments in the Apama documentation.

The following permissions are required by the microservice in order to start up and use all features in the Cumulocity transport from EPL. These are set with requiredRoles in the microservice manifest.

  • ROLE_APPLICATION_MANAGEMENT_READ
  • ROLE_INVENTORY_READ
  • ROLE_INVENTORY_ADMIN
  • ROLE_INVENTORY_CREATE
  • ROLE_MEASUREMENT_READ
  • ROLE_MEASUREMENT_ADMIN
  • ROLE_EVENT_READ
  • ROLE_EVENT_ADMIN
  • ROLE_ALARM_READ
  • ROLE_ALARM_ADMIN
  • ROLE_DEVICE_CONTROL_READ
  • ROLE_DEVICE_CONTROL_ADMIN
  • ROLE_IDENTITY_READ
  • ROLE_OPTION_MANAGEMENT_READ
  • ROLE_BULK_OPERATION_READ
  • ROLE_SMS_ADMIN
Info

To take advantage of the Cumulocity Notifications 2.0 reliable data forwarding capability to receive notifications, you must also add the following permission to the manifest of the custom microservice and contact product support to set the the notification2.streaming-analytics feature flag.

  • ROLE_NOTIFICATION_2_ADMIN
Info
The above is the minimum list of permissions that a custom Apama microservice needs. If you are developing a custom microservice, you may add more permissions to the microservice manifest.
To deploy an Apama application as a microservice
  1. Develop your application in Apama Plugin for Eclipse in the usual way.

  2. You can use Apama’s Docker support to turn the entire project into a microservice. In the Project Explorer view, right-click the project and select Apama > Add Docker Support, which will add a Dockerfile to the root of your project directory. When used for building, it will make use of the Apama images available on Docker Hub. You will need Docker Hub credentials that give you access to the Apama images. Apama Docker images are exclusively Linux-based.

  3. Add any custom steps to the Dockerfile that might be necessary, for example, building a custom plug-in, or copying your license file into the image.

  4. Use the Cumulocity microservice utility tool for packaging and deploying the project; for detailed information, see Microservice utility tool. When creating the directory structure for the microservice utility tool to build from, copy your entire project directory inside that directory with the name “docker/”. For example:

    docker/monitors/
    docker/eventdefinitions/
    docker/Dockerfile
    docker/…
    cumulocity.json

    You must create the microservice manifest manually, but there is no need for anything special in the microservice manifest; no roles or probes are required. However, if you want to configure a liveness or readiness probe, you can configure an httpGet probe for the path /ping on port 15903 (Apama’s default port). Enabling auto-scaling is not recommended, as Apama applications are usually stateful and do not automatically partition their input.

    You can pack, deploy and subscribe from this directory, resulting in your Apama application being turned into a running microservice. The behavior of the application when being run outside of Cumulocity (from Apama Plugin for Eclipse or your test environment) will be near-identical to its behavior inside Cumulocity. When deployed as a microservice doing requests to the Cumulocity API, Apama will automatically pick up the credentials to connect to the tenant you deployed it to, overwriting any other credentials provided to Apama. However, if you wish to receive real-time events, you must have valid credentials specified in the project configuration as you do when connecting to Cumulocity from an external Apama environment.

  5. When you are ready to deploy to Cumulocity, upload the application as a microservice. For details, refer to Managing microservices.

Info
After December 2024, the location of the Docker images has changed for all supported release trains. They are now available at Amazon ECR Public Gallery instead of at Docker Hub. If you still use the images from the previous location, you must migrate them.
Important

Apama 10.15.0 introduces several new container images, and some of the existing container images have changed content. As of December 2024, these images are provided via Amazon ECR Public Gallery. When building images for use as a Cumulocity microservice, this is now different to earlier releases. You must now use the public.ecr.aws/apama/apama-cumulocity-jre image with the public.ecr.aws/apama/apama-cumulocity-builder image as a builder image. To do this with the default project Dockerfile created by Apama Plugin for Eclipse in 10.15.0 and previous versions, you must either change the FROM lines in the Dockerfile appropriately (you only need to do this once) or build using the following flags (you must do this every time):

--build-arg APAMA_BUILDER=public.ecr.aws/apama/apama-cumulocity-builder:10.15 --build-arg APAMA_IMAGE=public.ecr.aws/apama/apama-cumulocity-jre:10.15

Testing apps

You can use the Apama EPL Apps Tools on GitHub to script uploads of your EPL apps and manage them for CI/CD (continuous integration and continuous delivery) use cases. This tooling also provides extensions to the PySys test framework to allow you to simply write tests for your EPL apps and to run them automatically.

Apama EPL Apps Tools is available from https://github.com/Cumulocity-IoT/apama-eplapps-tools. See the EPL Apps Tools documentation for detailed information.

For more information on PySys, see the API Reference for Python that you can access from the Apama documentation.

Supported REST services

EPL apps are designed to listen for REST (Representational State Transfer) services and supports all GET, POST, PUT and DELETE operations. Example requests for the different operations are listed below.

To perform these operations, you must have READ and ADMIN permissions for “CEP management” (see also Managing permissions).

Request headers for all operations

Each request must be authenticated to Cumulocity.

Name Description
Accept “application/json”. This is a mandatory parameter.

Common response codes

The following common error response codes can be expected for all requests:

Code Description
401 Unauthorized.
403 Forbidden. EPL apps are not available with the Apama-ctrl-starter microservice.

Any other response codes that can be expected from a specific request are given below.

Common field descriptions

The following common fields are available with the responses, depending on the operation:

Field Description
contents The full contents of the EPL file.
description A description of the file.
eplPackageName The package name of the EPL file. If the name contains special characters (including spaces), these characters are escaped to make them valid EPL identifiers and avoid injection errors.
errors A list of all compilation errors in the file, if any, with line numbers and text.
id A unique identifier of the file.
name The name provided for this bit of EPL.
state Whether the EPL is injected into the correlator and running. This can either be active or inactive.
warnings A list of all compilation warnings in the file, if any, with line numbers and text.

GET - Retrieve all available EPL files

Endpoint: /service/cep/eplfiles

Example request

GET /service/cep/eplfiles

Responses
Code Description
200 Successful operation. See also the example value below.
400 Bad request. Header contents has unexpected value.

Example value for response code 200:

{
  "eplfiles":[
    {
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[

      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[

      ]
    }
  ]
}

GET - Retrieve all available EPL files with their contents

Endpoint: /service/cep/eplfiles

Request parameters
Name Description
contents Boolean type. Fetches the EPL files with their contents. This is an optional query parameter.
Example request

GET /service/cep/eplfiles?contents=true

Responses
Code Description
200 Successful operation. See also the example value below.

Example value for response code 200:

{
  "eplfiles":[
    {
      "contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[
      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[
      ]
    }
  ]
}

GET - Retrieve EPL file by identifier

Endpoint: /service/cep/eplfiles/{id}

Request parameters
Name Description
id Identifier of the EPL file to be fetched. This is a mandatory parameter.
Example request

GET /service/cep/eplfiles/{{id}}

Responses
Code Description
200 Successful operation. See also the example value below.
404 File with identifier not found. See also the example value for this response code at the end of this section.

Example value for response code 200:

{
      "contents":"monitor M0 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
      "description":"",
      "eplPackageName": "eplfiles.Ordinal1",
      "errors":[
      ],
      "id":"39615",
      "name":"Ordinal1",
      "state":"active",
      "warnings":[
      ]
}

POST - Create a new EPL application

Endpoint: /service/cep/eplfiles

Example request

POST /service/cep/eplfiles

The following is an example of a request body:

{
  "name": "Ordinal1",
  "contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
  "state": "active",
  "description": ""
}

Note the following:

  • The name is used for the package of the file (thus the EPL file must not contain a package statement) and must be unique across all EPL files. The name is prefixed and certain characters are escaped. The actual package name used is returned in the eplPackageName field for convenience (you can search for this in the microservice log file to find log statements).
  • Make sure to provide safely escaped contents.
  • description is optional and can be empty.
Responses
Code Description
201 Successfully created / Created with errors in file / Created with warnings in file. See also the examples below.
405 Invalid input.

Example for response code 201 when successfully created:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[

  ],
  "id":"39615",
  "name":"Ordinal1",
  "state":"active",
  "warnings":[

  ]
}

Example for response code 201 when created with warnings or errors:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
    {
      "line":5,
      "text":"assigning a float to an integer variable"
    }
  ],
  "id":"39651",
  "name":"Ordinal1",
  "state":"inactive",
  "warnings":[
    {
      "line":10,
      "text":"\"assert\" may become a reserved word in future versions of EPL"
    }
  ]
}

PUT - Update EPL file by identifier

Endpoint: /service/cep/eplfiles/{id}

Request parameters
Name Description
id Identifier of the EPL file to be updated. The identifier must be included in the path. This is a mandatory parameter.
Example request

PUT /service/cep/eplfiles/{id}

The following is an example of a request body:

{
  "name": "Ordinal1",
  "contents": "monitor M1 { action onload() { on wait(1.0) { log \"Hello\" at INFO; }}}",
  "state": "active",
  "description": ""
}

See also the information given for the POST request.

Responses
Code Description
200 Successfully updated. See also the example values below.
404 File with identifier not found. See also the example value for this response code at the end of this section.

Example value for response code 200 when successfully updated with no errors:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[

  ],
  "id":"39615",
  "name":"Ordinal1",
  "state":"active",
  "warnings":[

  ]
}

Example value for response code 200 when updated with errors or warnings:

{
  "description":"",
  "eplPackageName": "eplfiles.Ordinal1",
  "errors":[
    {
      "line":5,
      "text":"assigning a float to an integer variable"
    }
  ],
  "id":"39651",
  "name":"Ordinal1",
  "state":"inactive",
  "warnings":[
    {
      "line":10,
      "text":"\"assert\" may become a reserved word in future versions of EPL"
    }
  ]
}

DELETE - Delete EPL file by identifier

Endpoint: /service/cep/eplfiles/{id}

Request parameters
Name Description
id Identifier of the EPL file to be deleted. The identifier must be included in the path. This is a mandatory parameter.
Example request

DELETE /service/cep/eplfiles/{{id}}

Responses
Code Description
200 Successfully deleted.
404 File with identifier not found. See also the example value for this response code at the end of this section.

Example value for response code 404

The response code 404 indicates that a file with a specific identifier was not found.

{
  "error":"Not Found",
  "exception":"com.apama.in_c8y.FileNotFoundException",
  "message":"File with id 39613 not found",
  "path":"/eplfiles/39613",
  "status":404,
  "timestamp":"2020-01-17T12:21:42.457+0000"
}

where

  • error is the error message.
  • exception specifies the exception that was raised.
  • message is a description of the exception message.
  • path is the path that was requested.
  • status is the status of the application.
  • timestamp is the timestamp in ISO format.

Events and channels

In Apama EPL, interactions with the rest of the Cumulocity ecosystem are done via events. A number of event definitions is provided for accessing Cumulocity data.

Info
Apama and Cumulocity use different “event” concepts. Apama events are used for all interactions with Cumulocity, such as listening for and creating device measurements, alarms and (Cumulocity) events. For more information on Apama events, see Defining event types in the Apama documentation. For more information on Cumulocity events, see Events in the Cumulocity OpenAPI Specification.

Predefined event types

There are some predefined event types to interact with several Cumulocity APIs. Events are sent to Apama applications automatically when a new measurement, alarm or event is created. For interacting with the Cumulocity backend, you can create an event and send it to the relevant channel. Cumulocity will automatically execute either the database query or create the API calls necessary for sending mails, SMS, or similar.

Look at the data model in the API Reference for EPL (ApamaDoc) to see how the events for each stream are structured.

Sending events to a channel

Sending an event is done by constructing the event, either with new <type> followed by assignments to the fields, or with a constructor specifying all of the fields. The send statement is then used to send the event to Cumulocity. The send statement requires a channel - this is the SEND_CHANNEL constant on the event type.

Listening to events

You can trigger your EPL by listening to events on channels. You can subscribe to channels with the monitor.subscribe("string name") method. This can be done in the startup of your monitor, or if you only want to receive events some of the time, called as needed, followed by monitor.unsubscribe("string name").

Listen for events using the on statement, followed by the event type that you are listening to, open and close parentheses, and as <identifier> to name a variable that will hold the event.

By default, a listener will fire once; to make it repeat for all events, use the all keyword before the event type.

Filters

Adding filters can be done by specifying one or more fields between the parentheses for a listener. Only top-level fields can be filtered for. Use if statements for more complex filtering, or for filtering on subproperties of events (for example, in dictionaries).

Standard event types and channels

For the standard Cumulocity events, there are constants that contain the channels for sending and receiving events, for example:

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
send msmnt to Measurement.SEND_CHANNEL;

The events listed in the following table are part of the com.apama.cumulocity package.

Event Channel for sending Channel for receiving
Operation Operation.SEND_CHANNEL Operation.SUBSCRIBE_CHANNEL
Measurement Measurement.SEND_CHANNEL Measurement.SUBSCRIBE_CHANNEL
Event Event.SEND_CHANNEL Event.SUBSCRIBE_CHANNEL
Alarm Alarm.SEND_CHANNEL Alarm.SUBSCRIBE_CHANNEL
ManagedObject ManagedObject.SEND_CHANNEL ManagedObject.SUBSCRIBE_CHANNEL
MeasurementFragment MeasurementFragment.SEND_CHANNEL MeasurementFragment.SUBSCRIBE_CHANNEL

Measurement fragments

Measurement and MeasurementFragment events are always published.

You can generate listeners in EPL that will match on the contents of MeasurementFragment events rather than Measurement events. For example:

on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
}

See also Measurement fragments.

Distinguishing between create and update notifications

When listening for Alarm, Event, ManagedObject or Operation events from Cumulocity, you may want to to distinguish between create and update operations. Each of these event types have actions named isCreate() and isUpdate() for this purpose.

Example for listening for new alarms:

on all Alarm() as alarm {
    if alarm.isCreate() {
        log "Alarm created: " + alarm.toString() at INFO;
    }
    // else it's an update
}

And similarly, only for updated alarms:

on all Alarm() as alarm {
    if alarm.isUpdate() {
        log "Alarm updated: " + alarm.toString() at INFO;
    }
    // else it's a create
}

For events that have come from Cumulocity, one of isUpdate() or isCreate() will always return true. Both actions are provided for choice and readability.

For more information, including examples for the different types of objects, see Receiving update notifications in the Apama documentation.

See also the API Reference for EPL (ApamaDoc) for more information about the isCreate() and isUpdate() actions.

Example

This example listens for new measurements using the com.apama.cumulocity.MeasurementFragment API. It filters incoming measurements to find speed values above a given maximum speed and raises an alarm if the limit is breached.

  1. Subscribe to the MeasurementFragment.SUBSCRIBE_CHANNEL channel.
  2. Listen to the measurement fragment and filter on type, which is c8y_SpeedMeasurement. Ensure that valueFragment has the value c8y_speed and that valuesSeries filters on speedX only. Also filter on value when it is greater than SPEED_LIMIT.
  3. Create the event using the constructor specifying all of the fields.
  4. Send the event to the correct channel - Alarm.SEND_CHANNEL.

The resulting *.mon file can look like this:

using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.MeasurementFragment;

monitor TriggerAlarmForSpeedBreach {
    constant float SPEED_LIMIT := 30.0;
    action onload() {
        monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
        // Everytime a measurement fragment with the specific details of the match criteria is triggered then we should raise an alarm
        on all MeasurementFragment(type="c8y_SpeedMeasurement", valueFragment = "c8y_speed", valueSeries = "speedX", value > SPEED_LIMIT) as mf {
            send Alarm("", "c8y_SpeedAlarm", mf.source, currentTime,
                        "Speed limit breached", "ACTIVE", "CRITICAL", 1,
                        new dictionary<string,any>) to Alarm.SEND_CHANNEL;
        }
    }
}

Built-in actions

Overview

With Apama EPL, it is possible to utilize functions, called “actions”. Every monitor will have at least one action - the onload action. This section covers the already built-in actions ready to use.

See also the API Reference for EPL (ApamaDoc) for actions on the built-in types.

Querying Cumulocity data

To interact with your historical data, you can use one of the following request-response event pairs to look up resources.

Example: To look up alarms, you can send a com.apama.cumulocity.FindAlarm request event with appropriate query parameters to the FindAlarm.SEND_CHANNEL channel. In response, you can expect 0 or more com.apama.cumulocity.FindAlarmResponse events (depending on the number of resources that match the lookup request) and a com.apama.cumulocity.FindAlarmResponseAck event on the FindAlarmResponse.SUBSCRIBE_CHANNEL channel. Similar functionality is also provided for looking up managed objects, events, measurements and operations.

The events listed in the following table are part of the com.apama.cumulocity package.

To look up Request-Response Events Example
ManagedObject FindManagedObject
FindManagedObjectResponse
FindManagedObjectResponseAck
Example
Alarm FindAlarm
FindAlarmResponse
FindAlarmResponseAck
Example
Event FindEvent
FindEventResponse
FindEventResponseAck
Example
Measurement FindMeasurement
FindMeasurementResponse
FindMeasurementResponseAck
Example
Operation FindOperation
FindOperationResponse
FindOperationResponseAck
Example
CurrentUser CurrentUser
GetCurrentUser
GetCurrentUserResponse
Example
TenantOption TenantOption
FindTenantOptions
FindTenantOptionsResponse
Documentation

Invoking other parts of the Cumulocity REST API

The Cumulocity REST API covers some extra functionality which is not covered with the individual event types. To invoke any other part of the REST API, a generic request-response API is provided which you can use to invoke any part of the Cumulocity API.

You can use the following request-response events:

  • com.apama.cumulocity.GenericRequest
  • com.apama.cumulocity.GenericResponse
  • com.apama.cumulocity.GenericResponseComplete
Info
The Apama-ctrl microservice, and thus all EPL apps code within it, runs with a number of permissions which permit the EPL to access all objects in the inventory and also read user details.
This includes personal identifiable information, such as username, email address, and so on.

For more information, see REST implementation in the Cumulocity OpenAPI Specification and Invoking other parts of the Cumulocity REST API in the Apama documentation.

Invoking HTTP services

Info
The information below is for interacting with external HTTP services. For making requests to other parts of the Cumulocity REST API, see Invoking other parts of the Cumulocity REST API. For making requests to anything else on the platform, including other microservices, see Connecting Apama to other microservices.

To interact with HTTP services using REST and JSON, create a com.softwareag.connectivity.httpclient.HttpTransport instance using one of the factory methods:

  • HttpTransport.getOrCreate(string host, integer port) returns HttpTransport
  • HttpTransport.getOrCreateWithConfiguration(string host, integer port, dictionary <string, string> configurations) returns HttpTransport (the keys in the configurations dictionary are the constants on HttpTransport with the CONFIG_ prefix)

On the HttpTransport object, call one of the create methods, passing a path and payload as needed, to produce a Request object.

On the Request object, you may set cookies, headers or query parameters as needed, and can then invoke the request with the execute(action<Response> callback). Supply the name of an action in your monitor for the callback, and it will be invoked with the Response when the request has completed (or timed out).

In the callback, the Response object is supplied with statusCode and payload. Fields on the payload are accessible via the com.apama.util.AnyExtractor object it is supplied in - see the information on access fragments below.

Refer to the API Reference for EPL (ApamaDoc) for further details.

Utility functions

Access fragments

You can access fragments via the params dictionary of most events. The AnyExtractor object can be constructed to help you extract data from any objects containing multiple subfragments and access:

  • action getInteger(string path) returns integer

  • action getFloat(string path) returns float

  • action getString(string path) returns string

  • action getBoolean(string path) returns Boolean

  • action getSequence(string path) returns sequence<any>

  • action getDictionary(string path) returns dictionary<any, any>

You can use a JSON path to navigate in the object structure. For example:

string s := AnyExtractor(measurement.params["fragment"]).getString("sub.fragment.object");

Example “fragment”: “c8y_TemperatureMeasurement”.
Example “sub.fragment.object”: “c8y_TemperatureMeasurement.T.Unit”.

Casting “any” values

Alternatively, use a cast to convert an any to a particular type:

string s := <string> measurement.params["strfragment"];

Note that a cast operation will throw if the object is of a different type.

currentTime and the TimeFormatter

The read-only variable currentTime can be used to obtain the current server time. Apama deals with time using seconds since the Unix Epoch (1 Jan 1970 UTC). You can easily transform it to a human-readable form using the TimeFormat object. The TimeFormat object can be used for formatting dates and times, and also for parsing them.

Example:

using com.apama.correlator.timeformat.TimeFormat;

monitor Example {
    action onload {
        log TimeFormat.format(currentTime, "yyyy.MM.dd 'at' HH:mm:ss") at INFO;
    }
}

For more information on TimeFormat and its functions, see Using the TimeFormat Event Library in the Apama documentation and the API Reference for EPL (ApamaDoc).

inMaintenanceMode

The Util.inMaintenanceMode() function is a fast way to check if the device is currently in maintenance mode. It takes a managed object as a parameter and returns a Boolean which is true if the device is in maintenance mode.

Example:

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;

using com.apama.cumulocity.Util;

monitor ExampleMonitor {
  action onload() {
    // Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
    monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
    on all Measurement() as m {
      integer reqId := integer.getUnique();
      send FindManagedObject(reqId, m.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
      on FindManagedObjectResponse(reqId = reqId, id = m.source) as d and not FindManagedObjectResponseAck(reqId = reqId) {
        if not Util.inMaintenanceMode(d.managedObject) {
          send Event("", "c8y_Event", m.source, currentTime, "Received measurement from active device", new dictionary<string,any>) to Event.SEND_CHANNEL;
        }
      }
    }
  }
}

replacePlaceholders

To build strings, you can use concatenation as follows:

string s:= "An event with the text " + evt.text + " has been created.";

If the texts get longer and have more values that are dynamically set from the data, you can use the Util.replacePlaceholders() function. In your text string, you mark the placeholders with the field name from the event and surround it by #{}. The second parameter to replacePlaceholders can be any event type.

Utils::replacePlaceholders looks up the field name specified in the event or in the parameters of the event to generate the text replacement. You can use field names of type #{X.Y} to access nested structures in the event.

myMailText := Util.replacePlaceholders("The device #{source} created an event with the text #{text} at #{time}", alarm);

If the replacement string is of a form such as #{source.name} where source.name is the name of the underlying managed object/device or #{source.c8y_Hardware.notes} where c8y_Hardware is a fragment on the managed object, then special handling is required to achieve the replacement. After the initial replacement, you must update the placeholder field name and run Util::replacePlaceholders again with the source managedObject.

myMailText := Util.replacePlaceholders("The device #{source} with the serial number #{source.c8y_Hardware.serialNumber} created an event with the text #{text} at #{time}. The device is located at #{source.c8y_Address.street} in #{source.c8y_Address.city}.", alarm);
myMailText := myMailText.replaceAll("#{source.", "#{");
myMailText := Util.replacePlaceholders(myMailText, managedObject);

Advanced features

Custom fragments

Cumulocity APIs let you structure your data freely. In Apama EPL, this is done by adding entries to params, which is of the type dictionary<string, any>. Each Cumulocity event in the com.apama.cumulocity package (such as Alarm, Event, Measurement or Operation) has a params field, which is translated to fragments or optional fields. Thus, when receiving events, your code must look up entries in the params field. When sending events, this can be done by defining event types, or you can use the dictionary<string, any> type. When receiving events, the EPL type is dictionary<any, any>. Note that EPL is strongly typed, so if you are creating an event with no fragments, a new dictionary<string, any> expression is required. If you are providing entries inline with a dictionary literal, then EPL determines the type based on the type of the first key-value pair - thus, for dictionary<string, any>, cast the first value to an any type with the <any> cast operator:

send Event(..., new dictionary<string,any>) to Event.SEND_CHANNEL;
send Event(..., {"fragment":<any>"value"}) to Event.SEND_CHANNEL;

The MeasurementValue type is provided for the measurements in the Measurement type. MeasurementValue has value and unit fields and params for other fragments.

Example 1:

send Measurement("", "c8y_TemperatureMeasurement", "12345", currentTime, {
	"c8y_TemperatureMeasurement":{
		"T1":MeasurementValue(1.0, "C", new dictionary<string,any>),
		"T2":MeasurementValue(2.0, "C", new dictionary<string,any>),
		"T3":MeasurementValue(3.0, "C", new dictionary<string,any>),
		"T4":MeasurementValue(4.0, "C", new dictionary<string,any>),
		"T5":MeasurementValue(5.0, "C", new dictionary<string,any>)
	}},
	new dictionary<string,any>) to Measurement.SEND_CHANNEL;

This will result in the following JSON structure:

{
	"type": "c8y_TemperatureMeasurement",
	"time": "...",
	"source": {
		"id": "12345"
	},
	"c8y_TemperatureMeasurement": {
		"T1": {
			"value": 1,
			"unit": "C"
		},
		"T2": {
			"value": 1,
			"unit": "C"
		},
		"T3": {
			"value": 1,
			"unit": "C"
		},
		"T4": {
			"value": 1,
			"unit": "C"
		},
		"T5": {
			"value": 1,
			"unit": "C"
		},
	}
}

Measurement fragments

A measurement can be broken into individual measurement fragments. This can be done for each fragment and series present in the measurement. See Cumulocity’s domain model for more information on measurement fragments.

Listen for events of type com.apama.cumulocity.MeasurementFragment when you require filtering based on measurement fragments or series, instead of listening for com.apama.cumulocity.Measurement events and looking inside the measurements dictionary. For more information, see Using measurement fragments in the Apama documentation.

Listeners

Triggering a statement by an arriving event is not the only possibility. The following sections cover other ways to combine listeners. Refer to Defining Event Listeners in the Apama documentation for full details.

Filters

Filters enable you to trigger by combinations or sequences of other triggers. If you have a trigger like this

on all Event() as e { ... }

it is also possible to add filters in the pattern.

on all Event(type = "c8y_EntranceEvent") as e { ... }

You can listen for more than one event:

on Event() as e and Alarm() as a { ... }

This will trigger on receiving an Event and an Alarm event - the first of each will be captured.

You can also trigger by sequences:

on all (Event() as e -> Alarm() as a) { ... }

This will trigger for every pair “Event followed by Alarm”. On receiving an event, it will stop listening for further events and start listening for alarms instead. Once an alarm is received, it will start listening for events again.

Timers

You can also trigger listeners based on time. You can either trigger in a certain interval, for example, fire every 5 minutes (300 seconds):

on all wait(300.0) { ... }

Or you can have a listener fire at certain times of the day, with similar functionality to Unix’s cron scheduler:

// timer:at(minutes, hours, daysOfMonth, month, daysOfWeek, (optional) seconds)
// minutes: 0-59
// hours: 0-23
// daysOfMonth: 1-31
// month: 1-12
// daysOfWeek: 0 (Sunday) - 6 (Saturday)
// seconds: 0-59

on all at(*, *, *, *, *) {} // trigger every minute

on all at(*/10, *, *, *, *) {} // trigger every 10 minutes
on all at(0, 1, *, *, [1,3,5]) {} // trigger at 1am every monday, wednesday and friday
on all at(0, */2, 1:7, *, *) {} // trigger every 2 hours on every day in the first week of every month

You can also combine timer patterns with other patterns. For example, you can check if there was an event within a certain time after another event:

on Event() -> wait(600.0) and not Alarm() { ... }

This will trigger if there is an event and within 10 minutes (600 seconds) there is no alarm. Note the use of not which terminates the listener if the event occurs.

You can use a tenant option to set the time zone used for on all at timers. To set the tenant option, specify the microservice.runtime category and the timezone key. For example:

{
    "category" : "microservice.runtime",
    "key" : "timezone",
    "value" : "Europe/Warsaw"
}

See also Timezone variable in this documentation and Supported time zones in the Apama documentation.

Info
This tenant option is only read when the microservice starts. If the tenant option is changed, the microservice only picks this up on the next microservice subscription.

Streams - windows

Streams give you the possibility to operate on windows of events. Streams use the from keyword instead of on and define a window to operate over, and select what output they want from that window using aggregates. Windows can be restricted by two means:

  1. Windows for a certain time - use the within keyword.

    from m in all Measurement(type="c8y_TemperatureMeasurement") within 3600.0 select avg(m.measurements	["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
    
  2. Windows with a certain amount of events - use the retain keyword.

    from m in all Measurement(type="c8y_TemperatureMeasurement") retain 100 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }
    

Streams - outputting periodically

Streams can also control how frequently they evaluate, using the every specifier.

// will output the last measurement arrived every 1 minute
from m in all Measurement(type="c8y_TemperatureMeasurement") within 60.0 every 60.0 select last(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as lastValue { }

// will output the first of every 20 measurements arriving
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select first(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as firstValue { }

// will output the average of all 20 measurements after the 20th arrived
from m in all Measurement(type="c8y_TemperatureMeasurement") retain 20 every 20 select avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value) as avgValue { }

See the Apama documentation for built-in aggregate functions.

Creating own event types

As well as the predefined event types, you can define your own event types. These can be useful to detect patterns of events occurring which trigger other parts of the same module.

event MyEvent {
	Measurement m1;
	Measurement m2;
}

...

on Measurement() as m1 -> Measurement() as m2 {
	route MyEvent(m1, m2);
}
Info
Cumulocity deploys each module into its own namespace, so event definitions from one module cannot be used in other modules. This prevents dependencies between modules.

Creating own actions

Typically, you will structure a monitor using actions (much like functions in Java), as shown in the following examples.

Increasing the given severity:

action upgradeSeverity(string old) returns string {
	if old = "WARNING" { return "MINOR"; }
	if old = "MINOR"   { return "MAJOR"; }
	if old = "MAJOR"   { return "CRITICAL"; }
	return old;
}

Calculating the distance between two geo-coordinates:

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;
}

Variables

You can define variables in your modules.

string myEmailText := "Hello World";
sequence<string> supportedOperationsList := ["c8y_Restart", "c8y_Relay"];

If you define a monitor-scope variable (that is, inside a monitor but not within any actions on that monitor), then that can be used in a listener if you use a colon (:) instead of as for the event co-assignment in the listener. Thus, the example below logs the latest event every 10 seconds:

monitor MyMonitor {
    // monitor scope:
    Event e;
    action onload() {
        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
        on all Event():e {}
        on all wait(10.0) {
            log e.toString();
        }
    }
}

When a listener starts, it takes a copy of all of the local variables. The example below thus logs each event after a 10 second delay, even if other events come in between.

monitor MyMonitor {
    // monitor scope:
    Event e;
    action onload() {
      monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
      on all Event():e {
            on all wait(10.0) {
                log e.toString();
            }
        }
    }
}

Spawning monitor instances and contexts

While it is possible to handle multiple devices in a single monitor (for example, using group by and partition by in streams, or maintaining a dictionary keyed on the device ID for other state), it is often useful to separate processing of different devices into separate monitor instances.

New monitor instances can be created using the spawn statement. This takes a copy of the monitor’s monitor scope variables and runs the named action in a new monitor instance. No listeners are copied into the new monitor. It is also possible to specify a context to spawn the new monitor instance in. Different contexts can run concurrently with each other, and also help isolate different monitors from each other. When constructing a context, supply a name to identify the context, and a Boolean to control if the context is public - that is, it receives the Cumulocity events by default (sent to the default channel).

This pattern is often used with the unmatched keyword to identify events that are not matched by any other listeners in that context. By using a separate context for each monitor, the unmatched behavior is scoped to that monitor. For example:

monitor PerDeviceMeasurementTracker {
	action onload() {
		spawn factory to context("PerDeviceMeasurementTracker", true);
	}
	action factory() {
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		on all unmatched Measurement() as m {
			spawn perDevice(m);
		}
	}

	dictionary<string, Measurement> latestMeasurementByType; // measurements for this device

	action perDevice(Measurement m) {
		processMeasurement(m);
		on all Measurement(source = m.source) as m {
			processMeasurement(m);
		}
	}
	action processMeasurement(Measurement m) {
		latestMeasurementByType[m.type] := m;
	}
}

Best practices and guidelines

EPL monitors

Symptom: Your event processing rules are disabled automatically

If a monitor throws an exception from onload or a listener and the exception is not caught, then the monitor will be terminated. Catch exceptions or avoid the reason for them occurring.

Similarly, if a monitor completes processing an event and has no listeners left active, then it cannot be triggered again, and will automatically remove itself.

Avoid excessive memory usage per monitor

Make sure that your event processing rules do not leak listeners. For example, when doing request-response operations, ensure that no listeners are left active after the response is processed, or if a timeout occurs and there is no response.

Number formats

Cumulocity measurements use the float type. Note that the timestamps are stored as floats (seconds since 1 Jan 1970, 00:00 UTC).

Subscribing to channels and contexts

A context is a parallel processing unit within Apama. Monitor instances can be deployed to multiple contexts using the spawn...to syntax. When subscribing to a channel, all monitor instances within a context will receive events for that subscription. So it is recommended practice to put different subscriptions in different contexts. The use of contexts can prevent part of the application being overloaded from affecting other parts of the application.

Contexts are created with a user-friendly name, and each individual instance of the context object corresponds to different contexts, even if they have the same name.

For example:

action onload() {
   context subContext := context("Worker");
   spawn worker() to subContext;
}
action worker() {
   monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
   on all Measurement() as m {
      ...
   }
}  

Apama limitations in Cumulocity

Using Apama within the Cumulocity environment necessarily has some restrictions to the capabilities available when Apama is used standalone.

There are a number of ways that assets may be deployed to Apama within Cumulocity and the restrictions vary according to those mechanisms:

  • EPL apps - the simplest mechanism to deploy Apama assets into a fully managed Apama correlator, see Deploying apps.
  • Custom microservice - where more complex Apama projects can be built using Cumulocity’s Microservice SDK, see Microservices.

When designing an Apama solution to be deployed within any form of Cumulocity environment, consider the following points.

General Apama limitations when using EPL apps or a custom microservice

  • For scalability, a correlator may move between hosts and therefore does not have access to a persistent file system. It is a standard Cumulocity constraint that all microservices (either provided by the platform, or custom) must be stateless, see Microservices. The Apama features affected by this include:
    • Correlator persistence.
    • MemoryStore persistence.
  • Non-HTTP/REST connections to an external system or process are mostly impractical. Although if a service is available over the internet, then it can be used (for example, an HTTP client inside Apama could connect to publicly accessible HTTP servers). The Apama features affected by this include:
    • Apama Database Connector (ADBC).
    • Correlator-integrated support for the Java Message Service (JMS).
    • Distributed memory stores.
    • Connections between correlators.
  • For security and implementing user access control, Cumulocity does not make the correlator port available to external processes, see Microservices. The following capabilities require access to the correlator port and hence are not compatible with this access control:
    • Command line tools such as engine_connect, engine_management, engine_send, engine_receive.
    • Engine Management API, Event Service API, Scenario Service API.
    • Connecting to adapters running out-of-process in an IAF.
    • Dashboards (provided in-the-box with Apama).
    • Debugging from Apama Plugin for Eclipse. Instead, debug your app running in a local correlator.
    • Correlator REST interface.
  • To reduce both the memory usage of an application during startup and the application’s startup time, ensure the application is completely initialized before injecting monitors that automatically unload, and before running time-consuming queries.

Specific Apama limitations when using EPL apps

  • For ease of use, the correlator startup is controlled by Cumulocity. Thus, features that require you to change configuration files or command line options are not accessible. The Apama features affected by this include:
    • Persistence.
    • Connectivity plug-ins.
  • For security, the file system used by the correlator is not accessible. The Apama features affected by this include:
    • Accessing the input log.
    • Using custom plug-ins.
    • Using file system assets for enrichment.
  • For simplicity, it is only possible to make independent EPL injections. Each monitor is managed independently and so dependencies between different monitors cannot be created. The Apama features affected by this include:
    • A *.mon file must not contain a package statement (to do so is an error).
    • It is not possible to share event definitions between separate *.mon files.
    • It is not possible to use Apama queries.
    • You can only use the bundles listed in Developing apps with Apama Plugin for Eclipse.

All of these restrictions are implemented to ensure the smooth and secure operation of EPL apps within Cumulocity.

Examples

Calculating an hourly average of measurements

We are assuming the input data looks like this:

{
  "c8y_TemperatureMeasurement": {"T": {"value": ..., "unit": "C"}},
  "time": "...",
  "source": {"id":"..."},
  "type": "c8y_TemperatureMeasurement"
}

To create the average (mean), we need the following parts in the module:

  • A time window over one hour, grouped by device (source).

  • A select that returns the average calculation every hour, the source and the unit (as we must use an aggregate over the window contents, we select the last unit - we assume all measurements are of the same unit). Note the AverageByDevice event definition to hold these.

  • Everything created as a new measurement.

For example:

using com.apama.aggregates.avg;
using com.apama.aggregates.last;
using com.apama.cumulocity.Measurement;

monitor HourlyAvgMeasurementDeviceContext {

  event AverageByDevice {
    string source;
    float avgValue;
    string unit;
  }

  action onload() {
    // Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

    from m in all Measurement(type="c8y_TemperatureMeasurement") within (3600.0)
      group by m.source select
        AverageByDevice(m.source,
          avg(m.measurements["c8y_TemperatureMeasurement"]["T"].value),
          last(m.measurements["c8y_TemperatureMeasurement"]["T"].unit)) as avgdata {
            send Measurement("", "c8y_AverageTemperatureMeasurement", avgdata.source, currentTime,
              {"c8y_AverageTemperatureMeasurement":
                {
                  "T": MeasurementValue(avgdata.avgValue, avgdata.unit, new dictionary<string,any>)
                }
              }, new dictionary<string,any>) to Measurement.SEND_CHANNEL;
          }
  }
}

Creating alarms from bit measurements

Devices often keep alarm statuses in registers and cannot interpret the meaning of alarms. In this example, we assume that a device just sends the entire register as a binary value in a measurement. A rule must identify the bits and create the respective alarm.

We create three dictionaries to map alarm text, type and severity for each of the bits, and an action to look up the value. We use -1 to indicate a default value, and replace <position> with the string form of the position.

dictionary<integer, string> positionToAlarmType := {
	0  : "c8y_HighTemperatureAlarm",
	1  : "c8y_ProcessingAlarm",
	2  : "c8y_DoorOpenAlarm",
	3  : "c8y_SystemFailureAlarm",
	-1 : "c8y_FaultRegister<position>Alaram"
};

dictionary<integer, string> positionToAlarmSeverity := {
	0  : "MAJOR",
	1  : "WARNING",
	2  : "MINOR",
	3  : "CRITICAL",
	-1 : "MAJOR"
};

dictionary<integer, string> positionToAlarmText := {
	0  : "The machine temperature reached a critical status",
	1  : "There was an error trying to process data",
	2  : "Door was opened",
	3  : "There was a critical system failure",
	-1 : "An undefined alarm was reported on position <position> in the binary fault register"
};

action getText(integer bitPosition, dictionary<integer, string> lookup) returns string {
	string template := lookup.getOr(bitPosition, lookup[-1]);
	return template.replaceAll("<position>", bitPosition.toString());
}

To analyze the binary measurement value, we will interpret it as a string value and loop through each character. The getActiveBits() function will do that and return a list of the bit positions at where the measurement had a “1”. We can then use a for loop to iterate through that:

action getBitPositions(string binaryAsText) returns sequence<integer> {
	sequence<integer> bitsSet := new sequence<integer>;
	integer i := 0;
	while i < binaryAsText.length() {
		string character := binaryAsText.substring(i, i+1);
		if character = "1" {
			bitsSet.append(binaryAsText.length() - i - 1);
		}
		i:=i+1;
	}
	return bitsSet;
}

action onload() {
	// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
	monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
	on all Measurement(type = "c8y_BinaryFaultRegister") as m {
		string faultRegister := m.measurements.getOrDefault("c8y_BinaryFaultRegister").getOrDefault("errors").value.toString();
		integer bitPosition;
		for bitPosition in getBitPositions(faultRegister) {
			Alarm alarm    := new Alarm;
			alarm.type     := getText(bitPosition, positionToAlarmType);
			alarm.severity := getText(bitPosition, positionToAlarmSeverity);
			alarm.text     := getText(bitPosition, positionToAlarmText);
			alarm.source   := m.source;
			alarm.time     := m.time;
			alarm.status   := "ACTIVE";
			send alarm to Alarm.SEND_CHANNEL;
		}
	}
}

Creating a measurement like this

{
	"c8y_BinaryFaultRegister": {"errors": {"value": 10110}},
	"time": "...",
	"source": {"id": "..."},
	"type": "c8y_BinaryFaultRegister"
}

will trigger the last statement three times.

  • measurement at bit position 1 - c8y_ProcessingAlarm, WARNING, “There was an error trying to process data”
  • measurement at bit position 2 - c8y_DoorOpenAlarm, MINOR, “Door was opened”
  • measurement at bit position 4 - c8y_FaultRegister4Alarm, MAJOR, “An undefined alarm was reported on position 4 in the binary fault register”

and therefore create three alarms.

Consumption measurements

Assuming we have a sensor which measures the current fill level of something and sends the values on a regular basis to Cumulocity, we can easily create additional consumption values. Calculating the absolute difference between two measurements can be useful, but it will only give you a clear view if the measurements are sent always in the same interval. Therefore, we will put the absolute difference in relation to the time difference and calculate as a per hour consumption.

We will compare the value and time difference of two adjacent measurements for a device, using a stream retaining 2 entries, and selecting the first and last timestamp and value.

using com.apama.aggregates.last;
using com.apama.aggregates.first;
using com.apama.aggregates.count;

monitor FillLevelMeasurements {

	event FillLevel {
		float firstValue;
		float firstTime;
		float lastValue;
		float lastTime;
		string source;
	}

	action calculateConsumption(FillLevel l) returns float {
		if(l.firstTime = l.lastTime) {
			return 0.0;
		} else {
			return ((l.lastValue - l.firstValue) * 3600.0) / (l.lastTime - l.firstTime);
		}
	}

	action onload() {
		// Subscribe to Measurement.SUBSCRIBE_CHANNEL to receive all measurements
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		from m in all Measurement(type = "c8y_WaterTankFillLevel") partition by m.source retain 2 group by m.source having count() = 2
			select FillLevel(first(m.measurements["c8y_WaterTankFillLevel"]["level"].value), first(m.time),
							last(m.measurements["c8y_WaterTankFillLevel"]["level"].value), last(m.time), m.source) as fill {

			Measurement m := new Measurement;
			m.type := "c8y_HourlyWaterConsumption";
			m.time := currentTime;
			m.source := fill.source;
			MeasurementValue mv := new MeasurementValue;
			mv.value := calculateConsumption(fill);
			mv.unit := "l/h";
			m.measurements[m.type] := {"consumption":mv};
			send m to Measurement.SEND_CHANNEL;
		}
	}
}

Miscellaneous sample apps

The EPL editor in the Streaming Analytics application provides several sample apps which demonstrate how to use Apama EPL, for example, to query for Cumulocity objects or to create alarms. You can use these samples to build your own apps.

Study - Circular geofence alarms

Overview

This section gives an in-depth example how you can create more complex rules. It uses multiple of the features explained before in the other sections of this guide.

If you are just starting with Apama EPL, take a look at Examples.

Prerequisites

Goal

We want our tracking devices that are continuously sending location events to automatically generate alarms if they move outside a geofence. This geofence will be a circle and should be configurable for each device separately. The alarm will be created at the moment the device moves outside the geofence. While it is moving outside, it should not create new alarms because the first one will remain active. As soon as the device moves back into the geofence, the alarm will be cleared.

Cumulocity data model

Location event structure (the part we need):

{
  "id": "...",
  "source": {"id": "..."},
  "text": "...",
  "time": "...",
  "type": "...",
  "c8y_Position": {"alt": ..., "lng": ..., "lat": ...}
}

We store the geofence configuration in the device (the radius will be configured in meters):

{
  "c8y_Geofence": {"lat": ..., "lng": ..., "radius": ...}
}

Additionally, we want to enable/disable the geofence alarms for each device without removing the configuration entirely. We will do that by adding/removing “c8y_Geofence” to c8y_SupportedOperations in the device:

{
  "c8y_SupportedOperations": [..., "c8y_Geofence", ...]
}

Calculation

The device is outside of the geofence if the distance between the current position and the center is bigger than the configured radius of the geofence. What we need is a function that can calculate the difference between two geo-coordinates:

action distance(float lat1, float lon1, float lat2, float lon2) returns float {
	float R := 6371000.0;
	float toRad := float.PI / 180.0;
	float lat1Rad := lat1 * toRad;
	float lat2Rad := lat2 * toRad;
	float deltaLatRad := (lat2-lat1) * toRad;
	float deltaLonRad := (lat2-lat1) * toRad;
	float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
	float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
	return R * c;
}

The above action will return the distance in meters.

Step 1: Filtering the input

The main input for this module will be events. To discard non-matching events as early as possible, we perform this as the first check in the listener:

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
on all Event() as e {
	if e.params.hasKey("c8y_Position") {
		// we have an event
	}
}

Step 2: Collecting necessary data

In the next step, we need the configuration of the geofence for the calculation and grab it.

monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
...
integer reqId := integer.getUnique();
send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
on FindManagedObjectResponse(reqId = reqId) as resp
   and not FindManagedObjectResponseAck(reqId) {
	  ManagedObject dev := resp.managedObject;
   }

Step 3: Checking if the device supports c8y_Geofence

With the device available we will now check if there is a geofence configured for the device and if it is activated (contains “c8y_Geofence” in supportedOperations). To check the c8y_SupportedOperations array, we can use the indexOf() function. This function will loop through all elements and return the index of that entry, or a negative number if the value is not present. For the configuration, we will just check if the device contains the fragment “c8y_Geofence”.

Once we have an event and a device, we extract the data from the event’s c8y_Position and the device’s c8y_Geofence. These objects are mapped to dictionary<any, any> entries in the params. As the params hold values of type any, we must cast to a dictionary<any, any>.

if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {
	dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
	float eventLat := <float> evtPos["lat"];
	float eventLng := <float> evtPos["lng"];

	dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
	float centerLat := <float> devGeofence["lat"];
	float centerLng := <float> devGeofence["lng"];
	float maxDistance := <float> devGeofence["radius"];
}

Step 4: Creating the trigger

As mentioned earlier, the device is outside of the fence if the distance between the current device position and the geofence center is bigger than the configured geofence radius. To trigger the alarm, we need two events so we can check if the device entered or left the geofence within these two events.

In the first step, we calculate the distance with the function mentioned earlier:

float d := distance(centerLat, centerLng, eventLat, eventLng);

Now we re-route this as an event with:

event LocationEventWithDistance {
	string source;
	float distance;
	Event e;
	float maxDistance;
}

...

route LocationEventWithDistance(e.source, d, e, maxDistance);

We place the source in the event so we can easily match it in a listener.

We now set up a listener triggered by the event LocationEventWithDistance, listening for the next LocationEventWithDistance - for the same source:

on all LocationEventWithDistance() as firstPos {
	on LocationEventWithDistance(source = firstPos.source) as secondPos {
		// now have two events with distances
	}
}

This pair of LocationEventWithDistance events now holds all data for checking if we should create the alarm or not. Note that we are filtering the secondPos event to be for the same source as the first - there will be an active listener for every device we have received an event from.

Step 5: Creating the alarm

To create the alarm, we now need two events where the first one has a distance smaller than the radius and the second one has a distance bigger than the radius. This would mean that the device just left the geofence.

if firstPos.distance <= firstPos.maxDistance and
	secondPos.distance > secondPos.maxDistance {
	send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
					"Device moved out of circular geofence", "ACTIVE",
					"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
}

Step 6: Clearing the alarm

To clear the alarm, we must just switch the condition at the bottom and additionally grab the currently active alarm to get its ID. We do not need to care about whether there is an existing alarm at this point. If there is none, the listener will trigger the and not FindAlarmResponseAck, terminating the listener:

monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
...
if firstPos.distance > firstPos.maxDistance and
	secondPos.distance <= secondPos.maxDistance {
	integer reqId:= integer.getUnique();
	send FindAlarm(reqId, {"source": firstPos.source,
		"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
	on FindAlarmResponse(reqId=reqId) as alarmResponse
	   and not FindAlarmResponseAck(reqId=reqId) {
		send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
					firstPos.source, currentTime, "Device moved back into circular geofence",
					"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
	}
}

Putting everything together

We can now combine all the parts into one module. The order of the listeners does not matter.

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
using com.apama.cumulocity.FindAlarm;
using com.apama.cumulocity.FindAlarmResponse;
using com.apama.cumulocity.FindAlarmResponseAck;

monitor MonitorDevicesForCircularGeofence {

	event LocationEventWithDistance {
		string source;
		float distance;
		Event e;
		float maxDistance;
	}

	action onload {
		monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
		monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
		monitor.subscribe(FindAlarmResponse.SUBSCRIBE_CHANNEL);
		on all Event() as e {
			if e.params.hasKey("c8y_Position") {
				// we have an event
				integer reqId := integer.getUnique();
				send FindManagedObject(reqId, e.source, new dictionary<string,string>) to FindManagedObject.SEND_CHANNEL;
				on FindManagedObjectResponse(reqId = reqId) as resp
				and not FindManagedObjectResponseAck(reqId) {
				ManagedObject dev := resp.managedObject;

				if(dev.params.hasKey("c8y_Geofence") and dev.supportedOperations.indexOf("c8y_Geofence") >= 0) {

						dictionary<any, any> evtPos := <dictionary<any, any> > e.params["c8y_Position"];
						float eventLat := <float> evtPos["lat"];
						float eventLng := <float> evtPos["lng"];

						dictionary<any,any> devGeofence := <dictionary<any,any> > dev.params["c8y_Geofence"];
						float centerLat := <float> devGeofence["lat"];
						float centerLng := <float> devGeofence["lng"];
						float maxDistance := <float> devGeofence["radius"];

						float d := distance(centerLat, centerLng, eventLat, eventLng);

						route LocationEventWithDistance(e.source, d, e, maxDistance);
					}
				}
			}
		}

		on all LocationEventWithDistance() as firstPos {
			on LocationEventWithDistance(source = firstPos.source) as secondPos {
				// now have two events with distances
				if firstPos.distance <= firstPos.maxDistance and
					secondPos.distance > secondPos.maxDistance {
					send Alarm("", "c8y_GeofenceAlarm", firstPos.source, currentTime,
							"Device moved out of circular geofence", "ACTIVE",
							"MAJOR", 1, new dictionary<string,any>) to Alarm.SEND_CHANNEL;
				}

				if firstPos.distance > firstPos.maxDistance and
					secondPos.distance <= secondPos.maxDistance {
					integer reqId:= integer.getUnique();
					send FindAlarm(reqId, {"source": firstPos.source,
						"status": "ACTIVE", "type": "c8y_GeofenceAlarm"}) to FindAlarm.SEND_CHANNEL;
					on FindAlarmResponse(reqId=reqId) as alarmResponse
					and not FindAlarmResponseAck(reqId=reqId) {
						send Alarm(alarmResponse.id, "c8y_GeofenceAlarm",
								firstPos.source, currentTime, "Device moved back into circular geofence",
								"CLEARED", alarmResponse.alarm.severity, 1, new dictionary<string, any>) to Alarm.SEND_CHANNEL;
					}
				}
			}
		}
	}

	action distance(float lat1, float lon1, float lat2, float lon2) returns float {
		float R := 6371000.0;
		float toRad := float.PI / 180.0;
		float lat1Rad := lat1 * toRad;
		float lat2Rad := lat2 * toRad;
		float deltaLatRad := (lat2-lat1) * toRad;
		float deltaLonRad := (lat2-lat1) * toRad;
		float a := (deltaLatRad/2.0).sin().pow(2.0) * lat1Rad.cos() * lat2Rad.cos() * (deltaLonRad/2.0).sin().pow(2.0);
		float c := 2.0 * a.sqrt().atan2((1.0-a).sqrt());
		return R * c;
	}
}

Connecting Apama to other microservices

Overview

Streaming analytics applications using Apama can make use of applications running in other microservices. This section uses the /health endpoint of an Apama-ctrl microservice, but the steps apply to connecting to any other microservice running inside Cumulocity. This section is going to show you how to create a connection to the Cumulocity platform from within Apama EPL which can be used to invoke other microservices directly. It will then show you how to make a request and decode the result.

We will assume that you are developing an EPL app using the EPL editor that is part of the Streaming Analytics application and demonstrate a request to a microservice. The steps in this guide will also work with any other way you could be creating an Apama application and can be used to interact with any microservice.

We will be making use of the CumulocityRequestInterface API. For more technical information about this API, see Invoking microservices in the Apama documentation.

Creating an EPL app

Click the Streaming Analytics icon in the application switcher. On the resulting home screen, navigate to the EPL Apps page and then click New EPL app. You will now see an EPL editor window in which to create the app which interacts with another microservice.

Connecting to the Cumulocity platform

To support making these requests, we provide a helper event with actions to automatically connect to the Cumulocity platform and then create requests which can be used to call other microservices. This helper event is called CumulocityRequestInterface and is within the com.apama.cumulocity package. This helper event provides a static action which will connect to Cumulocity and return an instance of the event. It can automatically connect either from within a microservice or the Cumulocity platform itself, or from a remote correlator. That instance has an action which will create a request to call a specific microservice.

To create the connection from your own code, simply call the connectToCumulocity method and store the result:

CumulocityRequestInterface cumulocity := CumulocityRequestInterface.connectToCumulocity();

This will automatically create a connection using the credentials and connection details provided to your microservice, or using the configuration for the Cumulocity transport when connecting from an external Apama instance.

Making microservice requests

The CumulocityRequestInterface instance has an action on it to create a request:

/**
* Allows creation of a request on a transport that
* has been configured for a Cumulocity connection.
*
* @param method The type of HTTP request, for example "GET".
* @param path A specific path to be appended to the request.
* @param payload A dictionary of elements to be included in the request.
*/
action createRequest(string method, string path, any payload) returns Request

This takes the HTTP method to use (usually GET, PUT or POST), a path including the Cumulocity service prefix (typically something like /service/serviceName/path/on/service) and the payload. The payload will be converted to a JSON document before submitting to the microservice. The action returns a Request object which is part of the HTTP Client interface, documentation of which can be found in the API Reference for EPL (ApamaDoc).

Requests are executed with a call-back action as an argument which will be invoked when the request is completed with the response as an argument. If you must set any options, query parameters or headers on the request, you can set those on the Request object before calling it. For example:

action responseCallback(Response resp) {
    string objectId := resp.payload.getString("id");
    ...
}
...
Request req := cumulocity.createRequest("GET", "/service/otherService/data", any());
req.setQueryParameter("type", "object");
req.execute(responseCallback);

The response will also be decoded from JSON and the response payload uses the AnyExtractor pattern which you can find linked from the Response event in the HTTP Client transport documentation. The above example will be equivalent to the REST request GET http://cumulocity/service/otherService/data?type=object.

Example request to a microservice endpoint

The following is a very simple application that shows how to query another microservice. We are using the /health endpoint of an Apama-ctrl microservice as an example.

We will start with EPL which connects to Cumulocity and calls an action to send the request.

using com.apama.cumulocity.CumulocityRequestInterface;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.Response;

monitor CallAnotherMicroservice {

	CumulocityRequestInterface requestIface;

	action onload() {
		requestIface := CumulocityRequestInterface.connectToCumulocity();
		sendHealthRequest();
	}

Sending the request

First, we create the Request with: the request type, the request path, and the payload any() because in this example we do not need to put anything in the payload.

We then use execute to send the request and provide an action to be called with the response.

action sendHealthRequest()
{
	Request healthRequest:=
		requestIface.createRequest("GET", "/service/cep/health", any());
	healthRequest.execute(responseHandler);
}

We use an Apama-ctrl microservice for this example, which has the context path of /cep. To modify this for another microservice, substitute /cep with the context path as defined in the manifest for your microservice. The /health endpoint completes the request path for this example, but could be replaced with any valid endpoint of the microservice.

Receiving the response

Here is the defined action that we used when sending the request. This action is called in response to the sent request and is provided with the Response object.

For this example, we simply log the status code and the body of the response.

action responseHandler(Response healthResponse)
{
	integer statusCode := healthResponse.statusCode;
	string payload := healthResponse.payload.data.toString();
	log "Health response status code = " + statusCode.toString() +
		", response body = " + payload at INFO;
}

Other microservices

This section was demonstrating talking to an Apama-ctrl microservice. However, you can also access any other microservice through Cumulocity as long as it uses standard REST requests with JSON payloads. You must simply construct the appropriate /service URL using the name of your microservice followed by the path of the request within your microservice.