The Cumulocity MQTT Service

About the Cumulocity MQTT Service

The Cumulocity MQTT Service is an MQTT endpoint to Cumulocity which supports publishing and subscribing arbitrary payloads. These need to be processed within Cumulocity into a format that can be consumed by the platform. This connectivity allows for that processing to be done in Apama or in Cumulocity Streaming Analytics. The MQTT Service allows for messages to be processed by Apama before being stored in the platform.

This API can be used both when connecting to the Cumulocity platform from a custom external deployment and when deployed into the platform as a custom microservice. It can also be used within the Cumulocity Streaming Analytics EPL Apps. More details for the latter option will be in the Cumulocity Streaming Analytics guide.

Info
The Cumulocity MQTT Service feature is currently in private preview. If you would like to have it enabled for your tenant, please contact Cumulocity IoT Operations.

Connecting to the MQTT Service

Adding MQTT Service support to your project

To add MQTT Service support to your project, you need to add the Cumulocity Notifications 2.0 bundle to your project. You will need to fill out the Notifications configuration in the CumulocityNotifications2.properties file which will be added to your project. For more details see Configuration for Cumulocity Notifications 2.0.

Creating a subscription to the MQTT Service

MQTT Service subscriptions are created using the various createSubscription actions on the com.apama.cumulocity.mqttservice.MQTTService type. Each subscription action takes a call-back functor that is called once the subscription is created.

The basic form of the createSubscription methods are:

static action createSubscription(string serviceTopic, string subscriberName, string messageFormat, string eventType, boolean producerOnly, any onConnected);
static action createMultiTenantSubscription(TenantDetails tenant, string serviceTopic, string subscriberName, string messageFormat, string eventType, boolean producerOnly, any onConnected);

There are also several helper actions which wrap these and are described in Data formats on MQTT Service.

The onConnected callback should be an action<MQTTSubscription> or a generic or partial functor of the same type. This is passed a com.apama.cumulocity.mqttservice.MQTTSubscription object that provides the following actions:

event MQTTSubscription
{
	action getTopic() returns string;
	action getSubscribeChannel() returns string;
	action getSendChannel() returns string;
	action disconnect();
	action unsubscribe();
}

The getTopic action returns the topic this subscription is subscribed to. It will match the topic added to received events. The getSubscribeChannel action returns a channel which can be used with monitor.subscribe to receive messages from the MQTT Service. The getSendChannel action returns a channel which can be used with the send keyword to send messages to the MQTT Service. The disconnect action is used to temporarily stop receiving messages from the MQTT Service while preserving message history. The unsubscribe action is used to stop receiving messages from the MQTT Service and clears message history.

The serviceTopic should match the MQTT topic that the device is sending to. The subscriberName parameter is an arbitrary string to uniquely identify this receiver.

The messageFormat and eventType parameters determine how data from the device is parsed and delivered to EPL. For more details on these, see Data formats on MQTT Service.

For applications being deployed in a multi-tenant microservice you should use the createMultiTenantSubscription action. You can see the Working with multi-tenant deployments section for more details.

More details on all of the EPL APIs can be found in the API Reference for EPL (apamadoc).

Managing subscriptions to MQTT Service

Subscriptions to the MQTT Service can be disconnected using the disconnect action or unsubscribed by using the unsubscribe action on the MQTTSubscription object. disconnect preserves message history, but unsubscribe clears message history. It’s important to unsubscribe subscriptions when they are no longer needed to avoid resource leaks.

On disconnect the subscription will temporarily stop receiving messages from the MQTT Service while preserving message history. Messages published to the topic while disconnected will be queued and delivered when you resume listening to the same topic with the same subscriber name. You will need to create a new MQTT Subscription for the topic with the same subscriber name to resume listening.

action ondie()
{
	subscription.disconnect();
}

On unsubscribe the subscription will stop receiving messages from the MQTT Service and clears message history. Messages published to the topic after unsubscribing will not be queued or delivered for this subscriber name. Creating a new MQTT Subscription will allow you to see new messages on the same topic.

action ondie()
{
	subscription.unsubscribe();
}

MQTT Service Example

A simple example of connecting to the MQTT Service and receiving messages is shown below:

using com.apama.cumulocity.mqttservice.MQTTService;
using com.apama.cumulocity.mqttservice.MQTTSubscription;

event JSONMeasurement
{
	string topic;
	string deviceId;
	float timestamp;
	string type;
	string name;
	float value;
}

monitor TestMQTTService
{
	MQTTSubscription subscription;
	action onload()
	{
		MQTTService.createSubscription("mqtttopicname", "MySubscriber", MQTTService.JSON_FORMAT, JSONMeasurement.getName(), false, onConnected);
	}
	action onConnected(MQTTSubscription sub)
	{
		subscription := sub;
		monitor.subscribe(subscription.getSubscribeChannel());
		on all JSONMeasurement(topic=sub.getTopic()) as m {
			log "Received measurement: " + m;
		}
	}
	action ondie()
	{
		subscription.disconnect();
	}
}

Data formats on the MQTT Service

The Cumulocity MQTT Service supports a number of different data formats for messages. These are specified in the messageFormat parameter of the createSubscription and createMultiTenantSubscription actions. The following formats are supported:

  • MQTTService.JSON_FORMAT - The message is expected to be a JSON object. The object is parsed and the fields are delivered to EPL as fields of the event.
  • MQTTService.TEXT_FORMAT - The message is expected to be text. The text is delivered to EPL as a field of the event.
  • MQTTService.BASE64_FORMAT - The message can be any binary data. The data is encoded in Base64 and delivered to EPL as a field of the event.

The eventType parameter specifies the name of an event type that is defined in EPL. This event type will be used to deliver the data to EPL. The fields of the event are then populated with the data from the message according to the messageFormat parameter. For JSON format, the keys in the JSON object should correspond to the field names in the EPL event. For text format, the data will be provided in a single field called textData. For Base64 format, it should be called base64Data.

While it’s recommended, particularly for JSON format, to use a custom event type to match the structure of the data, it is possible to use the com.apama.cumulocity.mqttservice.MQTTServiceMessage event type. This will map all of the data into a dictionary field called data where the JSON keys, textData or base64Data fields will be keys in the dictionary. This is useful for cases where the data structure is not known in advance.

In addition, the following two fields exist in MQTTServiceMessage: topic and properties. The topic field contains the topic that the message was received on. The properties field contains any properties that were sent with the message to MQTT. These two fields can also be added to custom event types if required.

Convenience subscription actions

There are several convenience action for creating subscriptions to the MQTT Service with specific data formats. These are provided only for per-tenant usage, and are as follows:

static action createJSONSubscription(string serviceTopic, string subscriberName, any onConnected)
static action createCustomJSONSubscription(string serviceTopic, string subscriberName, string eventType, any onConnected)
static action createTextSubscription(string serviceTopic, string subscriberName, any onConnected)
static action createBase64Subscription(string serviceTopic, string subscriberName, any onConnected)

These actions are wrappers around the createSubscription action, and provide a more convenient way to create subscriptions with specific data formats. The subscription can be used to receive from and send messages to the MQTT Service. By default using the same serviceTopic with a different subscriberName causes messages to be received by each subscription, increasing the cpu and memory usage. So where possible it is more efficient to avoid creating multiple subscriptions and instead to forward messages to other monitor instances and contexts that need them.

The createJSONSubscription action creates a subscription with the JSON format and using the default MQTTServiceMessage type. The createCustomJSONSubscription action creates a subscription with the JSON format and a custom event type. The createTextSubscription action creates a subscription with the Text format and the default MQTTServiceMessage type. The createBase64Subscription action creates a subscription with the Base64 format and the default MQTTServiceMessage type.

There are also corresponding convenience actions for sending only. These will not subscribe to the topic, so cannot be used to receive messages, but are useful when the intention is only to send towards the MQTT Service.

static action createJSONProducer(string serviceTopic, string producerName, any onConnected)
static action createCustomJSONProducer(string serviceTopic, string producerName, string eventType, any onConnected)
static action createTextProducer(string serviceTopic, string producerName, any onConnected)
static action createBase64Producer(string serviceTopic, string producerName, any onConnected)

Text format example

A full example of a subscription to the MQTT Service with the Text format that publishes the data as a Measurement to Cumulocity is shown below:

using com.apama.cumulocity.mqttservice.MQTTService;
using com.apama.cumulocity.mqttservice.MQTTServiceMessage;
using com.apama.cumulocity.mqttservice.MQTTSubscription;
using com.apama.cumulocity.MeasurementFragment;

monitor TestMQTTService
{
	action onload()
	{
		MQTTService.createTextSubscription("mqtttopicname", "MySubscriber", onConnected);
	}
	action onConnected(MQTTSubscription sub)
	{
		monitor.subscribe(sub.getSubscribeChannel());
		on all MQTTServiceMessage(topic=sub.getTopic()) as m {
			log "Received measurement: " + m.data["textData"];
			send MeasurementFragment("", 
					<string>m.properties["type"],
					<string>m.properties["deviceId"],
					float.parse(<string>m.properties["time"]),
					<string>m.properties["fragment"],
					<string>m.properties["series"],
					float.parse(<string>m.data["textData"]),
					<string>m.properties["unit"],
					new dictionary<string, any>) to MeasurementFragment.SEND_CHANNEL;
		}
	}
}

JSON format example

A full example of a subscription to the MQTT Service with the JSON format that publishes the data as a Measurement to Cumulocity is shown below:

using com.apama.cumulocity.mqttservice.MQTTService;
using com.apama.cumulocity.mqttservice.MQTTSubscription;
using com.apama.cumulocity.MeasurementFragment;

event JSONMeasurement
{
	string topic;
	string deviceId;
	float timestamp;
	string type;
	string name;
	float value;
}

monitor TestMQTTService
{
	action onload()
	{
		MQTTService.createCustomJSONSubscription("mqtttopicname", "MySubscriber", JSONMeasurement.getName(), onConnected);
	}
	action onConnected(MQTTSubscription sub)
	{
		monitor.subscribe(sub.getSubscribeChannel());
		on all JSONMeasurement(topic=sub.getTopic(), type="typeToConvert") as m {
			log "Received measurement: " + m.toString();
			send MeasurementFragment("", 
					m.type,
					m.deviceId,
					m.timestamp,
					"data_fragment",
					m.name,
					m.value,
					"unit",
					new dictionary<string, any>) to MeasurementFragment.SEND_CHANNEL;

		}
	}
}

Base64 format example

A full example of a subscription to the MQTT Service with the Base64 format that publishes the data as a measurement to Cumulocity is shown below. Because EPL has no native handling of binary data, the data is encoded in Base64 and delivered as a string. This must be passed to a plug-in in order to be parsed and converted into a data type which EPL can handle natively:

using com.apama.cumulocity.mqttservice.MQTTService;
using com.apama.cumulocity.mqttservice.MQTTServiceMessage;
using com.apama.cumulocity.mqttservice.MQTTSubscription;
using com.apama.cumulocity.MeasurementFragment;

monitor TestMQTTService
{
	import "base64decoder" as base64decoder;
	action onload()
	{
		MQTTService.createBase64Subscription("mqtttopicname", "MySubscriber", onConnected);
	}
	action onConnected(MQTTSubscription sub)
	{
		monitor.subscribe(sub.getSubscribeChannel());
		on all MQTTServiceMessage(topic=sub.getTopic()) as m {
			chunk c := base64decoder.decode(m.data["base64Data"]);

			send MeasurementFragment("", 
					base64decoder.extractString(c, "type"),
					<string>m.properties["deviceId"],
					base64decoder.extractFloat(c, "timestamp"),
					<string>m.properties["fragment"],
					<string>m.properties["series"],
					base64decoder.extractFloat(c, "value"),
					base64decoder.extractString(c, "unit"),
					new dictionary<string, any>) to MeasurementFragment.SEND_CHANNEL;
		}
	}
}

More details on all of the EPL APIs can be found in the API Reference for EPL (apamadoc).

Sending messages to devices

As well as subscribing to messages from devices, MQTT Service can be used to send messages back to devices. Typically these would map onto Operation objects in Cumulocity. EPL can be used to turn Operations into MQTT messages to send to devices.

To do this you should subscribe using the MQTTService actions as normal, but then use the getSendChannel() action on the MQTTSubscription object to get a channel to send messages to the MQTT Service. You can then use the send keyword to send messages to the MQTT Service.

Sending messages to devices example

A simple example of sending a message to a device is shown below:

using com.apama.cumulocity.mqttservice.MQTTService;
using com.apama.cumulocity.Operation;

event JSONMessageToDevice
{
	any deviceAction;
}

monitor TestMQTTService
{
	constant string DEVICE_ID := "12345";
	action onload()
	{
		MQTTService.createCustomJSONSubscription("mqtttopicname", "MySubscriber", JSONMessageToDevice.getName(), onConnected);
	}
	action onConnected(MQTTSubscription sub)
	{
		monitor.subscribe(Operation.SUBSCRIBE_CHANNEL);
		on all Operation(source=DEVICE_ID) as op {
			send JSONMessageToDevice(op.params["deviceAction"]) to sub.getSendChannel();
		}
	}
}