About the DeviceService
The DeviceService bundle provides APIs for consuming device messages from the MQTT Service (from-device Pulsar topic) and publishing messages to devices connected to the MQTT Service (to-device Pulsar topic).
For more details on Pulsar topics and the MQTT Service, see the MQTT Service Doc
DeviceService bundle includes
-
DeviceService.yaml - Connectivity chain configuration defining the codec and transport pipeline.
-
DeviceService.properties - Configuration template for connection properties and behavior.
-
DeviceServicePlugin.mon - This monitor provides the following events and APIs:
-
DeviceConsumer - Manages the consumer connection for a specific tenant and transport. It also provides details regarding the consumer partition and its subscription channel.
-
DevicePublisher - Represents a publisher connection for a specific tenant and transport. It also provides sender channel details.
-
DeviceMessage - Represents an event with MqttServiceMessage information. see MQTT Service Payload
-
DeviceService - Provides APIs to connect to a consumer and act as a producer:
-
-
DeviceServiceTenantConnector.mon - A helper utility that manages the DeviceService chain configuration and cleanup upon tenant subscription/unsubscription.
For detailed information about the DeviceService APIs, see the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).
Configuring the DeviceService
-
Use the
apama_projecttool to add the DeviceService Connectivity bundle to your project:apama_project add bundle DeviceService -
Configure the connection properties in the generated
config/connectivity/DeviceService/DeviceService.propertiesfile.
For detailed information, see the The apama_project tool
Configuration for DeviceService
The following is an example of a typical DeviceService.properties configuration file:
# The largest batch of messages that can be received from DeviceService at one time.
CUMULOCITY_DEVICESERVICE_MAX_BATCHSIZE=1000
# A unique name to identify the subscriber.
CUMULOCITY_DEVICESERVICE_SUBSCRIBER_NAME=deviceservice-subscriber
# Name of the subscription to use or create.
CUMULOCITY_DEVICESERVICE_SUBSCRIPTION_NAME=deviceservice-subscriber
# The subscription type: Exclusive, Shared, or KeyShared.
CUMULOCITY_DEVICESERVICE_SUBSCRIPTION_TYPE=KeyShared
# Create a number of parallel connections for receiving notifications.
CUMULOCITY_DEVICESERVICE_NUMBER_CLIENTS=1
When you add the DeviceService bundle, you must set the following required property:
|
Property |
Description |
|---|---|
|
|
A unique name to identify the subscriber. Disconnecting and reconnecting or restarting with the same subscriber name resumes the message stream from the point it left off. Required if not running in a microservice. If running in a microservice, this is inferred from the microservice Type: |
You can also add the following optional properties to the .properties configuration file:
|
Property |
Description |
|---|---|
|
|
Name of the subscription to use or create. All correlators which use the same subscription name consume the same set of messages. Type: Default: |
|
|
An Type: Values: Default: |
|
|
Create a number of parallel connections for receiving device messages. Requires the subscription type In multi-tenant deployments, this defaults to 1 per tenant to avoid resource exhaustion. In per-tenant deployments, this defaults to the number of CPUs. Type: Default: Number of CPUs (per-tenant) or 1 (multi-tenant). |
|
|
The largest batch of messages that can be received from the DeviceService at one time. Type: Default: 1000. |
|
|
The Pulsar service URL for connecting to the DeviceService. In most situations, you do not need to set this. It is derived from the When specified, use the format Type: |
Using the DeviceService API
The DeviceService API provides EPL APIs for establishing connections to consume messages from devices and publish messages to devices via the MQTT Service.
The DeviceService event in the com.apama.cumulocity.devices package provides the following APIs:
connectPublisher: Connects a device publisher for a given tenant and transport.connectConsumers: Connects device consumers for a given tenant and transport.
For detailed information, see the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).
Connecting consumers
Consuming device messages from the MQTT Service (from-device Pulsar topic). See the example below to use this API
using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;
using com.apama.cumulocity.devices.DeviceService;
using com.apama.cumulocity.devices.DeviceConsumer;
using com.apama.cumulocity.devices.DeviceMessage;
monitor DeviceMessageReceiver {
action onload {
TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().onSubscription(tenantSubscribed);
}
action tenantSubscribed(TenantDetails tenant) {
// Create Consumer
sequence<DeviceConsumer> consumers := DeviceService.connectConsumers(tenant.tenantId, "mqtt");
if consumers.size() > 0 {
log "DeviceService Consumer started for tenant: " + tenant.tenantId at INFO;
DeviceConsumer consumer;
for consumer in consumers {
monitor.subscribe(consumer.getSubscribeChannel());
log "Subscribed to channel: " + consumer.getSubscribeChannel() at INFO;
// Handle incoming device messages
on all DeviceMessage() as msg {
log "Received message from clientID: " + msg.clientID + " on topic: " + msg.topic at INFO;
// Process the message...
}
}
} else {
log "No DeviceService Consumer created for tenant: " + tenant.tenantId + " at ERROR";
}
}
}
The connectConsumers action:
- Creates multiple consumers based on
CUMULOCITY_DEVICESERVICE_NUMBER_CLIENTSconfiguration - Returns a sequence of
DeviceConsumerobjects
For more information, see the connectConsumers API in the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).
Connecting a publisher
Publishing messages to devices connected to the MQTT Service (to-device Pulsar topic). For this use the connectPublisher API. Refer the following example.
using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;
using com.apama.cumulocity.devices.DeviceService;
using com.apama.cumulocity.devices.DevicePublisher;
using com.apama.cumulocity.devices.DeviceMessage;
monitor DeviceMessageSender {
constant string CLIENT_ID := "client123";
constant string DEVICE_TOPIC := "device/sensor/temperature";
constant string PAYLOAD := "eyJrZXkiOiAidmFsIn0=";
action onload {
TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().onSubscription(tenantSubscribed);
}
action tenantSubscribed(TenantDetails tenant) {
DevicePublisher devicePublisher := DeviceService.connectPublisher(tenant.tenantId, "mqtt");
log "DeviceService Producer started for tenant: " + devicePublisher.getTenantID() at INFO;
// sending a device message with payload
DeviceMessage msg := new DeviceMessage;
msg.transportID := "mqtt";
msg.clientID := CLIENT_ID;
msg.topic := DEVICE_TOPIC;
msg.payloadBase64 := PAYLOAD;
msg.transportProperties := new dictionary<string, any>;
log "Sending message "+ msg +" to device " at INFO;
send msg to devicePublisher.getSendChannel();
}
}
The connectPublisher action:
- Creates a single publisher connection per tenant/transport combination
- Returns a
DevicePublisherobject
For more information, see the connectPublisher API in the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).