The Cumulocity Transport Connectivity Plug-in

About the Cumulocity transport

Cumulocity is used for communication with connected IoT devices. See https://cumulocity.com/ for detailed information.

Info
For details on the support lifetime of the connectivity to Cumulocity, refer to the Release availability document.

Apama provides several connectivity bundles which allow you to communicate with the IoT devices connected to Cumulocity. For example, you can receive events from the devices, send operations to the devices, and query the state stored in the platform.

The following bundles are under the Cumulocity IoT group of connectivity bundles (see Adding bundles to projects for more information on connectivity bundles):

  • Cumulocity Client (includes REST support). This bundle is for the legacy real-time notifications feature and has been superseded by the Cumulocity Notifications 2.0 bundle.

    Info
    Use of the Cumulocity Client bundle is deprecated.
  • Cumulocity Notifications 2.0 (includes REST support). This is the recommended way to receive events and updates as they are published from devices as well as query and publish data to the platform. This bundle replaces the Cumulocity Client bundle.

    Info
    The Cumulocity Notifications 2.0 feature is currently in private preview. If you would like to have it enabled for your tenant, please contact Cumulocity Operations.
  • Cumulocity REST Support. This bundle is for applications that do not need to receive real-time updates and only need to publish data or query the platform. You must not add it to your project if you are adding one of the other bundles.

Info
When using Cumulocity Notifications 2.0, Apama creates subscriptions in Notifications 2.0 that persist after Apama has exited to allow processing of the event stream to be resumed later. This consumes the resources of the server. If a particular subscription name is no longer used, you must manually delete the subscription using the Notifications 2.0 REST API.

You configure the Cumulocity connectivity bundles by editing the .properties files that come with the respective connectivity bundle. All bundles provide a .properties file for configuring the REST API connection. The Cumulocity Client and Cumulocity Notifications 2.0 bundles also provide a second .properties file with properties specific to that connection. See Adding the Cumulocity connectivity plug-in to a project and Configuring the Cumulocity transport for further information.

In addition to the above connectivity bundles, the following EPL bundles are also available (see also Adding EPL bundles to projects). These bundles are included automatically if you add any of the connectivity bundles.

  • Event Definitions for Cumulocity. This EPL bundle defines all events that can be used for interacting with Cumulocity. This includes definitions for events that you receive from Cumulocity, events that you can send to Cumulocity, and event APIs that you can use for requesting data from Cumulocity. For more information, see the com.apama.cumulocity package in the API reference for EPL (ApamaDoc).
  • Utilities for Cumulocity. This EPL bundle contains useful utilities for EPL code that is interacting with Cumulocity. It also contains a geofence helper utility for determining whether a location is part of a geofence or not. For more information, see the com.apama.cumulocity.Util and the com.apama.cumulocity.GeoFenceContainer events in the API reference for EPL (ApamaDoc).
Info
In addition to using Apama Plugin for Eclipse to add the above mentioned connectivity and EPL bundles, you can also do this using the apama_project command-line tool. See Creating and managing an Apama project from the command line for more information.

The samples/cumulocity directory of your Apama installation includes samples which show how to use the Cumulocity bundles. For more information, see the README.txt file in the corresponding samples folder.

Info
The Cumulocity Client and Cumulocity REST Support bundles do not support reliable messaging.

As with other connectivity plug-ins, the EPL application should call com.softwareag.connectivity.ConnectivityPlugins.onApplicationInitialized(). For more information, see Sending and receiving events with connectivity plug-ins. Per-tenant EPL applications automatically start to receive Notifications 2.0 events after this call. Multi-tenant applications need to additionally send a com.apama.cumulocity.notifications2.SubscribeTenantNotifications event for each tenant to SubscribeTenantNotifications.SEND_CHANNEL to start receiving notifications from that tenant. For an example, see Working with multi-tenant deployments.

Configuring the Cumulocity transport

When you add one of the Cumulocity connectivity bundles in Apama Plugin for Eclipse, one or two .properties configuration files are created. You have to provide all required information in these files in order to connect to Cumulocity.

Configuration for all Cumulocity connectivity bundles

All bundles contain the properties to connect to the REST API in your tenant. The following is an example of a typical CumulocityIoTREST.properties configuration file:

##### Username and password must be provided for authentication
CUMULOCITY_USERNAME=MYNAME
CUMULOCITY_PASSWORD=MYPW

##### Application key and the URL of the application
CUMULOCITY_APPKEY=MYAPP
CUMULOCITY_SERVER_URL=https://myserver.cumulocity.com

##### TLS certificate authority file
CUMULOCITY_AUTHORITY_FILE=

##### Allow connection to Cumulocity instance with unknown certificate
CUMULOCITY_ALLOW_UNAUTHORIZED_SERVER=false

##### Set this to the tenant ID if you don't have a per-tenant hostname
CUMULOCITY_TENANT=

##### Proxy server host and port to start using HTTP proxy
CUMULOCITY_PROXY_HOST=proxy_host
CUMULOCITY_PROXY_PORT=

##### Proxy username and password must be provided for basic authentication
CUMULOCITY_PROXY_USERNAME=ProxyUser
CUMULOCITY_PROXY_PASSWORD=ProxyPW

##### Number of concurrent REST connections to use
CUMULOCITY_NUM_CLIENTS=3

In order to connect to Cumulocity, it is required that you set the following properties.

Property

Description

CUMULOCITY_USERNAME

Username for authentication. This can be specified either as a username alone or in the form of tenantID/username. In recent versions of Cumulocity, the tenant ID is visible in the web applications in the user menu in the top right. Required when not running in a microservice.

When running in a microservice, you do not need to set this, unless you have added the Cumulocity Client bundle.

Type: string.

CUMULOCITY_PASSWORD

Password for authentication. Required when not running in a microservice.

When running in a microservice, you do not need to set this, unless you have added the Cumulocity Client bundle.

Type: string.

CUMULOCITY_APPKEY

Unique key for the application defined on the Cumulocity instance. Required when not running in a microservice.

The application key is defined in Cumulocity. Log in to your account in Cumulocity, and use the Administration application to add an external application. You can then specify the application key and the URL of the application. See the Cumulocity documentation at https://cumulocity.com/docs/ for more information.

When running in a microservice, you do not need to set this, unless you have added the Cumulocity Client bundle.

Type: string.

CUMULOCITY_SERVER_URL

URL of the Cumulocity tenant. Required when not running in a microservice.

Type: string.

Under normal conditions in the cloud, the above properties are all you need to set. The properties listed below may be useful for custom on-premises installations of Cumulocity or for Cumulocity Edge.

Property

Description

CUMULOCITY_AUTHORITY_FILE

The TLS certificate authority (CA) file. If you are connecting to a Cumulocity platform whose certificate is not signed by a trusted CA, provide the certificate of your signing authority here. Type: string.

CUMULOCITY_ALLOW_UNAUTHORIZED_SERVER

Set this to true when connecting to a Cumulocity platform whose certificate is not signed by a trusted CA. This generally happens in the Cumulocity Edge instance where the installation is using a self-signed certificate. Default: false.

CUMULOCITY_TENANT

Unique name of the application tenant. This configuration option is useful in the case of Cumulocity Edge. Type: string.

CUMULOCITY_PROXY_HOST

The name of the proxy server to connect to. Type: string.

CUMULOCITY_PROXY_PORT

The port number of the proxy server to connect to. Both host and port are required to enable an HTTP proxy.

Type: integer.

CUMULOCITY_PROXY_USERNAME

Optional proxy user name for HTTP basic authentication. Type: string.

CUMULOCITY_PROXY_PASSWORD

Optional proxy password for HTTP basic authentication. Provide both user name and password if the proxy server has basic authentication enabled.

Type: string.

CUMULOCITY_NUM_CLIENTS

The number of simultaneous client connections to use for handling queries to the platform.Type: integer.

Default: 3.

CUMULOCITY_MULTI_TENANT_APPLICATION

Set this to true when developing an application that is deployed using MULTI_TENANT isolation in Cumulocity. It is used only when running as an external project and connecting to remote Cumulocity. See also Working with multi-tenant deployments.Type: boolean.

Default: false.

CUMULOCITY_MULTI_TENANT_MICROSERVICE_NAME

The name of the multi-tenant microservice to use. It is used only when running as an external multi-tenant project and connecting to remote Cumulocity. It is required when developing a MULTI_TENANT microservice application and is ignored if the CUMULOCITY_MULTI_TENANT_APPLICATION property is not already set.If a multi-tenant microservice does not already exist, either upload a multi-tenant microservice or create a microservice with a valid manifest. Subscribe the microservice to tenants for which you want to run the project.

See also Working with multi-tenant deployments.

Type: string.

The following properties are not provided by default in the `.properties` configuration file. If you add them, they will be used.

Property

Description

CUMULOCITY_INITIAL_DELAY_SECS

The initial delay (in seconds) that can be set for querying tenant subscriptions.Type: float.

Default: 0 seconds.

CUMULOCITY_MAX_BATCH_SIZE

The maximum number of Apama events that can be batched as a single request before sending to Cumulocity. The only event type that supports batching is com.apama.cumulocity.Measurement. Type: integer.

Default: 1000.

CUMULOCITY_LATENCY_SLOW_THRESHOLD_SECS

Update the mostRecentSlowRequestDetails status (see Monitoring status for Cumulocity) if the time for fetching one page of response multiplied by the number of total pages is greater than this threshold. Set this to 0 to disable updates.

Type: integer.

Default: 1 second.

CUMULOCITY_LATENCY_LOG_THRESHOLD_SECS

Log a warning if a single-paged or multi-paged request takes more time to complete than defined by this threshold. Set this to 0 to disable logging.

Type: integer.

Default: 10 seconds.

CUMULOCITY_LATENCY_BATCH_THRESHOLD_SECS

Log a warning if a batch of requests takes more time to complete than defined by this threshold. If a warning for an individual request of the batch has already been logged with CUMULOCITY_LATENCY_LOG_THRESHOLD_SECS, then a warning for this batch is not logged. Set this to 0 to disable logging.

Type: integer.

Default: 50 seconds.

CUMULOCITY_SMS_SENDER_NAME

The sender name to be used as the default if it is not specified in the SendSMS event and not configured in the messaging/sms.senderName tenant option of Cumulocity. The tenant option is given preference over the value of CUMULOCITY_SMS_SENDER_NAME. The tenant option is checked for every SendSMS event. If the check does not find it in Cumulocity, then only the value of CUMULOCITY_SMS_SENDER_NAME is used as the default sender name.

Type: string.

Default: Apama.

CUMULOCITY_SMS_SENDER_ADDRESS

The sender address to be used as the default if it is not specified in the SendSMS event and not configured in the messaging/sms.senderAddress tenant option of Cumulocity. The tenant option is given preference over the value of CUMULOCITY_SMS_SENDER_ADDRESS. The tenant option is checked for every SendSMS event. If the check does not find it in Cumulocity, then only the value of CUMULOCITY_SMS_SENDER_ADDRESS is used as the default sender address.

You can provide a sender address in the following formats: PROTOCOL:NUMBER or just NUMBER. Valid protocols include tel, SHORT, ICCID and ACR. If the protocol is missing or invalid, tel is used as the default protocol.

Type: string.

Default: apama.

CUMULOCITY_CONCURRENCY_MODE

If set to auto (default), the transport tries to order requests across multiple clients to avoid races between multiple updates relating to the same managed object. For more information, see Optimizing requests to Cumulocity with concurrent connections. If set to always, no attempt to preserve order is made.Type: string.

Default: auto.

Configuration for Cumulocity Notifications 2.0

If you are using the Cumulocity Notifications 2.0 bundle, you also get a .properties file to configure this connection. The following is an example of a typical CumulocityNotifications2.properties configuration file:

##### A unique name to identify the subscriber.
##### Disconnecting and reconnecting or restarting with the same subscriber name
##### will resume the message stream from the point it left off.
##### Required if not running in a microservice.
##### Will be inferred from the microservice contextPath when running as a microservice
CUMULOCITY_NOTIFICATIONS_SUBSCRIBER_NAME=mySubscriberName

##### Name of the subscription to use or create.
##### All correlators which use the same subscription name
##### will consume the same set of messages.
CUMULOCITY_NOTIFICATIONS_SUBSCRIPTION_NAME=mySubscriptionName

##### An Exclusive subscription can only have one connection for each subscriber name,
##### subsequent connections will fail. A Shared or KeyShared subscription can have
##### multiple connections with the same subscriber name, in which case messages will
##### be delivered only to one of the set of connections.
##### In Shared mode any message can be delivered to any connection.
##### In KeyShared mode messages relating to a given device will all be received by the same connection.
CUMULOCITY_NOTIFICATIONS_SUBSCRIPTION_TYPE=Exclusive

##### Create a number of parallel connections for receiving notifications on.
##### Requires Shared or KeyShared subscription type.
##### Increase this number above 1 if your application isn't consuming notifications fast enough.
##### For best performance the number of clients should be a power of 2.
CUMULOCITY_NOTIFICATIONS_NUMBER_CLIENTS=1

##### The limit of messages which can be buffered within the receiving chain before being
##### delivered to subscribed contexts.
CUMULOCITY_NOTIFICATIONS_MAX_BUFFERSIZE=1000

##### The largest batch of messages that can be received from notifications at one time.
CUMULOCITY_NOTIFICATIONS_MAX_BATCHSIZE=1000

When you add the Cumulocity Notifications 2.0 bundle, you must set the following required properties:

Property

Description

CUMULOCITY_NOTIFICATIONS_SUBSCRIBER_NAME

A unique name to identify the subscriber. Disconnecting and reconnecting or restarting with the same subscriber name resumes the message stream from the point it left off.

Required if not running in a microservice.

If running in a microservice, this is inferred from the microservice contextPath.

Type: string.

CUMULOCITY_NOTIFICATIONS_SUBSCRIPTION_NAME

Name of the subscription to use or create. All correlators which use the same subscription name consume the same set of messages.

Subscriptions are created with this name in Notifications 2.0. They persist after the correlator exits, waiting for reconnection. If a subscription is no longer used, you must manually delete it using the Notifications 2.0 API.

Type: string.

CUMULOCITY_NOTIFICATIONS_SUBSCRIPTION_TYPE

An Exclusive subscription can only have one connection for each subscriber name, subsequent connections will fail. A Shared or KeyShared subscription can have multiple connections with the same subscriber name, in which case messages are delivered only to one of the set of connections. In Shared mode, any message can be delivered to any connection. In KeyShared mode, messages relating to a given device are all received by the same connection.Type: string.

Values: Exclusive, Shared or KeyShared.

You can also add the following optional properties to the `.properties` configuration file:

Property

Description

CUMULOCITY_NOTIFICATIONS_NUMBER_CLIENTS

Create a number of parallel connections for receiving notifications. Requires the subscription type Shared or KeyShared. Increase this number above 1 if your application is not consuming notifications fast enough. For best performance, the number of clients should be a power of 2.Type: integer.

Default: 1.

CUMULOCITY_NOTIFICATIONS_MAX_BUFFERSIZE

The limit of messages that can be buffered within the receiving chain before being delivered to subscribed contexts.Type: integer.

Default: 1000.

CUMULOCITY_NOTIFICATIONS_MAX_BATCHSIZE

The largest batch of messages that can be received from Notifications 2.0 at one time.Type: integer.

Default: 1000.

CUMULOCITY_NOTIFICATIONS_AUTO_START

Set to false if you do not want notifications to connect at startup. Connections must be explicitly requested from EPL later.Type: boolean.

Default: true.

CUMULOCITY_NOTIFICATIONS_RECEIVER_QUEUESIZE

The number of unacknowledged items available to a consumer. Should not be smaller than CUMULOCITY_NOTIFICATIONS_MAX_BATCHSIZE.Type: integer.

Default: 1000.

CUMULOCITY_NOTIFICATIONS_SERVICE_URL

The service URL for connecting to Notifications 2.0. In most situations, you do not need to set this. It is derived from the CUMULOCITY_SERVER_URL.

Configuration for Cumulocity Client

When you add the deprecated legacy Cumulocity Client bundle, you can add the following optional properties to the CumulocityIoT.properties configuration file:

Property

Description

CUMULOCITY_MEASUREMENT_FORMAT

The measurement format mode used by the tenant. Two modes are available: MEASUREMENT_ONLY and BOTH. For more information, see Turning measurement fragments on/off. Type: string.

Default: BOTH.

CUMULOCITY_FORCE_INITIAL_HOST

If set to false, the endpoint details returned by the Cumulocity platform are used. If set to true, the Cumulocity SDK always uses the URL provided during session initialization instead of the endpoint details. This is helpful in deployment scenarios where the Cumulocity instance is reachable only with an IP address. Type: boolean.

Default: true.

CUMULOCITY_REQUEST_ALL_DEVICES

Deprecated. Request all assets at startup. Type: boolean.

Default: false.

Note:

You should explicitly request for all available devices on startup using the com.apama.cumulocity.FindManagedObject API. For more information, see Sample EPL.

CUMULOCITY_SUBSCRIBE_OPERATIONS

Subscribe to all device operations. Type: boolean.

Default: true.

CUMULOCITY_SUBSCRIBE_ALL_MEASUREMENTS

Subscribe to measurements, events and alarms of all devices during startup. Type: boolean.

Default: true.

CUMULOCITY_SUBSCRIBE_DEVICES

Subscribe to all device-related updates. Type: boolean.

Default: true.

Using managed objects

During application initialization (onApplicationInitialized), if the requestAllDevices configuration option is enabled, the adapter sends all device/asset related information using the com.apama.cumulocity.ManagedObject event on the com.apama.cumulocity.ManagedObject.SUBSCRIBE_CHANNEL (same as cumulocity.devices) channel. After all devices/assets have been sent, the adapter sends a com.apama.cumulocity.RequestAllDevicesComplete(-1) event.

Info
Use of the above-mentioned requestAllDevices configuration option is deprecated. Instead you should use the com.apama.cumulocity.FindManagedObject API to cause the adapter to send the device events when the application is ready. This will also work for applications deployed in Cumulocity directly.

Example of a device event:

com.apama.cumulocity.ManagedObject("44578836","","Device_1",
  ["c8y_Restart","c8y_Meassage","c8y_Relay"],
  ["c8y_TemperatureMeasurement","c8y_LightMeasurement"],
  [],[],[],[],{},
  {"c8y_IsDevice":any(dictionary<any,any>,new dictionary<any,any>),
  "owner":any(string,"Cumulocity_User")})

To fetch a list of all existing managed objects, use the FindManagedObjects API. For more information, see Querying for managed objects.

Example

The following is a simple example of how to receive, update and send managed objects:

// Subscribe to receive all the devices from Cumulocity
monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);

// Consume all the devices from Cumulocity
on all ManagedObject() as mo {
  log mo.toString() at INFO;

  // Update a managed object

  mo.params.add("CustomMetadata", {"metadata": "Adding custom data"});
  send mo to ManagedObject.SEND_CHANNEL;

}

Updating a managed object

To enable use cases where information related to a managed object can be persisted, you can update any metadata information (such as the state) as properties of a managed object.

managedObject.params.add("CUSTOM_PROPERTY", PROPERTY_VALUE);
send managedObject to com.apama.cumulocity.ManagedObject.SEND_CHANNEL

Where

  • CUSTOM_PROPERTY is the property that is to be added.
  • PROPERTY_VALUE is the value for the newly added property.
Sending managed objects requesting response and setting headers

When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse action. This allows your application to receive a response on completion on the ManagedObject.SUBSCRIBE_CHANNEL channel. You will need to subscribe to the ManagedObject.SUBSCRIBE_CHANNEL channel first. The response can be one of two possibilities:

  • ObjectCommitted

    This includes the reqId which is the identifier of the original request, the Id which is the identifier of the newly created or updated object, and the actual object in JSON form.

  • ObjectCommitFailed

    This includes the reqId which is the identifier of the original request, the statusCode which is the HTTP status code of the failure, and the body which is the content of the response from the API (this might be in HTML format).

When using withChannelResponse, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.

Example of creating a managed object:

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor Test_ManagedObjects {

    action onload {
        monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
        ManagedObject mo := new ManagedObject;
        mo.params.add("c8y_IsDevice", new dictionary<any, any>);
        mo.name := "MyManagedObject";

        mo.type := "DeviceType";
        integer reqId := com.apama.cumulocity.Util.generateReqId();

        // Create a new ManagedObject in Cumulocity, ensuring that a
        // response is returned.
        send mo.withChannelResponse(reqId, new dictionary<string, string>) to
          ManagedObject.SEND_CHANNEL;

        // If the ManagedObject creation succeeded do something with the
        // returned object or id.
        on ObjectCommitted(reqId=reqId) as c and not
          ObjectCommitFailed(reqId=reqId){
            log "New managed object successfully created " + c.toString()
              at INFO;
        }

        // If the ManagedObject creation failed, log an error.
        on ObjectCommitFailed(reqId=reqId) as cfail and not
          ObjectCommitted(reqId=reqId){
            log "Creating a new event failed " + cfail.toString() at ERROR;
        }
    }
}
Info

The following ManagedObject reference fields cannot be set using ManagedObject events and are useful for read-only purposes in these events: childDeviceIds, childAssetIds, deviceParentIds, and assetParentIds. However, this can be done using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and Child operations in the Cumulocity OpenAPI documentation.

The position field can be used to clear individual position elements, but it cannot be used to delete the entire position fragment c8y_Position. Similarly, the params dictionary cannot be manipulated to delete fragments in the ManagedObject. However, both of these can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.

Querying for managed objects

To search for a managed object or a collection of managed objects, send the com.apama.cumulocity.FindManagedObject event to Cumulocity, with a unique reqId to the com.apama.cumulocity.FindManagedObject.SEND_CHANNEL channel.

The transport will then respond with zero or more com.apama.cumulocity.FindManagedObjectResponse events and then one com.apama.cumulocity.FindManagedObjectResponseAck event on the com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL channel.

Example:

integer reqId := com.apama.cumulocity.Util.generateReqId();

com.apama.cumulocity.FindManagedObject request :=
  new com.apama.cumulocity.FindManagedObject;
request.reqId := reqId;

// Optionally provide the 'id' of the managed object
//request.deviceId := "<DEVICE_ID>";

// Filter based on fragmentType
request.params.add("fragmentType", "c8y_IsDevice");

// Subscribe to com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL to
// listen for responses
monitor.subscribe(com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL);

// Listen for responses based on reqId
on all com.apama.cumulocity.FindManagedObjectResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindManagedObjectResponseAck(reqId=reqId)
{
    log "Received ManagedObject " + response.toString() at INFO;
}

// Listen for com.apama.cumulocity.FindManagedObjectResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindManagedObjectResponseAck(reqId=reqId)
  as requestCompleted
{
    log "Received all ManagedObject(s) for request "
      + requestCompleted.reqId.toString() at INFO;

    // Request is completed and we can unsubscribe from this channel
    monitor.unsubscribe(com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
}

// Send request to find available managed objects
send request to com.apama.cumulocity.FindManagedObject.SEND_CHANNEL;
Query parameters

Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all managed objects in the Cumulocity OpenAPI Specification.

Parameter Description
deviceId Search for a managed object based on deviceId. When deviceId is populated in a FindManagedObject request, all the query parameters listed below are ignored.
fragmentType Search for managed objects based on the fragment type.
type Search for managed objects based on the type.
owner Search for managed objects based on the owner.
childAssetId Search for managed objects based on the asset identifier of the child.
childDeviceId Search for managed objects based on the device identifier of the child.
ids Search for managed objects based a comma-separated list of device identifiers.
pageSize Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the managed objects, but each request is larger. By default, 1000 managed objects are in each page and there is an upper limit of 2000.
currentPage Retrieve a specific page of results for the given pageSize. If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested.
Query result paging

Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.

Caching managed objects
ManagedObjectCache

The com.apama.cumulocity.ManagedObjectCache event covers the common use case of fetching a ManagedObject with a known id, such as when you want to fetch the ManagedObject associated with an event source or a rule held within a ManagedObject. The cache eliminates the need to implement the FindManageObject code for this scenario and reduces traffic across the REST API.

The ManagedObjectCache event can be used by any monitor to cache the managed objects it is interested in.

You can limit the cache in certain ways to improve performance and reduce memory usage. By default, the entire ManagedObject is cached for each requested id, but in many situations this is probably unnecessary. Therefore, you can restrict the cache by one or more of the following:

  • Maximum number of cached managed objects. The cache size is limited to this number. When an uncached id is requested, the cached ManagedObject that was accessed longest ago is removed and the new ManagedObject is added.
  • Expiration of unaccessed managed objects. The cache removes all cached managed objects that have not been accessed within a specified period of time.
  • The params that are cached. The cache stores only a defined set of the params fragments for the ManagedObject.

The cache is lazy loaded, that is, a ManagedObject is only cached the first time you request it. Once cached, it is kept up to date using broadcast update notifications. Thus, unless it is removed from the cache, by the cache management restrictions above or directly by the remove() action, all subsequent requests are supplied from the cache itself.

In some situations, you may want more information than has been cached. The cache provides query functionality that bypasses its internal store and always fetches the ManagedObject from the inventory. This functionality always returns the full ManagedObject, regardless of any params restrictions you may have specified for the cache’s own store.

The cache stores only those properties that are received both when the inventory is directly queried and when update notifications are received. Therefore, deviceParentIds and assetParentIds are not cached because they are not provided within notification updates. You can fetch the parent hierarchy using the query functionality mentioned above.

Similarly, the .apama_notificationType and tenantId params are not cached by default because they are only present in notification updates. The absence of the first parameter means that isCreate() and isUpdate() always return false when called on a ManagedObject provided by the cache. The cache only caches updates that pass the isUpdate() condition, so the check on cache-provided managed objects is unnecessary.

Because the cache may need to fetch the ManagedObject directly from the inventory for any given request, the process of requesting a ManagedObject is asynchronous. The cache provides two methods for waiting for the response:

  • Listening for the ManagedObjectCacheResponse and ManagedObjectCacheError events routed by the cache.
  • Using response callbacks that you provide to the cache.
SharedManagedObjectCache

The ManagedObjectCache is useful when only one EPL app is interested in the ManagedObject set that is likely to be cached. However, if two or more are interested in the same set of managed objects, this would lead to duplicate caches and waste memory. To allow more than one EPL app to share the same cached managed objects, there is the SharedManagedObjectCache, which is a wrapper monitor around the ManagedObjectCache. When setting up this cache, make sure that it caches all parameters that are required by its clients.

The SharedManagedObjectCache is a monitor that runs in its own context and communicates through events that are sent. You must explicitly activate the cache using the StartSharedMOCache event, which specifies the maximum size, expiration period, params to be cached if desired, and tenant details if any.

You make requests to the cache using the FindCacheManagedObject event, similar to the FindManagedObject event used for the Cumulocity transport API. The request is asynchronous and the response is either a set of FindCacheManagedObjectResponse events or a FindCacheManagedObjectBatchResponse event, depending on whether you specified a batch response. This is followed by a FindCacheManagedObjectResponseAck event.

Using alarms

using-alarms

The com.apama.cumulocity.Alarm is sent on an alarm from a device. This event is sent to the com.apama.cumulocity.Alarm.SUBSCRIBE_CHANNEL (same as cumulocity.alarms) channel. This event contains the identifier of the source, a timestamp (same form as currentTime), message text, and optional parameters.

Example of an alarm:

com.apama.cumulocity.Alarm("44578840","c8y_UnavailabilityAlarm","44578839",
  1529496204.346,"No data received from device within required interval",
  Alarm.STATUS_ACTIVE,Alarm.SEVERITY_MAJOR,1,{"creationTime":any(float,1529496204.067)})
Example

The following is a simple example of how to receive, update, create and send alarms:

// Subscribe to receive all the alarms published from Cumulocity
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);

// Consume all the alarms from Cumulocity
on all Alarm() as alarm {
  log alarm.toString() at INFO;

  // Example for updating an alarm

  // Set alarm severity to MAJOR
  alarm.severity := Alarm.SEVERITY_MAJOR;
  send alarm to Alarm.SEND_CHANNEL;
}
// Create a new alarm
Alarm alarm := new Alarm;
alarm.source := "<MANAGED_OBJECT_ID>";
alarm.type := "TestAlarm";
alarm.severity := Alarm.SEVERITY_MINOR;
alarm.status := Alarm.STATUS_ACTIVE;
alarm.time := currentTime;
alarm.text := "This is a sample alarm";
send alarm to Alarm.SEND_CHANNEL;

Creating a new alarm

send Alarm("","c8y_SampleAlarm","<SOURCE>",<TIME>,
  "Alarm text", Alarm.STATUS_<STATUS>,Alarm.SEVERITY_<SEVERITY>,1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;

Where

  • <SOURCE> is the source of the alarm (same as the ManagedObject identifier).
  • <TIME> is the time at which the alarm was generated.
  • <STATUS> is the status of the alarm. This can be ACTIVE, ACKNOWLEDGED or CLEARED.
  • <SEVERITY> is the severity of the alarm. This can be CRITICAL, MAJOR, MINOR or WARNING.
Sending alarms requesting response and setting headers

When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse action. This allows your application to receive a response on completion on the Alarm.SUBSCRIBE_CHANNEL channel. You will need to subscribe to the Alarm.SUBSCRIBE_CHANNEL channel first. The response can be one of two possibilities:

  • ObjectCommitted

    This includes the reqId which is the identifier of the original request, the Id which is the identifier of the newly created or updated object, and the actual object in JSON form.

  • ObjectCommitFailed

    This includes the reqId which is the identifier of the original request, the statusCode which is the HTTP status code of the failure, and the body which is the content of the response from the API (this might be in HTML format).

When using withChannelResponse, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.

Example of creating an alarm:

using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor TestCreatingAlarm {
    string deviceId;  // Where this is populated from the actual device Id.
    string timestamp; // Where this is populated from the timestamp of
                      // the device.

    action onload {
        monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
        Alarm al := new Alarm;
        string name := "MyTestAlarm";
        al.status := Alarm.STATUS_ACTIVE;
        al.severity := Alarm.SEVERITY_CRITICAL;
        al.source := deviceId;
        al.type := "c8y_TestAlarm";
        al.text := "test alarm";
        al.time := currentTime;

        integer reqId := com.apama.cumulocity.Util.generateReqId();

        // Create a new Alarm in Cumulocity, ensuring that a response is
        // returned
        // and the processing mode, indicating how to process the request,
        // sent to Cumulocity is Transient.
        send al.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
          "Transient" }) to Alarm.SEND_CHANNEL;

        // If the Alarm creation succeeded do something with the returned
        // object or id.
        on ObjectCommitted(reqId=reqId) as c and not
          ObjectCommitFailed(reqId=reqId){
            log "New alarm successfully created " + c.toString() at INFO;
        }

        // If the Alarm creation failed, log an error.
        on ObjectCommitFailed(reqId=reqId) as cfail and not
          ObjectCommitted(reqId=reqId){
            log "Creating a new event failed " + cfail.toString() at ERROR;
        }
    }
}
Alarm de-duplication

If an active or acknowledged alarm (does not work for the CLEARED status) with the same source and type exists, no new alarm is created. Instead, the existing alarm is updated by incrementing the count property, and the time property is also updated. Any other changes are ignored, and the alarm history is not updated. The first occurrence of the alarm is recorded in firstOccurenceTime.

Updating an existing alarm

You can update the text, status and severity fields.

send Alarm("<ALARM_ID>","c8y_SampleAlarm","<SOURCE>", <TIME>,
  "Alarm Updated", Alarm.STATUS_<STATUS>,Alarm.SEVERITY_<SEVERITY>,1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;

Where

  • <ALARM_ID> is the identifier of the previously created alarm. The presence of <ALARM_ID> indicates that the request is for updating an existing alarm.
Info
The params dictionary cannot be manipulated to delete fragments in the Alarm. However, the fragments in the Alarm can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.

Querying for alarms

To search for an alarm or a collection of alarms, send the com.apama.cumulocity.FindAlarm event to Cumulocity, with a unique reqId to the com.apama.cumulocity.FindAlarm.SEND_CHANNEL channel.

The transport will then respond with zero or more com.apama.cumulocity.FindAlarmResponse events and then one com.apama.cumulocity.FindAlarmResponseAck event on the com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL channel.

Example:

integer reqId := com.apama.cumulocity.Util.generateReqId();

com.apama.cumulocity.FindAlarm request := new com.apama.cumulocity.FindAlarm;
request.reqId := reqId;

// Filter based on alarms type
request.params.add("type", "c8y_UnavailabilityAlarm");

// Subscribe to com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL to listen
// for responses
monitor.subscribe(com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL);

// Listen for responses based on reqId
on all com.apama.cumulocity.FindAlarmResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindAlarmResponseAck(reqId=reqId)
{
    log "Received Alarm " + response.toString() at INFO;
}

// Listen for com.apama.cumulocity.FindAlarmResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindAlarmResponseAck(reqId=reqId) as requestCompleted
{
    log "Received all Alarm(s) for request " +
      requestCompleted.reqId.toString() at INFO;

    // Request is completed and we can unsubscribe from this channel
    monitor.unsubscribe(com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL);
}

// Send request to find available alarms
send request to com.apama.cumulocity.FindAlarm.SEND_CHANNEL;
Query parameters

Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all alarms in the Cumulocity OpenAPI Specification.

Parameter Description
id Search for an alarm based on alarmId. When searching for an alarm based on Id, all the query parameters listed below are ignored.
source Search for alarms based on the device identifier or asset identifier of the source.
status Search for alarms based on the status. The status can be any of ACTIVE, ACKNOWLEDGED or CLEARED.
severity Search for alarms based on the severity. The severity can be any of CRITICAL, MAJOR, MINOR or WARNING.
type Search for alarms based on the type.
dateFrom Search for alarms from a start date. The date and time is provided as seconds since the epoch.
dateTo Search for alarms within a time range. This is to be used in combination with dateFrom. The date and time is provided as seconds since the epoch.
resolved A boolean parameter used for filtering, based on the resolved state.
pageSize Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the alarms, but each request is larger. By default, 1000 alarms are in each page and there is an upper limit of 2000.
currentPage Retrieve a specific page of results for the given pageSize. If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested.
Query result paging

Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.

Using events

The com.apama.cumulocity.Event is sent on an event from a device. This event is sent to the com.apama.cumulocity.Event.SUBSCRIBE_CHANNEL (same as cumulocity.events) channel. This event contains the identifier of the source, a timestamp (same form as currentTime), message text, and optional parameters.

Example of an event:

com.apama.cumulocity.Event("48073557","c8y_EntranceEvent",
  "12346082",1519838833.6,
  "Entrance event triggered.",
  {"creationTime":any(float,1519838834.706)})
Example

The following is a simple example of how to receive, update, create and send events:

// Subscribe to receive all the events published from Cumulocity
 monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

 // Consume all the events from Cumulocity
 on all Event() as e {
   log e.toString() at INFO;

   // Example for updating an event

   // Update text
   e.text := "This is an updated text";
   send e to Event.SEND_CHANNEL;
 }

 // Create a new event
 Event evt := new Event;
 evt.source := "<MANAGED_OBJECT_ID>";
 evt.type := "TestEvent";
 evt.time := currentTime;
 evt.text := "This is a sample event";
 send evt to Event.SEND_CHANNEL;

Creating a new event

send Event("","c8y_SampleEvent","SOURCE", TIME,
   "Event text",new dictionary<string,any>) to Event.SEND_CHANNEL;

Where

  • SOURCE is the source of the event (same as the ManagedObject identifier).
  • TIME is the time at which the event was generated.
Sending events requesting response and setting headers

When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse action. This allows your application to receive a response on completion on the Event.SUBSCRIBE_CHANNEL channel. You will need to subscribe to the Event.SUBSCRIBE_CHANNEL channel first. The response can be one of two possibilities:

  • ObjectCommitted

    This includes the reqId which is the identifier of the original request, the Id which is the identifier of the newly created or updated object, and the actual object in JSON form.

  • ObjectCommitFailed

    This includes the reqId which is the identifier of the original request, the statusCode which is the HTTP status code of the failure, and the body which is the content of the response from the API (this might be in HTML format).

When using withChannelResponse, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.

Example of creating an event:

using com.apama.cumulocity.Event;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor Test_CumulocityEvents {
    string timestamp; // Where this is populated from the timestamp of the
                      // device.

    action onload {
        monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
        Event ev := new Event;
        string name := "MyEvent";
        ev.type := "DoorSensor";
        ev.source := "7104838";
        ev.text := "Door sensor was triggered";
        ev.time := currentTime;

        integer reqId := com.apama.cumulocity.Util.generateReqId();
        // Create a new Event in Cumulocity, ensuring that a response is
        // returned
        // and the processing mode, indicating how to process the request, sent
        // to Cumulocity is Transient.
        send ev.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
          "Transient" }) to Event.SEND_CHANNEL;

        // If the Event creation succeeded do something with the returned
        // object or id.
        on ObjectCommitted(reqId=reqId) as c and not
          ObjectCommitFailed(reqId=reqId){
            log "New event successfully created " + c.toString() at INFO;
        }

        // If the Event creation failed, log an error.
        on ObjectCommitFailed(reqId=reqId) as cfail and not
          ObjectCommitted(reqId=reqId){
            log "Creating a new event failed " + cfail.toString() at ERROR;
        }
    }
}

Updating an existing event

You can update the text field.

send Event("EVENT_ID","c8y_SampleEvent","SOURCE",TIME,
  "Event Updated",new dictionary<string,any>) to Event.SEND_CHANNEL;

Where

  • EVENT_ID is the identifier of the previously created event. The presence of EVENT_ID indicates that the request is for updating an existing event.
Info
The params dictionary cannot be manipulated to delete fragments in the Event. However, the fragments in the Event can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.

Querying for events

To search for an event or a collection of events, send the com.apama.cumulocity.FindEvent event to Cumulocity, with a unique reqId to the com.apama.cumulocity.FindEvent.SEND_CHANNEL channel.

The transport will then respond with zero or more com.apama.cumulocity.FindEventResponse events and then one com.apama.cumulocity.FindEventResponseAck event on the com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL channel.

Info
Cumulocity returns the oldest events first. However, in case of a range query (that is, when the query includes at least one of the dateFrom or dateTo parameters), the latest events are returned first. This is different from measurements and operations where the oldest items are always returned first, regardless of whether the query is a range query or not. If you want to reverse the order, see the description of the revert parameter below.

Example:

integer reqId := com.apama.cumulocity.Util.generateReqId();

com.apama.cumulocity.FindEvent request := new com.apama.cumulocity.FindEvent;
request.reqId := reqId;

// Filter based on event type
request.params.add("type", "c8y_DoorOpenedEvent");

// Subscribe to com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL to listen
// for responses
monitor.subscribe(com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL);

// Listen for responses based on reqId
on all com.apama.cumulocity.FindEventResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindEventResponseAck(reqId=reqId)
{
    log "Received Event " + response.toString() at INFO;
}

// Listen for com.apama.cumulocity.FindEventResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindEventResponseAck(reqId=reqId) as requestCompleted
{
    log "Received all Event(s) for request " +
      requestCompleted.reqId.toString() at INFO;

    // Request is completed and we can unsubscribe from this channel
    monitor.unsubscribe(com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL);
}

// Send request to find available events
send request to com.apama.cumulocity.FindEvent.SEND_CHANNEL;
Query parameters

Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all events in the Cumulocity OpenAPI Specification.

Parameter Description
id Search for an event based on eventId. When searching for an event based on Id, all the query parameters listed below are ignored.
source Search for events based on the device identifier or asset identifier of the source.
type Search for events based on the type.
dateFrom Search for events from a start date. The date and time is provided as seconds since the epoch.
dateTo Search for events within a time range. This is to be used in combination with dateFrom. The date and time is provided as seconds since the epoch.
fromCreationDate Similar to dateFrom, but fetches the events based on the creation date. The date and time is provided as seconds since the epoch.
toCreationDate Search for events that have been created within a date range. This is to be used in combination with fromCreationDate. The date and time is provided as seconds since the epoch.
pageSize Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the events, but each request is larger. By default, 1000 events are in each page and there is an upper limit of 2000.
currentPage Retrieve a specific page of results for the given pageSize. If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested.
revert Boolean parameter. If a range query is used (that is, the query includes at least one of the dateFrom or dateTo parameters), Cumulocity, by default, returns the latest events first. You can reverse the order in which the matching events are returned by adding the query parameter revert=true. This returns the oldest events first. (Cumulocity returns the oldest events first by default when neither the dateFrom nor the dateTo parameter is supplied.)
Query result paging

Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.

Using measurements

Measurement events contain the identifier of the source of the measurement, the type of measurement, timestamp, and a dictionary of values which contain the numeric value, units and optional type, quantity and state.

Examples of measurement events:

Measurement("1001","c8y_LightMeasurement","12346081",1464359004.89,
  {"c8y_LightMeasurement": {"e":com.apama.cumulocity.MeasurementValue(108.1,
   "lux", new dictionary<string, any>)}},new dictionary<string, any>)

Measurement("1002","c8y_DistanceMeasurement","12346082",1464359005.396,
  {"c8y_DistanceMeasurement": {"distance":com.apama.cumulocity.MeasurementValue
   (344,"mm","","","", dictionary<string, any>)}}, dictionary<string, any>)
Example

The following is a simple example of how to receive, create and send measurements:

// Subscribe to receive all the measurements published from Cumulocity
 monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

 // Consume all the measurements from Cumulocity
 on all Measurement() as m {
   log  m.toString() at INFO;
 }

 // Create a new measurement

 Measurement m := new Measurement;
 m.source := "<MANAGED_OBJECT_ID>";
 m.time := currentTime;
 m.type := "TemperatureMeasurement";
 MeasurementValue mv := new MeasurementValue;
 mv.value := 100.0;
 dictionary<string, MeasurementValue> fragment
   := new dictionary<string, MeasurementValue>;
 fragment.add("temperature", mv);
 m.measurements.add("TemperatureMeasurement", fragment);
 send m to Measurement.SEND_CHANNEL;

Creating a new measurement

Measurement m := new Measurement;
m.type := *&lt;MEASUREMENT\_TYPE&gt;*;
m.source := *&lt;SOURCE&gt;*;
m.time := currentTime;
MeasurementValue mv := new MeasurementValue;
mv.value := 1.0;
mv.unit := "V";
dictionary<string, MeasurementValue>  dict := {"voltage": mv};
m.measurements.add(m.type, dict);
send m to Measurement.SEND_CHANNEL;

Where

  • SOURCE is the source of the measurement (same as the ManagedObject identifier).
  • MEASUREMENT_TYPE is the type of the measurement. For example, c8y_VoltageMeasurement.
Sending measurements requesting response and setting headers

When creating a new object, it is recommended that you use the withChannelResponse action. This allows your application to receive a response on completion on the Measurement.SUBSCRIBE_CHANNEL channel. You will need to subscribe to the Measurement.SUBSCRIBE_CHANNEL channel first. The response can be one of two possibilities:

  • ObjectCommitted

    This includes the reqId which is the identifier of the original request, the Id which is the identifier of the newly created or updated object, and the actual object in JSON form.

  • ObjectCommitFailed

    This includes the reqId which is the identifier of the original request, the statusCode which is the HTTP status code of the failure, and the body which is the content of the response from the API (this might be in HTML format).

When using withChannelResponse, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.

It is worth noting that when using withChannelResponse for measurements, it is not able to achieve the same throughput as sending them without a response. As they are not batched into a single HTTP request, there are just individual create/update requests sent to Cumulocity.

Example of creating a measurement:

using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;

using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor Test {
    string deviceId;  // Where this is populated from the actual device Id.
    float timestamp;  // Where this is populated from the timestamp of the
                      // device.

    action onload {

        monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
        Measurement mo := new Measurement;
        mo.type := "test_measurement";
        mo.source := deviceId;
        mo.time := timestamp;

        //Create a Measurement with two Measurement fragments.
        MeasurementValue mv1 := new MeasurementValue;
        mv1.value := 10.2;
        mv1.unit := "km/hr";

        MeasurementValue mv2 := new MeasurementValue;
        mv2.value := 11.7;
        mv2.unit := "km/hr";

        dictionary<string, MeasurementValue> dict :=
          {"speedX": mv1, "speedY": mv2};
        mo.measurements.add("c8y_speed", dict);

        integer reqId := com.apama.cumulocity.Util.generateReqId();
        // Create a new Measurement in Cumulocity, ensuring that a response is
        // returned
        // and the processing mode, indicating how to process the request,
        // sent to Cumulocity is Transient.
        send mo.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
          "Transient" }) to Measurement.SEND_CHANNEL;

        // If the Measurement creation succeeded do something with the returned
        // object or id.
        on ObjectCommitted(reqId=reqId) as c and not
          ObjectCommitFailed(reqId=reqId){
            log "New measurement successfully created " + c.toString() at INFO;
        }

        // If the Measurement creation failed, log an error.
        on ObjectCommitFailed(reqId=reqId) as cfail and not
        ObjectCommitted(reqId=reqId){
            log "Creating a new event failed " + cfail.toString() at ERROR;
        }
    }
}

Querying for measurements

To search for a measurement or a collection of measurements, send the com.apama.cumulocity.FindMeasurement event to Cumulocity, with a unique reqId to the com.apama.cumulocity.FindMeasurement.SEND_CHANNEL channel.

The transport will then respond with zero or more com.apama.cumulocity.FindMeasurementResponse events and then one com.apama.cumulocity.FindMeasurementResponseAck event on the com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL channel.

Example:

integer reqId := com.apama.cumulocity.Util.generateReqId();

com.apama.cumulocity.FindMeasurement request :=
  new com.apama.cumulocity.FindMeasurement;
request.reqId := reqId;

// Filter based on measurement fragment type and series
request.params.add("valueFragmentType", "c8y_MotionMeasurement");
request.params.add("valueFragmentSeries", "motionDetected");

// Subscribe to com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL
// to listen for responses
monitor.subscribe(com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL);

// Listen for responses based on reqId
on all com.apama.cumulocity.FindMeasurementResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindMeasurementResponseAck(reqId=reqId)
{
    log "Received Measurement " + response.toString() at INFO;
}

// Listen for com.apama.cumulocity.FindMeasurementResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindMeasurementResponseAck(reqId=reqId)
  as requestCompleted
{
    log "Received all Measurement(s) for request "
      + requestCompleted.reqId.toString() at INFO;

    // Request is completed and we can unsubscribe from this channel
    monitor.unsubscribe(com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL);
}

// Send request to find available measurements
send request to com.apama.cumulocity.FindMeasurement.SEND_CHANNEL;
Query parameters

Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all measurements in the Cumulocity OpenAPI Specification.

Parameter Description
id Search for a measurement based on measurementId. When searching for a measurement based on Id, all the query parameters listed below are ignored.
source Search for measurements based on the device identifier or asset identifier of the source.
type Search for measurements based on the type.
valueFragmentType Search for measurements based on fragment type (should be used with valueFragmentSeries).
valueFragmentSeries Search for measurements based on fragment series (should be used with valueFragmentType).
dateFrom Search for measurements from a start date. The date and time is provided as seconds since the epoch.
dateTo Search for measurements within a time range. This is to be used in combination with dateFrom. The date and time is provided as seconds since the epoch.
pageSize Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the measurements, but each request is larger. By default, 1000 measurements are in each page and there is an upper limit of 2000.
currentPage Retrieve a specific page of results for the given pageSize. If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested.
revert Boolean parameter. If a range query is used (that is, the query includes at least one of the dateFrom or dateTo parameters), you can reverse the order in which the matching measurements are returned by adding the query parameter revert=true. This returns the latest measurements first. By default, Cumulocity returns the oldest measurements first.
Query result paging

Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.

Using measurement fragments

A MeasurementFragment event represents a single fragment/series on a measurement.

Creating measurement fragments

You can send a single fragment to Cumulocity to create a single-fragment measurement.

Example of creating a measurement fragment:

using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor Test {
    string deviceId;  // Where this is populated from the actual device Id.
    float timestamp;  // Where this is populated from the timestamp of the
                      // device.

    action onload {
        MeasurementFragment mf := new MeasurementFragment;
        mf.type := "test_measurement";
        mf.source := deviceId;
        mf.time := timestamp;

        mf.valueFragment := "c8y_speed";
        mf.valueSeries := "speedX";
        mf.value := 12.0;
        mf.unit := "km/hr";

        integer reqId := com.apama.cumulocity.Util.generateReqId();

        send mf to MeasurementFragment.SEND_CHANNEL;
    }
}

Where

  • source is the source of the measurement.
  • time is the time at which the measurement was taken.
  • type is the type of the measurement.
  • valueFragment is the name of the fragment of the measurement fragment.
  • valueSeries is the name of the series of the measurement fragment.
  • value is the value from the sensor.
  • unit is the units the reading is in, for example, mm, lux, km/hr.
Sending measurement fragments requesting a response and setting headers

When creating a new object, it is recommended that you use the withChannelResponse action. This allows your application to receive a response on completion on the MeasurementFragment.SUBSCRIBE_CHANNEL channel. You will need to subscribe to the MeasurementFragment.SUBSCRIBE_CHANNEL channel first. The response can be one of two possibilities:

  • ObjectCommitted

    This includes the reqId which is the identifier of the original request, the Id which is the identifier of the newly created or updated object, and the actual object in JSON form.

  • ObjectCommitFailed

    This includes the reqId which is the identifier of the original request, the statusCode which is the HTTP status code of the failure, and the body which is the content of the response from the API (this might be in HTML format).

When using withChannelResponse, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.

Example of creating a measurement fragment:

using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor Test {
    string deviceId;  // Where this is populated from the actual device Id.
    string timestamp; // Where this is populated from the timestamp of the
                      // device.

    action onload {
        monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
        MeasurementFragment mf := new MeasurementFragment;
        mf.type := "test_measurement";
        mf.source := deviceId;
        mf.time := timestamp;

        mf.valueFragment := "c8y_speed";
        mf.valueSeries := "speedX";
        mf.value := 12.0;
        mf.unit := "km/hr";

        integer reqId := com.apama.cumulocity.Util.generateReqId();

        // Create a new Measurement in Cumulocity from a single
        // MeasurementFragment, ensuring that a response is returned
        // and the processing mode, indicating how to process the request, sent
        // to Cumulocity is Transient.
        send mf.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
          "Transient" }) to MeasurementFragment.SEND_CHANNEL;

        // If the Measurement creation succeeded do something with the returned
        // object or id.
        on ObjectCommitted(reqId=reqId) as c and not
          ObjectCommitFailed(reqId=reqId){
            log "New measurement successfully created " + c.toString() at INFO;
        }

        // If the Measurement creation failed, log an error.
        on ObjectCommitFailed(reqId=reqId) as cfail and not
          ObjectCommitted(reqId=reqId){
            log "Creating a new measurement failed " + cfail.toString()
              at ERROR;
        }
    }
}

Listening for measurement fragments

The Apama mapping codec can turn measurements into measurement fragments, and listeners in EPL can match on the elements of measurement fragments rather than measurements.

Example - matching on measurement fragments:

on all MeasurementFragment(valueFragment = 'c8y_speed', valueSeries = 'speedX',
   value > SPEED_LIMIT) as mf {
}

Turning measurement fragments on/off

To be able to match based on measurement fragments, you must ensure they are returned by setting the correct measurement format. There are two modes available, MEASUREMENT_ONLY and BOTH. The default, if it is not set or set incorrectly, is MEASUREMENT_ONLY. Set the mode to BOTH if you require filtering based on fragments or series.

If you are deploying a custom microservice, connecting to Cumulocity from an external correlator, or using Apama Plugin for Eclipse, you can set the mode in the CumulocityIoT.properties file (see also Configuring the Cumulocity IoT transport) or directly on the command line to start the correlator by setting the CUMULOCITY_MEASUREMENT_FORMAT value.

The recommended approach is to set the mode from the .properties file. For example, to turn measurement fragments on:

CUMULOCITY_MEASUREMENT_FORMAT=BOTH

Alternatively, you can set the mode using the command line. For example, to turn measurement fragments off:

-DCUMULOCITY_MEASUREMENT_FORMAT=MEASUREMENT_ONLY
Info
As of Apama 10.5, new Apama projects in Apama Plugin for Eclipse have the default set to BOTH, but existing projects will retain their previous configuration. If you want to enable fragments in an existing project, you may need to remove and re-add the bundle.

When you deploy (activate) an EPL app directly in Cumulocity using the Streaming Analytics application, both measurements and measurement fragments are always available (this is always BOTH). See the Streaming Analytics guide at https://cumulocity.com/docs/streaming-analytics/overview-analytics/ for more information.

Handling measurement fragments

It is possible to separate the individual fragments from the contents of a Measurement into MeasurementFragment objects, which can allow better performance matching in searches. You achieve this by using the sequence<MeasurementFragment> getFragments() action on the Measurement event. This returns a sequence of MeasurementFragment objects.

You can generate a Measurement event based on MeasurementFragment objects. You can achieve this by using the static Measurement createFromFragments(sequence<MeasurementFragment> fragments) action on the Measurement event, where fragments is the sequence of MeasurementFragment objects to create it from, and it returns the created Measurement.

Using operations

The com.apama.cumulocity.Operation event represents a device operation. This event is sent to the com.apama.cumulocity.Operation.SUBSCRIBE_CHANNEL (same as cumulocity.operations) channel. This event contains the unique identifier of the operation (id), the identifier of the source (deviceId), a status, and optional parameters.

Example of an operation:

Operation("12345", "deviceId", Operation.STATUS_EXECUTING, params)

where params is a dictionary of string keys and any values (dictionary<string, any>).

Make sure to set the deviceId field to the identifier of a managed object which has the com_cumulocity_model_Agent fragment. The com_cumulocity_model_Agent marks devices running a Cumulocity agent. Such devices receive all operations targeted to themselves and their children for routing (see also Device integration using REST in the Cumulocity documentation.

When creating a new operation, do not supply the id field (that is, supply an empty string for the operation identifier).

It is not possible to set the params field of an operation to an empty dictionary.

Example

The following is a simple example of how to receive, create and send operations:

// Subscribe to receive all the operations published from Cumulocity

 monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);

 on all Operation() as o {
    log o.toString() at INFO;

   // Update an operation
   o.status := Operation.STATUS_EXECUTING;
   send o to Operation.SEND_CHANNEL;
 }

 // Create an operation
 Operation operation := new Operation;
 operation.source := "<MANAGED_OBJECT_ID>";
 operation.status := Operation.STATUS_PENDING;
 operation.params.add("c8y_Message", {"text": "Device Operation"});
 send operation to Operation.SEND_CHANNEL;

Creating a new operation

send com.apama.cumulocity.Operation("","<SOURCE>",Operation.STATUS_<STATUS>,
{"c8y_Message":<any> {<any>"text":<any>"Hello Cumulocity device"}} )
to com.apama.cumulocity.Operation.SEND_CHANNEL;

Where

  • SOURCE is the source of the operation (same as the ManagedObject identifier).
  • STATUS is the status of the operation. This can be PENDING.
Sending operations requesting response and setting headers

When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse action. This allows your application to receive a response on completion on the Operation.SUBSCRIBE_CHANNEL channel. You will need to subscribe to the Operation.SUBSCRIBE_CHANNEL channel first. The response can be one of two possibilities:

  • ObjectCommitted

    This includes the reqId which is the identifier of the original request, the Id which is the identifier of the newly created or updated object, and the actual object in JSON form.

  • ObjectCommitFailed

    This includes the reqId which is the identifier of the original request, the statusCode which is the HTTP status code of the failure, and the body which is the content of the response from the API (this might be in HTML format).

When using withChannelResponse, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.

Example of creating an operation:

using com.apama.cumulocity.Operation;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;

monitor Test_Operations {

    action onload {
        monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
        Operation op := new Operation;
        string name := "CreateOperation";
        op.source := "7104835";
        op.status := Operation.STATUS_PENDING;
        op.params :=
            {"c8y_Meassage":any(dictionary<any,any>,
                {any(string,"text"):
                    any(string,"Hello Cumulocity device")
                })
            };

        integer reqId := com.apama.cumulocity.Util.generateReqId();
        // Create a new Operation in Cumulocity, ensuring that a response is
        // returned.
        send op.withChannelResponse(reqId, new dictionary<string, string>) to
          Operation.SEND_CHANNEL;

        // If the Operation creation succeeded do something with the returned
        // object or id.
        on ObjectCommitted(reqId=reqId) as c and not
          ObjectCommitFailed(reqId=reqId){
            log "New operation successfully created " + c.toString() at INFO;
        }

        // If the Operation creation failed, log an error.
        on ObjectCommitFailed(reqId=reqId) as cfail and not
          ObjectCommitted(reqId=reqId){
            log "Creating a new event failed " + cfail.toString() at ERROR;
        }
    }
}

Updating an existing operation

You can update the status field.

send com.apama.cumulocity.Operation("<OPERATION_ID>","<SOURCE>",Operation.STATUS_<STATUS>,
{"c8y_Message":<any> {<any>"text":<any>"Updated Cumulocity device"}} )
to com.apama.cumulocity.Operation.SEND_CHANNEL;

Where

  • OPERATION_ID is the identifier of the previously created operation. The presence of OPERATION_ID indicates that the request is for updating an existing operation.
  • SOURCE is the source of the operation (same as the ManagedObject identifier).
  • STATUS is the status of the operation. This can be PENDING, EXECUTING, SUCCESSFUL or FAILED.
Info
The params dictionary cannot be manipulated to delete fragments in the Operation. However, the fragments in the Operation can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.

Querying for operations

To search for an operation or a collection of operations, send the com.apama.cumulocity.FindOperation event to Cumulocity, with a unique reqId to the com.apama.cumulocity.FindOperation.SEND_CHANNEL channel.

The transport will then respond with zero or more com.apama.cumulocity.FindOperationResponse events and then one com.apama.cumulocity.FindOperationResponseAck event on the com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL channel.

Example:

integer reqId := com.apama.cumulocity.Util.generateReqId();

com.apama.cumulocity.FindOperation request :=
  new com.apama.cumulocity.FindOperation;
request.reqId := reqId;

// Filter based on operation status
request.params.add("status", Operation.STATUS_PENDING);

// Subscribe to com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL
// to listen for responses
monitor.subscribe(com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL);

// Listen for responses based on reqId
on all com.apama.cumulocity.FindOperationResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindOperationResponseAck(reqId=reqId)
{
    log "Received Operation " + response.toString() at INFO;
}

// Listen for com.apama.cumulocity.FindOperationResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindOperationResponseAck(reqId=reqId)
  as requestCompleted
{
    log "Received all Operation(s) for request "
      + requestCompleted.reqId.toString() at INFO;

    // Request is completed and we can unsubscribe from this channel
    monitor.unsubscribe(com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL);
}

// Send request to find available operations
send request to com.apama.cumulocity.FindOperation.SEND_CHANNEL;
Query parameters

Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve a list of operations in the Cumulocity OpenAPI Specification.

Parameter Description
id Search for an operation based on operationId. When searching for an operation based on Id, all the query parameters listed below are ignored.
source Search for operations based on the device identifier or asset identifier of the source.
status Search for operations based on the status. The status can be any of SUCCESSFUL, FAILED, EXECUTING or PENDING.
agent Search for operations based on the agent identifier.
fragmentType Search for operations based on the fragment type.
pageSize Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the operations, but each request is larger. By default, 1000 operations are in each page and there is an upper limit of 2000.
currentPage Retrieve a specific page of results for the given pageSize. If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested.
dateFrom Search for operations from a start date. The date and time is provided as seconds since the epoch.
dateTo Search for operations within a time range. This is to be used in combination with dateFrom. The date and time is provided as seconds since the epoch.
revert Boolean parameter. If a range query is used (that is, the query includes at least one of the dateFrom or dateTo parameters), you can reverse the order in which the matching operations are returned by adding the query parameter revert=true. This returns the latest operations first. By default, Cumulocity returns the oldest operations first.
Query result paging

Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.

Receiving update notifications

The Cumulocity transport can receive update notifications on new measurements, events, alarms, managed objects and operations that are processed by the Cumulocity platform. To receive these notifications, you must add either the Cumulocity Notifications 2.0 bundle (recommended) or the Cumulocity Client bundle (deprecated). For details on how to add the bundles, see Loading the Cumulocity transport.

Info
The Cumulocity Client bundle makes use of the long-polling real-time notifications feature of Cumulocity. This is not recommended for high-throughput use cases, and it will not receive messages if they are sent in the QUIESCENT or CEP processing modes. It only receives messages if they are sent in the PERSISTENT or TRANSIENT processing modes. We recommend using the Notifications 2.0 bundle instead, which does not have these limitations.

When a notification about a managed object, operation, alarm or event is sent, the params dictionary member will contain a property which signals whether the notification is a new object or an update to an existing object. The property name is in the constant PARAM_NOTIFICATION and has the value corresponding to the value of the constants NOTIFICATION_CREATED or NOTIFICATION_UPDATED. The recommended way to distinguish between create and update events is to use the isCreate() and isUpdate() actions which are available on these events, as shown in the example below.

Measurements are not modifiable in Cumulocity, so all measurement notifications are newly-created objects.

Example:

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.ManagedObjectDeleted;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementDeleted;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.EventDeleted;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Operation;

monitor NotificationListener {
  action onload {
    // Subscribe for notification for managed objects
    monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
    // Subscribe for notification for measurements
    monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
    // Subscribe for notification for events
    monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
    // Subscribe for notification for alarms
    monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
    // Subscribe for notification for operations
    monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);

    // Listen for notifications for managed objects
    on all ManagedObject() as managedObject {
      if managedObject.isCreate() {
        log "ManagedObject created" at INFO;
      }
      else if managedObject.isUpdate() {
        log "ManagedObject updated" at INFO;
      }
    }

    // Listen for notifications on deleted managed objects
    on all ManagedObjectDeleted() as managedObjectDeleted {
      log "ManagedObject deleted" at INFO;
    }

    // Listen for notifications for measurements
    on all Measurement() as measurement {
      // Measurements can only be created
      log "Measurement created" at INFO;
    }

    // Listen for notifications on deleted measurements
    on all MeasurementDeleted() as measurementDeleted {
      log "Measurement deleted" at INFO;
    }

    // Listen for notifications for events
    on all Event() as evt {
      if evt.isCreate() {
        log "Event created" at INFO;
      }
      else if evt.isUpdate() {
        log "Event updated" at INFO;
      }
    }

    // Listen for notifications on deleted events
    on all EventDeleted() as eventDeleted {
      log "Event deleted" at INFO;
    }

    // Listen for notifications for alarms
    on all Alarm() as alarm {
      if alarm.isCreate() {
        log "Alarm created" at INFO;
      }
      else if alarm.isUpdate() {
        log "Alarm updated" at INFO;
      }
    }

    // Listen for notifications for operations
    on all Operation() as operation {
      if operation.isCreate() {
        log "Operation created" at INFO;
      }
      else if operation.isUpdate() {
        log "Operation updated" at INFO;
      }
    }
  }
}

Paging Cumulocity queries

Queries support paging when requesting multiple responses from Cumulocity. The number of objects requested by queries are controlled using the following query parameters:

  • pageSize represents a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the objects, but each request is larger. By default, pageSize is set to 1000. There is an upper limit of 2000.

  • currentPage can be set to retrieve a specific page of results for a given pageSize. If you set currentPage, then only a single page is requested. If currentPage is not set (default), all the pages are requested.

    Info
    It is not recommended to set a small `pageSize` unless you are requesting a single page. To do this, you must set `currentPage`. Set `currentPage` to 1 to retrieve the first `pageSize` results. A warning is logged if `pageSize` is below 50 and `currentPage` is not set.
    
  • withTotalPages is optional because it defaults to true, which means that a query includes full page statistics. If you set withTotalPages to false, only a single page is requested.

    Info
    If you do not want to receive all pages, you can set `withTotalPages` to `false`, which can improve performance by ensuring that only a single page is requested. With this setting, it is not required to also set `currentPage` on a request.
    

For more details on query result paging and the above query parameters, see the information on the REST implementation in the Cumulocity - API Specifications at https://cumulocity.com/api/.

Examples

The following example shows a FindEvent query where the first 50 events are requested:

// Example 1: A FindEvent query where the first 50 responses are requested.
com.apama.cumulocity.FindEvent request := new com.apama.cumulocity.FindEvent;
//... adding other query params...
request.params.add("pageSize", "50");
request.params.add("currentPage", "1");

send request to com.apama.cumulocity.FindEvent.CHANNEL;

The next two examples demonstrate how to request a range of events where we are not just interested in the first page of results.

In the second example, pageSize is also set to 50, but currentPage is set to 3, thus requesting the 101st to 150th events:

// Example 2: A FindEvent query where the 101st-150th responses are requested
com.apama.cumulocity.FindEvent request := new com.apama.cumulocity.FindEvent;
//... adding other query params...
request.params.add("pageSize", "50");
request.params.add("currentPage", "3");

send request to com.apama.cumulocity.FindEvent.CHANNEL;

As a general rule, if currentPage is greater than 1 and the number of objects you require is not a factor of the start of the range (or if the number of responses required is greater than the upper limit of pageSize), multiple requests are needed to retrieve the objects of interest. As the second example above retrieves 50 events starting after the 100th response (and as 50 is a factor of 100), only 1 request is required.

The third example illustrates a situation where multiple queries are required. 40 events are to be retrieved, starting after the 60th response. As 40 is not a factor of 60, you should set pageSize to 20 (the largest common factor of 60 and 40) and send two requests: one where currentPage is set to 4 (this retrieves the 61st-80th events), and another where currentPage is set to 5 (this retrieves the 81st-100th events).

// Example 3: Two FindEvent queries retrieving the 61st-100th events

// First request retrieves 61st-80th events
com.apama.cumulocity.FindEvent request1 := new com.apama.cumulocity.FindEvent;
//... adding other query params....
request1.params.add("pageSize", "20");
request1.params.add("currentPage", "4");

send request1 to com.apama.cumulocity.FindEvent.CHANNEL;

// Second request retrieves 81st-100th events
com.apama.cumulocity.FindEvent request2 := new com.apama.cumulocity.FindEvent;
//... adding other query params....
request2.params.add("pageSize", "20");
request2.params.add("currentPage", "5");

send request2 to com.apama.cumulocity.FindEvent.CHANNEL;

Invoking other parts of the Cumulocity REST API

For individual event types, Apama provides event-specific interfaces for performing actions. See Using managed objects, Using alarms, Using events, Using measurements, or Using operations for more information. This topic covers how to interact with the remaining Cumulocity REST API.

Typically, interacting with the REST API or making HTTP requests through the Cumulocity platform requires manual setup, including configuring tenant information and authentication methods. The generic request/response interface described below does this for you. This is also the case for users creating EPL apps with the Streaming Analytics application in Cumulocity.

Info
While the GenericRequest/GenericResponse interface can be used for any request to the platform, it makes several assumptions about the request and response format that the Cumulocity REST API guarantees. The CumulocityRequestInterface event described in Invoking microservices is more appropriate for low-level requests, such as making requests to other microservices.

Create a new com.apama.cumulocity.GenericRequest using new GenericRequest and then individually set whichever fields are needed by name. You must always set the reqId (which is used to tie requests and responses together) to a unique identifier generated by the com.apama.cumulocity.Util.generateReqId() action. You will also need to set the HTTP method (also known as the verb) and path. For some APIs, you will also need the queryParams (which populates the query string), body (typically a sequence or dictionary that will be converted to JSON by the plug-in) and/or additional HTTP headers. The GenericRequest event should be sent to the channel specified by the GenericRequest.SEND_CHANNEL constant.

To receive responses, you must subscribe to the channel given in the GenericResponse.SUBSCRIBE_CHANNEL constant. The response events will contain the reqId identifier specified in the request, as well as a body in a dictionary<any,any> where the AnyExtractor can be used to extract the expected content. This dictionary contains a structure which is equivalent to the JSON payload returned by Cumulocity. For the cases where no body is expected in the response (for example, for a DELETE request), only a GenericResponseComplete event will be received for the request identifier.

When using an API which returns a collection, the results are automatically split into multiple GenericResponse events, followed by a GenericResponseComplete, all with the reqId identifier provided in the request.

Here is a simple example of using the API:

GenericRequest request := new GenericRequest;
request.reqId := com.apama.cumulocity.Util.generateReqId();
request.method := "GET";
request.isPaging := true;
request.path := "/measurement/measurements";

monitor.subscribe(GenericResponse.SUBSCRIBE_CHANNEL);
on all GenericResponse(reqId=request.reqId) as response
   and not GenericResponseComplete(reqId=request.reqId)
{
  AnyExtractor dict := AnyExtractor(response.getBody());
  AnyExtractor source := AnyExtractor(dict.getDictionary("source"));

  try{
    AnyExtractor speed :=
      AnyExtractor(dict.getDictionary("c8y_SpeedMeasurement")["speed"]);
    log "Found measurement of type: c8y_SpeedMeasurement with id : " +
      dict.getString("id") + " and source id :" + source.getString("id") +
      " and speed "+speed.getFloat("value").toString()+
      " "+speed.getString("unit")
      at INFO;
  }
  catch(Exception e){
    log "Failed to parse unexpected measurement : " +
      dict.toString() at WARN;
  }
}

on GenericResponseComplete(reqId=request.reqId)
{
  monitor.unsubscribe(GenericResponse.SUBSCRIBE_CHANNEL);
}

send request to GenericRequest.SEND_CHANNEL;

Invoking microservices

The Cumulocity transport has a CumulocityRequestInterface event that allows you to invoke microservices (or other HTTP requests) within Cumulocity. This interface automatically handles authentication. It can be used from an Apama instance outside of Cumulocity and from within Cumulocity, either as an EPL app within the Streaming Analytics application or in a custom microservice.

Info
While the CumulocityRequestInterface can be used to interact with the REST API, we recommend that you use the GenericRequest/GenericResponse interface described in Invoking other parts of the Cumulocity REST API.

Before you can create an HTTP request, you need to call a static connectToCumulocity() action in order to connect (as shown in the later example). The following is the format of the action on the helper class that you call to create a request:

action createRequest(string method, string path, any payload) returns Request

Pass the following:

  • The specific type of HTTP request that is to be created, such as GET or PUT.
  • A specific path that you want to append to your request. For example, the path for a microservice that is running on your desired tenant: /service/myMicroService/path/under/microservice.
  • The payload will be encoded as JSON. For example, a dictionary will be converted to a JSON object.

This action will return an instance of a Request from the generic HTTP API (see also Using predefined generic event definitions to invoke HTTP services with JSON and string payloads) with configuration set up on the request. You can later call execute on this request, passing in a handler to deal with any response.

The following example shows how to make use of this class, that is, how to make an HTTP request in order to retrieve information from a running microservice:

monitor CumulocityTestMonitor {
    action onload() {
        try{
            CumulocityRequestInterface cInterface :=
                CumulocityRequestInterface.connectToCumulocity();
            Request req := cInterface.createRequest("GET",
                "/service/myMicroService/path/under/microservice",
                {"request":"data"});
            req.execute(getHandler);
        }
        catch (com.apama.exceptions.Exception e) {
            log "Error thrown trying to create a Cumulocity Request " +
                e.toString() at ERROR;
        }
    }

    action getHandler(Response resp) {
        AnyExtractor d := resp.payload;
        log "Response output: " + d.toString() at INFO;
        log "Test Done";
    }
}

The CumulocityRequestInterface will automatically detect if it is running inside or outside of Cumulocity and will automatically connect. If running remotely, it will rely on properties being set, which will be the connection details provided for the transport. You can do this by creating a .properties file in your project and specifying it with the --config option when starting a correlator (see Starting the correlator).

Optionally, you can set the following if you are connecting to a Cumulocity server with a self-signed or private certificate. Set this to the path to the certificate authority file by which the server’s certificate was signed:

CUMULOCITY_TLS_CERT_AUTH_FILE

The helper class is included in the Utilities for Cumulocity bundle in Apama Plugin for Eclipse. You can also find it in the monitor/cumulocity directory of your Apama installation.

Monitoring status for Cumulocity

There are two sets of status values provided via the user status mechanism. Requests to the platform provide the metrics listed in the table below, where prefix is:

  • user-{chainId=CumulocityIoTGenericChain}.cumulocityCodec. in the correlator status, and
  • sag_apama_correlator_user_cumulocityCodec_ as a Prometheus metric, with the label chainId=CumulocityIoTGenericChain, and the additional label tenantId in a multi-tenant microservice.

event in the table below stands for one of Alarm, Measurement, Event, Operation or ManagedObject.

Key

Description

prefix.maxLatencyInLastHourMillis

Maximum request latency observed during the last hour, in milliseconds.

prefix.maxLatencyInLastHourDetails

Details of the maximum latency request. Consists of a tab-separated string containing the following: - ISO format timestamp in UTC,

  • method, path and parameters truncated to 100 characters (in URL format), and
  • an optional count of the number of objects if this is a batched request (only com.apama.cumulocity.Measurement requests can be batched).

prefix.mostRecentSlowRequestDetails

Details of the most recent slow request. A request is slow if the request-response multiplied by the number of pages (or 1) is above CUMULOCITY_LATENCY_SLOW_THRESHOLD_SECS (see Configuring the Cumulocity transport). Consists of a tab-separated string containing the following: - ISO format timestamp in UTC,

  • method, path and parameters truncated to 100 characters (in URL format), and
  • an optional count of the number of objects if this is a batched request (only com.apama.cumulocity.Measurement requests can be batched).

prefix.requestLatencyEWMAShortMillis

A quickly-evolving exponentially-weighted moving average of request latencies, in milliseconds. Uses 0.5 as the weight to calculate this. This puts more importance on recent latencies than requestLatencyEWMALongMillis.

prefix.requestLatencyEWMALongMillis

A longer-term exponentially-weighted moving average of request latencies, in milliseconds. Uses 0.1 as the weight to calculate this.

prefix.receivedeventEventsParsedCount

Count of the event received following a FindAlarm, FindMeasurement, FindEvent, FindOperation or FindManagedObject.

prefix.senteventEventsParsedCount

Count of the event sent to Cumulocity from EPL.

prefix.receivedeventEventsParsedSizeEWMABytes

An exponentially-weighted moving average of the size of the event received following a FindAlarm, FindMeasurement, FindEvent, FindOperation or FindManagedObject. Uses 0.3 as the weight to calculate this.

prefix.senteventEventsParsedSizeEWMABytes

An exponentially-weighted moving average of the size of the event sent to Cumulocity from EPL. Uses 0.3 as the weight to calculate this.

You can find additional status information relating to the Cumulocity transport in the status elements from the HTTP client (see also Monitoring status for the HTTP client). These start with the following prefix:

  • user-{chainId=CumulocityIoTGenericChain}.httpClient. in the correlator status, or
  • sag_apama_correlator_user_httpClient_ as a Prometheus metric, with the label chainId=CumulocityIoTGenericChain.

When using the Cumulocity Notifications 2.0 bundle, the metrics in the table below relating to receiving notifications are provided with the following prefix:

  • user-notifications{tenantId=TENANT,subscriber=SUBSCRIBER,subscription=SUBSCRIPTION,client=N}. in the correlator status, or
  • sag_apama_correlator_user_notifications_ as Prometheus metrics, with labels for tenantId, subscriber, subscription, and client.
Key Description
prefix.batchAccumulatorCodec.queueSize Size of the queue in the receiving chain.
prefix.cumulocityNotifications2Codec.alarmsReceived Count of alarms received.
prefix.cumulocityNotifications2Codec.eventsReceived Count of events received.
prefix.cumulocityNotifications2Codec.managedObjectsReceived Count of managed objects received.
prefix.cumulocityNotifications2Codec.measurementsReceived Count of measurements received.
prefix.cumulocityNotifications2Codec.operationsReceived Count of operations received.
prefix.pulsarTransport.messagesAcknowleged Count of messages acknowledged.
prefix.pulsarTransport.messagesReceived Count of messages received.
prefix.pulsarTransport.messagesRedelivered Count of redelivered messages received.
prefix.pulsarTransport.receivedSizeEWMABytes Exponentially weighted moving average (EWMA) of size of received messages, in bytes.
prefix.numClients Configured number of parallel clients.

For more information about monitor status information published by the correlator, see Managing and monitoring over REST and Watching correlator runtime status.

Finding tenant options

In order to find the available tenant options on a tenant, you can send the event com.apama.cumulocity.FindTenantOptions to FindTenantOptions.SEND_CHANNEL. This results in events of type com.apama.cumulocity.FindTenantOptionsResponse being returned on com.apama.cumulocity.FindTenantOptionsResponse.SUBSCRIBE_CHANNEL.

The returned event includes a sequence of com.apama.cumulocity.TenantOption, which individually include the key/value combinations that represent the available tenant option.

In order to filter the tenant options returned, you can specify values in the category and key fields of the find request. If only the key is specified, then Cumulocity returns all the available tenant options and the correlator filters them by the key.

Getting user details

You can get the details of the current user by sending the event com.apama.cumulocity.GetCurrentUser to com.apama.cumulocity.GetCurrentUser.SEND_CHANNEL. This results in events of type com.apama.cumulocity.GetCurrentUserResponse being returned on com.apama.cumulocity.GetCurrentUserResponse.SUBSCRIBE_CHANNEL.

The com.apama.cumulocity.GetCurrentUserResponse returned contains a com.apama.cumulocity.CurrentUser event, which in turn contains the id of the user, the userName, a sequence of effectiveRoles for the user and a dictionary of userOptions.

By default, this is the user which the Apama application is running as. This is either the user configured in the Cumulocity connection if it is not running within Cumulocity or the service user of the microservice if it is running in Cumulocity.

The ability to request details of permissions for another user can be done by overriding the authorization or cookies headers in the com.apama.cumulocity.GetCurrentUser event. This would normally be used if you are taking the authentication details from a request to your application and using them to determine the roles that user has.

Example - checking a user based on information in a received request:

/** Event containing extracted information retrieved from a
* http request where we want to check the validity of the user */
event ActionRequest {
    string authorization;
    string actionToTake;
    string requestId;
    string channel;
}
/** Response for the HTTP request */
event ActionResponse {
    string requestId;
    string actionResult;
}
/** Response if authorization failed */
event ActionNotAllowed {
    string requestId;
}
...
monitor.subscribe(GetCurrentUserResponse.SUBSCRIBE_CHANNEL);

// Listen for incoming HTTP requests

on all ActionRequest() as ar {
    integer reqId := com.apama.cumulocity.Util.generateReqId();
    // Send a request to check the user from the incoming request
    GetCurrentUser checkUser := new GetCurrentUser;
    checkUser.reqId := reqId;
    checkUser.authorization := ar.authorization;
    send checkUser to GetCurrentUser.SEND_CHANNEL;

    // if authentication passed, check authorization
    on GetCurrentUserResponse(reqId=reqId) as res and not
      GetCurrentUserResponseFailed(reqId=reqId) {
        if checkHasRoles("ActionAllowed", res.user.effectiveRoles) {
            send ActionResponse(ar.requestId, performAction(ar.actionToTake))
              to ar.channel;
        } else {
            send ActionNotAllowed(ar.requestId) to ar.channel;
        }
    }

    // if authentication failed, return an error
    on GetCurrentUserResponseFailed(reqId=reqId) as err and not
      GetCurrentUserResponse(reqId=reqId) {
        send ActionNotAllowed(ar.requestId) to ar.channel;
    }
}
action performAction(string actiontoTake) returns string{
        // do some action
           return "";
}

action checkHasRoles(string role,sequence<Role> effectiveRoles) returns boolean {
       Role r;
       for r in effectiveRoles {
               if r.id = role{
                       return true;
               }
        }
        return false;
  }

}

You can override the current user in one of the following ways:

  • By setting the authorization header of the other user. This would be used for basic authentication and returns details of that other user.

    If this is invalid, then GetCurrentUserResponseFailed is returned.

  • By setting both authCookie and xsrfToken to valid values for another user. This returns details of that other user.

    If either authCookie or xsrfToken are incorrect or not set, then GetCurrentUserResponseFailed is returned.

  • Not setting all of authorization, authCookie or xsrfToken returns details of the current user.

Optimizing requests to Cumulocity with concurrent connections

In order to provide better performance in requests to the Cumulocity platform, you can configure the transport to use multiple client connections to perform requests concurrently. This can provide improved performance, but may also change the ordering in which requests are executed and responses are returned. By default, the Cumulocity transport tries to use multiple connections and restricts ordering to avoid races that may affect your EPL application. However, this may be either insufficiently concurrent or insufficiently ordered for your specific use case. In that case, there are several options on how to control the concurrency used. These are described below.

Default behavior

By default, three concurrent connections are created for handling requests to the Cumulocity platform for a given correlator process (a given tenant, for applications running inside the Cumulocity platform).

To avoid obvious races, the following rules apply for concurrency:

  • All read (GET) requests can be performed concurrently with each other.
  • All update (PUT/POST) requests relating to different managed objects (for example, a ManagedObject update, a new Measurement for a second ManagedObject and an Alarm with a third ManagedObject as a source) can be performed concurrently with each other.
  • All updates to a single managed object (based on id, or source as appropriate) are performed serially in the order they were sent to the transport.
  • Measurement updates sent without the withChannelResponse option that may be batched can be processed concurrently with any other request.
  • Any read request waits for all outstanding update requests before starting.
  • Any update request waits for all outstanding read requests before starting.
  • GenericRequest update (PUT/POST/DELETE) operations cannot be automatically tied to a ManagedObject, so they are never processed concurrently with any other request.
Changing the default behavior

There are two options to change the default behavior through the CumulocityIoT.properties file. These changes apply to the entire correlator. When running as an EPL application in the Cumulocity platform, this setting can be changed for your whole tenant as a tenant option.

  • You can set the CUMULOCITY_NUM_CLIENTS property to control the number of clients used. If it is set to 1, then all requests are sent serially using a single connection (disables concurrency). If it is set to another number, then that number of concurrent connections to the platform is created.
  • You can add the CUMULOCITY_CONCURRENCY_MODE property to the properties file. By default, this has the value of auto which has the behavior described above to limit the concurrency to preserve some ordering. You can set the value to always instead, in which case requests of all types may be processed concurrently and any ordering consequences must be handled in EPL instead.

See also Configuring the Cumulocity transport.

Creating a new connection with specific behavior

The following table lists the event types from the com.apama.cumulocity package that you can use to either create new connections for sending events independently of the default transport or use the default shared connection. In this way, multiple different configurations can be used at the same time. You can use these event types to separate the requests for different EPL applications, or different types of requests within the system. Each of these event types corresponds to a particular combination of settings that can be configured for the default transport. These events can be created using either the create or the createForTenant actions. The create action is used when an application needs to work only with per-tenant deployment. The create action may take an argument to configure the number of clients to use for the connection depending on the connection type. The createForTenant action should be used when an application needs to work with multi-tenant deployment or both multi-tenant and per-tenant deployment. The createForTenant action takes a TenantDetails event as its first argument and may also take an argument to configure the number of clients to use for the connection depending on the connection type. The TenantDetails event provides details of the subscribed tenant to the connection object so that it can provide information that is specific to the provided tenant. The TenantDetails events can be obtained by using tenant subscription events. See Working with multi-tenant deployments for more details.

Event type

Description

FullyConcurrentConnection

A connection with multiple clients and no restriction on ordering.The additional argument to the create() or createForTenant() method on that event type is:

integer numClients

Configuration equivalents:

CUMULOCITY_NUM_CLIENTS=numClients
CUMULOCITY_CONCURRENCY_MODE=always

SerialConnection

A completely serial connection with no concurrency.There is no argument to the create() or createForTenant() method on that event type.

Configuration equivalent:

CUMULOCITY_NUM_CLIENTS=1

AutoConcurrentConnection

A connection with multiple clients but with the standard ordering restrictions.The additional argument to the create() or createForTenant() method on that event type is:

integer numClients

Configuration equivalents:

CUMULOCITY_NUM_CLIENTS=numClients
CUMULOCITY_CONCURRENCY_MODE=auto

SharedConnection

A connection to Cumulocity which is configured to handle all requests using a shared default client.

All of the above event types have the same API to use them, which looks like this:
FullyConcurrentConnection conn := FullyConcurrentConnection.create(5);

monitor.subscribe(conn.getChannel(FindManagedObjectResponse.SUBSCRIBE_CHANNEL));
FindManagedObject fmo := // create a request
send fmo to conn.getChannel(FindManagedObject.SEND_CHANNEL));

on FindManagedObjectResponse(reqId=fmo.reqId) {
   // do something
   monitor.unsubscribe(conn.getChannel(FindManagedObjectResponse.SUBSCRIBE_CHANNEL));
   conn.destroy(); // can't use it after this
}

The connection object provides a getChannel method which can be used to convert the standard channels used into the correct channels for events to go to this connection instead of the default transport.

For more details, see the com.apama.cumulocity package in the API reference for EPL (ApamaDoc).

Writing your EPL to avoid races

If there are ordering issues due to concurrency, particularly if you want to use a fully-concurrent connection, then you may need to write your EPL to explicitly cater for the possible reordering. This is normally done by listening for the response events that each request has to ensure that it is fully completed before doing any requests which depend on the first one having completed.

For sending a creation or update event to an object that means using the withChannelResponse API and listening for the com.apama.cumulocity.ObjectCommitted or com.apama.cumulocity.ObjectCommitFailed response. For example:

monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
Measurement m := // create measurement
integer reqId := Util.generateReqId();
send m.withChannelResponse(reqId) to Measurement.SEND_CHANNEL;
on ObjectCommitted(reqId=reqId) and not ObjectCommitFailed(reqId=reqId) {
   // do whatever must wait until the Measurement request has been fully completed
}
on ObjectCommitFailed(reqId=reqId) and not ObjectCommitted(reqId=reqId) {
   // handle the fact that this failed
}

Working with multi-tenant deployments

When developing an application which can work in multi-tenant deployments, the application needs to handle subscription/unsubscription of tenants and sending/receiving events to/from subscribed tenants.

The preferred approach is to use the com.apama.cumulocity.TenantSubscriptionNotifier event, which provides a callback-based API for notifications about tenant subscriptions and unsubscriptions. It automatically handles spawning new monitor instances for each subscribed tenant and terminating the instance when the tenant is unsubscribed. The monitor instances are spawned in a private context for each tenant. The callback actions are called for all currently subscribed tenants and for any future tenant subscriptions and unsubscriptions from the monitor instances spawned for those tenants. This keeps the state and processing for each tenant separate from other tenants and cleans up the monitor instances and listeners when tenants are unsubscribed.

The subscription callback provides a com.apama.cumulocity.TenantDetails event, which is used to create connection objects for that tenant. The connection objects provide getChannel actions to get tenant-specific channels to send events to and receive events from that tenant. See the documentation of the FullyConcurrentConnection, SerialConnection, AutoConcurrentConnection, and SharedConnection events in thecom.apama.cumulocity package for more details.

Alternatively, the application can use the com.apama.cumulocity.GetAllTenantsSubscribedToApplication event to get all tenants that are currently subscribed to the current application and listen for the com.apama.cumulocity.ApplicationSubscribedForTenant and the com.apama.cumulocity.ApplicationUnsubscribedForTenant events to get future tenant subscription and unsubscription notifications. The ApplicationSubscribedForTenant event provides a com.apama.cumulocity.TenantDetails event, which is used to create connection objects for that tenant. The ApplicationSubscribedForTenant event also provides a context that is a private context only for the tenant and can be used for spawning new monitor instances. When using this approach, the application needs to manually take care of spawning new monitor instances for each subscribed tenant and terminate them when tenants are unsubscribed.

The application developed to handle multi-tenant deployments using the above-mentioned events works in per-tenant deployments as well.

The example below shows how to implement a simple threshold rule which raises an alarm when the value of the measurement is greater than the threshold.

using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.Alarm;

/**
 * This application monitors measurements and raises alarms if value is greater
 * than the threshold.
 *
 */
monitor ThresholdAlarmRule {
    /** The measurement threshold value. */
    constant float THRESHOLD := 10.0;

    action onload() {

        // Listen for measurements and raise an alarm if measurement exceeds a threshold.
        monitorMeasurements();
    }

    /** Listen for measurements and raise an alarm if measurement exceeds a threshold. */
    action monitorMeasurements() {

        monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);

        // Listen for all MeasurementFragment of interested types.
        on all MeasurementFragment(type="my_measurement", valueFragment="my_frag", valueSeries="my_series") as m {
            // Raise alarm if value is greater than the threshold.
            if m.value > THRESHOLD {
                Alarm alarm := Alarm("","my_alarm",m.source,currentTime,"Threshold exceeded",
                                     Alarm.STATUS_ACTIVE,Alarm.SEVERITY_MINOR,1,new dictionary<string,any>);
                // Get tenant-specific channel to which alarm must be sent.
                send alarm to Alarm.SEND_CHANNEL;
            }
        }
    }
}

The example below shows how to convert an existing per-tenant EPL application to also work in multi-tenant mode.

using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;
using com.apama.cumulocity.SharedConnection;
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.Alarm;

using com.apama.cumulocity.notifications2.SubscribeTenantNotifications;

/**
* This application monitors measurements and raises alarms if value is greater
* than the threshold.
*
* The application monitors tenants subscriptions and unsubscriptions. It starts
* listening for measurements when a tenant is subscribed and stops it when tenant
* is unsubscribed.
*
* The application works correctly both in per-tenant and multi-tenant deployment.
*/
monitor ThresholdAlarmRule {
    /** The measurement threshold value. */
    constant float THRESHOLD := 10.0;

    /** Connection object for the tenant. It is used to get correct channels for sending and receiving events.*/
    SharedConnection connection;

    action onload() {
        // Create a new instance of TenantSubscriptionNotifier and register callbacks for
        // subscriptions and unsubscriptions
        TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().
        onSubscription(tenantSubscribed).
        onUnsubscription(tenantUnsubscribed);
    }

    /** Action which is called back when a tenant is subscribed. */
    action tenantSubscribed(TenantDetails tenant) {
        // Create a connection object for this tenant so that we can send events to
        // this tenant and receive events from it.
        // For most cases, SharedConnection connection is sufficient, but also see
        // FullyConcurrentConnection, SerialConnection and AutoConcurrentConnection events
        // to create connections with different semantics.
        connection := SharedConnection.createForTenant(tenant);

        // Now start to listen for measurements and raise an alarm if measurement exceeds a threshold.
        monitorMeasurements();

        // Subscribe this tenant to Notifications 2.0 events to receive the measurements.
        // You can choose to listen for the SubscribeTenantNotificationsComplete event to wait for the connection to succeed.
        send SubscribeTenantNotifications(tenant, com.apama.cumulocity.Util.generateReqId()) to SubscribeTenantNotifications.SEND_CHANNEL;
    }

    /** Listen for measurements and raise an alarm if measurement exceeds a threshold. */
    action monitorMeasurements() {
        monitor.subscribe(connection.getChannel(MeasurementFragment.SUBSCRIBE_CHANNEL));

        // Listen for all MeasurementFragment of interested types.
        on all MeasurementFragment(type="my_measurement", valueFragment="my_frag", valueSeries="my_series") as m {
            // Raise alarm if value is greater than the threshold.
            if m.value > THRESHOLD {
                Alarm alarm := Alarm("","my_alarm",m.source,currentTime,"Threshold exceeded",
                        "ACTIVE","MINOR",1,new dictionary<string,any>);
                // Get tenant-specific channel to which alarm must be sent.
                send alarm to connection.getChannel(Alarm.SEND_CHANNEL);
            }
        }
    }

    /** Action which is called back when a tenant is unsubscribed. */
    action tenantUnsubscribed(string tenantId) {
        // Destroy the connection object.
        connection.destroy();
        // Do not need to explicitly terminate this monitor instance as it
        // is automatically done.
    }

    action ondie {
        // Destroy the connection object.
        connection.destroy();
    }
}

Building custom Cumulocity IoT microservices using Apama

You can use Apama to create custom microservices which can be deployed to Cumulocity. Unlike the built-in EPL apps functionality of the Streaming Analytics application, which supports deploying individual EPL files using the standard platform components deployed there, a custom microservice allows you to deploy a complete Apama project as you would run outside of the platform.

To create a custom microservice, you need to have an Apama project, either by usingApama Plugin for Eclipse or by using the apama_project tool on the command line. Your project must include one of the Cumulocity connectivity bundles as described in Adding the Cumulocity connectivity plug-in to a project. When you configure these bundles via the .properties files, the following properties, which are normally required, are automatically detected by the microservice:

  • CUMULOCITY_USERNAME - uses a service user provided for the microservice instead.
  • CUMULOCITY_PASSWORD - uses a service user provided for the microservice instead.
  • CUMULOCITY_APPKEY - uses the microservice’s own application in the platform.
  • CUMULOCITY_SERVER_URL - automatically detected within the platform.

When using the Cumulocity Notifications 2.0 bundle:

  • CUMULOCITY_NOTIFICATIONS_SUBSCRIBER_NAME - uses the contextPath for the microservice instead.

Otherwise, the project should be the same as you can run in an Apama that is deployed outside of the platform.

To actually build the microservice, you should first build a Docker image from your project. In Apama Plugin for Eclipse, use the Add Docker Support option from the context menu to create a Dockerfile from which you can build the microservice. For more information, see Building Apama projects during the Docker build.

You can use the Cumulocity microservice utility tool to build and deploy this project as a microservice (see Microservice utility tool in the Cumulocity documentation). Put your project inside a subdirectory for your microservice called docker. You also need to create a cumulocity.json file that contains a manifest for the microservice. For details, see Microservice manifest in the Cumulocity documentation. To use Apama, you have to grant your microservice the following roles using the requiredRoles property:

  • ROLE_APPLICATION_MANAGEMENT_READ
  • ROLE_INVENTORY_READ
  • ROLE_INVENTORY_ADMIN
  • ROLE_INVENTORY_CREATE
  • ROLE_MEASUREMENT_READ
  • ROLE_MEASUREMENT_ADMIN
  • ROLE_EVENT_READ
  • ROLE_EVENT_ADMIN
  • ROLE_ALARM_READ
  • ROLE_ALARM_ADMIN
  • ROLE_DEVICE_CONTROL_READ
  • ROLE_DEVICE_CONTROL_ADMIN
  • ROLE_IDENTITY_READ
  • ROLE_OPTION_MANAGEMENT_READ
  • ROLE_BULK_OPERATION_READ
  • ROLE_SMS_ADMIN

If you are using the Cumulocity Notifications 2.0 bundle, then you also need the following role:

  • ROLE_NOTIFICATION_2_ADMIN

For more details on deploying Apama as a microservice, including how to build multi-tenant microservices, see Deploying Apama applications as microservices in the Cumulocity documentation.

Sample EPL

The sample EPL below describes how to subscribe and receive device measurements, device events and device information.

package com.apama.sample;

using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Operation;

using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;

monitor CumulocityApplication {

  action onload() {

     fetchManagedObjects();

     listenForMeasurements();

     listenForAlarms();

     listenForEvents();

     listenForOperations();
  }

  action fetchManagedObjects() {

    // Subscribe to receive all the devices from Cumulocity
    monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);

    // Consume all the devices from Cumulocity
    on all ManagedObject() as mo {
      log mo.toString() at INFO;

      // Update a managed object
      /*
      mo.params.add("CustomMetadata", {"metadata": "Adding custom data"});
      send mo to ManagedObject.SEND_CHANNEL;
      */
    }

    monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);

    // Fetch a list of all available devices
    integer reqId := com.apama.cumulocity.Util.generateReqId();
    on all FindManagedObjectResponse(reqId=reqId) as response
    and not FindManagedObjectResponseAck(reqId=reqId) {
      log "Received managedObject " + response.managedObject.toString() at INFO;
    }

    on FindManagedObjectResponseAck(reqId=reqId) {
      log "Find Managed Objects request completed" at INFO;
      monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
    }

    // Retrieve list of all available devices
    send FindManagedObject(reqId, "", {"fragmentType": "c8y_IsDevice"})
                                          to FindManagedObject.SEND_CHANNEL;
  }

  action listenForMeasurements() {

     // Subscribe to receive all the measurements published from
     // Cumulocity
     monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);

     // Consume all the measurements from Cumulocity
     on all Measurement() as m {
       log  m.toString() at INFO;
     }

     // Create a new measurement
     /*
     Measurement m := new Measurement;
     m.source := "<MANAGED_OBJECT_ID>";
     m.time := currentTime;
     m.type := "TemperatureMeasurement";
     MeasurementValue mv := new MeasurementValue;
     mv.value := 100.0;
     dictionary<string, MeasurementValue> fragment :=
																															new dictionary<string, MeasurementValue>;
     fragment.add("temperature", mv);
     m.measurements.add("TemperatureMeasurement", fragment);
     send m to Measurement.SEND_CHANNEL;
     */
   }

  action listenForEvents() {

     // Subscribe to receive all the events published from
     // Cumulocity
     monitor.subscribe(Event.SUBSCRIBE_CHANNEL);

     // Consume all the events from Cumulocity
     on all Event() as e {
       log e.toString() at INFO;

       // Example for updating an event
       /*
       // update text
       e.text := "This is an updated text";
       send e to Event.SEND_CHANNEL;
       */
     }

     // Create a new event
     /*
     Event evt := new Event;
     evt.source := "<MANAGED_OBJECT_ID>";
     evt.type := "TestEvent";
     evt.time := currentTime;
     evt.text := "This is a sample event";
     send evt to Event.SEND_CHANNEL;
     */
  }

  action listenForAlarms() {

     // Subscribe to receive all the alarms published from
     // Cumulocity
     monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);

     // Consume all the alarms from Cumulocity
     on all Alarm() as alarm {
       log alarm.toString() at INFO;

       // Example for updating an alarm
       /*
       // set alarm severity to MAJOR
       alarm.severity := Alarm.SEVERITY_MAJOR;
       send alarm to Alarm.SEND_CHANNEL;
       */
     }

     // Create a new alarm
     /*
     Alarm alarm := new Alarm;
     alarm.source := "<MANAGED_OBJECT_ID>";
     alarm.type := "TestAlarm";
     alarm.severity := Alarm.SEVERITY_MINOR;
     alarm.status := Alarm.STATUS_ACTIVE;
     alarm.time := currentTime;
     alarm.text := "This is a sample alarm";
     send alarm to Alarm.SEND_CHANNEL;
     */
  }

  action listenForOperations() {
     // Subscribe to receive all the operations published from
     // Cumulocity

     // Note: When using the Cumulocity transport, ensure that the
     // subscribeToOperations transport property is set to true
     monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);

     on all Operation() as o {
  	    log o.toString() at INFO;

       // Update an operation
       /*
       o.status := Operation.STATUS_EXECUTING;
       send o to Operation.SEND_CHANNEL;
       */
     }

     // Create an operation
     /*
     Operation operation := new Operation;
     operation.source := "<MANAGED_OBJECT_ID>";
     operation.status := Operation.STATUS_PENDING;
     operation.params.add("c8y_Message", {"text": "Device Operation"});
     send operation to Operation.SEND_CHANNEL;
     */
  }
}