MQTT Service

Feature preview
This feature is in Public Preview. That is, it is not yet generally available and may be subject to change in the future.
Requirements

To work with the MQTT Service, the following requirements must be met:

  • The Cumulocity Messaging Service must be deployed in your Cumulocity environment.
  • The Cumulocity MQTT Service must be deployed in your Cumulocity environment.
  • Your tenant must be subscribed to the mqtt-service microservice. This may have been done automatically, depending on how your Cumulocity environment was configured. To check the subscription, open the Administration application and navigate to Ecosystem > Microservices. If you do not see the mqtt-service microservice listed, contact product support (for public environments) or your Cumulocity administrator (for dedicated environments) to request the subscription for your tenant.

The MQTT Service is a new MQTT endpoint implementation for Cumulocity that provides the following benefits:

  • Sending and receiving arbitrary payloads on any MQTT topic. Note that the topics used by the Cumulocity Core MQTT implementation currently cannot be used with the MQTT Service.
  • User-provided microservices can send and receive messages on MQTT topics, and map messages to and from the Cumulocity data model. The typical use case for such a microservice is to map between MQTT device payloads, and the Cumulocity REST and Notifications 2.0 APIs.
  • Multi-tenancy support. A single endpoint serves multiple tenants and tenants are completely isolated from each other.
  • Bi-directional TLS support. All MQTT traffic is encrypted and clients can authenticate using X.509 certificates.

The MQTT Service does not replace the existing Core MQTT capability of Cumulocity that supports sending device data already in the Cumulocity domain model directly into the platform. The new capability provided by the MQTT Service allows for easier integration of MQTT devices that cannot use the Cumulocity domain model. It also supports more flexible communication patterns between devices, applications, and the Cumulocity platform, controlled by user-provided microservices.

Device isolation is the default behaviour for the MQTT Service. Each MQTT client has its own private topic space and cannot directly receive messages from other clients, enhancing security and isolation between devices.

This documentation does not describe the basics of MQTT communication. If you are unfamiliar with MQTT, we recommend you to consult one of the numerous introductions on the internet. Some references can be found on the MQTT website.

Overview

Architecture

The MQTT Service works together with the Messaging Service to provide a framework for highly customizable and flexible MQTT message processing solutions. The diagram below illustrates how a message flows, starting from the device, through the Messaging Service, then to a user-provided microservice where it is converted to the Cumulocity JSON format and delivered to Cumulocity using the standard REST API.

MQTT Service send

All MQTT messages published to the MQTT Service are forwarded to the Messaging Service, where they are persisted, waiting to be consumed. A custom microservice or Streaming Analytics app that understands the topic and payload structure can consume the MQTT messages, and then translate and push them into Cumulocity.

Similarly, a custom microservice or Streaming Analytics app can send messages to devices, as shown in the diagram below. In this case, the user-provided microservice receives messages from Cumulocity through a Notifications 2.0 subscription. These messages are mapped to the payload structure used by the MQTT devices, then published to MQTT topics.

MQTT Service push

As with MQTT messages published by devices, messages published from a microservice will be forwarded to the Messaging Service, where they can be consumed by MQTT devices subscribed to the relevant topics.

Custom microservices may use the Java client to publish to or consume from MQTT topics. They can use the Microservice SDK to push data into Cumulocity.

MQTT Service compared to Core MQTT

The table below presents a basic comparison between the Cumulocity Core MQTT functionality and that of the MQTT Service.

Core MQTT MQTT Service
QoS 0, 1, 2 0, 1
Clean session Starting with clean session is recommended Starting with clean session is required
Retained flag Not supported Not supported
Last will Supported Supported
MQTT 5.0 features Not supported MQTT 5.0 clients can connect. Partial support for MQTT 5.0 features
Authentication Basic and TLS device certificates Basic and TLS device certificates
Scalability Horizontal Currently a single instance. Horizontal scaling will be available in the GA release
Topic format Determined by the SmartREST 2.0 protocol Unrestricted. SmartREST topic names are reserved and cannot currently be used
Payload Determined by the SmartREST 2.0 protocol Unrestricted. The maximum message size is 128 KiB including all headers
Extensibility Limited by SmartREST 2.0 custom templates Streaming Analytics apps or custom mapping microservices can support arbitrary MQTT-based protocols
Message processors/consumers Built-in message processor for each SmartREST 2.0 topic Streaming Analytics apps or custom mapping microservices can support multiple processors for a topic
JSON via MQTT Limited feature set Streaming Analytics apps or custom mapping microservices can support arbitrary JSON payloads

MQTT protocol implementation

This section covers some implementation details of the MQTT Service. The MQTT Service implementation supports clients connecting using MQTT versions 3.1, 3.1.1 and 5.0, although not all MQTT 5.0 protocol features are currently supported.

Connecting to the service

Important
MQTT Service requires clients to connect with clean session flag enabled, set to “1” (true), otherwise the client connection is rejected by the server.

MQTT connections to the MQTT Service must use TCP. Use your tenant domain as the target host for the connection, for example {my-tenant}.cumulocity.com.

Available ports:

  TCP
TLS 9883
no TLS 2883

Port 9883 (TLS) is the default port and should be used for secure, encrypted communication. Both one-way (server certificate only) and two-way (both client and server certificates) TLS are supported. When client certificates are not used, the server authenticates the client using standard username and password credentials. Port 2883 (no TLS) is not enabled in Cumulocity shared public environments due to the security risks of unencrypted traffic. To enable port 2883 in a dedicated environment, please contact Product support.

Topics

MQTT Service topics are mapped to the Messaging Service subscriptions with identical names, including additional URL encoding. The Messaging Service subscriptions reliably store the topic messages for asynchronous processing. The messages stored on these subscriptions can be consumed using a dedicated Java Client.

Topic restrictions

The MQTT Service does not impose any topic structure. There are just a few topic names which are reserved for historic purposes and future use, namely:

  • All SmartREST 2.0 related topics
  • error
  • devicecontrol/notifications
Info
Wildcard topics (+, #) and system topics starting with $ are not currently supported.

Other than these restrictions you are free to use any topic name which is compatible with the MQTT specification.

Topic limits

The MQTT Service imposes several topic-related limits. See the Service Quotas section for details of the current limits in force.

There is a limit on the total number of topics that a single tenant can create. When the creation of a new topic, either by creating it via the client publishing a message or subscribing to a non-existent topic, would breach the topic limit the delivery of the packet is prevented.

The different MQTT protocol versions provide different feedback when this limit is exceeded.

MQTT 5 clients:

  • Have access to the reason code and reason string describing the failure when using QoS 1 with acknowledgements, where the reason code is QUOTA_EXCEEDED: 0x97.

MQTT 3.1 and 3.1.1 clients:

  • Clients only have access to the reason code describing the failure when using QoS 1 with acknowledgements and only for SUBSCRIBE packets, where the reason code is 0x80.
  • For PUBLISH packets, the client will be disconnected with no further information as per the MQTT specification.

In addition to the topic count, the MQTT Service also limits the size of the message backlog on each topic. The message backlog contains all messages that have been published on the topic but not yet received and acknowledged by all subscribers to the topic. When the backlog limit is reached, further attempts to publish to the topic will fail until some messages have been consumed.

Each message in a topic backlog also has a time-to-live (TTL) that starts at the moment the message is published. When the TTL of a message expires, that messages will be deleted from the backlog regardless of whether all subscribers have received it or not. MQTT clients do not receive any notification that messages have been discarded from a backlog due to TTL expiry.

Error topic

The MQTT Service provides clients the ability to review errors through messages received by subscribing to the error topic, $debug/$error. When subscribing to the topic it will act as a per-client topic, meaning the client will only receive messages exclusively related to their client ID. For example, if a client was attempting to subscribe to a new topic, and the creation of the topic would exceed the topic limit, only that client would receive an error.

According to the MQTT 3.1.1 specification, if either the server or the client encounters a protocol violation, it must close the network connection on which it received the control packet which caused the violation.

In such instances MQTT clients must reconnect to be able to receive error messages from the error topic via the subscription. Error messages received after this reconnection are from the previous session. This can lead to confusion when attempting corrective actions. Therefore, we highly recommend you to build a microservice which uses the MQTT Service SDK to consume error messages, or use MQTT 5 for clients and make use of the reason codes feature.

Topic cleanup

The MQTT service will automatically remove topics which are no longer active. Topics are recognized as inactive when there are no subscriptions and the internal publisher to the topic is closed. The publisher is responsible for publishing the modified MQTT service messages to the correct topic. The publishers live within a cache, where the publisher expires after one hour. Due to this it can take up to an hour after removing all subscriptions from a topic for it to be automatically deleted.

Payload

MQTT protocol messages map bidirectionally to the internal MQTT Service message format which includes the original payload and additional metadata fields. Assuming Java types, the packed message structure looks as follows:

MqttServiceMessage

Field name Type Description
payload byte[] MQTT payload
metadata MqttServiceMetadata Metadata from the MQTT message

MqttServiceMetadata

Field name Type Description
clientId String Unique MQTT client identifier, usually used as an external identifier
messageId int Unique MQTT message ID per client, available only with QoS 1 and 2
dupFlag boolean Indicates this message is a resend by the MQTT client
userProperties Map Reserved for future use of MQTT 5.0 features
payloadFormatIndicator enum Reserved for future use of MQTT 5.0 features
contentType String Reserved for future use of MQTT 5.0 features
correlationData byte[] Reserved for future use of MQTT 5.0 features
responseTopic String Reserved for future use of MQTT 5.0 features
topic String The name of the MQTT topic that the message was published by the client

The Java Client contains classes representing the above model.

Payload restrictions

The MQTT Service does not impose any specific payload format. All the incoming MQTT messages must meet the specification in terms of fixed and variable headers, but the payload for published messages is unrestricted. A Streaming Analytics app or a custom microservice will receive the exact same set of bytes that was sent by an MQTT device, and is responsible for converting these to a Cumulocity compatible format.

The size of the MQTT payload is limited to a maximum value that includes both the message header and body. The size of an MQTT packet header varies, but it will be at least 2 bytes. See the Service Quotas section for details of the current limit in force.

Features

Authentication and authorization

The MQTT Service supports the following authentication methods:

  • Username and password The MQTT username must include the tenant ID and username in the format <tenantID>/<username>.
  • Device certificates For secure communication, devices must contain the entire chain of certificates leading to the trusted root certificate, or if only the device certificate is provided, then the immediate issuer certificate must be uploaded to the platform’s truststore. You can do this via the Trusted certificates page in the UI or via REST. Moreover, the devices must contain the server certificate in their truststore.
    If the trust anchor (that is, the trusted root or intermediate certificate) used to validate the device certificate is trusted by multiple tenants, the device must also specify the tenant ID in the MQTT username field. This ensures that the platform can correctly identify which tenant the device is attempting to connect to. While multi-tenant trust anchors are not currently supported in Cumulocity, this feature may be introduced in the future. If the tenant ID is provided, it must correspond to a tenant that trusts the given certificate; otherwise, the connection will be rejected.

ClientId

The MQTT ClientID field identifies the connected client. ClientID may consist of up to 128 alphanumeric characters. Each client connecting to the MQTT Service must have a unique client identifier, connecting a second client with the same identifier will result in the previous client’s disconnection.

Quality of Service (QoS)

The MQTT Service implementation supports two levels of MQTT QoS:

  • QoS 0: At most once:
    • The client sends the message once (fire and forget).
    • There is no response from the server.
    • There is no guarantee that subscribers will receive the message.
  • QoS 1: At least once:
    • The client awaits server acknowledgment for each published message.
    • The client should re-send the message if there was no acknowledgement from the server.
    • It is guaranteed that subscribers will receive a message that was acknowledged by the server.
    • Subscribers may receive more than one copy of a message.
  • QoS 2: Exactly once:
    • not supported

For subscriptions, the MQTT Service will deliver messages in the QoS that the client defined when subscribing to the topic (QoS 0 or 1).

Clean session

The MQTT Service requires the clean session flag to be set to “1” (true). Disabling clean session will result in client connections being rejected by the server.

Retained flag

The retained flag is currently ignored. Publishing data with the retained flag on the topic is allowed but has no practical difference to sending it without the flag.

Last will

In MQTT, the “last will” is a message that is specified at connection time and that is executed when the client loses the connection. Last will is fully supported by the MQTT Service, and as with with any other publish messages you can use any unreserved topic and any payload.

Return codes

The MQTT Service follows the MQTT specification for server responses. For example, if invalid credentials are sent in the CONNECT message, the server response CONNACK message contains the 0x05 return code. The return code can be treated similarly to REST API HTTP codes, such as 401.

MQTT 5.0 features

Clients can connect using version 5.0 of the MQTT protocol. Support for additional MQTT 5.0 features will be added in future releases.

MQTT TLS certificates

Server certificates

The MQTT Service uses the same server certificates that are assigned to the main Cumulocity environment domain. It always sends these certificates during TLS handshake to devices. Moreover, Enterprise tenants are not able to customize those certificates via the SSL Management feature.

Device (client) certificates

Using device certificates with the MQTT Service shares the same requirements as outlined in Device certificates.
If the trust anchor (that is, the trusted root or intermediate certificate) used to validate the device certificate is trusted by multiple tenants, the device must also specify the tenant ID in the MQTT username field. This ensures that the platform can correctly identify which tenant the device is attempting to connect to. For more information, see Authentication and authorization.

Adding and trusting CA certificate

TLS trust anchors in the Cumulocity platform are defined per tenant. To use device certificates for authentication, the CA or intermediate certificate that signs the device certificates must be uploaded to the platform and added to the tenant’s list of trusted certificates. You can do this via the Trusted certificates page in the UI or via REST.
Additionally, ensure that the Auto registration option is enabled when adding certificates. This allows any device presenting a valid certificate to be automatically registered on the platform when it first connects.

Creating self-signed certificates

In order to self-sign the device certificates, the root CA certificate needs to be created. Using the OpenSSL CLI tool, create a private key and then generate a self-signed root certificate from it.

openssl genpkey -algorithm RSA -out ca.key
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/C=UK/O=YourCompany/OU=YourOrg/CN=MQTTServiceCA"

Then create a private key for the device, generate the certificate signing request from this private key, and then sign the CSR.

openssl genpkey -algorithm RSA -out client.key
openssl rsa -in client.key -out client-key.pem -outform PEM
openssl req -new -key client.key -out client.csr -subj "/C=UK/O=YourCompany/OU=YourOrg/CN=mqtt-client"
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 3650 -sha256
cat client.crt ca.crt > client-chain.pem

If you have more advanced requirements regarding certificate creation, see Generating and signing certificates.

Using certificates

Once the CA certificate has been uploaded and trusted in Cumulocity, devices can authenticate using client certificates signed by your trusted CA. To connect using any MQTT client, use the previously generated client certificate and key. This example uses the Mosquitto MQTT client:

mosquitto_pub --cafile cumulocity.com.pem -d -q 1 \
  -h "cumulocity.com" -p "9883" -i myclient \
  -u t11101 \
  -t "v1/devices/me/telemetry" \
  --key client-key.pem \
  --cert client-chain.pem \
  -m '{"temperature":25}'

Explanation:

  • --cafile cumulocity.com.pem: This file contains the CA certificate of Cumulocity’s MQTT Service broker, used to validate the server’s identity.
  • --key client-key.pem and --cert client-chain.pem: These are your client certificate and private key, signed by your trusted CA.
  • -u t11101: (Optional) Specifies the MQTT username, which must be your tenant ID as described in Authentication and authorization. In this example, t11101 is the tenant ID.

Downloading the CA certificate (cumulocity.com.pem):

To download the Cumulocity MQTT Service broker’s CA certificate:

  1. Open cumulocity.com in a browser.
  2. Click the padlock icon in the address bar and view the certificate details.
  3. Download or export the root certificate, and save it as cumulocity.com.pem.

Alternatively, you can use openssl to retrieve and extract the certificate:

echo | openssl s_client -connect cumulocity.com:9883 -showcerts 2>/dev/null | \
    sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > cumulocity.com.pem
Info
Cumulocity uses certificates signed by well-known public CAs. Some clients (like Mosquitto) require explicitly providing the CA file, while others (like MQTTX) trust these certificates automatically.

Connecting microservices and applications

Cumulocity microservices and external applications can consume messages published by devices connected to the MQTT Service, and publish messages back to those devices. To do this, your microservice or external application connects to the Cumulocity Messaging Service, a deployment of Apache Pulsar, and uses the Pulsar protocol to publish and consume MQTT messages. The diagram below shows the important interfaces and data flows used when interacting with the MQTT Service through Pulsar.

MQTT Service Pulsar connections

Info
An MQTT Service messaging client is a software component that interacts with the MQTT Service through Pulsar. It can be deployed as a microservice hosted by the Cumulocity platform, or as part of an external application hosted outside the platform. This documentation refers to such a component simply as a client. If the implementation or behaviour differs depending on where the client is hosted, those differences are documented where relevant.

The MQTT Service implements device isolation, meaning that MQTT devices connected to the MQTT Service cannot communicate directly with each other using the MQTT protocol. All inter-device communication must be managed explicitly by the client, as shown in the diagram.

This documentation does not cover the publish-subscribe messaging concepts and architecture implemented by Pulsar, nor any features of the Pulsar client libraries beyond those needed to implement a simple MQTT Service client. To learn more about those subjects, refer to the Pulsar product documentation.

Connecting to the Messaging Service

To connect your client to the Messaging Service, you will need:

  1. A Pulsar client library.
  2. The URL of the Messaging Service (Pulsar broker) in your Cumulocity environment.
  3. Credentials for a user in your tenant with permission to access MQTT Service data on the Messaging Service.

Each of these prerequisites is explained in detail below.

Pulsar client library

Open-source Pulsar client libraries are available for a number of different languages and protocols. The example code in this documentation will use the Java client library. Pulsar has strong cross-version compatibility. Use the latest version of your chosen client library regardless of the server version used by the Messaging Service. Integration with the MQTT Service does not require advanced Pulsar features that may only be available in the latest server version.

Caution
Currently only “basic” (username/password) authentication is supported for clients connecting to the Messaging Service through Pulsar. Therefore, you must ensure that your chosen Pulsar client library supports this authentication scheme.

Pulsar URL

For a microservice client, the URL should be obtained from the C8Y_BASEURL_PULSAR environment variable that will be passed to the microservice when it starts running. For an external application client, the URL has the general form pulsar+ssl://<tenant_domain>:6651/, where <tenant_domain> is the domain of your Cumulocity tenant, for example my-tenant.cumulocity.com. As implied by the pulsar+ssl protocol name, all external application client connections will use SSL/TLS security. Currently, only one-way TLS is supported. The server provides a certificate that the client can verify. Client certificates cannot be used. Implementing an external application client so that it reads the Pulsar URL from the C8Y_BASEURL_PULSAR environment variable makes it easier to develop a client that can be deployed as either a microservice or an external application.

Pulsar authentication

Authentication credentials identify both the Cumulocity tenant and the user within that tenant. Currently, only “basic” (username and password) authentication is supported for clients connecting to the Messaging Service through Pulsar. For a microservice client, you should use the credentials of the per-tenant service user that will be passed to the microservice when the tenant is subscribed to it. For an external application user, you can use the credentials of any tenant user with the appropriate authorization roles assigned, as described below. The username must be in the form <tenantID>/<user> where <tenantID> is the tenant ID (not the tenant name), and <user> is a user within that tenant. If two-factor authentication (TFA) is enabled for your tenant, your user must have the devices role assigned to disable the TFA check for that user. See TFA Settings for more information. Note that the devices role may be shown as “Device User” in the Cumulocity user interface.

Role-based access control

Pulsar client connections will be granted access to Messaging Service resources based on the roles and permissions assigned to the authenticated user. The following roles and permissions should be used for MQTT Service messaging clients:

Role and permission Access granted
Mqtt service messaging topics, Read Consume messages from MQTT devices connected to the MQTT Service
Mqtt service messaging topics, Update Publish messages to MQTT devices connected to the MQTT Service

For microservice clients, the required permissions should be added to the requiredRoles section of the microservice manifest, which will grant the requested permissions to the per-tenant service user. For example:

{
    "apiVersion": "v2",
    "name": "my-mqtt-service-client",
    "version": "1.0.0",
    ...
    "requiredRoles": [
        "ROLE_MQTT_SERVICE_MESSAGING_TOPICS_READ",
        "ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE"
    ],
    ...
}

For external application clients, the required permissions should be configured for the authenticating user through the Administration application.

Assign only the minimum permissions needed for the client to operate. For example, if your microservice only consumes messages, do not include the ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE permission in the manifest.

Example code – connecting to the Messaging Service

The code snippet below shows how to use the Pulsar Java client library to connect to the Messaging Service with basic authentication. It assumes that the Pulsar URL is in the C8Y_BASEURL_PULSAR environment variable and that the tenant identifier, username and password are provided on the command line. Note that the client library will not actually attempt to connect to the Pulsar server immediately when the PulsarClient object is created. In the interest of brevity and clarity, this example does no error handling. A realistic implementation would need to handle exceptions thrown by the Pulsar client library methods.

package c8y.example.mqttservice;

import java.text.MessageFormat;
import java.nio.charset.StandardCharsets;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

public class SimplePulsarClient {
    public static void main(String[] args) throws Exception {
        // Validate command line.
        if (args.length != 2) {
            System.err.println("Usage: SimplePulsarClient <tenantID> <username>");
            System.err.println("The Pulsar URL will be read from the C8Y_BASEURL_PULSAR environment variable");
            System.err.println("The password will be read from the console");
            System.exit(-1);
        }

        // Collect all the configuration properties.
        final String pulsarUrl = System.getenv("C8Y_BASEURL_PULSAR");
        final String tenantID = args[0];
        final String username = args[1];
        final String password = new String(System.console().readPassword("Password for user %s/%s: ", tenantID, username));

        // Create the basic authentication credentials object.
        final AuthenticationBasic basicAuth = new AuthenticationBasic();
        basicAuth.configure(MessageFormat.format("'{'\"userId\":\"{0}/{1}\",\"password\":\"{2}\"'}'", tenantID, username, password));

        // Create a Pulsar client using the basic authentication credentials.
        // The client will *not* try to connect and authenticate immediately.
        final PulsarClient client = PulsarClient.builder()
            .serviceUrl(pulsarUrl)
            .authentication(basicAuth)
            .build();
        System.out.println("Created Pulsar client");

        // The rest of the example will go here...
    }
}

Message payloads and properties

Pulsar messages consist of a payload and set of properties.

The payload is a sequence of zero or more bytes, identical to the payload of the MQTT PUBLISH message that the Pulsar message corresponds to. It is the client’s responsibility to understand the format of the payloads produced and accepted by the MQTT devices it communicates with.

Pulsar message properties are name-value pairs, where both the name and the value are text strings. The properties recognised by the MQTT Service are listed in the table below. Messages received from MQTT devices will always include the properties marked as required, and may include any of the optional properties. Received messages will not include any properties other than those listed here. Messages published to MQTT devices must include all of the required properties, and may include any of the optional properties. If a published message includes any properties other than those listed here, those properties will be ignored by the MQTT Service.

Property name Required Value type and encoding Purpose
topic YES String MQTT topic name
clientID YES(1) String MQTT client identifier
tx.payloadFormatIndicator(2) NO Single byte with two permitted values, encoded as strings “0” and “1” MQTT v5 Payload Format Indicator
tx.contentType NO String MQTT v5 Content Type
tx.responseTopic NO String MQTT v5 Response Topic
tx.correlationData NO Sequence of bytes, encoded as a Base64 string MQTT v5 Correlation Data
tx.userProperties.<name> NO String MQTT v5 User Property with name name(3)

Notes:

  1. The clientID property can be omitted from a published message only in special case of a broadcast message, described below in broadcast messages.
  2. The tx. prefix indicates that a property is specific to a transport, in this case the MQTT Service. Other transports will define their own transport-specific properties, but all transports will use topic and clientID.
  3. The MQTT version 5 specification allows a message to include more than one user property with the same name. This feature is not supported by the MQTT Service. If a device publishes a message containing multiple user properties with the same name, only one of these will be copied into the Pulsar message. It is undefined which property will be copied.

Consuming messages from MQTT devices

All messages published by devices connected to the MQTT Service for a given tenant will be published to a single Pulsar topic, identified by the URL persistent://<tenantID>/mqtt/from-device. The topic URL can be broken down into 4 components:

Component Description
persistent Indicates that this is a persistent topic that will be preserved by the Messaging Service across component failures and restarts, to provide “at least once” delivery guarantees
<tenantID> The Pulsar tenant ID, which will match the Cumulocity tenant ID
mqtt The Pulsar namespace within the tenant, which will always be mqtt for the MQTT Service
from-device The Pulsar topic within the namespace, which will always be from-device for message from devices connected to the MQTT Service

Your client will only be able to consume from this topic if the authenticated user has the “read” permission on the “Mqtt service messaging topics” role. The client will not be able to consume from any other topic.

The client identifier of the device that published the messages, and the MQTT topic it was published on, can be obtained from the message properties clientID and topic as described above. This means that your client must consume every message published by every device connected to the MQTT Service for the tenant, even those you are not interested in. Messages that are not of interest to the client can simply be acknowledged without further processing.

Caution
Your client must be trusted to safely handle every message published by every device connected to the MQTT Service in your tenant. If untrusted users have access to your tenant, these users should not be permitted to upload microservices, nor to connect external application clients to the Messaging Service. This recommendation also applies in the case of multiple customers, who do not mutually trust each other, sharing a single tenant.

Durable subscriptions and message acknowledgement

Subscribing a consumer to a topic establishes a durable subscription to the topic. This means that the Messaging Service will retain messages published to the topic until they have been delivered to, and acknowledged by, a client. The subscription will remain until it is explicitly deleted. It will not be removed simply because the client is not currently running. Messages that are published while the client is disconnected will be available for it to consume when it reconnects. After consuming each message, the client must explicitly acknowledge it. Acknowledging a message tells the Messaging Service that the client has no further interest in it, allowing the message to be discarded. See the section on best practices below for more information on managing durable subscriptions correctly.

Example code – consuming messages

The code snippet below shows how to use the Pulsar Java client library to consume messages from the MQTT Service from-device topic. It extends the previous example that showed how to set up the connection to the Pulsar server.

To consume messages from the topic, your client should create a Pulsar Consumer and subscribe it to the topic. The consumer should register a MessageListener callback that will be called whenever a new message arrives on the topic. The MessageListener implementation shows how to access the payload and properties of the received messages. For simplicity, the application messages in the example are simple text strings. However, the payload of the Pulsar message will always be an array of bytes, that must be converted to the format used by the application.

        // Create a simple message listener that will log some details of
        // each message received, when registered with a consumer.
        final MessageListener<byte[]> listener = new MessageListener<byte[]>() {
            @Override
            public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
                final String clientId = message.getProperty("clientID");
                final String topic = message.getProperty("topic");
                System.out.println(MessageFormat.format("Received message from MQTT device {0} on MQTT topic {1}", clientId, topic));
                System.out.println(MessageFormat.format("Message payload: {0}", new String(message.getValue(), StandardCharsets.UTF_8)));
                System.out.println(MessageFormat.format("Message properties: {0}", message.getProperties()));
                try {
                    // Acknowledge the message.
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
            }
        };

        // Create a Pulsar consumer on the from-device topic for the tenant,
        // using the listener defined above to process each message.
        // This will trigger connection and authentication by the client.
        final Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
            .topic(MessageFormat.format("persistent://{0}/mqtt/from-device", tenantID))
            .subscriptionName("demoSubscription")
            .messageListener(listener)
            .subscribe();
        System.out.println("Created Pulsar consumer");

Publishing messages to MQTT devices

Any messages that your client wants to send to devices connected to the MQTT Service for a given tenant must be published to a single Pulsar topic, identified by the URL persistent://<tenantID>/mqtt/to-device. The components of the URL should be interpreted as described in Consuming messages from MQTT devices above.

Your client will only be able to publish to this topic if the authenticated user has the “update” permission on the “Mqtt service messaging topics” role. The client will not be able to publish to any other topic.

Messages published to the to-device topic are routed to connected MQTT devices using the two required message properties:

Property name Purpose
clientID Client identifier of the MQTT device that should receive the message
topic Name of the MQTT topic that the message should be published to

If the topic property is empty or missing, the message will not be published to any MQTT client. The message will only be published to a device with an active subscription to the named MQTT topic. The message will only be published to a client that is connected at the time the MQTT Service processes the published message.

Successfully publishing a message to the Messaging Service does not mean that the message has been successfully delivered to any MQTT device. Onward publishing to MQTT devices happens asynchronously and without any feedback to the Pulsar client. Messages will be delivered to devices according to the MQTT protocol specification, using the QoS level of the MQTT subscription made by the device. However, because MQTT devices are required to use a clean session when connecting to the MQTT Service, messages published to a device while it is disconnected will not be delivered.

Broadcast messages

To enforce device-level isolation, messages are published only to the specific MQTT client identified by the clientID property, provided that client has an active subscription to the relevant MQTT topic. If the clientID property is not present, the message is broadcast to all connected MQTT clients with active subscriptions to that topic.

Broadcast publishing is potentially expensive when many clients are connected and may deliver messages to unexpected devices. Use broadcast only when the application must publish the same message to every device subscribed to a topic.

Message keys

To facilitate efficient delivery and correct ordering of messages sent to MQTT devices, clients must also set the key of a Pulsar message published to the to-device topic. The key should be set as follows:

  • When the clientID message property is set, the key should have the same value as this property.
  • When the clientID message property is not set, the key should have the same value as the topic message property.

Handling of invalid messages

Published messages that do not follow the rules for message properties and keys documented above will not be delivered to any MQTT device. In particular this applies to messages with the following invalid configuration:

  • The message key is not set.
  • The message key is set but does not match the clientID or topic property as described in message keys.
  • The clientID property is set but has an empty value.
  • The topic property is not set, or it is set but has an empty value.

An alarm will be raised in the Cumulocity tenant when one of these invalid messages is detected and discarded. The rate of alarm sending is limited to avoid overloading the tenant with redundant alarms alerting about the same error on different messages.

A message with a non-empty clientID property referring to an MQTT device that is not currently connected is not considered to be invalid. However, this message will not be delivered to the device, even if it connects later, because of the requirement for devices to use a clean session when connecting. Similarly, a message published to a connected MQTT device that is not currently subscribed to the MQTT topic specified in the topic property is not considered to be invalid. In these situations, the message will not be delivered but no alarms will be raised.

Example code – publishing messages

The code snippet below shows how to use the Pulsar Java client library to publish messages to the MQTT Service to-device topic. It extends the previous examples that set up the connection to the Pulsar server and created a message consumer.

To publish messages to the topic, your client should first create a Pulsar Producer associated with the topic. Then, the Producer can be used to create new Message objects that will be published to the topic. The example code shows how to correctly set the message properties and message key for messages targeted at a single device, and for “broadcast” messages. Again, the example assumes that the application messages are simple text strings, that must be converted to the byte array expected by the MQTT Service. For clarity, most error-handling code is omitted from the example. See Handling Messaging Service errors for advice on dealing with errors in a production client.

        // Wrap all the operations that might fail after we create the
        // durable subscription in a try-catch, so that we can delete the
        // subscription if something goes wrong.
        try {
            // Create a Pulsar producer on the to-device topic for the tenant.
            final Producer<byte[]> producer = client.newProducer(Schema.BYTES)
                .topic(MessageFormat.format("persistent://{0}/mqtt/to-device", tenantID))
                .create();
            System.out.println("Created Pulsar producer");

            // Publish a message to a single MQTT device.
            producer.newMessage()
                .property("clientID", "demoClient")
                .property("topic", "demoTopicB")
                .key("demoClient")
                .value("Message sent to a single device".getBytes(StandardCharsets.UTF_8))
                .send();
            System.out.println("Sent message to single device");

            // Publish a message to all MQTT devices subscribed to a topic.
            // Note that the "clientID" property is omitted here.
            producer.newMessage()
                .property("topic", "demoTopicB")
                .key("demoTopicB")
                .value("Message sent to all subscribed devices".getBytes(StandardCharsets.UTF_8))
                .send();
            System.out.println("Sent message to all subscribed devices");

            // Close the producer.
            producer.close();
        }

Messaging Service quotas and limits

Messages published to a Pulsar topic are stored persistently by the Messaging Service until they have been delivered to, and acknowledged by, all interested consumers. For messages published to the from-device topic by the MQTT Service, the consumers are any clients that have created durable subscriptions on the topic. For messages published to the to-device topic by clients, the consumers are the instances of the MQTT Service that will deliver the messages to devices.

To optimize resource usage, the Messaging Service imposes storage limits and a message time-to-live (TTL) on persistently stored messages.

See the service quotas documentation for details on the default limits. These limits are configurable on a per-tenant basis. If your use case requires a different configuration, or if you have any questions or concerns, contact product support.

Message backlog quota

Persistent messages are stored in a backlog until they are delivered to any interested consumers. The maximum size of the backlog is set by the backlog quota limit, which directly affects the number of messages that can be stored and therefore the resource consumption of the platform.

A separate backlog exists for each Pulsar topic, so for the MQTT Service the from-device and to-device topics for a tenant will each have their own independent backlog. The backlog is shared by all subscriptions on a topic. If the backlog quota limit is reached, no new messages can be added to the backlog until some older messages have been delivered, or deleted due to their TTL expiring.

If the backlog quota limit for the Pulsar from-device topic is reached, new MQTT PUBLISH packets from connected devices will be rejected. If the PUBLISH packet was sent with QoS level 0, the message will be lost. If the PUBLISH packet was sent with QoS level 1, the behaviour depends on the MQTT protocol version used by the device:

  • For devices using MQTT version 3, the device will be disconnected.
  • For devices using MQTT version 5, the device will receive a PUBACK packet with reason code 0x97, Quota exceeded.

If the backlog quota limit for the Pulsar to-device topic is reached, clients calling the Producer.send() method, or its equivalent in the Pulsar library used by the client, will receive an appropriate exception or error response from the client library.

Message time-to-live

Any undelivered messages will be automatically deleted if they have been on the backlog for longer than the time-to-live (TTL) limit. This policy helps to limit overall resource usage and reduces the need to process outdated data after a prolonged disconnection of a consumer.

No undelivered message will ever be deleted from the backlog unless it reaches its TTL limit. Messages will always be delivered to the consumer in the order they were published to the topic.

Best practices for reliable message delivery from devices

If a topic reaches its backlog quota limit, it stops accepting new messages and messages may be lost. To avoid this:

  • Process and acknowledge messages from the from-device topic as quickly as possible. Every message must be explicitly acknowledged, even if the client is not interested in it. Do not acknowledge a message until processing is complete or the message has been stored securely for later processing. Acknowledged messages will not be re-delivered after a client failure or restart.
  • Manage subscription lifecycles. Subscribing a consumer creates a durable subscription that remains until explicitly deleted. Messages published while the client is disconnected will be retained for the subscription and delivered when the client reconnects. Because subscriptions persist, a topic can reach its backlog quota even when no clients are running.
    1. Use the same subscription name each time the client connects. Avoid creating random subscription names on each run. That leaves inactive subscriptions accumulating and may exhaust the backlog.
    2. Explicitly delete subscriptions when they are no longer required. For example, when taking a client out of service for an extended period, call the consumer unsubscribe() method or use the Messaging Service monitoring and management interface to delete the subscription.

Example code – deleting the subscription

The code snippet below shows how to delete the subscription and close the other Pulsar client objects created by the earlier code examples.

        finally {
            // Delete the durable subscription.
            // This is only necessary if messages should *not* be retained
            // on the topic while the client is disconnected.
            consumer.unsubscribe();
        }

        // Close the other Pulsar objects that we created.
        consumer.close();
        client.close();

Handling Messaging Service errors

The Cumulocity Messaging Service is a complex, distributed service running remotely from your client. In common with all distributed systems, perfect reliability cannot be guaranteed, and a client should be prepared to handle errors reported by the Pulsar client library. These errors can be split into two general categories:

  1. Configuration or logical errors in the client implementation. Errors in this category are usually “fatal” and prevent the client from connecting to the Messaging Service, or publishing or consuming any messages. Some typical examples of this type of error include:
    • Attempting to connect with an incorrect Pulsar URL.
    • Using invalid authentication credentials.
    • Using the credentials of a user that is not authorized to access the Messaging Service.
    • Attempting to consume from the to-device topic, or publish to the from-device topic.
    • Attempting to publish to or consume from any other topic.
    • Attempting to publish incorrectly constructed messages. The most likely cause for this is attempting to publish a message with a payload that was not explicitly created as a byte array.
  2. Transient errors in the Messaging Service. Errors in this category usually reflect a temporary issue with the Messaging Service server, that will be resolved either automatically or by administrator action. Some transient errors that a client may experience include:
    • Connections may be dropped when Messaging Service components are restarted during upgrades, or during unplanned outages of the Messaging Service. This will cause publish or consume operations to fail, and it may be necessary to re-connect, or re-establish the producer or consumer, before retrying the operation.
    • Published messages will be rejected when the backlog quota limit on the to-device topic has been reached. See reliable delivery best practices for advice on avoiding this situation.
    • Published messages may be rejected if other limits or quotas on the Messaging Service are reached.

If your client is using the Java client library, almost all errors will be reported as a PulsarClientException thrown by a client library method. In some very rare cases a SchemaSerializationException runtime error might also be thrown, if the client has not used the Schema.BYTES schema and byte array payloads exclusively. The PulsarClientException class has many sub-classes that allow a client to determine the cause of the error more precisely. Other client libraries will have similar language-specific error reporting mechanisms.

In general, it is not possible to recover from a fatal configuration or logic error in the client implementation. The client will need to be restarted after the error has been corrected. For transient errors, a strategy of retrying after a delay is usually appropriate. When an operation on a producer or a consumer has failed, it may be difficult to identify the exact root cause and the optimal response. A simple recovery approach that covers most scenarios is to delete the failed producer or consumer and create a new one before retrying the operation. This avoids cases where the producer or consumer cannot reconnect after an error. A more sophisticated strategy can tailor the response to the specific subclass of PulsarClientException thrown. Use an exponential backoff strategy to increase the delay between retries until the service recovers.

Example client

A complete example Java client based on the code snippets above can be found in the cumulocity-examples repository. The README.md file provided with the example explains how to build and run it.

The examples repository also contains a simple Python MQTT client that can be used to simulate an MQTT device and test the operation of the Java client. See the README.md file included with the example for more details. Start the Python client first to ensure messages sent to a device are received, then start the Java client.

Java Client

Caution

The MQTT Service Java SDK is deprecated and should not be used for new development. It will be replaced by direct connections to the Cumulocity Messaging Service.

See Connecting microservices and applications for more information.

The MQTT Service Java Client library provides the classes necessary to interact with the MQTT Service. The following operations are supported by the client:

  • Publishing messages to the MQTT Service via a WebSocket protocol.
  • Subscribing to messages from the MQTT Service via a WebSocket protocol

Repositories and dependencies

Follow the Microservice SDK documentation for guidance on how to configure Maven repositories. To include the MQTT Service Java Client into your project, add the following dependency inside the <dependencies> node:

<dependency>
    <groupId>com.cumulocity.sdk.mqtt</groupId>
    <artifactId>mqtt-service-ws</artifactId>
    <version>${c8y.version}</version>
</dependency>

Examples

Example of publishing messages to the MQTT Service via WebSocket:

// Message to be sent
final String payload = "Hello World";

// Construct a new MqttServiceMessage and set the payload
final MqttServiceMessage message = new MqttServiceMessage();
message.setPayload(payload.getBytes());

// Create an instance of MqttServiceApi by specifying the server URI to connect to along with TokenApi
final MqttServiceApi mqttServiceApi = MqttServiceApi.webSocket()
        .url(webSocketBaseUrl)
        .tokenApi(tokenApi)
        .build();

// Build PublisherConfig with topic to which the message is to be sent
final PublisherConfig config = PublisherConfig.publisherConfig().topic(topic).build();

// Build Publisher and publish MqttServiceMessage. Close the resource either by using a [try-with-resources block](https://docs.oracle.com/javase/tutorial/essential/exceptions/tryResourceClose.html) as below or by calling publisher.close() explicitly
try (final Publisher publisher = mqttServiceApi.buildPublisher(config)) {
    publisher.publish(message);
} catch (Exception e) {
    log.error("Could not sent message to {}", topic, e);
}
mqttServiceApi.close();

Example of subscribing to messages from the MQTT Service via WebSocket:

// Create an instance of MqttServiceApi by specifying the server URI to connect to along with TokenApi
final MqttServiceApi mqttServiceApi = MqttServiceApi.webSocket()
        .url(webSocketBaseUrl)
        .tokenApi(tokenApi)
        .build();

// Build SubscriberConfig with topic and subscriber name
final SubscriberConfig config = SubscriberConfig.subscriberConfig().topic(topic).subscriber(subscriberName).build();

// Build Subscriber
final Subscriber subscriber = mqttServiceApi.buildSubscriber(config);

// Subscribe by passing implementation of MessageListener to handle messages from the MQTT Service.
subscriber.subscribe(new MessageListener() {
    @Override
    public void onMessage(MqttServiceMessage message) {
        log.info("Message Received: {}", new String(message.getPayload()));
    }
});

// Close the resources after usage
subscriber.close();
mqttServiceApi.close();

Frequently Asked Questions

Q: How can I obtain device credentials for my MQTT devices?
A: The MQTT Service is not yet integrated with the Cumulocity device bootstrap process. This support is planned for a future release. In the meantime, follow the Integration lifecycle to bootstrap the device and obtain device credentials. Once the device credentials are obtained, the device can use them to connect to the MQTT Service.

Q: Does the MQTT Service support the SmartREST 2.0 protocol?
A: Not yet. Support for SmartREST 2.0 is planned for a future release.

Q: Why does the MQTT Service not use the standard MQTT ports 1883 and 8883?
A: Those ports are already used by Cumulocity Core MQTT. While both MQTT implementations are operating in parallel, the MQTT Service must use different ports.

Q: What other ways are there to map my MQTT device payloads to Cumulocity, other than a Streaming Analytics app or a custom microservice?
A: One option is to use the Dynamic Mapping Service for Cumulocity. This is a community-supported open-source component that allows many different payload formats and encodings to be mapped to the Cumulocity domain model. Mappings can be configured using a graphical UI or by writing JavaScript code.