About the Cumulocity transport
Cumulocity is used for communication with connected IoT devices. See https://cumulocity.com/ for detailed information.
Apama provides several connectivity bundles which allow you to communicate with the IoT devices connected to Cumulocity. For example, you can receive events from the devices, send operations to the devices, and query the state stored in the platform.
The following bundles are under the Cumulocity IoT group of connectivity bundles (see Adding bundles to projects for more information on connectivity bundles):
-
Cumulocity Client (includes REST support). This bundle is for the legacy real-time notifications feature and has been superseded by the Cumulocity Notifications 2.0 bundle.
InfoUse of the Cumulocity Client bundle is deprecated. -
Cumulocity Notifications 2.0 (includes REST support). This is the recommended way to receive events and updates as they are published from devices as well as query and publish data to the platform. This bundle replaces the Cumulocity Client bundle.
InfoThe Cumulocity Notifications 2.0 feature is currently in private preview. If you would like to have it enabled for your tenant, please contact Cumulocity Operations. -
Cumulocity REST Support. This bundle is for applications that do not need to receive real-time updates and only need to publish data or query the platform. You must not add it to your project if you are adding one of the other bundles.
You configure the Cumulocity connectivity bundles by editing the .properties
files that come with the respective connectivity bundle. All bundles provide a .properties
file for configuring the REST API connection. The Cumulocity Client and Cumulocity Notifications 2.0 bundles also provide a second .properties
file with properties specific to that connection. See Adding the Cumulocity connectivity plug-in to a project and Configuring the Cumulocity transport for further information.
In addition to the above connectivity bundles, the following EPL bundles are also available (see also Adding EPL bundles to projects). These bundles are included automatically if you add any of the connectivity bundles.
- Event Definitions for Cumulocity. This EPL bundle defines all events that can be used for interacting with Cumulocity. This includes definitions for events that you receive from Cumulocity, events that you can send to Cumulocity, and event APIs that you can use for requesting data from Cumulocity. For more information, see the
com.apama.cumulocity
package in the API reference for EPL (ApamaDoc). - Utilities for Cumulocity. This EPL bundle contains useful utilities for EPL code that is interacting with Cumulocity. It also contains a geofence helper utility for determining whether a location is part of a geofence or not. For more information, see the
com.apama.cumulocity.Util
and thecom.apama.cumulocity.GeoFenceContainer
events in the API reference for EPL (ApamaDoc).
apama_project
command-line tool. See Creating and managing an Apama project from the command line for more information.The samples/cumulocity
directory of your Apama installation includes samples which show how to use the Cumulocity bundles. For more information, see the README.txt
file in the corresponding samples folder.
As with other connectivity plug-ins, the EPL application should call com.softwareag.connectivity.ConnectivityPlugins.onApplicationInitialized()
. For more information, see Sending and receiving events with connectivity plug-ins. Per-tenant EPL applications automatically start to receive Notifications 2.0 events after this call. Multi-tenant applications need to additionally send a com.apama.cumulocity.notifications2.SubscribeTenantNotifications
event for each tenant to SubscribeTenantNotifications.SEND_CHANNEL
to start receiving notifications from that tenant. For an example, see Working with multi-tenant deployments.
Configuring the Cumulocity transport
When you add one of the Cumulocity connectivity bundles in Apama Plugin for Eclipse, one or two .properties
configuration files are created. You have to provide all required information in these files in order to connect to Cumulocity.
Configuration for all Cumulocity connectivity bundles
All bundles contain the properties to connect to the REST API in your tenant. The following is an example of a typical CumulocityIoTREST.properties
configuration file:
##### Username and password must be provided for authentication
CUMULOCITY_USERNAME=MYNAME
CUMULOCITY_PASSWORD=MYPW
##### Application key and the URL of the application
CUMULOCITY_APPKEY=MYAPP
CUMULOCITY_SERVER_URL=https://myserver.cumulocity.com
##### TLS certificate authority file
CUMULOCITY_AUTHORITY_FILE=
##### Allow connection to Cumulocity instance with unknown certificate
CUMULOCITY_ALLOW_UNAUTHORIZED_SERVER=false
##### Set this to the tenant ID if you don't have a per-tenant hostname
CUMULOCITY_TENANT=
##### Proxy server host and port to start using HTTP proxy
CUMULOCITY_PROXY_HOST=proxy_host
CUMULOCITY_PROXY_PORT=
##### Proxy username and password must be provided for basic authentication
CUMULOCITY_PROXY_USERNAME=ProxyUser
CUMULOCITY_PROXY_PASSWORD=ProxyPW
##### Number of concurrent REST connections to use
CUMULOCITY_NUM_CLIENTS=3
In order to connect to Cumulocity, it is required that you set the following properties.
Property |
Description |
---|---|
|
Username for authentication. This can be specified either as a username alone or in the form of When running in a microservice, you do not need to set this, unless you have added the Cumulocity Client bundle. Type: |
|
Password for authentication. Required when not running in a microservice. When running in a microservice, you do not need to set this, unless you have added the Cumulocity Client bundle. Type: |
|
Unique key for the application defined on the Cumulocity instance. Required when not running in a microservice. The application key is defined in Cumulocity. Log in to your account in Cumulocity, and use the Administration application to add an external application. You can then specify the application key and the URL of the application. See the Cumulocity documentation at https://cumulocity.com/docs/ for more information. When running in a microservice, you do not need to set this, unless you have added the Cumulocity Client bundle. Type: |
|
URL of the Cumulocity tenant. Required when not running in a microservice. Type: |
Property |
Description |
---|---|
|
The TLS certificate authority (CA) file. If you are connecting to a Cumulocity platform whose certificate is not signed by a trusted CA, provide the certificate of your signing authority here. Type: |
|
Set this to |
|
Unique name of the application tenant. This configuration option is useful in the case of Cumulocity Edge. Type: |
|
The name of the proxy server to connect to. Type: |
|
The port number of the proxy server to connect to. Both host and port are required to enable an HTTP proxy. Type: |
|
Optional proxy user name for HTTP basic authentication. Type: |
|
Optional proxy password for HTTP basic authentication. Provide both user name and password if the proxy server has basic authentication enabled. Type: |
|
The number of simultaneous client connections to use for handling queries to the platform.Type: Default: 3. |
|
Set this to Default: |
|
The name of the multi-tenant microservice to use. It is used only when running as an external multi-tenant project and connecting to remote Cumulocity. It is required when developing a See also Working with multi-tenant deployments. Type: |
Property |
Description |
---|---|
|
The initial delay (in seconds) that can be set for querying tenant subscriptions.Type: Default: 0 seconds. |
|
The maximum number of Apama events that can be batched as a single request before sending to Cumulocity. The only event type that supports batching is Default: 1000. |
|
Update the Type: Default: 1 second. |
|
Log a warning if a single-paged or multi-paged request takes more time to complete than defined by this threshold. Set this to 0 to disable logging. Type: Default: 10 seconds. |
|
Log a warning if a batch of requests takes more time to complete than defined by this threshold. If a warning for an individual request of the batch has already been logged with Type: Default: 50 seconds. |
|
The sender name to be used as the default if it is not specified in the Type: Default: |
|
The sender address to be used as the default if it is not specified in the You can provide a sender address in the following formats: Type: Default: |
|
If set to Default: |
Configuration for Cumulocity Notifications 2.0
If you are using the Cumulocity Notifications 2.0 bundle, you also get a .properties
file to configure this connection. The following is an example of a typical CumulocityNotifications2.properties
configuration file:
##### A unique name to identify the subscriber.
##### Disconnecting and reconnecting or restarting with the same subscriber name
##### will resume the message stream from the point it left off.
##### Required if not running in a microservice.
##### Will be inferred from the microservice contextPath when running as a microservice
CUMULOCITY_NOTIFICATIONS_SUBSCRIBER_NAME=mySubscriberName
##### Name of the subscription to use or create.
##### All correlators which use the same subscription name
##### will consume the same set of messages.
CUMULOCITY_NOTIFICATIONS_SUBSCRIPTION_NAME=mySubscriptionName
##### An Exclusive subscription can only have one connection for each subscriber name,
##### subsequent connections will fail. A Shared or KeyShared subscription can have
##### multiple connections with the same subscriber name, in which case messages will
##### be delivered only to one of the set of connections.
##### In Shared mode any message can be delivered to any connection.
##### In KeyShared mode messages relating to a given device will all be received by the same connection.
CUMULOCITY_NOTIFICATIONS_SUBSCRIPTION_TYPE=Exclusive
##### Create a number of parallel connections for receiving notifications on.
##### Requires Shared or KeyShared subscription type.
##### Increase this number above 1 if your application isn't consuming notifications fast enough.
##### For best performance the number of clients should be a power of 2.
CUMULOCITY_NOTIFICATIONS_NUMBER_CLIENTS=1
##### The limit of messages which can be buffered within the receiving chain before being
##### delivered to subscribed contexts.
CUMULOCITY_NOTIFICATIONS_MAX_BUFFERSIZE=1000
##### The largest batch of messages that can be received from notifications at one time.
CUMULOCITY_NOTIFICATIONS_MAX_BATCHSIZE=1000
When you add the Cumulocity Notifications 2.0 bundle, you must set the following required properties:
Property |
Description |
---|---|
|
A unique name to identify the subscriber. Disconnecting and reconnecting or restarting with the same subscriber name resumes the message stream from the point it left off. Required if not running in a microservice. If running in a microservice, this is inferred from the microservice Type: |
|
Name of the subscription to use or create. All correlators which use the same subscription name consume the same set of messages. Subscriptions are created with this name in Notifications 2.0. They persist after the correlator exits, waiting for reconnection. If a subscription is no longer used, you must manually delete it using the Notifications 2.0 API. Type: |
|
An Values: |
Property |
Description |
---|---|
|
Create a number of parallel connections for receiving notifications. Requires the subscription type Default: 1. |
|
The limit of messages that can be buffered within the receiving chain before being delivered to subscribed contexts.Type: Default: 1000. |
|
The largest batch of messages that can be received from Notifications 2.0 at one time.Type: Default: 1000. |
|
Set to Default: |
|
The number of unacknowledged items available to a consumer. Should not be smaller than Default: 1000. |
|
The service URL for connecting to Notifications 2.0. In most situations, you do not need to set this. It is derived from the |
Configuration for Cumulocity Client
When you add the deprecated legacy Cumulocity Client bundle, you can add the following optional properties to the CumulocityIoT.properties
configuration file:
Property |
Description |
---|---|
|
The measurement format mode used by the tenant. Two modes are available: Default: |
|
If set to Default: |
|
Deprecated. Request all assets at startup. Type: Default: Note: You should explicitly request for all available devices on startup using the |
|
Subscribe to all device operations. Type: Default: |
|
Subscribe to measurements, events and alarms of all devices during startup. Type: Default: |
|
Subscribe to all device-related updates. Type: Default: |
Loading the Cumulocity transport
You can load the Cumulocity transport by adding one of the Cumulocity connectivity bundles to your project in Apama Plugin for Eclipse (see Adding the Cumulocity connectivity plug-in to a project).
Using managed objects
During application initialization (onApplicationInitialized
), if the requestAllDevices
configuration option is enabled, the adapter sends all device/asset related information using the com.apama.cumulocity.ManagedObject
event on the com.apama.cumulocity.ManagedObject.SUBSCRIBE_CHANNEL
(same as cumulocity.devices
) channel. After all devices/assets have been sent, the adapter sends a com.apama.cumulocity.RequestAllDevicesComplete(-1)
event.
requestAllDevices
configuration option is deprecated. Instead you should use the com.apama.cumulocity.FindManagedObject
API to cause the adapter to send the device events when the application is ready. This will also work for applications deployed in Cumulocity directly.Example of a device event:
com.apama.cumulocity.ManagedObject("44578836","","Device_1",
["c8y_Restart","c8y_Meassage","c8y_Relay"],
["c8y_TemperatureMeasurement","c8y_LightMeasurement"],
[],[],[],[],{},
{"c8y_IsDevice":any(dictionary<any,any>,new dictionary<any,any>),
"owner":any(string,"Cumulocity_User")})
To fetch a list of all existing managed objects, use the FindManagedObjects
API. For more information, see Querying for managed objects.
Example
The following is a simple example of how to receive, update and send managed objects:
// Subscribe to receive all the devices from Cumulocity
monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
// Consume all the devices from Cumulocity
on all ManagedObject() as mo {
log mo.toString() at INFO;
// Update a managed object
mo.params.add("CustomMetadata", {"metadata": "Adding custom data"});
send mo to ManagedObject.SEND_CHANNEL;
}
Updating a managed object
To enable use cases where information related to a managed object can be persisted, you can update any metadata information (such as the state) as properties of a managed object.
managedObject.params.add("CUSTOM_PROPERTY", PROPERTY_VALUE);
send managedObject to com.apama.cumulocity.ManagedObject.SEND_CHANNEL
Where
CUSTOM_PROPERTY
is the property that is to be added.PROPERTY_VALUE
is the value for the newly added property.
Sending managed objects requesting response and setting headers
When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse
action. This allows your application to receive a response on completion on the ManagedObject.SUBSCRIBE_CHANNEL
channel. You will need to subscribe to the ManagedObject.SUBSCRIBE_CHANNEL
channel first. The response can be one of two possibilities:
-
ObjectCommitted
This includes the
reqId
which is the identifier of the original request, theId
which is the identifier of the newly created or updated object, and the actual object in JSON form. -
ObjectCommitFailed
This includes the
reqId
which is the identifier of the original request, thestatusCode
which is the HTTP status code of the failure, and thebody
which is the content of the response from the API (this might be in HTML format).
When using withChannelResponse
, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.
Example of creating a managed object:
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor Test_ManagedObjects {
action onload {
monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
ManagedObject mo := new ManagedObject;
mo.params.add("c8y_IsDevice", new dictionary<any, any>);
mo.name := "MyManagedObject";
mo.type := "DeviceType";
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Create a new ManagedObject in Cumulocity, ensuring that a
// response is returned.
send mo.withChannelResponse(reqId, new dictionary<string, string>) to
ManagedObject.SEND_CHANNEL;
// If the ManagedObject creation succeeded do something with the
// returned object or id.
on ObjectCommitted(reqId=reqId) as c and not
ObjectCommitFailed(reqId=reqId){
log "New managed object successfully created " + c.toString()
at INFO;
}
// If the ManagedObject creation failed, log an error.
on ObjectCommitFailed(reqId=reqId) as cfail and not
ObjectCommitted(reqId=reqId){
log "Creating a new event failed " + cfail.toString() at ERROR;
}
}
}
The following ManagedObject
reference fields cannot be set using ManagedObject
events and are useful for read-only purposes in these events: childDeviceIds
, childAssetIds
, deviceParentIds
, and assetParentIds
. However, this can be done using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest
events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and Child operations in the Cumulocity OpenAPI documentation.
The position
field can be used to clear individual position elements, but it cannot be used to delete the entire position fragment c8y_Position
. Similarly, the params
dictionary cannot be manipulated to delete fragments in the ManagedObject
. However, both of these can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest
events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.
Querying for managed objects
To search for a managed object or a collection of managed objects, send the com.apama.cumulocity.FindManagedObject
event to Cumulocity, with a unique reqId
to the com.apama.cumulocity.FindManagedObject.SEND_CHANNEL
channel.
The transport will then respond with zero or more com.apama.cumulocity.FindManagedObjectResponse
events and then one com.apama.cumulocity.FindManagedObjectResponseAck
event on the com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL
channel.
Example:
integer reqId := com.apama.cumulocity.Util.generateReqId();
com.apama.cumulocity.FindManagedObject request :=
new com.apama.cumulocity.FindManagedObject;
request.reqId := reqId;
// Optionally provide the 'id' of the managed object
//request.deviceId := "<DEVICE_ID>";
// Filter based on fragmentType
request.params.add("fragmentType", "c8y_IsDevice");
// Subscribe to com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL to
// listen for responses
monitor.subscribe(com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
// Listen for responses based on reqId
on all com.apama.cumulocity.FindManagedObjectResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindManagedObjectResponseAck(reqId=reqId)
{
log "Received ManagedObject " + response.toString() at INFO;
}
// Listen for com.apama.cumulocity.FindManagedObjectResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindManagedObjectResponseAck(reqId=reqId)
as requestCompleted
{
log "Received all ManagedObject(s) for request "
+ requestCompleted.reqId.toString() at INFO;
// Request is completed and we can unsubscribe from this channel
monitor.unsubscribe(com.apama.cumulocity.FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
}
// Send request to find available managed objects
send request to com.apama.cumulocity.FindManagedObject.SEND_CHANNEL;
Query parameters
Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all managed objects in the Cumulocity OpenAPI Specification.
Parameter | Description |
---|---|
deviceId |
Search for a managed object based on deviceId . When deviceId is populated in a FindManagedObject request, all the query parameters listed below are ignored. |
fragmentType |
Search for managed objects based on the fragment type. |
type |
Search for managed objects based on the type. |
owner |
Search for managed objects based on the owner. |
childAssetId |
Search for managed objects based on the asset identifier of the child. |
childDeviceId |
Search for managed objects based on the device identifier of the child. |
ids |
Search for managed objects based a comma-separated list of device identifiers. |
pageSize |
Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the managed objects, but each request is larger. By default, 1000 managed objects are in each page and there is an upper limit of 2000. |
currentPage |
Retrieve a specific page of results for the given pageSize . If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested. |
Query result paging
Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.
Caching managed objects
ManagedObjectCache
The com.apama.cumulocity.ManagedObjectCache
event covers the common use case of fetching a ManagedObject
with a known id
, such as when you want to fetch the ManagedObject
associated with an event source or a rule held within a ManagedObject
. The cache eliminates the need to implement the FindManageObject
code for this scenario and reduces traffic across the REST API.
The ManagedObjectCache
event can be used by any monitor to cache the managed objects it is interested in.
You can limit the cache in certain ways to improve performance and reduce memory usage. By default, the entire ManagedObject
is cached for each requested id
, but in many situations this is probably unnecessary. Therefore, you can restrict the cache by one or more of the following:
- Maximum number of cached managed objects. The cache size is limited to this number. When an uncached
id
is requested, the cachedManagedObject
that was accessed longest ago is removed and the newManagedObject
is added. - Expiration of unaccessed managed objects. The cache removes all cached managed objects that have not been accessed within a specified period of time.
- The params that are cached. The cache stores only a defined set of the
params
fragments for theManagedObject
.
The cache is lazy loaded, that is, a ManagedObject
is only cached the first time you request it. Once cached, it is kept up to date using broadcast update notifications. Thus, unless it is removed from the cache, by the cache management restrictions above or directly by the remove()
action, all subsequent requests are supplied from the cache itself.
In some situations, you may want more information than has been cached. The cache provides query functionality that bypasses its internal store and always fetches the ManagedObject
from the inventory. This functionality always returns the full ManagedObject
, regardless of any params
restrictions you may have specified for the cache’s own store.
The cache stores only those properties that are received both when the inventory is directly queried and when update notifications are received. Therefore, deviceParentIds
and assetParentIds
are not cached because they are not provided within notification updates. You can fetch the parent hierarchy using the query functionality mentioned above.
Similarly, the .apama_notificationType
and tenantId
params are not cached by default because they are only present in notification updates. The absence of the first parameter means that isCreate()
and isUpdate()
always return false when called on a ManagedObject
provided by the cache. The cache only caches updates that pass the isUpdate()
condition, so the check on cache-provided managed objects is unnecessary.
Because the cache may need to fetch the ManagedObject
directly from the inventory for any given request, the process of requesting a ManagedObject
is asynchronous. The cache provides two methods for waiting for the response:
- Listening for the
ManagedObjectCacheResponse
andManagedObjectCacheError
events routed by the cache. - Using response callbacks that you provide to the cache.
SharedManagedObjectCache
The ManagedObjectCache
is useful when only one EPL app is interested in the ManagedObject
set that is likely to be cached. However, if two or more are interested in the same set of managed objects, this would lead to duplicate caches and waste memory. To allow more than one EPL app to share the same cached managed objects, there is the SharedManagedObjectCache
, which is a wrapper monitor around the ManagedObjectCache
. When setting up this cache, make sure that it caches all parameters that are required by its clients.
The SharedManagedObjectCache
is a monitor that runs in its own context and communicates through events that are sent. You must explicitly activate the cache using the StartSharedMOCache
event, which specifies the maximum size, expiration period, params to be cached if desired, and tenant details if any.
You make requests to the cache using the FindCacheManagedObject
event, similar to the FindManagedObject
event used for the Cumulocity transport API. The request is asynchronous and the response is either a set of FindCacheManagedObjectResponse
events or a FindCacheManagedObjectBatchResponse
event, depending on whether you specified a batch response. This is followed by a FindCacheManagedObjectResponseAck
event.
Using alarms
using-alarms
The com.apama.cumulocity.Alarm
is sent on an alarm from a device. This event is sent to the com.apama.cumulocity.Alarm.SUBSCRIBE_CHANNEL
(same as cumulocity.alarms
) channel. This event contains the identifier of the source, a timestamp (same form as currentTime
), message text, and optional parameters.
Example of an alarm:
com.apama.cumulocity.Alarm("44578840","c8y_UnavailabilityAlarm","44578839",
1529496204.346,"No data received from device within required interval",
Alarm.STATUS_ACTIVE,Alarm.SEVERITY_MAJOR,1,{"creationTime":any(float,1529496204.067)})
Example
The following is a simple example of how to receive, update, create and send alarms:
// Subscribe to receive all the alarms published from Cumulocity
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
// Consume all the alarms from Cumulocity
on all Alarm() as alarm {
log alarm.toString() at INFO;
// Example for updating an alarm
// Set alarm severity to MAJOR
alarm.severity := Alarm.SEVERITY_MAJOR;
send alarm to Alarm.SEND_CHANNEL;
}
// Create a new alarm
Alarm alarm := new Alarm;
alarm.source := "<MANAGED_OBJECT_ID>";
alarm.type := "TestAlarm";
alarm.severity := Alarm.SEVERITY_MINOR;
alarm.status := Alarm.STATUS_ACTIVE;
alarm.time := currentTime;
alarm.text := "This is a sample alarm";
send alarm to Alarm.SEND_CHANNEL;
Creating a new alarm
send Alarm("","c8y_SampleAlarm","<SOURCE>",<TIME>,
"Alarm text", Alarm.STATUS_<STATUS>,Alarm.SEVERITY_<SEVERITY>,1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
Where
<SOURCE>
is the source of the alarm (same as theManagedObject
identifier).<TIME>
is the time at which the alarm was generated.<STATUS>
is the status of the alarm. This can beACTIVE
,ACKNOWLEDGED
orCLEARED
.<SEVERITY>
is the severity of the alarm. This can beCRITICAL
,MAJOR
,MINOR
orWARNING
.
Sending alarms requesting response and setting headers
When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse
action. This allows your application to receive a response on completion on the Alarm.SUBSCRIBE_CHANNEL
channel. You will need to subscribe to the Alarm.SUBSCRIBE_CHANNEL
channel first. The response can be one of two possibilities:
-
ObjectCommitted
This includes the
reqId
which is the identifier of the original request, theId
which is the identifier of the newly created or updated object, and the actual object in JSON form. -
ObjectCommitFailed
This includes the
reqId
which is the identifier of the original request, thestatusCode
which is the HTTP status code of the failure, and thebody
which is the content of the response from the API (this might be in HTML format).
When using withChannelResponse
, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.
Example of creating an alarm:
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor TestCreatingAlarm {
string deviceId; // Where this is populated from the actual device Id.
string timestamp; // Where this is populated from the timestamp of
// the device.
action onload {
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
Alarm al := new Alarm;
string name := "MyTestAlarm";
al.status := Alarm.STATUS_ACTIVE;
al.severity := Alarm.SEVERITY_CRITICAL;
al.source := deviceId;
al.type := "c8y_TestAlarm";
al.text := "test alarm";
al.time := currentTime;
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Create a new Alarm in Cumulocity, ensuring that a response is
// returned
// and the processing mode, indicating how to process the request,
// sent to Cumulocity is Transient.
send al.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
"Transient" }) to Alarm.SEND_CHANNEL;
// If the Alarm creation succeeded do something with the returned
// object or id.
on ObjectCommitted(reqId=reqId) as c and not
ObjectCommitFailed(reqId=reqId){
log "New alarm successfully created " + c.toString() at INFO;
}
// If the Alarm creation failed, log an error.
on ObjectCommitFailed(reqId=reqId) as cfail and not
ObjectCommitted(reqId=reqId){
log "Creating a new event failed " + cfail.toString() at ERROR;
}
}
}
Alarm de-duplication
If an active or acknowledged alarm (does not work for the CLEARED
status) with the same source and type exists, no new alarm is created. Instead, the existing alarm is updated by incrementing the count
property, and the time
property is also updated. Any other changes are ignored, and the alarm history is not updated. The first occurrence of the alarm is recorded in firstOccurenceTime
.
Updating an existing alarm
You can update the text
, status
and severity
fields.
send Alarm("<ALARM_ID>","c8y_SampleAlarm","<SOURCE>", <TIME>,
"Alarm Updated", Alarm.STATUS_<STATUS>,Alarm.SEVERITY_<SEVERITY>,1,new dictionary<string,any>) to Alarm.SEND_CHANNEL;
Where
<ALARM_ID>
is the identifier of the previously created alarm. The presence of<ALARM_ID>
indicates that the request is for updating an existing alarm.
params
dictionary cannot be manipulated to delete fragments in the Alarm
. However, the fragments in the Alarm
can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest
events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.Querying for alarms
To search for an alarm or a collection of alarms, send the com.apama.cumulocity.FindAlarm
event to Cumulocity, with a unique reqId
to the com.apama.cumulocity.FindAlarm.SEND_CHANNEL
channel.
The transport will then respond with zero or more com.apama.cumulocity.FindAlarmResponse
events and then one com.apama.cumulocity.FindAlarmResponseAck
event on the com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL
channel.
Example:
integer reqId := com.apama.cumulocity.Util.generateReqId();
com.apama.cumulocity.FindAlarm request := new com.apama.cumulocity.FindAlarm;
request.reqId := reqId;
// Filter based on alarms type
request.params.add("type", "c8y_UnavailabilityAlarm");
// Subscribe to com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL to listen
// for responses
monitor.subscribe(com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL);
// Listen for responses based on reqId
on all com.apama.cumulocity.FindAlarmResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindAlarmResponseAck(reqId=reqId)
{
log "Received Alarm " + response.toString() at INFO;
}
// Listen for com.apama.cumulocity.FindAlarmResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindAlarmResponseAck(reqId=reqId) as requestCompleted
{
log "Received all Alarm(s) for request " +
requestCompleted.reqId.toString() at INFO;
// Request is completed and we can unsubscribe from this channel
monitor.unsubscribe(com.apama.cumulocity.FindAlarmResponse.SUBSCRIBE_CHANNEL);
}
// Send request to find available alarms
send request to com.apama.cumulocity.FindAlarm.SEND_CHANNEL;
Query parameters
Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all alarms in the Cumulocity OpenAPI Specification.
Parameter | Description |
---|---|
id |
Search for an alarm based on alarmId . When searching for an alarm based on Id , all the query parameters listed below are ignored. |
source |
Search for alarms based on the device identifier or asset identifier of the source. |
status |
Search for alarms based on the status. The status can be any of ACTIVE , ACKNOWLEDGED or CLEARED . |
severity |
Search for alarms based on the severity. The severity can be any of CRITICAL , MAJOR , MINOR or WARNING . |
type |
Search for alarms based on the type. |
dateFrom |
Search for alarms from a start date. The date and time is provided as seconds since the epoch. |
dateTo |
Search for alarms within a time range. This is to be used in combination with dateFrom . The date and time is provided as seconds since the epoch. |
resolved |
A boolean parameter used for filtering, based on the resolved state. |
pageSize |
Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the alarms, but each request is larger. By default, 1000 alarms are in each page and there is an upper limit of 2000. |
currentPage |
Retrieve a specific page of results for the given pageSize . If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested. |
Query result paging
Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.
Using events
The com.apama.cumulocity.Event
is sent on an event from a device. This event is sent to the com.apama.cumulocity.Event.SUBSCRIBE_CHANNEL
(same as cumulocity.events
) channel. This event contains the identifier of the source, a timestamp (same form as currentTime
), message text, and optional parameters.
Example of an event:
com.apama.cumulocity.Event("48073557","c8y_EntranceEvent",
"12346082",1519838833.6,
"Entrance event triggered.",
{"creationTime":any(float,1519838834.706)})
Example
The following is a simple example of how to receive, update, create and send events:
// Subscribe to receive all the events published from Cumulocity
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
// Consume all the events from Cumulocity
on all Event() as e {
log e.toString() at INFO;
// Example for updating an event
// Update text
e.text := "This is an updated text";
send e to Event.SEND_CHANNEL;
}
// Create a new event
Event evt := new Event;
evt.source := "<MANAGED_OBJECT_ID>";
evt.type := "TestEvent";
evt.time := currentTime;
evt.text := "This is a sample event";
send evt to Event.SEND_CHANNEL;
Creating a new event
send Event("","c8y_SampleEvent","SOURCE", TIME,
"Event text",new dictionary<string,any>) to Event.SEND_CHANNEL;
Where
SOURCE
is the source of the event (same as theManagedObject
identifier).TIME
is the time at which the event was generated.
Sending events requesting response and setting headers
When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse
action. This allows your application to receive a response on completion on the Event.SUBSCRIBE_CHANNEL
channel. You will need to subscribe to the Event.SUBSCRIBE_CHANNEL
channel first. The response can be one of two possibilities:
-
ObjectCommitted
This includes the
reqId
which is the identifier of the original request, theId
which is the identifier of the newly created or updated object, and the actual object in JSON form. -
ObjectCommitFailed
This includes the
reqId
which is the identifier of the original request, thestatusCode
which is the HTTP status code of the failure, and thebody
which is the content of the response from the API (this might be in HTML format).
When using withChannelResponse
, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.
Example of creating an event:
using com.apama.cumulocity.Event;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor Test_CumulocityEvents {
string timestamp; // Where this is populated from the timestamp of the
// device.
action onload {
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
Event ev := new Event;
string name := "MyEvent";
ev.type := "DoorSensor";
ev.source := "7104838";
ev.text := "Door sensor was triggered";
ev.time := currentTime;
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Create a new Event in Cumulocity, ensuring that a response is
// returned
// and the processing mode, indicating how to process the request, sent
// to Cumulocity is Transient.
send ev.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
"Transient" }) to Event.SEND_CHANNEL;
// If the Event creation succeeded do something with the returned
// object or id.
on ObjectCommitted(reqId=reqId) as c and not
ObjectCommitFailed(reqId=reqId){
log "New event successfully created " + c.toString() at INFO;
}
// If the Event creation failed, log an error.
on ObjectCommitFailed(reqId=reqId) as cfail and not
ObjectCommitted(reqId=reqId){
log "Creating a new event failed " + cfail.toString() at ERROR;
}
}
}
Updating an existing event
You can update the text
field.
send Event("EVENT_ID","c8y_SampleEvent","SOURCE",TIME,
"Event Updated",new dictionary<string,any>) to Event.SEND_CHANNEL;
Where
EVENT_ID
is the identifier of the previously created event. The presence ofEVENT_ID
indicates that the request is for updating an existing event.
params
dictionary cannot be manipulated to delete fragments in the Event
. However, the fragments in the Event
can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest
events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.Querying for events
To search for an event or a collection of events, send the com.apama.cumulocity.FindEvent
event to Cumulocity, with a unique reqId
to the com.apama.cumulocity.FindEvent.SEND_CHANNEL
channel.
The transport will then respond with zero or more com.apama.cumulocity.FindEventResponse
events and then one com.apama.cumulocity.FindEventResponseAck
event on the com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL
channel.
dateFrom
or dateTo
parameters), the latest events are returned first. This is different from measurements and operations where the oldest items are always returned first, regardless of whether the query is a range query or not. If you want to reverse the order, see the description of the revert
parameter below.Example:
integer reqId := com.apama.cumulocity.Util.generateReqId();
com.apama.cumulocity.FindEvent request := new com.apama.cumulocity.FindEvent;
request.reqId := reqId;
// Filter based on event type
request.params.add("type", "c8y_DoorOpenedEvent");
// Subscribe to com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL to listen
// for responses
monitor.subscribe(com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL);
// Listen for responses based on reqId
on all com.apama.cumulocity.FindEventResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindEventResponseAck(reqId=reqId)
{
log "Received Event " + response.toString() at INFO;
}
// Listen for com.apama.cumulocity.FindEventResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindEventResponseAck(reqId=reqId) as requestCompleted
{
log "Received all Event(s) for request " +
requestCompleted.reqId.toString() at INFO;
// Request is completed and we can unsubscribe from this channel
monitor.unsubscribe(com.apama.cumulocity.FindEventResponse.SUBSCRIBE_CHANNEL);
}
// Send request to find available events
send request to com.apama.cumulocity.FindEvent.SEND_CHANNEL;
Query parameters
Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all events in the Cumulocity OpenAPI Specification.
Parameter | Description |
---|---|
id |
Search for an event based on eventId . When searching for an event based on Id , all the query parameters listed below are ignored. |
source |
Search for events based on the device identifier or asset identifier of the source. |
type |
Search for events based on the type. |
dateFrom |
Search for events from a start date. The date and time is provided as seconds since the epoch. |
dateTo |
Search for events within a time range. This is to be used in combination with dateFrom . The date and time is provided as seconds since the epoch. |
fromCreationDate |
Similar to dateFrom , but fetches the events based on the creation date. The date and time is provided as seconds since the epoch. |
toCreationDate |
Search for events that have been created within a date range. This is to be used in combination with fromCreationDate . The date and time is provided as seconds since the epoch. |
pageSize |
Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the events, but each request is larger. By default, 1000 events are in each page and there is an upper limit of 2000. |
currentPage |
Retrieve a specific page of results for the given pageSize . If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested. |
revert |
Boolean parameter. If a range query is used (that is, the query includes at least one of the dateFrom or dateTo parameters), Cumulocity, by default, returns the latest events first. You can reverse the order in which the matching events are returned by adding the query parameter revert=true . This returns the oldest events first. (Cumulocity returns the oldest events first by default when neither the dateFrom nor the dateTo parameter is supplied.) |
Query result paging
Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.
Using measurements
Measurement events contain the identifier of the source of the measurement, the type of measurement, timestamp, and a dictionary of values which contain the numeric value, units and optional type, quantity and state.
Examples of measurement events:
Measurement("1001","c8y_LightMeasurement","12346081",1464359004.89,
{"c8y_LightMeasurement": {"e":com.apama.cumulocity.MeasurementValue(108.1,
"lux", new dictionary<string, any>)}},new dictionary<string, any>)
Measurement("1002","c8y_DistanceMeasurement","12346082",1464359005.396,
{"c8y_DistanceMeasurement": {"distance":com.apama.cumulocity.MeasurementValue
(344,"mm","","","", dictionary<string, any>)}}, dictionary<string, any>)
Example
The following is a simple example of how to receive, create and send measurements:
// Subscribe to receive all the measurements published from Cumulocity
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
// Consume all the measurements from Cumulocity
on all Measurement() as m {
log m.toString() at INFO;
}
// Create a new measurement
Measurement m := new Measurement;
m.source := "<MANAGED_OBJECT_ID>";
m.time := currentTime;
m.type := "TemperatureMeasurement";
MeasurementValue mv := new MeasurementValue;
mv.value := 100.0;
dictionary<string, MeasurementValue> fragment
:= new dictionary<string, MeasurementValue>;
fragment.add("temperature", mv);
m.measurements.add("TemperatureMeasurement", fragment);
send m to Measurement.SEND_CHANNEL;
Creating a new measurement
Measurement m := new Measurement;
m.type := *<MEASUREMENT\_TYPE>*;
m.source := *<SOURCE>*;
m.time := currentTime;
MeasurementValue mv := new MeasurementValue;
mv.value := 1.0;
mv.unit := "V";
dictionary<string, MeasurementValue> dict := {"voltage": mv};
m.measurements.add(m.type, dict);
send m to Measurement.SEND_CHANNEL;
Where
SOURCE
is the source of the measurement (same as theManagedObject
identifier).MEASUREMENT_TYPE
is the type of the measurement. For example,c8y_VoltageMeasurement
.
Sending measurements requesting response and setting headers
When creating a new object, it is recommended that you use the withChannelResponse
action. This allows your application to receive a response on completion on the Measurement.SUBSCRIBE_CHANNEL
channel. You will need to subscribe to the Measurement.SUBSCRIBE_CHANNEL
channel first. The response can be one of two possibilities:
-
ObjectCommitted
This includes the
reqId
which is the identifier of the original request, theId
which is the identifier of the newly created or updated object, and the actual object in JSON form. -
ObjectCommitFailed
This includes the
reqId
which is the identifier of the original request, thestatusCode
which is the HTTP status code of the failure, and thebody
which is the content of the response from the API (this might be in HTML format).
When using withChannelResponse
, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.
It is worth noting that when using withChannelResponse
for measurements, it is not able to achieve the same throughput as sending them without a response. As they are not batched into a single HTTP request, there are just individual create/update requests sent to Cumulocity.
Example of creating a measurement:
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor Test {
string deviceId; // Where this is populated from the actual device Id.
float timestamp; // Where this is populated from the timestamp of the
// device.
action onload {
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
Measurement mo := new Measurement;
mo.type := "test_measurement";
mo.source := deviceId;
mo.time := timestamp;
//Create a Measurement with two Measurement fragments.
MeasurementValue mv1 := new MeasurementValue;
mv1.value := 10.2;
mv1.unit := "km/hr";
MeasurementValue mv2 := new MeasurementValue;
mv2.value := 11.7;
mv2.unit := "km/hr";
dictionary<string, MeasurementValue> dict :=
{"speedX": mv1, "speedY": mv2};
mo.measurements.add("c8y_speed", dict);
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Create a new Measurement in Cumulocity, ensuring that a response is
// returned
// and the processing mode, indicating how to process the request,
// sent to Cumulocity is Transient.
send mo.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
"Transient" }) to Measurement.SEND_CHANNEL;
// If the Measurement creation succeeded do something with the returned
// object or id.
on ObjectCommitted(reqId=reqId) as c and not
ObjectCommitFailed(reqId=reqId){
log "New measurement successfully created " + c.toString() at INFO;
}
// If the Measurement creation failed, log an error.
on ObjectCommitFailed(reqId=reqId) as cfail and not
ObjectCommitted(reqId=reqId){
log "Creating a new event failed " + cfail.toString() at ERROR;
}
}
}
Querying for measurements
To search for a measurement or a collection of measurements, send the com.apama.cumulocity.FindMeasurement
event to Cumulocity, with a unique reqId
to the com.apama.cumulocity.FindMeasurement.SEND_CHANNEL
channel.
The transport will then respond with zero or more com.apama.cumulocity.FindMeasurementResponse
events and then one com.apama.cumulocity.FindMeasurementResponseAck
event on the com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL
channel.
Example:
integer reqId := com.apama.cumulocity.Util.generateReqId();
com.apama.cumulocity.FindMeasurement request :=
new com.apama.cumulocity.FindMeasurement;
request.reqId := reqId;
// Filter based on measurement fragment type and series
request.params.add("valueFragmentType", "c8y_MotionMeasurement");
request.params.add("valueFragmentSeries", "motionDetected");
// Subscribe to com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL
// to listen for responses
monitor.subscribe(com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL);
// Listen for responses based on reqId
on all com.apama.cumulocity.FindMeasurementResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindMeasurementResponseAck(reqId=reqId)
{
log "Received Measurement " + response.toString() at INFO;
}
// Listen for com.apama.cumulocity.FindMeasurementResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindMeasurementResponseAck(reqId=reqId)
as requestCompleted
{
log "Received all Measurement(s) for request "
+ requestCompleted.reqId.toString() at INFO;
// Request is completed and we can unsubscribe from this channel
monitor.unsubscribe(com.apama.cumulocity.FindMeasurementResponse.SUBSCRIBE_CHANNEL);
}
// Send request to find available measurements
send request to com.apama.cumulocity.FindMeasurement.SEND_CHANNEL;
Query parameters
Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve all measurements in the Cumulocity OpenAPI Specification.
Parameter | Description |
---|---|
id |
Search for a measurement based on measurementId . When searching for a measurement based on Id , all the query parameters listed below are ignored. |
source |
Search for measurements based on the device identifier or asset identifier of the source. |
type |
Search for measurements based on the type. |
valueFragmentType |
Search for measurements based on fragment type (should be used with valueFragmentSeries ). |
valueFragmentSeries |
Search for measurements based on fragment series (should be used with valueFragmentType ). |
dateFrom |
Search for measurements from a start date. The date and time is provided as seconds since the epoch. |
dateTo |
Search for measurements within a time range. This is to be used in combination with dateFrom . The date and time is provided as seconds since the epoch. |
pageSize |
Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the measurements, but each request is larger. By default, 1000 measurements are in each page and there is an upper limit of 2000. |
currentPage |
Retrieve a specific page of results for the given pageSize . If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested. |
revert |
Boolean parameter. If a range query is used (that is, the query includes at least one of the dateFrom or dateTo parameters), you can reverse the order in which the matching measurements are returned by adding the query parameter revert=true . This returns the latest measurements first. By default, Cumulocity returns the oldest measurements first. |
Query result paging
Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.
Using measurement fragments
A MeasurementFragment
event represents a single fragment/series on a measurement.
Creating measurement fragments
You can send a single fragment to Cumulocity to create a single-fragment measurement.
Example of creating a measurement fragment:
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor Test {
string deviceId; // Where this is populated from the actual device Id.
float timestamp; // Where this is populated from the timestamp of the
// device.
action onload {
MeasurementFragment mf := new MeasurementFragment;
mf.type := "test_measurement";
mf.source := deviceId;
mf.time := timestamp;
mf.valueFragment := "c8y_speed";
mf.valueSeries := "speedX";
mf.value := 12.0;
mf.unit := "km/hr";
integer reqId := com.apama.cumulocity.Util.generateReqId();
send mf to MeasurementFragment.SEND_CHANNEL;
}
}
Where
source
is the source of the measurement.time
is the time at which the measurement was taken.type
is the type of the measurement.valueFragment
is the name of the fragment of the measurement fragment.valueSeries
is the name of the series of the measurement fragment.value
is the value from the sensor.unit
is the units the reading is in, for example, mm, lux, km/hr.
Sending measurement fragments requesting a response and setting headers
When creating a new object, it is recommended that you use the withChannelResponse
action. This allows your application to receive a response on completion on the MeasurementFragment.SUBSCRIBE_CHANNEL
channel. You will need to subscribe to the MeasurementFragment.SUBSCRIBE_CHANNEL
channel first. The response can be one of two possibilities:
-
ObjectCommitted
This includes the
reqId
which is the identifier of the original request, theId
which is the identifier of the newly created or updated object, and the actual object in JSON form. -
ObjectCommitFailed
This includes the
reqId
which is the identifier of the original request, thestatusCode
which is the HTTP status code of the failure, and thebody
which is the content of the response from the API (this might be in HTML format).
When using withChannelResponse
, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.
Example of creating a measurement fragment:
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor Test {
string deviceId; // Where this is populated from the actual device Id.
string timestamp; // Where this is populated from the timestamp of the
// device.
action onload {
monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
MeasurementFragment mf := new MeasurementFragment;
mf.type := "test_measurement";
mf.source := deviceId;
mf.time := timestamp;
mf.valueFragment := "c8y_speed";
mf.valueSeries := "speedX";
mf.value := 12.0;
mf.unit := "km/hr";
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Create a new Measurement in Cumulocity from a single
// MeasurementFragment, ensuring that a response is returned
// and the processing mode, indicating how to process the request, sent
// to Cumulocity is Transient.
send mf.withChannelResponse(reqId, { "X-Cumulocity-Processing-Mode":
"Transient" }) to MeasurementFragment.SEND_CHANNEL;
// If the Measurement creation succeeded do something with the returned
// object or id.
on ObjectCommitted(reqId=reqId) as c and not
ObjectCommitFailed(reqId=reqId){
log "New measurement successfully created " + c.toString() at INFO;
}
// If the Measurement creation failed, log an error.
on ObjectCommitFailed(reqId=reqId) as cfail and not
ObjectCommitted(reqId=reqId){
log "Creating a new measurement failed " + cfail.toString()
at ERROR;
}
}
}
Listening for measurement fragments
The Apama mapping codec can turn measurements into measurement fragments, and listeners in EPL can match on the elements of measurement fragments rather than measurements.
Example - matching on measurement fragments:
on all MeasurementFragment(valueFragment = 'c8y_speed', valueSeries = 'speedX',
value > SPEED_LIMIT) as mf {
}
Turning measurement fragments on/off
To be able to match based on measurement fragments, you must ensure they are returned by setting the correct measurement format. There are two modes available, MEASUREMENT_ONLY
and BOTH
. The default, if it is not set or set incorrectly, is MEASUREMENT_ONLY
. Set the mode to BOTH
if you require filtering based on fragments or series.
If you are deploying a custom microservice, connecting to Cumulocity from an external correlator, or using Apama Plugin for Eclipse, you can set the mode in the CumulocityIoT.properties
file (see also Configuring the Cumulocity IoT transport) or directly on the command line to start the correlator by setting the CUMULOCITY_MEASUREMENT_FORMAT
value.
The recommended approach is to set the mode from the .properties
file. For example, to turn measurement fragments on:
CUMULOCITY_MEASUREMENT_FORMAT=BOTH
Alternatively, you can set the mode using the command line. For example, to turn measurement fragments off:
-DCUMULOCITY_MEASUREMENT_FORMAT=MEASUREMENT_ONLY
BOTH
, but existing projects will retain their previous configuration. If you want to enable fragments in an existing project, you may need to remove and re-add the bundle.When you deploy (activate) an EPL app directly in Cumulocity using the Streaming Analytics application, both measurements and measurement fragments are always available (this is always BOTH
). See the Streaming Analytics guide at https://cumulocity.com/docs/streaming-analytics/overview-analytics/ for more information.
Handling measurement fragments
It is possible to separate the individual fragments from the contents of a Measurement
into MeasurementFragment
objects, which can allow better performance matching in searches. You achieve this by using the sequence<MeasurementFragment> getFragments()
action on the Measurement
event. This returns a sequence of MeasurementFragment
objects.
You can generate a Measurement
event based on MeasurementFragment
objects. You can achieve this by using the static Measurement createFromFragments(sequence<MeasurementFragment> fragments)
action on the Measurement
event, where fragments
is the sequence of MeasurementFragment
objects to create it from, and it returns the created Measurement
.
Using operations
The com.apama.cumulocity.Operation
event represents a device operation. This event is sent to the com.apama.cumulocity.Operation.SUBSCRIBE_CHANNEL
(same as cumulocity.operations
) channel. This event contains the unique identifier of the operation (id
), the identifier of the source (deviceId
), a status, and optional parameters.
Example of an operation:
Operation("12345", "deviceId", Operation.STATUS_EXECUTING, params)
where params
is a dictionary of string keys and any
values (dictionary<string, any>
).
Make sure to set the deviceId
field to the identifier of a managed object which has the com_cumulocity_model_Agent
fragment. The com_cumulocity_model_Agent
marks devices running a Cumulocity agent. Such devices receive all operations targeted to themselves and their children for routing (see also Device integration using REST in the Cumulocity documentation.
When creating a new operation, do not supply the id
field (that is, supply an empty string for the operation identifier).
It is not possible to set the params
field of an operation to an empty dictionary.
Example
The following is a simple example of how to receive, create and send operations:
// Subscribe to receive all the operations published from Cumulocity
monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
on all Operation() as o {
log o.toString() at INFO;
// Update an operation
o.status := Operation.STATUS_EXECUTING;
send o to Operation.SEND_CHANNEL;
}
// Create an operation
Operation operation := new Operation;
operation.source := "<MANAGED_OBJECT_ID>";
operation.status := Operation.STATUS_PENDING;
operation.params.add("c8y_Message", {"text": "Device Operation"});
send operation to Operation.SEND_CHANNEL;
Creating a new operation
send com.apama.cumulocity.Operation("","<SOURCE>",Operation.STATUS_<STATUS>,
{"c8y_Message":<any> {<any>"text":<any>"Hello Cumulocity device"}} )
to com.apama.cumulocity.Operation.SEND_CHANNEL;
Where
SOURCE
is the source of the operation (same as theManagedObject
identifier).STATUS
is the status of the operation. This can bePENDING
.
Sending operations requesting response and setting headers
When creating a new object or updating an existing one, it is recommended that you use the withChannelResponse
action. This allows your application to receive a response on completion on the Operation.SUBSCRIBE_CHANNEL
channel. You will need to subscribe to the Operation.SUBSCRIBE_CHANNEL
channel first. The response can be one of two possibilities:
-
ObjectCommitted
This includes the
reqId
which is the identifier of the original request, theId
which is the identifier of the newly created or updated object, and the actual object in JSON form. -
ObjectCommitFailed
This includes the
reqId
which is the identifier of the original request, thestatusCode
which is the HTTP status code of the failure, and thebody
which is the content of the response from the API (this might be in HTML format).
When using withChannelResponse
, it allows the ability to set headers. This can be used, for example, to determine what processing mode Cumulocity will use as shown in the example below.
Example of creating an operation:
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.ObjectCommitFailed;
using com.apama.cumulocity.ObjectCommitted;
monitor Test_Operations {
action onload {
monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
Operation op := new Operation;
string name := "CreateOperation";
op.source := "7104835";
op.status := Operation.STATUS_PENDING;
op.params :=
{"c8y_Meassage":any(dictionary<any,any>,
{any(string,"text"):
any(string,"Hello Cumulocity device")
})
};
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Create a new Operation in Cumulocity, ensuring that a response is
// returned.
send op.withChannelResponse(reqId, new dictionary<string, string>) to
Operation.SEND_CHANNEL;
// If the Operation creation succeeded do something with the returned
// object or id.
on ObjectCommitted(reqId=reqId) as c and not
ObjectCommitFailed(reqId=reqId){
log "New operation successfully created " + c.toString() at INFO;
}
// If the Operation creation failed, log an error.
on ObjectCommitFailed(reqId=reqId) as cfail and not
ObjectCommitted(reqId=reqId){
log "Creating a new event failed " + cfail.toString() at ERROR;
}
}
}
Updating an existing operation
You can update the status
field.
send com.apama.cumulocity.Operation("<OPERATION_ID>","<SOURCE>",Operation.STATUS_<STATUS>,
{"c8y_Message":<any> {<any>"text":<any>"Updated Cumulocity device"}} )
to com.apama.cumulocity.Operation.SEND_CHANNEL;
Where
OPERATION_ID
is the identifier of the previously created operation. The presence ofOPERATION_ID
indicates that the request is for updating an existing operation.SOURCE
is the source of the operation (same as theManagedObject
identifier).STATUS
is the status of the operation. This can bePENDING
,EXECUTING
,SUCCESSFUL
orFAILED
.
params
dictionary cannot be manipulated to delete fragments in the Operation
. However, the fragments in the Operation
can be deleted using the Cumulocity REST API, which can be invoked in EPL by using GenericRequest
events. For more information, see Invoking other parts of the Cumulocity REST API in this documentation and REST usage in the Cumulocity OpenAPI documentation.Querying for operations
To search for an operation or a collection of operations, send the com.apama.cumulocity.FindOperation
event to Cumulocity, with a unique reqId
to the com.apama.cumulocity.FindOperation.SEND_CHANNEL
channel.
The transport will then respond with zero or more com.apama.cumulocity.FindOperationResponse
events and then one com.apama.cumulocity.FindOperationResponseAck
event on the com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL
channel.
Example:
integer reqId := com.apama.cumulocity.Util.generateReqId();
com.apama.cumulocity.FindOperation request :=
new com.apama.cumulocity.FindOperation;
request.reqId := reqId;
// Filter based on operation status
request.params.add("status", Operation.STATUS_PENDING);
// Subscribe to com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL
// to listen for responses
monitor.subscribe(com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL);
// Listen for responses based on reqId
on all com.apama.cumulocity.FindOperationResponse(reqId=reqId) as response
// Avoid listener leaks by terminating the listener on completion of the request
and not com.apama.cumulocity.FindOperationResponseAck(reqId=reqId)
{
log "Received Operation " + response.toString() at INFO;
}
// Listen for com.apama.cumulocity.FindOperationResponseAck,
// this indicates that all responses have been received
on com.apama.cumulocity.FindOperationResponseAck(reqId=reqId)
as requestCompleted
{
log "Received all Operation(s) for request "
+ requestCompleted.reqId.toString() at INFO;
// Request is completed and we can unsubscribe from this channel
monitor.unsubscribe(com.apama.cumulocity.FindOperationResponse.SUBSCRIBE_CHANNEL);
}
// Send request to find available operations
send request to com.apama.cumulocity.FindOperation.SEND_CHANNEL;
Query parameters
Some common query parameters that you can include in the request are listed in the table below. For a complete list of allowed query parameters, see Retrieve a list of operations in the Cumulocity OpenAPI Specification.
Parameter | Description |
---|---|
id |
Search for an operation based on operationId . When searching for an operation based on Id , all the query parameters listed below are ignored. |
source |
Search for operations based on the device identifier or asset identifier of the source. |
status |
Search for operations based on the status. The status can be any of SUCCESSFUL , FAILED , EXECUTING or PENDING . |
agent |
Search for operations based on the agent identifier. |
fragmentType |
Search for operations based on the fragment type. |
pageSize |
Indicates how many entries of the collection are to be retrieved from Cumulocity. This is a batching parameter for getting multiple responses from Cumulocity. A larger pageSize does fewer requests to Cumulocity to retrieve all the operations, but each request is larger. By default, 1000 operations are in each page and there is an upper limit of 2000. |
currentPage |
Retrieve a specific page of results for the given pageSize . If currentPage is set, then only a single page is requested. If currentPage is not set (default), all the pages are requested. |
dateFrom |
Search for operations from a start date. The date and time is provided as seconds since the epoch. |
dateTo |
Search for operations within a time range. This is to be used in combination with dateFrom . The date and time is provided as seconds since the epoch. |
revert |
Boolean parameter. If a range query is used (that is, the query includes at least one of the dateFrom or dateTo parameters), you can reverse the order in which the matching operations are returned by adding the query parameter revert=true . This returns the latest operations first. By default, Cumulocity returns the oldest operations first. |
Query result paging
Cumulocity queries support paging of data for requesting a specific range of the responses received. For more information, see Paging Cumulocity queries.
Receiving update notifications
The Cumulocity transport can receive update notifications on new measurements, events, alarms, managed objects and operations that are processed by the Cumulocity platform. To receive these notifications, you must add either the Cumulocity Notifications 2.0 bundle (recommended) or the Cumulocity Client bundle (deprecated). For details on how to add the bundles, see Loading the Cumulocity transport.
QUIESCENT
or CEP
processing modes. It only receives messages if they are sent in the PERSISTENT
or TRANSIENT
processing modes. We recommend using the Notifications 2.0 bundle instead, which does not have these limitations.When a notification about a managed object, operation, alarm or event is sent, the params
dictionary member will contain a property which signals whether the notification is a new object or an update to an existing object. The property name is in the constant PARAM_NOTIFICATION
and has the value corresponding to the value of the constants NOTIFICATION_CREATED
or NOTIFICATION_UPDATED
. The recommended way to distinguish between create and update events is to use the isCreate()
and isUpdate()
actions which are available on these events, as shown in the example below.
Measurements are not modifiable in Cumulocity, so all measurement notifications are newly-created objects.
Example:
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.ManagedObjectDeleted;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementDeleted;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.EventDeleted;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Operation;
monitor NotificationListener {
action onload {
// Subscribe for notification for managed objects
monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
// Subscribe for notification for measurements
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
// Subscribe for notification for events
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
// Subscribe for notification for alarms
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
// Subscribe for notification for operations
monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
// Listen for notifications for managed objects
on all ManagedObject() as managedObject {
if managedObject.isCreate() {
log "ManagedObject created" at INFO;
}
else if managedObject.isUpdate() {
log "ManagedObject updated" at INFO;
}
}
// Listen for notifications on deleted managed objects
on all ManagedObjectDeleted() as managedObjectDeleted {
log "ManagedObject deleted" at INFO;
}
// Listen for notifications for measurements
on all Measurement() as measurement {
// Measurements can only be created
log "Measurement created" at INFO;
}
// Listen for notifications on deleted measurements
on all MeasurementDeleted() as measurementDeleted {
log "Measurement deleted" at INFO;
}
// Listen for notifications for events
on all Event() as evt {
if evt.isCreate() {
log "Event created" at INFO;
}
else if evt.isUpdate() {
log "Event updated" at INFO;
}
}
// Listen for notifications on deleted events
on all EventDeleted() as eventDeleted {
log "Event deleted" at INFO;
}
// Listen for notifications for alarms
on all Alarm() as alarm {
if alarm.isCreate() {
log "Alarm created" at INFO;
}
else if alarm.isUpdate() {
log "Alarm updated" at INFO;
}
}
// Listen for notifications for operations
on all Operation() as operation {
if operation.isCreate() {
log "Operation created" at INFO;
}
else if operation.isUpdate() {
log "Operation updated" at INFO;
}
}
}
}
Paging Cumulocity queries
Queries support paging when requesting multiple responses from Cumulocity. The number of objects requested by queries are controlled using the following query parameters:
-
pageSize
represents a batching parameter for getting multiple responses from Cumulocity. A largerpageSize
does fewer requests to Cumulocity to retrieve all the objects, but each request is larger. By default,pageSize
is set to 1000. There is an upper limit of 2000. -
currentPage
can be set to retrieve a specific page of results for a givenpageSize
. If you setcurrentPage
, then only a single page is requested. IfcurrentPage
is not set (default), all the pages are requested.InfoIt is not recommended to set a small `pageSize` unless you are requesting a single page. To do this, you must set `currentPage`. Set `currentPage` to 1 to retrieve the first `pageSize` results. A warning is logged if `pageSize` is below 50 and `currentPage` is not set.
-
withTotalPages
is optional because it defaults totrue
, which means that a query includes full page statistics. If you setwithTotalPages
tofalse
, only a single page is requested.InfoIf you do not want to receive all pages, you can set `withTotalPages` to `false`, which can improve performance by ensuring that only a single page is requested. With this setting, it is not required to also set `currentPage` on a request.
For more details on query result paging and the above query parameters, see the information on the REST implementation in the Cumulocity - API Specifications at https://cumulocity.com/api/.
Examples
The following example shows a FindEvent
query where the first 50 events are requested:
// Example 1: A FindEvent query where the first 50 responses are requested.
com.apama.cumulocity.FindEvent request := new com.apama.cumulocity.FindEvent;
//... adding other query params...
request.params.add("pageSize", "50");
request.params.add("currentPage", "1");
send request to com.apama.cumulocity.FindEvent.CHANNEL;
The next two examples demonstrate how to request a range of events where we are not just interested in the first page of results.
In the second example, pageSize
is also set to 50, but currentPage
is set to 3, thus requesting the 101st to 150th events:
// Example 2: A FindEvent query where the 101st-150th responses are requested
com.apama.cumulocity.FindEvent request := new com.apama.cumulocity.FindEvent;
//... adding other query params...
request.params.add("pageSize", "50");
request.params.add("currentPage", "3");
send request to com.apama.cumulocity.FindEvent.CHANNEL;
As a general rule, if currentPage
is greater than 1 and the number of objects you require is not a factor of the start of the range (or if the number of responses required is greater than the upper limit of pageSize
), multiple requests are needed to retrieve the objects of interest. As the second example above retrieves 50 events starting after the 100th response (and as 50 is a factor of 100), only 1 request is required.
The third example illustrates a situation where multiple queries are required. 40 events are to be retrieved, starting after the 60th response. As 40 is not a factor of 60, you should set pageSize
to 20 (the largest common factor of 60 and 40) and send two requests: one where currentPage
is set to 4 (this retrieves the 61st-80th events), and another where currentPage
is set to 5 (this retrieves the 81st-100th events).
// Example 3: Two FindEvent queries retrieving the 61st-100th events
// First request retrieves 61st-80th events
com.apama.cumulocity.FindEvent request1 := new com.apama.cumulocity.FindEvent;
//... adding other query params....
request1.params.add("pageSize", "20");
request1.params.add("currentPage", "4");
send request1 to com.apama.cumulocity.FindEvent.CHANNEL;
// Second request retrieves 81st-100th events
com.apama.cumulocity.FindEvent request2 := new com.apama.cumulocity.FindEvent;
//... adding other query params....
request2.params.add("pageSize", "20");
request2.params.add("currentPage", "5");
send request2 to com.apama.cumulocity.FindEvent.CHANNEL;
Invoking other parts of the Cumulocity REST API
For individual event types, Apama provides event-specific interfaces for performing actions. See Using managed objects, Using alarms, Using events, Using measurements, or Using operations for more information. This topic covers how to interact with the remaining Cumulocity REST API.
Typically, interacting with the REST API or making HTTP requests through the Cumulocity platform requires manual setup, including configuring tenant information and authentication methods. The generic request/response interface described below does this for you. This is also the case for users creating EPL apps with the Streaming Analytics application in Cumulocity.
GenericRequest
/GenericResponse
interface can be used for any request to the platform, it makes several assumptions about the request and response format that the Cumulocity REST API guarantees. The CumulocityRequestInterface
event described in Invoking microservices is more appropriate for low-level requests, such as making requests to other microservices.Create a new com.apama.cumulocity.GenericRequest
using new GenericRequest
and then individually set whichever fields are needed by name. You must always set the reqId
(which is used to tie requests and responses together) to a unique identifier generated by the com.apama.cumulocity.Util.generateReqId()
action. You will also need to set the HTTP method
(also known as the verb) and path
. For some APIs, you will also need the queryParams
(which populates the query string), body
(typically a sequence or dictionary that will be converted to JSON by the plug-in) and/or additional HTTP headers
. The GenericRequest
event should be sent to the channel specified by the GenericRequest.SEND_CHANNEL
constant.
To receive responses, you must subscribe to the channel given in the GenericResponse.SUBSCRIBE_CHANNEL
constant. The response events will contain the reqId
identifier specified in the request, as well as a body
in a dictionary<any,any>
where the AnyExtractor
can be used to extract the expected content. This dictionary contains a structure which is equivalent to the JSON payload returned by Cumulocity. For the cases where no body is expected in the response (for example, for a DELETE
request), only a GenericResponseComplete
event will be received for the request identifier.
When using an API which returns a collection, the results are automatically split into multiple GenericResponse
events, followed by a GenericResponseComplete
, all with the reqId
identifier provided in the request.
Here is a simple example of using the API:
GenericRequest request := new GenericRequest;
request.reqId := com.apama.cumulocity.Util.generateReqId();
request.method := "GET";
request.isPaging := true;
request.path := "/measurement/measurements";
monitor.subscribe(GenericResponse.SUBSCRIBE_CHANNEL);
on all GenericResponse(reqId=request.reqId) as response
and not GenericResponseComplete(reqId=request.reqId)
{
AnyExtractor dict := AnyExtractor(response.getBody());
AnyExtractor source := AnyExtractor(dict.getDictionary("source"));
try{
AnyExtractor speed :=
AnyExtractor(dict.getDictionary("c8y_SpeedMeasurement")["speed"]);
log "Found measurement of type: c8y_SpeedMeasurement with id : " +
dict.getString("id") + " and source id :" + source.getString("id") +
" and speed "+speed.getFloat("value").toString()+
" "+speed.getString("unit")
at INFO;
}
catch(Exception e){
log "Failed to parse unexpected measurement : " +
dict.toString() at WARN;
}
}
on GenericResponseComplete(reqId=request.reqId)
{
monitor.unsubscribe(GenericResponse.SUBSCRIBE_CHANNEL);
}
send request to GenericRequest.SEND_CHANNEL;
Invoking microservices
The Cumulocity transport has a CumulocityRequestInterface
event that allows you to invoke microservices (or other HTTP requests) within Cumulocity. This interface automatically handles authentication. It can be used from an Apama instance outside of Cumulocity and from within Cumulocity, either as an EPL app within the Streaming Analytics application or in a custom microservice.
CumulocityRequestInterface
can be used to interact with the REST API, we recommend that you use the GenericRequest
/GenericResponse
interface described in Invoking other parts of the Cumulocity REST API.Before you can create an HTTP request, you need to call a static connectToCumulocity()
action in order to connect (as shown in the later example). The following is the format of the action on the helper class that you call to create a request:
action createRequest(string method, string path, any payload) returns Request
Pass the following:
- The specific type of HTTP request that is to be created, such as
GET
orPUT
. - A specific path that you want to append to your request. For example, the path for a microservice that is running on your desired tenant:
/service/myMicroService/path/under/microservice
. - The payload will be encoded as JSON. For example, a dictionary will be converted to a JSON object.
This action will return an instance of a Request
from the generic HTTP API (see also Using predefined generic event definitions to invoke HTTP services with JSON and string payloads) with configuration set up on the request. You can later call execute
on this request, passing in a handler to deal with any response.
The following example shows how to make use of this class, that is, how to make an HTTP request in order to retrieve information from a running microservice:
monitor CumulocityTestMonitor {
action onload() {
try{
CumulocityRequestInterface cInterface :=
CumulocityRequestInterface.connectToCumulocity();
Request req := cInterface.createRequest("GET",
"/service/myMicroService/path/under/microservice",
{"request":"data"});
req.execute(getHandler);
}
catch (com.apama.exceptions.Exception e) {
log "Error thrown trying to create a Cumulocity Request " +
e.toString() at ERROR;
}
}
action getHandler(Response resp) {
AnyExtractor d := resp.payload;
log "Response output: " + d.toString() at INFO;
log "Test Done";
}
}
The CumulocityRequestInterface
will automatically detect if it is running inside or outside of Cumulocity and will automatically connect. If running remotely, it will rely on properties being set, which will be the connection details provided for the transport. You can do this by creating a .properties
file in your project and specifying it with the --config
option when starting a correlator (see Starting the correlator).
Optionally, you can set the following if you are connecting to a Cumulocity server with a self-signed or private certificate. Set this to the path to the certificate authority file by which the server’s certificate was signed:
CUMULOCITY_TLS_CERT_AUTH_FILE
The helper class is included in the Utilities for Cumulocity bundle in Apama Plugin for Eclipse. You can also find it in the monitor/cumulocity
directory of your Apama installation.
Monitoring status for Cumulocity
There are two sets of status values provided via the user status mechanism. Requests to the platform provide the metrics listed in the table below, where prefix
is:
user-{chainId=CumulocityIoTGenericChain}.cumulocityCodec.
in the correlator status, andsag_apama_correlator_user_cumulocityCodec_
as a Prometheus metric, with the labelchainId=CumulocityIoTGenericChain
, and the additional labeltenantId
in a multi-tenant microservice.
event
in the table below stands for one of Alarm
, Measurement
, Event
, Operation
or ManagedObject
.
Key |
Description |
---|---|
|
Maximum request latency observed during the last hour, in milliseconds. |
|
Details of the maximum latency request. Consists of a tab-separated string containing the following: - ISO format timestamp in UTC,
|
|
Details of the most recent slow request. A request is slow if the request-response multiplied by the number of pages (or 1) is above
|
|
A quickly-evolving exponentially-weighted moving average of request latencies, in milliseconds. Uses 0.5 as the weight to calculate this. This puts more importance on recent latencies than |
|
A longer-term exponentially-weighted moving average of request latencies, in milliseconds. Uses 0.1 as the weight to calculate this. |
|
Count of the |
|
Count of the |
|
An exponentially-weighted moving average of the size of the |
|
An exponentially-weighted moving average of the size of the |
You can find additional status information relating to the Cumulocity transport in the status elements from the HTTP client (see also Monitoring status for the HTTP client). These start with the following prefix:
user-{chainId=CumulocityIoTGenericChain}.httpClient.
in the correlator status, orsag_apama_correlator_user_httpClient_
as a Prometheus metric, with the labelchainId=CumulocityIoTGenericChain
.
When using the Cumulocity Notifications 2.0 bundle, the metrics in the table below relating to receiving notifications are provided with the following prefix:
user-notifications{tenantId=TENANT,subscriber=SUBSCRIBER,subscription=SUBSCRIPTION,client=N}.
in the correlator status, orsag_apama_correlator_user_notifications_
as Prometheus metrics, with labels fortenantId
,subscriber
,subscription
, andclient
.
Key | Description |
---|---|
prefix.batchAccumulatorCodec.queueSize |
Size of the queue in the receiving chain. |
prefix.cumulocityNotifications2Codec.alarmsReceived |
Count of alarms received. |
prefix.cumulocityNotifications2Codec.eventsReceived |
Count of events received. |
prefix.cumulocityNotifications2Codec.managedObjectsReceived |
Count of managed objects received. |
prefix.cumulocityNotifications2Codec.measurementsReceived |
Count of measurements received. |
prefix.cumulocityNotifications2Codec.operationsReceived |
Count of operations received. |
prefix.pulsarTransport.messagesAcknowleged |
Count of messages acknowledged. |
prefix.pulsarTransport.messagesReceived |
Count of messages received. |
prefix.pulsarTransport.messagesRedelivered |
Count of redelivered messages received. |
prefix.pulsarTransport.receivedSizeEWMABytes |
Exponentially weighted moving average (EWMA) of size of received messages, in bytes. |
prefix.numClients |
Configured number of parallel clients. |
For more information about monitor status information published by the correlator, see Managing and monitoring over REST and Watching correlator runtime status.
Finding tenant options
In order to find the available tenant options on a tenant, you can send the event com.apama.cumulocity.FindTenantOptions
to FindTenantOptions.SEND_CHANNEL
. This results in events of type com.apama.cumulocity.FindTenantOptionsResponse
being returned on com.apama.cumulocity.FindTenantOptionsResponse.SUBSCRIBE_CHANNEL
.
The returned event includes a sequence of com.apama.cumulocity.TenantOption
, which individually include the key/value combinations that represent the available tenant option.
In order to filter the tenant options returned, you can specify values in the category
and key
fields of the find request. If only the key is specified, then Cumulocity returns all the available tenant options and the correlator filters them by the key.
Getting user details
You can get the details of the current user by sending the event com.apama.cumulocity.GetCurrentUser
to com.apama.cumulocity.GetCurrentUser.SEND_CHANNEL
. This results in events of type com.apama.cumulocity.GetCurrentUserResponse
being returned on com.apama.cumulocity.GetCurrentUserResponse.SUBSCRIBE_CHANNEL
.
The com.apama.cumulocity.GetCurrentUserResponse
returned contains a com.apama.cumulocity.CurrentUser
event, which in turn contains the id
of the user, the userName
, a sequence of effectiveRoles
for the user and a dictionary of userOptions
.
By default, this is the user which the Apama application is running as. This is either the user configured in the Cumulocity connection if it is not running within Cumulocity or the service user of the microservice if it is running in Cumulocity.
The ability to request details of permissions for another user can be done by overriding the authorization or cookies headers in the com.apama.cumulocity.GetCurrentUser
event. This would normally be used if you are taking the authentication details from a request to your application and using them to determine the roles that user has.
Example - checking a user based on information in a received request:
/** Event containing extracted information retrieved from a
* http request where we want to check the validity of the user */
event ActionRequest {
string authorization;
string actionToTake;
string requestId;
string channel;
}
/** Response for the HTTP request */
event ActionResponse {
string requestId;
string actionResult;
}
/** Response if authorization failed */
event ActionNotAllowed {
string requestId;
}
...
monitor.subscribe(GetCurrentUserResponse.SUBSCRIBE_CHANNEL);
// Listen for incoming HTTP requests
on all ActionRequest() as ar {
integer reqId := com.apama.cumulocity.Util.generateReqId();
// Send a request to check the user from the incoming request
GetCurrentUser checkUser := new GetCurrentUser;
checkUser.reqId := reqId;
checkUser.authorization := ar.authorization;
send checkUser to GetCurrentUser.SEND_CHANNEL;
// if authentication passed, check authorization
on GetCurrentUserResponse(reqId=reqId) as res and not
GetCurrentUserResponseFailed(reqId=reqId) {
if checkHasRoles("ActionAllowed", res.user.effectiveRoles) {
send ActionResponse(ar.requestId, performAction(ar.actionToTake))
to ar.channel;
} else {
send ActionNotAllowed(ar.requestId) to ar.channel;
}
}
// if authentication failed, return an error
on GetCurrentUserResponseFailed(reqId=reqId) as err and not
GetCurrentUserResponse(reqId=reqId) {
send ActionNotAllowed(ar.requestId) to ar.channel;
}
}
action performAction(string actiontoTake) returns string{
// do some action
return "";
}
action checkHasRoles(string role,sequence<Role> effectiveRoles) returns boolean {
Role r;
for r in effectiveRoles {
if r.id = role{
return true;
}
}
return false;
}
}
You can override the current user in one of the following ways:
-
By setting the
authorization
header of the other user. This would be used for basic authentication and returns details of that other user.If this is invalid, then
GetCurrentUserResponseFailed
is returned. -
By setting both
authCookie
andxsrfToken
to valid values for another user. This returns details of that other user.If either
authCookie
orxsrfToken
are incorrect or not set, thenGetCurrentUserResponseFailed
is returned. -
Not setting all of
authorization
,authCookie
orxsrfToken
returns details of the current user.
Optimizing requests to Cumulocity with concurrent connections
In order to provide better performance in requests to the Cumulocity platform, you can configure the transport to use multiple client connections to perform requests concurrently. This can provide improved performance, but may also change the ordering in which requests are executed and responses are returned. By default, the Cumulocity transport tries to use multiple connections and restricts ordering to avoid races that may affect your EPL application. However, this may be either insufficiently concurrent or insufficiently ordered for your specific use case. In that case, there are several options on how to control the concurrency used. These are described below.
Default behavior
By default, three concurrent connections are created for handling requests to the Cumulocity platform for a given correlator process (a given tenant, for applications running inside the Cumulocity platform).
To avoid obvious races, the following rules apply for concurrency:
- All read (
GET
) requests can be performed concurrently with each other. - All update (
PUT
/POST
) requests relating to different managed objects (for example, aManagedObject
update, a newMeasurement
for a secondManagedObject
and anAlarm
with a thirdManagedObject
as a source) can be performed concurrently with each other. - All updates to a single managed object (based on
id
, orsource
as appropriate) are performed serially in the order they were sent to the transport. - Measurement updates sent without the
withChannelResponse
option that may be batched can be processed concurrently with any other request. - Any read request waits for all outstanding update requests before starting.
- Any update request waits for all outstanding read requests before starting.
GenericRequest
update (PUT
/POST
/DELETE
) operations cannot be automatically tied to aManagedObject
, so they are never processed concurrently with any other request.
Changing the default behavior
There are two options to change the default behavior through the CumulocityIoT.properties
file. These changes apply to the entire correlator. When running as an EPL application in the Cumulocity platform, this setting can be changed for your whole tenant as a tenant option.
- You can set the
CUMULOCITY_NUM_CLIENTS
property to control the number of clients used. If it is set to 1, then all requests are sent serially using a single connection (disables concurrency). If it is set to another number, then that number of concurrent connections to the platform is created. - You can add the
CUMULOCITY_CONCURRENCY_MODE
property to the properties file. By default, this has the value ofauto
which has the behavior described above to limit the concurrency to preserve some ordering. You can set the value toalways
instead, in which case requests of all types may be processed concurrently and any ordering consequences must be handled in EPL instead.
See also Configuring the Cumulocity transport.
Creating a new connection with specific behavior
The following table lists the event types from the com.apama.cumulocity
package that you can use to either create new connections for sending events independently of the default transport or use the default shared connection. In this way, multiple different configurations can be used at the same time. You can use these event types to separate the requests for different EPL applications, or different types of requests within the system. Each of these event types corresponds to a particular combination of settings that can be configured for the default transport. These events can be created using either the create
or the createForTenant
actions. The create
action is used when an application needs to work only with per-tenant deployment. The create
action may take an argument to configure the number of clients to use for the connection depending on the connection type. The createForTenant
action should be used when an application needs to work with multi-tenant deployment or both multi-tenant and per-tenant deployment. The createForTenant
action takes a TenantDetails
event as its first argument and may also take an argument to configure the number of clients to use for the connection depending on the connection type. The TenantDetails
event provides details of the subscribed tenant to the connection object so that it can provide information that is specific to the provided tenant. The TenantDetails
events can be obtained by using tenant subscription events. See Working with multi-tenant deployments for more details.
Event type |
Description |
---|---|
|
A connection with multiple clients and no restriction on ordering.The additional argument to the
Configuration equivalents:
|
|
A completely serial connection with no concurrency.There is no argument to the Configuration equivalent:
|
|
A connection with multiple clients but with the standard ordering restrictions.The additional argument to the
Configuration equivalents:
|
|
A connection to Cumulocity which is configured to handle all requests using a shared default client. |
FullyConcurrentConnection conn := FullyConcurrentConnection.create(5);
monitor.subscribe(conn.getChannel(FindManagedObjectResponse.SUBSCRIBE_CHANNEL));
FindManagedObject fmo := // create a request
send fmo to conn.getChannel(FindManagedObject.SEND_CHANNEL));
on FindManagedObjectResponse(reqId=fmo.reqId) {
// do something
monitor.unsubscribe(conn.getChannel(FindManagedObjectResponse.SUBSCRIBE_CHANNEL));
conn.destroy(); // can't use it after this
}
The connection object provides a getChannel
method which can be used to convert the standard channels used into the correct channels for events to go to this connection instead of the default transport.
For more details, see the com.apama.cumulocity
package in the API reference for EPL (ApamaDoc).
Writing your EPL to avoid races
If there are ordering issues due to concurrency, particularly if you want to use a fully-concurrent connection, then you may need to write your EPL to explicitly cater for the possible reordering. This is normally done by listening for the response events that each request has to ensure that it is fully completed before doing any requests which depend on the first one having completed.
For sending a creation or update event to an object that means using the withChannelResponse
API and listening for the com.apama.cumulocity.ObjectCommitted
or com.apama.cumulocity.ObjectCommitFailed
response. For example:
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
Measurement m := // create measurement
integer reqId := Util.generateReqId();
send m.withChannelResponse(reqId) to Measurement.SEND_CHANNEL;
on ObjectCommitted(reqId=reqId) and not ObjectCommitFailed(reqId=reqId) {
// do whatever must wait until the Measurement request has been fully completed
}
on ObjectCommitFailed(reqId=reqId) and not ObjectCommitted(reqId=reqId) {
// handle the fact that this failed
}
Working with multi-tenant deployments
When developing an application which can work in multi-tenant deployments, the application needs to handle subscription/unsubscription of tenants and sending/receiving events to/from subscribed tenants.
The preferred approach is to use the com.apama.cumulocity.TenantSubscriptionNotifier
event, which provides a callback-based API for notifications about tenant subscriptions and unsubscriptions. It automatically handles spawning new monitor instances for each subscribed tenant and terminating the instance when the tenant is unsubscribed. The monitor instances are spawned in a private context for each tenant. The callback actions are called for all currently subscribed tenants and for any future tenant subscriptions and unsubscriptions from the monitor instances spawned for those tenants. This keeps the state and processing for each tenant separate from other tenants and cleans up the monitor instances and listeners when tenants are unsubscribed.
The subscription callback provides a com.apama.cumulocity.TenantDetails
event, which is used to create connection objects for that tenant. The connection objects provide getChannel
actions to get tenant-specific channels to send events to and receive events from that tenant. See the documentation of the FullyConcurrentConnection
, SerialConnection
, AutoConcurrentConnection
, and SharedConnection
events in thecom.apama.cumulocity
package for more details.
Alternatively, the application can use the com.apama.cumulocity.GetAllTenantsSubscribedToApplication
event to get all tenants that are currently subscribed to the current application and listen for the com.apama.cumulocity.ApplicationSubscribedForTenant
and the com.apama.cumulocity.ApplicationUnsubscribedForTenant
events to get future tenant subscription and unsubscription notifications. The ApplicationSubscribedForTenant
event provides a com.apama.cumulocity.TenantDetails
event, which is used to create connection objects for that tenant. The ApplicationSubscribedForTenant
event also provides a context that is a private context only for the tenant and can be used for spawning new monitor instances. When using this approach, the application needs to manually take care of spawning new monitor instances for each subscribed tenant and terminate them when tenants are unsubscribed.
The application developed to handle multi-tenant deployments using the above-mentioned events works in per-tenant deployments as well.
The example below shows how to implement a simple threshold rule which raises an alarm when the value of the measurement is greater than the threshold.
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.Alarm;
/**
* This application monitors measurements and raises alarms if value is greater
* than the threshold.
*
*/
monitor ThresholdAlarmRule {
/** The measurement threshold value. */
constant float THRESHOLD := 10.0;
action onload() {
// Listen for measurements and raise an alarm if measurement exceeds a threshold.
monitorMeasurements();
}
/** Listen for measurements and raise an alarm if measurement exceeds a threshold. */
action monitorMeasurements() {
monitor.subscribe(MeasurementFragment.SUBSCRIBE_CHANNEL);
// Listen for all MeasurementFragment of interested types.
on all MeasurementFragment(type="my_measurement", valueFragment="my_frag", valueSeries="my_series") as m {
// Raise alarm if value is greater than the threshold.
if m.value > THRESHOLD {
Alarm alarm := Alarm("","my_alarm",m.source,currentTime,"Threshold exceeded",
Alarm.STATUS_ACTIVE,Alarm.SEVERITY_MINOR,1,new dictionary<string,any>);
// Get tenant-specific channel to which alarm must be sent.
send alarm to Alarm.SEND_CHANNEL;
}
}
}
}
The example below shows how to convert an existing per-tenant EPL application to also work in multi-tenant mode.
using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;
using com.apama.cumulocity.SharedConnection;
using com.apama.cumulocity.MeasurementFragment;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.notifications2.SubscribeTenantNotifications;
/**
* This application monitors measurements and raises alarms if value is greater
* than the threshold.
*
* The application monitors tenants subscriptions and unsubscriptions. It starts
* listening for measurements when a tenant is subscribed and stops it when tenant
* is unsubscribed.
*
* The application works correctly both in per-tenant and multi-tenant deployment.
*/
monitor ThresholdAlarmRule {
/** The measurement threshold value. */
constant float THRESHOLD := 10.0;
/** Connection object for the tenant. It is used to get correct channels for sending and receiving events.*/
SharedConnection connection;
action onload() {
// Create a new instance of TenantSubscriptionNotifier and register callbacks for
// subscriptions and unsubscriptions
TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().
onSubscription(tenantSubscribed).
onUnsubscription(tenantUnsubscribed);
}
/** Action which is called back when a tenant is subscribed. */
action tenantSubscribed(TenantDetails tenant) {
// Create a connection object for this tenant so that we can send events to
// this tenant and receive events from it.
// For most cases, SharedConnection connection is sufficient, but also see
// FullyConcurrentConnection, SerialConnection and AutoConcurrentConnection events
// to create connections with different semantics.
connection := SharedConnection.createForTenant(tenant);
// Now start to listen for measurements and raise an alarm if measurement exceeds a threshold.
monitorMeasurements();
// Subscribe this tenant to Notifications 2.0 events to receive the measurements.
// You can choose to listen for the SubscribeTenantNotificationsComplete event to wait for the connection to succeed.
send SubscribeTenantNotifications(tenant, com.apama.cumulocity.Util.generateReqId()) to SubscribeTenantNotifications.SEND_CHANNEL;
}
/** Listen for measurements and raise an alarm if measurement exceeds a threshold. */
action monitorMeasurements() {
monitor.subscribe(connection.getChannel(MeasurementFragment.SUBSCRIBE_CHANNEL));
// Listen for all MeasurementFragment of interested types.
on all MeasurementFragment(type="my_measurement", valueFragment="my_frag", valueSeries="my_series") as m {
// Raise alarm if value is greater than the threshold.
if m.value > THRESHOLD {
Alarm alarm := Alarm("","my_alarm",m.source,currentTime,"Threshold exceeded",
"ACTIVE","MINOR",1,new dictionary<string,any>);
// Get tenant-specific channel to which alarm must be sent.
send alarm to connection.getChannel(Alarm.SEND_CHANNEL);
}
}
}
/** Action which is called back when a tenant is unsubscribed. */
action tenantUnsubscribed(string tenantId) {
// Destroy the connection object.
connection.destroy();
// Do not need to explicitly terminate this monitor instance as it
// is automatically done.
}
action ondie {
// Destroy the connection object.
connection.destroy();
}
}
Building custom Cumulocity IoT microservices using Apama
You can use Apama to create custom microservices which can be deployed to Cumulocity. Unlike the built-in EPL apps functionality of the Streaming Analytics application, which supports deploying individual EPL files using the standard platform components deployed there, a custom microservice allows you to deploy a complete Apama project as you would run outside of the platform.
To create a custom microservice, you need to have an Apama project, either by usingApama Plugin for Eclipse or by using the apama_project
tool on the command line. Your project must include one of the Cumulocity connectivity bundles as described in Adding the Cumulocity connectivity plug-in to a project. When you configure these bundles via the .properties
files, the following properties, which are normally required, are automatically detected by the microservice:
CUMULOCITY_USERNAME
- uses a service user provided for the microservice instead.CUMULOCITY_PASSWORD
- uses a service user provided for the microservice instead.CUMULOCITY_APPKEY
- uses the microservice’s own application in the platform.CUMULOCITY_SERVER_URL
- automatically detected within the platform.
When using the Cumulocity Notifications 2.0 bundle:
CUMULOCITY_NOTIFICATIONS_SUBSCRIBER_NAME
- uses thecontextPath
for the microservice instead.
Otherwise, the project should be the same as you can run in an Apama that is deployed outside of the platform.
To actually build the microservice, you should first build a Docker image from your project. In Apama Plugin for Eclipse, use the Add Docker Support option from the context menu to create a Dockerfile from which you can build the microservice. For more information, see Building Apama projects during the Docker build.
You can use the Cumulocity microservice utility tool to build and deploy this project as a microservice (see Microservice utility tool in the Cumulocity documentation). Put your project inside a subdirectory for your microservice called docker
. You also need to create a cumulocity.json
file that contains a manifest for the microservice. For details, see Microservice manifest in the Cumulocity documentation. To use Apama, you have to grant your microservice the following roles using the requiredRoles
property:
ROLE_APPLICATION_MANAGEMENT_READ
ROLE_INVENTORY_READ
ROLE_INVENTORY_ADMIN
ROLE_INVENTORY_CREATE
ROLE_MEASUREMENT_READ
ROLE_MEASUREMENT_ADMIN
ROLE_EVENT_READ
ROLE_EVENT_ADMIN
ROLE_ALARM_READ
ROLE_ALARM_ADMIN
ROLE_DEVICE_CONTROL_READ
ROLE_DEVICE_CONTROL_ADMIN
ROLE_IDENTITY_READ
ROLE_OPTION_MANAGEMENT_READ
ROLE_BULK_OPERATION_READ
ROLE_SMS_ADMIN
If you are using the Cumulocity Notifications 2.0 bundle, then you also need the following role:
ROLE_NOTIFICATION_2_ADMIN
For more details on deploying Apama as a microservice, including how to build multi-tenant microservices, see Deploying Apama applications as microservices in the Cumulocity documentation.
Sample EPL
The sample EPL below describes how to subscribe and receive device measurements, device events and device information.
package com.apama.sample;
using com.apama.cumulocity.ManagedObject;
using com.apama.cumulocity.Measurement;
using com.apama.cumulocity.MeasurementValue;
using com.apama.cumulocity.Alarm;
using com.apama.cumulocity.Event;
using com.apama.cumulocity.Operation;
using com.apama.cumulocity.FindManagedObject;
using com.apama.cumulocity.FindManagedObjectResponse;
using com.apama.cumulocity.FindManagedObjectResponseAck;
monitor CumulocityApplication {
action onload() {
fetchManagedObjects();
listenForMeasurements();
listenForAlarms();
listenForEvents();
listenForOperations();
}
action fetchManagedObjects() {
// Subscribe to receive all the devices from Cumulocity
monitor.subscribe(ManagedObject.SUBSCRIBE_CHANNEL);
// Consume all the devices from Cumulocity
on all ManagedObject() as mo {
log mo.toString() at INFO;
// Update a managed object
/*
mo.params.add("CustomMetadata", {"metadata": "Adding custom data"});
send mo to ManagedObject.SEND_CHANNEL;
*/
}
monitor.subscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
// Fetch a list of all available devices
integer reqId := com.apama.cumulocity.Util.generateReqId();
on all FindManagedObjectResponse(reqId=reqId) as response
and not FindManagedObjectResponseAck(reqId=reqId) {
log "Received managedObject " + response.managedObject.toString() at INFO;
}
on FindManagedObjectResponseAck(reqId=reqId) {
log "Find Managed Objects request completed" at INFO;
monitor.unsubscribe(FindManagedObjectResponse.SUBSCRIBE_CHANNEL);
}
// Retrieve list of all available devices
send FindManagedObject(reqId, "", {"fragmentType": "c8y_IsDevice"})
to FindManagedObject.SEND_CHANNEL;
}
action listenForMeasurements() {
// Subscribe to receive all the measurements published from
// Cumulocity
monitor.subscribe(Measurement.SUBSCRIBE_CHANNEL);
// Consume all the measurements from Cumulocity
on all Measurement() as m {
log m.toString() at INFO;
}
// Create a new measurement
/*
Measurement m := new Measurement;
m.source := "<MANAGED_OBJECT_ID>";
m.time := currentTime;
m.type := "TemperatureMeasurement";
MeasurementValue mv := new MeasurementValue;
mv.value := 100.0;
dictionary<string, MeasurementValue> fragment :=
new dictionary<string, MeasurementValue>;
fragment.add("temperature", mv);
m.measurements.add("TemperatureMeasurement", fragment);
send m to Measurement.SEND_CHANNEL;
*/
}
action listenForEvents() {
// Subscribe to receive all the events published from
// Cumulocity
monitor.subscribe(Event.SUBSCRIBE_CHANNEL);
// Consume all the events from Cumulocity
on all Event() as e {
log e.toString() at INFO;
// Example for updating an event
/*
// update text
e.text := "This is an updated text";
send e to Event.SEND_CHANNEL;
*/
}
// Create a new event
/*
Event evt := new Event;
evt.source := "<MANAGED_OBJECT_ID>";
evt.type := "TestEvent";
evt.time := currentTime;
evt.text := "This is a sample event";
send evt to Event.SEND_CHANNEL;
*/
}
action listenForAlarms() {
// Subscribe to receive all the alarms published from
// Cumulocity
monitor.subscribe(Alarm.SUBSCRIBE_CHANNEL);
// Consume all the alarms from Cumulocity
on all Alarm() as alarm {
log alarm.toString() at INFO;
// Example for updating an alarm
/*
// set alarm severity to MAJOR
alarm.severity := Alarm.SEVERITY_MAJOR;
send alarm to Alarm.SEND_CHANNEL;
*/
}
// Create a new alarm
/*
Alarm alarm := new Alarm;
alarm.source := "<MANAGED_OBJECT_ID>";
alarm.type := "TestAlarm";
alarm.severity := Alarm.SEVERITY_MINOR;
alarm.status := Alarm.STATUS_ACTIVE;
alarm.time := currentTime;
alarm.text := "This is a sample alarm";
send alarm to Alarm.SEND_CHANNEL;
*/
}
action listenForOperations() {
// Subscribe to receive all the operations published from
// Cumulocity
// Note: When using the Cumulocity transport, ensure that the
// subscribeToOperations transport property is set to true
monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
on all Operation() as o {
log o.toString() at INFO;
// Update an operation
/*
o.status := Operation.STATUS_EXECUTING;
send o to Operation.SEND_CHANNEL;
*/
}
// Create an operation
/*
Operation operation := new Operation;
operation.source := "<MANAGED_OBJECT_ID>";
operation.status := Operation.STATUS_PENDING;
operation.params.add("c8y_Message", {"text": "Device Operation"});
send operation to Operation.SEND_CHANNEL;
*/
}
}