The DeviceService Bundle

About the DeviceService

The DeviceService bundle provides APIs for consuming device messages from the MQTT Service (from-device Pulsar topic) and publishing messages to devices connected to the MQTT Service (to-device Pulsar topic).

For more details on Pulsar topics and the MQTT Service, see the MQTT Service Doc

Info
The DeviceService feature is currently in public preview and may be subject to change in future.

DeviceService bundle includes

  • DeviceService.yaml - Connectivity chain configuration defining the codec and transport pipeline.

  • DeviceService.properties - Configuration template for connection properties and behavior.

  • DeviceServicePlugin.mon - This monitor provides the following events and APIs:

    • DeviceConsumer - Manages the consumer connection for a specific tenant and transport. It also provides details regarding the consumer partition and its subscription channel.

    • DevicePublisher - Represents a publisher connection for a specific tenant and transport. It also provides sender channel details.

    • DeviceMessage - Represents an event with MqttServiceMessage information. see MQTT Service Payload

    • DeviceService - Provides APIs to connect to a consumer and act as a producer:

  • DeviceServiceTenantConnector.mon - A helper utility that manages the DeviceService chain configuration and cleanup upon tenant subscription/unsubscription.

For detailed information about the DeviceService APIs, see the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).

Configuring the DeviceService

  1. Use the apama_project tool to add the DeviceService Connectivity bundle to your project:

    apama_project add bundle DeviceService
    
  2. Configure the connection properties in the generated config/connectivity/DeviceService/DeviceService.properties file.

For detailed information, see the The apama_project tool

Configuration for DeviceService

The following is an example of a typical DeviceService.properties configuration file:

# The largest batch of messages that can be received from DeviceService at one time.
CUMULOCITY_DEVICESERVICE_MAX_BATCHSIZE=1000
 
# A unique name to identify the subscriber.
CUMULOCITY_DEVICESERVICE_SUBSCRIBER_NAME=deviceservice-subscriber

# Name of the subscription to use or create.
CUMULOCITY_DEVICESERVICE_SUBSCRIPTION_NAME=deviceservice-subscriber

# The subscription type: Exclusive, Shared, or KeyShared.
CUMULOCITY_DEVICESERVICE_SUBSCRIPTION_TYPE=KeyShared

# Create a number of parallel connections for receiving notifications.
CUMULOCITY_DEVICESERVICE_NUMBER_CLIENTS=1

When you add the DeviceService bundle, you must set the following required property:

Property

Description

CUMULOCITY_DEVICESERVICE_SUBSCRIBER_NAME

A unique name to identify the subscriber. Disconnecting and reconnecting or restarting with the same subscriber name resumes the message stream from the point it left off.

Required if not running in a microservice.

If running in a microservice, this is inferred from the microservice contextPath.

Type: string.

You can also add the following optional properties to the .properties configuration file:

Property

Description

CUMULOCITY_DEVICESERVICE_SUBSCRIPTION_NAME

Name of the subscription to use or create. All correlators which use the same subscription name consume the same set of messages.

Type: string.

Default: deviceservice-subscriber.

CUMULOCITY_DEVICESERVICE_SUBSCRIPTION_TYPE

An Exclusive subscription can only have one connection for each subscriber name; subsequent connections will fail. A Shared or KeyShared subscription can have multiple connections with the same subscriber name, in which case messages are delivered only to one of the set of connections. In Shared mode, any message can be delivered to any connection. In KeyShared mode, messages relating to a given device are all received by the same connection.

Type: string.

Values: Exclusive, Shared or KeyShared.

Default: KeyShared.

CUMULOCITY_DEVICESERVICE_NUMBER_CLIENTS

Create a number of parallel connections for receiving device messages. Requires the subscription type Shared or KeyShared. Increase this number above 1 if your application is not consuming device messages fast enough. For best performance, the number of clients should be a power of 2.

In multi-tenant deployments, this defaults to 1 per tenant to avoid resource exhaustion. In per-tenant deployments, this defaults to the number of CPUs.

Type: integer.

Default: Number of CPUs (per-tenant) or 1 (multi-tenant).

CUMULOCITY_DEVICESERVICE_MAX_BATCHSIZE

The largest batch of messages that can be received from the DeviceService at one time.

Type: integer.

Default: 1000.

CUMULOCITY_DEVICESERVICE_SERVICE_URL

The Pulsar service URL for connecting to the DeviceService. In most situations, you do not need to set this. It is derived from the C8Y_BASEURL_PULSAR.

When specified, use the format pulsar://hostname:port for non-TLS connections or pulsar+ssl://hostname:port for TLS connections.

Type: string.

Using the DeviceService API

The DeviceService API provides EPL APIs for establishing connections to consume messages from devices and publish messages to devices via the MQTT Service.

The DeviceService event in the com.apama.cumulocity.devices package provides the following APIs:

  • connectPublisher: Connects a device publisher for a given tenant and transport.
  • connectConsumers: Connects device consumers for a given tenant and transport.

For detailed information, see the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).

Connecting consumers

Consuming device messages from the MQTT Service (from-device Pulsar topic). See the example below to use this API

using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;

using com.apama.cumulocity.devices.DeviceService;
using com.apama.cumulocity.devices.DeviceConsumer;
using com.apama.cumulocity.devices.DeviceMessage;

monitor DeviceMessageReceiver {
    
    action onload {
        TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().onSubscription(tenantSubscribed);
    }

    action tenantSubscribed(TenantDetails tenant) {
        // Create Consumer
        sequence<DeviceConsumer> consumers := DeviceService.connectConsumers(tenant.tenantId, "mqtt");
        if consumers.size() > 0 {
            log "DeviceService Consumer started for tenant: " + tenant.tenantId at INFO;

            DeviceConsumer consumer;
            for consumer in consumers {
                monitor.subscribe(consumer.getSubscribeChannel());
                log "Subscribed to channel: " + consumer.getSubscribeChannel() at INFO;
                // Handle incoming device messages
                on all DeviceMessage() as msg {
                    log "Received message from clientID: " + msg.clientID + " on topic: " + msg.topic at INFO;
                    // Process the message...
                }
            }
        } else {
            log "No DeviceService Consumer created for tenant: " + tenant.tenantId + " at ERROR";
        }       
        
    }
}

The connectConsumers action:

  • Creates multiple consumers based on CUMULOCITY_DEVICESERVICE_NUMBER_CLIENTS configuration
  • Returns a sequence of DeviceConsumer objects

For more information, see the connectConsumers API in the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).

Connecting a publisher

Publishing messages to devices connected to the MQTT Service (to-device Pulsar topic). For this use the connectPublisher API. Refer the following example.

using com.apama.cumulocity.TenantSubscriptionNotifier;
using com.apama.cumulocity.TenantDetails;

using com.apama.cumulocity.devices.DeviceService;
using com.apama.cumulocity.devices.DevicePublisher;
using com.apama.cumulocity.devices.DeviceMessage;

monitor DeviceMessageSender {
    
    constant string CLIENT_ID := "client123";
    constant string DEVICE_TOPIC := "device/sensor/temperature";
    constant string PAYLOAD := "eyJrZXkiOiAidmFsIn0=";

    action onload {
        TenantSubscriptionNotifier notifier := TenantSubscriptionNotifier.create().onSubscription(tenantSubscribed);
    }

    action tenantSubscribed(TenantDetails tenant) {
        DevicePublisher devicePublisher := DeviceService.connectPublisher(tenant.tenantId, "mqtt");
        log "DeviceService Producer started for tenant: " + devicePublisher.getTenantID() at INFO;

        // sending a device message with payload
        DeviceMessage msg := new DeviceMessage;
        msg.transportID := "mqtt";
        msg.clientID := CLIENT_ID;
        msg.topic := DEVICE_TOPIC;
        msg.payloadBase64 := PAYLOAD;
        msg.transportProperties := new dictionary<string, any>;
        
        log "Sending message "+ msg +" to device " at INFO;
        send msg to devicePublisher.getSendChannel();
    }
}

The connectPublisher action:

  • Creates a single publisher connection per tenant/transport combination
  • Returns a DevicePublisher object

For more information, see the connectPublisher API in the com.apama.cumulocity.devices package in the API Reference for EPL (apamadoc).