MQTT Service

Requirements
To use the MQTT Service your tenant must be subscribed to the Mqtt-service microservice. This may have been done automatically, depending on how your Cumulocity environment was configured. To check the subscription, open the Administration application and navigate to Ecosystem > Microservices. If you do not see the Mqtt-service microservice listed, contact product support (for public environments) or your Cumulocity administrator (for dedicated environments) to request the subscription for your tenant.

The MQTT Service provides a single, unified endpoint for integrating MQTT devices with the Cumulocity platform. It is not a general-purpose MQTT broker. It is optimized for scenarios where there are a large number of connected devices, and a high aggregate throughput of messages from devices into the platform.

This documentation is aimed at developers who want to integrate MQTT devices with Cumulocity, or to build clients that communicate with their devices. It does not describe the basics of MQTT communication. If you are unfamiliar with MQTT, there are numerous introductions available, starting with the MQTT website.

Overview and architecture

The MQTT Service works together with the Messaging Service to quickly and securely integrate MQTT devices with the Cumulocity platform. Devices that already understand the Cumulocity domain model can use the Core MQTT protocols (SmartREST and JSON-over-MQTT) to communicate directly with Cumulocity. Alternatively, devices can send and receive messages with arbitrary payloads on arbitrary MQTT topics. For these generic devices, the tenant is responsible for converting between the device’s protocol and the Cumulocity domain model. This conversion can be implemented in a microservice running inside the platform, or in an external client application.

The MQTT Service should be regarded as an MQTT endpoint rather than a full MQTT broker. It is optimized for the IoT device integration use case, which has some highly asymmetric properties:

  • A large number (up to tens of millions) of simultaneously connected devices publishing messages into the Cumulocity platform
  • A large number (up to tens of millions) of unique MQTT topics
  • A high aggregate throughput (up to millions per second) of messages published into the Cumulocity platform
  • A small number of high-throughput message consumers within the Cumulocity platform
  • Individual devices have a smaller message throughput (up to hundreds per second)
  • Smaller aggregate message throughput from the Cumulocity platform to devices (up to thousands per second)
  • No direct peer-to-peer device communication
  • Devices are geographically distributed, often with limited physical security
  • The Cumulocity platform is deployed on secure, managed, highly available infrastructure

Other, more symmetric server-to-server or application integration use cases may exceed the limits enforced by the MQTT Service. For optimal performance, these use cases should be implemented using a more traditional publish/subscribe architecture with direct connections to the Messaging Service.

Key features

Protocols and clients

Connection protocols TCP only.
MQTT protocol versions 3.1.1 and 5.0. See MQTT protocol implementation for more details.
Generic MQTT device protocols MQTT devices can publish and subscribe arbitrary payloads on arbitrary MQTT topics.
Cumulocity Core MQTT protocols Preview support for the SmartREST 1.0, SmartREST 2.0 and JSON-over-MQTT protocols. See Core MQTT device support for more details.
Apache Pulsar Microservices and external clients connect directly to the Messaging Service to convert between device protocols and the Cumulocity domain model.
Messaging Service clients are also responsible for registering devices as Cumulocity Managed Objects if required.
Thin Edge Out of the box support in Thin Edge for generic and Core MQTT protocols on the same device.

Security and isolation

Multi tenancy MQTT devices from multiple tenants connect to the same endpoint. Each tenant’s connections, topics and messages are fully isolated.
Device isolation MQTT devices within a tenant cannot communicate directly with each other. Each device effectively has its own private topic space.
Client access Microservices and external applications connecting to the Messaging Service have access to all topics and messages within a tenant.
Bi-directional TLS Certificate trust anchors are managed within Cumulocity. Certificates are tightly bound to individual MQTT devices.

Performance and scaling

Horizontal scaling The MQTT Service can be scaled independently from the Cumulocity core.
Benchmark results Validated to scale up to 100 million concurrent device connections, with throughput of 1 million unique messages per second.
Limits and quotas Per-tenant and per-client limits and quotas ensure service stability and prevent “noisy neighbour” problems.

Architecture

The diagram below illustrates the MQTT Service data flows within a tenant.

All messages published by MQTT devices are forwarded to the Messaging Service, where they are persisted until they are consumed. Cumulocity domain model messages published to the MQTT topics used by the Core MQTT protocols are consumed directly by the Cumulocity core. Messages published to other MQTT topics are consumed by microservices and/or external clients that are responsible for mapping the messages to the Cumulocity domain model. Similarly, the Cumulocity core and clients can publish messages to the Messaging Service that will be consumed by the MQTT Service and forwarded to devices.

MQTT Service architecture

Device isolation

Because of the device isolation feature, there is no interaction between topics with the same name used by different clients. Effectively, every device has its own private topic space that can only be accessed by that device. This can be seen in the diagram where device 1 and device N are both publishing and subscribing on topic A. Because devices are isolated, device 1 cannot see any of the messages published by device N, and vice-versa. However, a microservice or external application client connecting directly to the Messaging Service has full access to the topics used by all devices, and can forward messages between clients if required.

Connecting MQTT devices

This section covers the details of connecting and authenticating an MQTT device to the MQTT Service. It will be of interest to anyone integrating MQTT devices with Cumulocity. In general, the MQTT Service behaves the same way for all devices, whether they use the Cumulocity Core MQTT protocols or a non-Cumulocity protocol. If there are differences related to the application protocol used by the device, these will be documented where relevant.

Ports

MQTT connections to the MQTT Service must use TCP. WebSocket connections are not supported. Use your tenant domain as the target host for the connection, for example <MY-TENANT>.cumulocity.com.

Because the MQTT Service operates alongside the pre-existing Core MQTT, devices using the MQTT Service must connect to different ports:

  • Port 9883 (TLS) is the default port for secure, encrypted communication with the MQTT Service. Both one-way (server certificate only) and two-way (both client and server certificates) TLS are supported. When client certificates are not used, the server authenticates the client using basic authentication.

  • Port 2883 (non-TLS) is not enabled in Cumulocity shared public environments due to the security risks of allowing unencrypted traffic. To enable port 2883 in a dedicated environment, please contact Product support.

Client Identifiers

Every device connecting to the MQTT Service within a given tenant must use a unique Client Identifier (client ID). If a device connects using a client ID that is already connected, the existing connection will be terminated, in accordance with the MQTT specification. Devices in different tenants can be connected at the same time using the same client ID. Empty client IDs are not permitted.

Note that in general, the client ID will be treated as an unstructured identifier that is not interpreted by the MQTT Service in any way. However, some special handling is done for previously registered Core MQTT devices connecting to the MQTT Service. See the Core MQTT device support section for more details.

See the table of limits and quotas for details of the maximum allowed client ID length.

Authentication

The MQTT Service supports the following authentication methods. In all cases it is important to ensure that the MQTT username is set correctly so that the MQTT Service can identify the Cumulocity tenant associated with the device:

  • Username and password (basic authentication)
    The credentials of any user on the tenant can be used to authenticate a device to the MQTT Service. The username in the MQTT CONNECT packet must include the tenant ID and username in the format <tenantID>/<username>. The password in the CONNECT packet must be the unencrypted password of the user. The user associated with this authentication method must be assigned to the ADMIN permission for the Mqtt service permission type.

  • X.509 device certificates (certificate authentication)
    To authenticate using a certificate, a device must provide a certificate chain that is trusted by a trust anchor configured for the Cumulocity tenant. The Common Name (CN) field of the certificate must match the client ID field in the MQTT CONNECT packet. The device should specify the tenant ID in the username field of the MQTT CONNECT packet. See the Using TLS certificates section below for more details on creating and managing trust anchors and device certificates.

Using TLS certificates

This section contains a simplified overview of the TLS certificate support in the MQTT Service. For more details, see the general device certificates documentation for Cumulocity.

Server certificates

The MQTT Service uses the same server certificates that are assigned to the main Cumulocity environment domain. Enterprise tenants are not able to customize these certificates via the SSL Management feature.

Device (client) certificates

Device certificates used with the MQTT Service share the same prerequisites outlined in the general documentation. In addition, for devices connecting to the MQTT Service:

  1. The Common Name (CN) field of the certificate must match the client ID field in the MQTT CONNECT packet. If the client ID and the certificate CN do not match, the connection will be rejected.
  2. The device should specify the tenant ID in the username field of the MQTT CONNECT packet. If the tenant ID is provided, it must correspond to a tenant that trusts the given certificate; otherwise, the connection will be rejected. Similarly, if a trust anchor is trusted by multiple tenants and the tenant ID is not provided, the connection will be rejected. Multi-tenant trust anchors are not currently supported in Cumulocity, but this feature may be introduced in the future. We recommend always specifying the tenant ID in the username field so that your devices will continue to connect if the trust anchor configuration changes.

Configuring certificate trust anchors

TLS trust anchors in the Cumulocity platform are defined per tenant. To use device certificates for authentication, the root or intermediate certificate that signs the device certificates must be uploaded to the platform and added to the tenant’s list of trusted certificates. For example, if only a root certificate has been configured as a trust anchor, the device should send a certificate chain containing (at least) the unique certificate for the the device, and an intermediate certificate trusted by the root. Conversely, if the intermediate certificate has been configured as a trust anchor, the device can send only the unique per-device certificate that is trusted by the intermediate certificate.

Trust anchors can be configured through the Trusted certificates page in the UI, or through the REST API.

Additionally, ensure that the Auto registration option is enabled when adding certificates. This allows any device presenting a valid certificate to be automatically registered on the platform when it first connects.

Creating self-signed certificates

In order to self-sign the device certificates, the root Certificate Authority (CA) certificate must be created. Using the OpenSSL CLI tool, create a private key and then generate a self-signed root certificate from it.

openssl genpkey -algorithm RSA -out ca.key
openssl req -x509 -new -nodes -key ca.key -sha256 -days 3650 -out ca.crt -subj "/C=UK/O=YourCompany/OU=YourOrg/CN=MQTTServiceCA"

Then create a private key for the device, generate the Certificate Signing Request (CSR) from this private key, and then sign the CSR.

openssl genpkey -algorithm RSA -out client.key
openssl rsa -in client.key -out client-key.pem -outform PEM
openssl req -new -key client.key -out client.csr -subj "/C=UK/O=YourCompany/OU=YourOrg/CN=mqtt-client"
openssl x509 -req -in client.csr -CA ca.crt -CAkey ca.key -CAcreateserial -out client.crt -days 3650 -sha256
cat client.crt ca.crt > client-chain.pem

If you have more advanced requirements regarding certificate creation, see Generating and signing certificates.

You may also be able to use Cumulocity’s built-in Certificate Authority, if your devices can support the Enrollment over Secure Transport (EST) protocol.

Using certificates

Once the CA certificate has been uploaded and trusted in Cumulocity, devices can authenticate using client certificates signed by your trusted CA. To connect using any MQTT client, use the previously generated client certificate and key. This example uses the Mosquitto MQTT client:

mosquitto_pub --cafile cumulocity.com.pem -d -q 1 \
  -h "cumulocity.com" -p "9883" -i myclient \
  -u t11101 \
  -t "v1/devices/me/telemetry" \
  --key client-key.pem \
  --cert client-chain.pem \
  -m '{"temperature":25}'

Explanation:

  • --cafile cumulocity.com.pem: This file contains the CA certificate of Cumulocity’s MQTT Service broker, used to validate the server’s identity.
  • --key client-key.pem and --cert client-chain.pem: These are your client certificate and private key, signed by your trusted CA.
  • -u t11101: (Optional) Specifies the MQTT username, which must be your tenant ID.
Downloading the CA certificate

Cumulocity uses certificates signed by well-known public CAs. Some clients (like Mosquitto) require explicitly providing the CA file, while others (like MQTTX) trust these certificates automatically. To download the Cumulocity MQTT Service broker’s CA certificate:

  1. Open cumulocity.com in a browser.
  2. Click the padlock icon in the address bar and view the certificate details.
  3. Download or export the root certificate, and save it as cumulocity.com.pem.

Alternatively, you can use openssl to retrieve and extract the certificate:

echo | openssl s_client -connect cumulocity.com:9883 -showcerts 2>/dev/null | \
    sed -ne '/-BEGIN CERTIFICATE-/,/-END CERTIFICATE-/p' > cumulocity.com.pem

MQTT protocol features

This section presents details of the MQTT protocol versions and features supported by the MQTT Service. As with Connecting MQTT devices it will be of interest to anyone integrating MQTT devices with Cumulocity. See the Core MQTT device support section for specific information about using Cumulocity MQTT protocols (SmartREST and JSON-over-MQTT) with the MQTT Service, but note that Core MQTT support shares the same general features and restrictions documented in this section.

Connecting to the service

The MQTT Service supports connections from clients using version 3.1, 3.1.1 or 5.0 of the MQTT protocol. Refer to the MQTT specifications for details of the differences between these versions.

MQTT version 3.1 is obsolete and not recommended. Most of the details below for version 3.1.1 will be valid for version 3.1; however, the specific differences in protcol version 3.1 are not explicitly documented.

MQTT version 3.1.1 features

These features are also applicable to devices connecting using version 5.0 of the MQTT protocol.

Clean Session

The MQTT Service requires devices to connect with the Clean Session flag in the MQTT CONNECT packet set to “1” (true). This flag is called Clean Start in MQTT version 5.0. If this flag is not set, the client connection will be rejected by the MQTT Service.

Caution
This means that messages sent to a device while it is disconnected will not be automatically delivered to it when it reconnects. Your devices and clients should implement an application-level protocol to send missed messages if this is important for your use case. This limitation also applies to the experimental support for Core MQTT devices. Pending Cumulocity device operations will not be sent to a Core MQTT device when it connects.

Quality of Service

The MQTT Service supports two levels of MQTT Quality of Service (QoS). The desired QoS level is specified in the MQTT PUBLISH packet when a device sends a message to the MQTT Service, and in the MQTT SUBSCRIBE packet when a device subscribes to a MQTT topic.

Level Supported Description
QoS 0 (at most once) Yes The service does not acknowledge messages sent by the device, and there is no guarantee that messages will be delivered.
For subscriptions, the service does not expect any acknowledgement from the device and will not send any message more than once.
QoS 1 (at least once) Yes The service will acknowledge messages sent by the device, and the device may re-send a message if no acknowledgement is received.
Acknowledged messages are guaranteed to be delivered at least once to Messaging Service clients.
For subscriptions, the device must acknowledge messages sent to it by the service, and the service may send the same message more than once.(1)
QoS 2 (exactly once) No Not supported

Notes:

  1. Because the MQTT Service requires devices to connect with a clean session, unacknowledged messages will not be re-sent by the MQTT Service after a device has disconnected and reconnected.

Duplicate Message Indicator

The Duplicate Message Indicator (DUP flag) in an MQTT PUBLISH packet indicates that this may be an attempted re-delivery of an earlier attempt to send the same packet. It will only ever be set on messages sent using QoS level 1. The DUP flag is supported by the MQTT Service in accordance with the MQTT specification.

Will Message

The MQTT Will Message feature allows a device to provide a message in the CONNECT packet that will be published on behalf of the device if it disconnects unexpectedly. Will Message is supported by the MQTT Service with these restrictions:

  • Because of device isolation, the Will Message will not be delivered to any other connected MQTT device. The Will Message will be published onto the Messaging Service where it can be consumed by a microservice or external application client.
  • The QoS level of the Will Message can be QoS 0 (at most once) or Qos 1 (at least once). QoS level 2 (exactly once) is not supported.
  • Retained Will Messages are not supported. If the retain flag is set on the Will Message, the message will not be accepted.

Retained Message

The MQTT Retained Message feature is not supported by the MQTT Service. If the RETAIN flag is set on a PUBLISH message from a device, the message will not be accepted and the connection will be closed.

Messages published by the MQTT Service to devices will never have the retain flag set.

Wildcard subscriptions

A wildcard subscription allows a device to subscribe to multiple MQTT topics using a pattern instead of a fixed topic name. The MQTT Service supports wildcard subscriptions in compliance with MQTT version 3.1.1 and 5.0 standards.

  • Single-level (+): Matches exactly one topic level. For example, sensors/+/temperature matches sensors/room1/temperature but not sensors/room1/basement/temperature.
  • Multi-level (#): Matches any number of nested levels. It must be the last character in the topic string. For example, sensors/# matches sensors/room1/temperature and sensors/room2/basement/humidity.
Device isolation

Device isolation remains active when using wildcards. A wildcard subscription does not grant access to topics that the client is not already authorized to see.

Overlapping subscriptions

If a client has multiple overlapping subscriptions that match the same topic (for example, a subscription to sensors/+/status and another to sensors/thermostat/status), the MQTT Service delivers the message only once. In cases where the overlapping subscriptions have different QoS settings, the service delivers the message using the highest QoS level defined among the matching subscriptions.

Caution
While the MQTT Service will accept wildcard subscriptions to Core MQTT topics, this is not supported. Using wildcards with Core MQTT topics can lead to unpredictable behavior. We strongly recommend using fixed topic strings for all subscriptions to Core MQTT topics.

MQTT version 5.0 features

These features apply to devices using version 5.0 of the MQTT protocol, in addition to the MQTT version 3.1.1 features described above.

Device connection properties

The MQTT version 5.0 CONNECT packet allows many optional properties of the MQTT session to be configured when a device connects. The level of support in the MQTT Service for these features varies, as shown in the table below. When a feature is described as “ignored”, this means that it can be requested at connection time but this will have no effect on the behaviour of the MQTT Service. Using features described as “not supported” may cause messages to be rejected, or the device to be disconnected.

Feature Support level Notes
Client Identifier Mandatory As for version 3.1.1.
Clean Start Mandatory As for version 3.1.1. Clean Start is required on all device connections.
Will Message Supported With the same restrictions on QoS level, retained messages and device isolation as for version 3.1.1.
These additional version 5.0 properties on the Will Message are supported:
Delay Interval, Payload Format Indicator, Content Type, Response Topic, Correlation Data and User Property.
The Message Expiry Interval property on the Will Message is ignored.
Receive Maximum Supported The MQTT Service will limit the number of unacknowledged QoS 1 messages for the device to the requested maximum.
Maximum Packet Size Supported The MQTT Service will not send any message larger than the requested size to this device.
Note that messages larger than the reqeusted size will be silently discarded.
Request Problem Information Supported A device should not assume that the MQTT Service will send a reason string, even when this has been requested.
Session Expiry Interval Ignored The requirement to set Clean Start on all connections means that session data is not retained.
Topic Alias Maximum Ignored The MQTT Service will not use topic aliases on messages sent to devices.
Request Response Information Ignored The MQTT Service will not send request/response hints in the CONNACK packet.
User Property Ignored User properties on the CONNECT packet will be ignored by the MQTT Service.
Authentication Method Ignored Extended authentication methods are not supported.
Authentication Data Ignored Extended authentication methods are not supported.

Message publishing features

These features are relevant for PUBLISH packets sent from a device to the MQTT Service, or from the MQTT Service to a device. In many cases, the additional MQTT version 5.0 properties on a message will be “passed through” from the device to a Messaging Service client, or vice-versa. It is the responsibilty of the device or client receiving the message to handle these properties appropriately.

Feature Support level Notes
Quality of Service level QoS 0 and 1 As for version 3.1.1. QoS level 2 is not supported.
Duplicate Message Indicator Supported As for version 3.1.1. Supported according to the MQTT specification.
Payload Format Indicator Supported Passed through between MQTT devices and Messaging Service clients.
Response Topic Supported Passed through between MQTT devices and Messaging Service clients.
Clients are responsible for sending a response message on the specified topic.
Correlation Data Supported Passed through between MQTT devices and Messaging Service clients.
Clients are responsible for including the correlation data on any response message(s).
User Property Supported Passed through between MQTT devices and Messaging Service clients.
Content Type Supported Passed through between MQTT devices and Messaging Service clients.
Message Expiry Interval Ignored The message expiry interval on messages published from devices will have no effect.
All messages published to devices will use the same message expiry interval.
Retained Message Not supported As for version 3.1.1. Retained messages are not supported.
Topic Alias Not supported Messages published using a topic alias will be rejected by the MQTT Service.
Subscription Identifier Not supported The subscription identifier will not be set on messages published by the MQTT Service.

Topic subscription features

These features relate to MQTT version 5.0 flags and properties that can be included in a SUBSCRIBE packet sent to the MQTT Service.

Feature Support level Notes
Maximum QoS level QoS 0 and 1 As for version 3.1.1. QoS level 2 is not supported.
User Property Ignored User properties on the SUBSCRIBE packet will be ignored by the MQTT Service.
No Local Ignored Local forwarding is not supported regardless of the setting of this option.
Retained As Published Ignored Retained messages are not supported, so this option has no effect.
Retain Handling Ignored Retained messages are not supported, so this option has no effect.
Subscription Identifier Not supported Subscriptions using a subscription identifier will be rejected by the MQTT Service.
Shared Subscription Not supported Subscriptions to topic names beginning with $share are not supported.

Topics

In general, the MQTT Service does not impose any restrictions on topic structure, and devices may use any topic name allowed by the MQTT specification. However, there are a small number of topic names that are reserved for historical compatibility or potential future use. These topic name cannot be used by devices:

  • All system topics (topic name beginning with $) unless specifically documented

There is a hard limit on the maximum length of a topic name. See the Service Quotas section for details of the limit.

Certain topics are reserved for devices using the Core MQTT protcols. See Core MQTT topics for the complete list. There is no overlap between the Core MQTT and generic device topic spaces, and all other topics are available for use by “generic” MQTT devices. Generic devices should avoid using any topic name starting with the Core MQTT prefixes listed below, even though some topics under those prefixes are not used by Core MQTT. This will help to avoid situations where it is not obvious how a given topic should be handled, which may be difficult to debug.

Payloads

The MQTT Service does not impose any specific message payload format. Message payloads are treated as opaque sequences of bytes that are delivered exactly as they were received. The content of a message payload will not have any effect on the behaviour of the MQTT Service.

There is a per-tenant hard quota on the maximum size of an MQTT message. The message size calculation includes the message header as well as the payload. Message header size can vary significantly, particularly for MQTT version 5.0 devices, but it will always be at least 2 bytes, and usually more. See the Service quotas section for details of the current quota values.

Error reporting

The MQTT Service follows the MQTT specification for responses from the server to devices.

According to the specification, if the server receives a malformed packet or a protocol error, it must disconnect the device. For MQTT version 3.1.1 devices, the device will simply be disconnected with no warning. For MQTT version 5.0 devices, the MQTT Service may send the device a packet containing a reason code, indicating the reason for the disconnection, before closing the connection. This will be a CONNACK packet in response to an error in a CONNECT packet, or a DISCONNECT packet in response to any other incorrect packet.

The server may receive a packet that is correct according to the protocol, but that it rejects for some other reason, such as a limit being exceeded. For devices using MQTT version 3.1.1, the protocol provides no way to indicate why a packet was rejected, so the connection will simply be dropped. The only exception is the SUBACK packet, which can indicate that a subscription failed, although without giving any more detailed reason. For devices using MQTT version 5.0, the protocol allows a reason code to be sent in response packets including SUBACK and PUBACK. The reason code provides the device with more information about why a specific request was rejected. The connection may still be dropped after sending the response packet.

The available reason codes are listed in section 2.4 of the MQTT version 5.0 specification.

MQTT device quotas and limits

The MQTT Service enforces several different quotas and limits on MQTT devices. See the Service Quotas section for details of the current values. As with other error conditions, a device exceeding a quota or limit will be handled according to the MQTT specification.

Unless specified otherwise, limits are enforced at the tenant level. For example, this means that the incoming message rate limit applies to the total rate across all devices connected for a tenant.

For devices using MQTT version 3.1.1, the protocol provides no way to indicate that a limit has been reached, so the connection will simply be dropped. The only exception is the SUBACK packet, which can indicate that a subscription failed, although without giving any more detailed reason.

For devices using MQTT version 5.0, where the protocol allows a reason code to be sent, the code 0x97 (Quota exceeded) will be used. The connection may still be dropped after sending this reason code.

For all protocol versions, an alarm will also be raised, subject to the rate limiting described in Alarms.

See also the discussion of Messaging Service quotas and limits imposed by the MQTT Service. These do not affect device connections directly, but “back pressure” from the Messaging Service can lead to device errors, for example if the Messaging Service is unable to accept more messages from a device.

Alarms

The MQTT Service raises Cumulocity alarms in response to some error conditions on device connections. This gives better visibility of problems to tenant users and applications, which is especially useful when obtaining good diagnostic data from a device is difficult. Alarms are rate limited, to avoid overloading the Cumulocity platform with too many alarms. This means that if, for example, many devices publish messages larger than the allowed maximum size in a short period of time, an alarm will not be raised for every instance of the problem. However, tenant users will still be aware that devices are publishing too-large messages, and can take steps to correct this.

The table below describes the alarms that will be raised for problems related to device connections:

Alarm type Description
c8y_MqttService_MaximumPacketSize_Connect A device sent a CONNECT packet larger than the allowed maximum size.
c8y_MqttService_MaximumPacketSize_Publish A device sent a PUBLISH packet larger than the allowed maximum size.
c8y_MqttService_MaximumPacketSize_Subscribe A device sent a SUBSCRIBE packet larger than the allowed maximum size.
c8y_MqttService_MaximumPacketSize_Unsubscribe A device sent an UNSUBSCRIBE packet larger than the allowed maximum size.
c8y_MqttService_TenantConnectionsLimitExceeded The number of connected devices has exceeded the allowed maximum.
c8y_MqttService_TenantConnectionRateLimitExceeded The number of device connection attempts per second has exceeded the allowed maximum.
c8y_MqttService_IncomingPublishRateLimitExceeded The number of incoming (from device) messages per second has exceeded the allowed maximum.
c8y_MqttService_OutgoingPublishRateLimitExceeded The number of outgoing (to device) messages per second has exceeded the allowed maximum.

Core MQTT device support

Feature preview

This feature is in Public Preview. That is, it is not yet generally available and may be subject to change in the future.

Core MQTT support is disabled by default and must be explicitly enabled for your tenant. To enable Core MQTT support, navigate to Settings > Feature toggles in the Administration application and set the mqtt-service.smartrest toggle key status to Enabled. While Core MQTT support is disabled, any messages published to Core MQTT topics will be treated as invalid and may cause the MQTT client to be disconnected.

The preview Core MQTT support in the MQTT Service has some differences in behaviour, compared to connecting devices directly to the Cumulocity core. Some of these differences happen because the MQTT Service is decoupled from the Cumulocity core, with messages transferred asynchronously between them, as shown in the architecture diagram. This means firstly that messages received, and potentially acknowledged, by the MQTT Service have not necessarily been processed by the Core MQTT implementation yet. Secondly, the Core MQTT implementation does not have full visiblity of the connection lifecycle and topic subscriptions made by devices connected to the MQTT Service. We expect to resolve many of these differences before the Core MQTT support in the MQTT Service reaches Generally Available status.

Using Core MQTT protocols through the MQTT Service shares the same features and restrictions documented elsewhere in this section, which may differ from accessing Core MQTT directly through the Cumulocity core. In particular:

  • WebSocket connections are not supported
  • QoS level 2 is not supported
  • MQTT version 5.0 clients are supported
  • Messages with the RETAIN flag set will be rejected, rather than the flag being ignored

Core MQTT topics

The Core MQTT protocols use a specific set of topics defined in the MQTT quick reference. All message publication and subscription on these topics is assumed to be for Core MQTT devices and will be routed to and from the Cumulocity core.

  • s/
  • t/
  • q/
  • c/
  • alarm/alarms/
  • event/events/
  • measurement/measurements/
  • inventory/managedObjects/
  • error
  • devicecontrol/notifications

Connect-time behaviour

Client identifiers

The structured MQTT client identifiers allowed by Core MQTT are not supported by the MQTT Service.

  • If the client ID includes a :defaultTemplateIdentifier suffix, this will be treated simply as part of the client ID, with no special handling.
  • If the client ID includes a d: (connection type) prefix, this will be recognized to allow previously registered Core MQTT devices to connect to the MQTT Service and be handled as the same device by Core MQTT. However, the d: prefix will not be handled specially for new devices connecting to the Cumulocity platform for the first time. When a new device connects for the first time, any d: prefix will be treated as simply part of the client ID, with no special handling.
Pending operations

Pending operations will not be automatically sent to a device when it connects to the MQTT Service. Devices should send a Get PENDING operations SmartREST message (template 500) to request an update on any pending operations to be sent.

Device registration

When a device authenticates to the MQTT Service using a certificate, it will be automatically registered as a new device by the Cumulocity core the first time it interacts with a Core MQTT topic. This automatic behaviour cannot be disabled.

Device error handling

As mentioned above, the decoupled, asynchronous architecture of the MQTT Service means that the Cumulocity core has less visiblity of connected MQTT devices. This means that a device will not be automatically disconnected even if it:

  1. Subscribes to an invalid or unavailable Core MQTT Topic.
  2. Sends an invalid Core MQTT message.

In these cases, the device will remain connected, but invalid messages will not be processed and no messages will be received from invalid topics. A device can subscribe to the s/e topic to monitor any error messages sent by the Core MQTT implementation in these cases.

Connection monitoring

Connection monitoring for “send connection” traffic from the device will work as expected.

However, monitoring of “push connection” traffic to the device is not supported by the MQTT Service.

Rate limiting

The Cumulocity rate limiting mechanism will not be used for MQTT devices connected through the MQTT Service. See the table of limits and quotas for details of the rate and other limits applied to the MQTT Service.

Integrating with microservices and external applications

Cumulocity microservices and external applications can consume messages published by devices connected to the MQTT Service, and publish messages back to those devices. To do this, your microservice or external application connects to the Cumulocity Messaging Service, a deployment of Apache Pulsar, and uses the Pulsar protocol to publish and consume MQTT messages. The diagram below shows the important interfaces and data flows used when interacting with the MQTT Service through Pulsar.

MQTT Service Pulsar connections

Info
An MQTT Service messaging client is a software component that interacts with the MQTT Service through Pulsar. It can be deployed as a microservice hosted by the Cumulocity platform, or as part of an external application hosted outside the platform. This documentation refers to such a component simply as a client. If the implementation or behaviour differs depending on where the client is hosted, those differences are documented where relevant.

The MQTT Service implements device isolation, meaning that MQTT devices connected to the MQTT Service cannot communicate directly with each other using the MQTT protocol. All inter-device communication must be managed explicitly by the client, as shown in the diagram.

This documentation does not cover the publish-subscribe messaging concepts and architecture implemented by Pulsar, nor any features of the Pulsar client libraries beyond those needed to implement a simple MQTT Service client. To learn more about those subjects, refer to the Pulsar product documentation.

Connecting to the Messaging Service

To connect your client to the Messaging Service, you will need:

  1. A Pulsar client library.
  2. The URL of the Messaging Service (Pulsar broker) in your Cumulocity environment.
  3. Credentials for a user in your tenant with permission to access MQTT Service data on the Messaging Service.

Each of these prerequisites is explained in detail below.

Pulsar client library

Open-source Pulsar client libraries are available for a number of different languages and protocols. The example code in this documentation will use the Java client library. Pulsar has strong cross-version compatibility. Use the latest version of your chosen client library regardless of the server version used by the Messaging Service. Integration with the MQTT Service does not require advanced Pulsar features that may only be available in the latest server version.

Caution
Currently only “basic” (username/password) authentication is supported for clients connecting to the Messaging Service through Pulsar. Therefore, you must ensure that your chosen Pulsar client library supports this authentication scheme.

Pulsar URL

For a microservice client, the URL should be obtained from the C8Y_BASEURL_PULSAR environment variable that will be passed to the microservice when it starts running. For an external application client, the URL has the general form pulsar+ssl://<tenant_domain>:6651/, where <tenant_domain> is the domain of your Cumulocity tenant, for example my-tenant.cumulocity.com. As implied by the pulsar+ssl protocol name, all external application client connections will use SSL/TLS security. Currently, only one-way TLS is supported. The server provides a certificate that the client can verify. Client certificates cannot be used. Implementing an external application client so that it reads the Pulsar URL from the C8Y_BASEURL_PULSAR environment variable makes it easier to develop a client that can be deployed as either a microservice or an external application.

Pulsar authentication

Authentication credentials identify both the Cumulocity tenant and the user within that tenant. Currently, only “basic” (username and password) authentication is supported for clients connecting to the Messaging Service through Pulsar. For a microservice client, you should use the credentials of the per-tenant service user that will be passed to the microservice when the tenant is subscribed to it. For an external application user, you can use the credentials of any tenant user with the appropriate authorization roles assigned, as described below. The username must be in the form <tenantID>/<user> where <tenantID> is the tenant ID (not the tenant name), and <user> is a user within that tenant. If two-factor authentication (TFA) is enabled for your tenant, your user must have the devices role assigned to disable the TFA check for that user. See TFA Settings for more information. Note that the devices role may be shown as “Device User” in the Cumulocity user interface.

Role-based access control

Pulsar client connections will be granted access to Messaging Service resources based on the roles and permissions assigned to the authenticated user. The following roles and permissions should be used for MQTT Service messaging clients:

Role and permission Access granted
Mqtt service messaging topics, Read Consume messages from MQTT devices connected to the MQTT Service
Mqtt service messaging topics, Update Publish messages to MQTT devices connected to the MQTT Service

For microservice clients, the required permissions should be added to the requiredRoles section of the microservice manifest, which will grant the requested permissions to the per-tenant service user. For example:

{
    "apiVersion": "v2",
    "name": "my-mqtt-service-client",
    "version": "1.0.0",
    ...
    "requiredRoles": [
        "ROLE_MQTT_SERVICE_MESSAGING_TOPICS_READ",
        "ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE"
    ],
    ...
}

For external application clients, the required permissions should be configured for the authenticating user through the Administration application.

Assign only the minimum permissions needed for the client to operate. For example, if your microservice only consumes messages, do not include the ROLE_MQTT_SERVICE_MESSAGING_TOPICS_UPDATE permission in the manifest.

Example code – connecting to the Messaging Service

The code snippet below shows how to use the Pulsar Java client library to connect to the Messaging Service with basic authentication. It assumes that the Pulsar URL is in the C8Y_BASEURL_PULSAR environment variable and that the tenant identifier, username and password are provided on the command line. Note that the client library will not actually attempt to connect to the Pulsar server immediately when the PulsarClient object is created. In the interest of brevity and clarity, this example does no error handling. A realistic implementation would need to handle exceptions thrown by the Pulsar client library methods.

package c8y.example.mqttservice;

import java.text.MessageFormat;
import java.nio.charset.StandardCharsets;

import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.impl.auth.AuthenticationBasic;

public class SimplePulsarClient {
    public static void main(String[] args) throws Exception {
        // Validate command line.
        if (args.length != 2) {
            System.err.println("Usage: SimplePulsarClient <tenantID> <username>");
            System.err.println("The Pulsar URL will be read from the C8Y_BASEURL_PULSAR environment variable");
            System.err.println("The password will be read from the console");
            System.exit(-1);
        }

        // Collect all the configuration properties.
        final String pulsarUrl = System.getenv("C8Y_BASEURL_PULSAR");
        final String tenantID = args[0];
        final String username = args[1];
        final String password = new String(System.console().readPassword("Password for user %s/%s: ", tenantID, username));

        // Create the basic authentication credentials object.
        final AuthenticationBasic basicAuth = new AuthenticationBasic();
        basicAuth.configure(MessageFormat.format("'{'\"userId\":\"{0}/{1}\",\"password\":\"{2}\"'}'", tenantID, username, password));

        // Create a Pulsar client using the basic authentication credentials.
        // The client will *not* try to connect and authenticate immediately.
        final PulsarClient client = PulsarClient.builder()
            .serviceUrl(pulsarUrl)
            .authentication(basicAuth)
            .build();
        System.out.println("Created Pulsar client");

        // The rest of the example will go here...
    }
}

Message payloads and properties

Pulsar messages consist of a payload and set of properties.

The payload is a sequence of zero or more bytes, identical to the payload of the MQTT PUBLISH message that the Pulsar message corresponds to. It is the client’s responsibility to understand the format of the payloads produced and accepted by the MQTT devices it communicates with.

For messages received from devices, the Pulsar eventTime field holds the time that the MQTT PUBLISH message arrived at the MQTT Service. The time is represented as a Unix timestamp (the number of milliseconds since 1 January 1970 00:00:00 UTC). This ensures downstream consumers have a consistent time source for processing.

Pulsar message properties are name-value pairs, where both the name and the value are text strings. The properties recognised by the MQTT Service are listed in the table below. Messages received from MQTT devices will always include the properties marked as required, and may include any of the optional properties. Received messages will not include any properties other than those listed here. Messages published to MQTT devices must include all of the required properties, and may include any of the optional properties. If a published message includes any properties other than those listed here, those properties will be ignored by the MQTT Service.

Property name Required Value type and encoding Purpose
topic YES String MQTT topic name
clientID YES(1) String MQTT client identifier
tx.clientUsername(2) NO(3) String MQTT client username, or common name in case of certificate authentication
tx.clientAuthType NO(3) String MQTT client authentication method (BASIC or X509)
tx.payloadFormatIndicator NO Single byte with two permitted values, encoded as strings “0” and “1” MQTT v5 Payload Format Indicator
tx.contentType NO String MQTT v5 Content Type
tx.responseTopic NO String MQTT v5 Response Topic
tx.correlationData NO Sequence of bytes, encoded as a Base64 string MQTT v5 Correlation Data
tx.userProperties.<name> NO String MQTT v5 User Property with name name(4)

Notes:

  1. The clientID property can be omitted from a published message only in special case of a broadcast message, described below in broadcast messages.
  2. The tx. prefix indicates that a property is specific to a transport, in this case the MQTT Service. Other transports will define their own transport-specific properties, but all transports will use topic and clientID.
  3. When a device connects to the MQTT Service using certificate authentication, the service enforces a strict binding, ensuring that the certificate’s Common Name matches the clientID. However, when a device connects using basic authentication, there is no automatic binding between the authenticated user and the clientID. To prevent client spoofing, it is the responsibility of the consumer to implement authorization validation. By checking the tx.clientAuthType and tx.clientUsername properties, downstream consumers (like your microservice) can verify whether the authenticated user is actually authorized to publish messages on behalf of the asserted clientID.
  4. The MQTT version 5 specification allows a message to include more than one user property with the same name. This feature is not supported by the MQTT Service. If a device publishes a message containing multiple user properties with the same name, only one of these will be copied into the Pulsar message. It is undefined which property will be copied.

Consuming messages from MQTT devices

All messages published by devices connected to the MQTT Service for a given tenant will be published to a single Pulsar topic, identified by the URL persistent://<tenantID>/mqtt/from-device. The topic URL can be broken down into 4 components:

Component Description
persistent Indicates that this is a persistent topic that will be preserved by the Messaging Service across component failures and restarts, to provide “at least once” delivery guarantees
<tenantID> The Pulsar tenant ID, which will match the Cumulocity tenant ID
mqtt The Pulsar namespace within the tenant, which will always be mqtt for the MQTT Service
from-device The Pulsar topic within the namespace, which will always be from-device for message from devices connected to the MQTT Service

Your client will only be able to consume from this topic if the authenticated user has the “read” permission on the “Mqtt service messaging topics” role. The client will not be able to consume from any other topic.

The client identifier of the device that published the messages, and the MQTT topic it was published on, can be obtained from the message properties clientID and topic as described above. The Pulsar eventTime field provides the exact time when the message was received by the MQTT Service. This means that your client must consume every message published by every device connected to the MQTT Service for the tenant, even those you are not interested in. Messages that are not of interest to the client can simply be acknowledged without further processing.

Caution
Your client must be trusted to safely handle every message published by every device connected to the MQTT Service in your tenant. If untrusted users have access to your tenant, these users should not be permitted to upload microservices, nor to connect external application clients to the Messaging Service. This recommendation also applies in the case of multiple customers, who do not mutually trust each other, sharing a single tenant.

Durable subscriptions and message acknowledgement

Subscribing a consumer to a topic establishes a durable subscription to the topic. This means that the Messaging Service will retain messages published to the topic until they have been delivered to, and acknowledged by, a client. The subscription will remain until it is explicitly deleted. It will not be removed simply because the client is not currently running. Messages that are published while the client is disconnected will be available for it to consume when it reconnects. After consuming each message, the client must explicitly acknowledge it. Acknowledging a message tells the Messaging Service that the client has no further interest in it, allowing the message to be discarded. See the section on best practices below for more information on managing durable subscriptions correctly.

Example code – consuming messages

The code snippet below shows how to use the Pulsar Java client library to consume messages from the MQTT Service from-device topic. It extends the previous example that showed how to set up the connection to the Pulsar server.

To consume messages from the topic, your client should create a Pulsar Consumer and subscribe it to the topic. The consumer should register a MessageListener callback that will be called whenever a new message arrives on the topic. The MessageListener implementation shows how to access the payload, properties, and arrival time of the received messages. For simplicity, the application messages in the example are simple text strings. However, the payload of the Pulsar message will always be an array of bytes, that must be converted to the format used by the application.

        // Create a simple message listener that will log some details of
        // each message received, when registered with a consumer.
        final MessageListener<byte[]> listener = new MessageListener<byte[]>() {
            @Override
            public void received(Consumer<byte[]> consumer, Message<byte[]> message) {
                final String clientId = message.getProperty("clientID");
                final String topic = message.getProperty("topic");
                final long eventTime = message.getEventTime();
                System.out.println(MessageFormat.format("Received message from MQTT device {0} on MQTT topic {1}", clientId, topic));
                System.out.println(MessageFormat.format("MQTT PUBLISH arrival timestamp: {0}", eventTime));
                System.out.println(MessageFormat.format("Message payload: {0}", new String(message.getValue(), StandardCharsets.UTF_8)));
                System.out.println(MessageFormat.format("Message properties: {0}", message.getProperties()));
                try {
                    // Acknowledge the message.
                    consumer.acknowledge(message);
                } catch (PulsarClientException e) {
                    e.printStackTrace();
                }
            }
        };

        // Create a Pulsar consumer on the from-device topic for the tenant,
        // using the listener defined above to process each message.
        // This will trigger connection and authentication by the client.
        final Consumer<byte[]> consumer = client.newConsumer(Schema.BYTES)
            .topic(MessageFormat.format("persistent://{0}/mqtt/from-device", tenantID))
            .subscriptionName("demoSubscription")
            .messageListener(listener)
            .subscribe();
        System.out.println("Created Pulsar consumer");

Publishing messages to MQTT devices

Any messages that your client wants to send to devices connected to the MQTT Service for a given tenant must be published to a single Pulsar topic, identified by the URL persistent://<tenantID>/mqtt/to-device. The components of the URL should be interpreted as described in Consuming messages from MQTT devices above.

Your client will only be able to publish to this topic if the authenticated user has the “update” permission on the “Mqtt service messaging topics” role. The client will not be able to publish to any other topic.

Messages published to the to-device topic are routed to connected MQTT devices using the two required message properties:

Property name Purpose
clientID Client identifier of the MQTT device that should receive the message
topic Name of the MQTT topic that the message should be published to

If the topic property is empty or missing, the message will not be published to any MQTT client. The message will only be published to a device with an active subscription to the named MQTT topic. The message will only be published to a client that is connected at the time the MQTT Service processes the published message.

Successfully publishing a message to the Messaging Service does not mean that the message has been successfully delivered to any MQTT device. Onward publishing to MQTT devices happens asynchronously and without any feedback to the Pulsar client. Messages will be delivered to devices according to the MQTT protocol specification, using the QoS level of the MQTT subscription made by the device. However, because MQTT devices are required to use a clean session when connecting to the MQTT Service, messages published to a device while it is disconnected will not be delivered.

Broadcast messages

To enforce device-level isolation, messages are published only to the specific MQTT client identified by the clientID property, provided that client has an active subscription to the relevant MQTT topic. If the clientID property is not present, the message is broadcast to all connected MQTT clients with active subscriptions to that topic.

Broadcast publishing is potentially expensive when many clients are connected and may deliver messages to unexpected devices. Use broadcast only when the application must publish the same message to every device subscribed to a topic.

Message keys

To facilitate efficient delivery and correct ordering of messages sent to MQTT devices, clients must also set the key of a Pulsar message published to the to-device topic. The key should be set as follows:

  • When the clientID message property is set, the key should have the same value as this property.
  • When the clientID message property is not set, the key should have the same value as the topic message property.

Handling of invalid messages

Published messages that do not follow the rules for message properties and keys documented above will not be delivered to any MQTT device. In particular this applies to messages with the following invalid configuration:

  • The message key is not set.
  • The message key is set but does not match the clientID or topic property as described in message keys.
  • The clientID property is set but has an empty value.
  • The topic property is not set, or it is set but has an empty value.

An alarm will be raised in the Cumulocity tenant when one of these invalid messages is detected and discarded. The rate of alarm sending is limited to avoid overloading the tenant with redundant alarms alerting about the same error on different messages. The following alarms are raised for invalid messages on the Pulsar to-device topic:

Alarm type Description
c8y_MqttService_ToDevice_NoKey The message key is not set.
c8y_MqttService_ToDevice_InvalidKey The message key is set but does not match the clientID or topic.
c8y_MqttService_ToDevice_EmptyClientId The clientID property is set but has an empty value.
c8y_MqttService_ToDevice_MissingTopic The topic property is not set.
c8y_MqttService_ToDevice_EmptyTopic The topic property is set but has an empty value.
c8y_MqttService_ToDevice_InvalidTopic The topic property is invalid, for example, SmartREST topic is used.

A message with a non-empty clientID property referring to an MQTT device that is not currently connected is not considered to be invalid. However, this message will not be delivered to the device, even if it connects later, because of the requirement for devices to use a clean session when connecting. Similarly, a message published to a connected MQTT device that is not currently subscribed to the MQTT topic specified in the topic property is not considered to be invalid. In these situations, the message will not be delivered but no alarms will be raised.

Example code – publishing messages

The code snippet below shows how to use the Pulsar Java client library to publish messages to the MQTT Service to-device topic. It extends the previous examples that set up the connection to the Pulsar server and created a message consumer.

To publish messages to the topic, your client should first create a Pulsar Producer associated with the topic. Then, the Producer can be used to create new Message objects that will be published to the topic. The example code shows how to correctly set the message properties and message key for messages targeted at a single device, and for “broadcast” messages. Again, the example assumes that the application messages are simple text strings, that must be converted to the byte array expected by the MQTT Service. For clarity, most error-handling code is omitted from the example. See Handling Messaging Service errors for advice on dealing with errors in a production client.

        // Wrap all the operations that might fail after we create the
        // durable subscription in a try-catch, so that we can delete the
        // subscription if something goes wrong.
        try {
            // Create a Pulsar producer on the to-device topic for the tenant.
            final Producer<byte[]> producer = client.newProducer(Schema.BYTES)
                .topic(MessageFormat.format("persistent://{0}/mqtt/to-device", tenantID))
                .create();
            System.out.println("Created Pulsar producer");

            // Publish a message to a single MQTT device.
            producer.newMessage()
                .property("clientID", "demoClient")
                .property("topic", "demoTopicB")
                .key("demoClient")
                .value("Message sent to a single device".getBytes(StandardCharsets.UTF_8))
                .send();
            System.out.println("Sent message to single device");

            // Publish a message to all MQTT devices subscribed to a topic.
            // Note that the "clientID" property is omitted here.
            producer.newMessage()
                .property("topic", "demoTopicB")
                .key("demoTopicB")
                .value("Message sent to all subscribed devices".getBytes(StandardCharsets.UTF_8))
                .send();
            System.out.println("Sent message to all subscribed devices");

            // Close the producer.
            producer.close();
        }

Messaging Service quotas and limits

Messages published to a Pulsar topic are stored persistently by the Messaging Service until they have been delivered to, and acknowledged by, all interested consumers. For messages published to the from-device topic by the MQTT Service, the consumers are any clients that have created durable subscriptions on the topic. For messages published to the to-device topic by clients, the consumers are the instances of the MQTT Service that will deliver the messages to devices.

To optimize resource usage, the Messaging Service imposes storage limits and a message time-to-live (TTL) on persistently stored messages.

See the service quotas documentation for details on the default limits. These limits are configurable on a per-tenant basis. If your use case requires a different configuration, or if you have any questions or concerns, contact product support.

Message backlog quota

Persistent messages are stored in a backlog until they are delivered to any interested consumers. The maximum size of the backlog is set by the backlog quota limit, which directly affects the number of messages that can be stored and therefore the resource consumption of the platform.

A separate backlog exists for each Pulsar topic, so for the MQTT Service the from-device and to-device topics for a tenant will each have their own independent backlog. The backlog is shared by all subscriptions on a topic. If the backlog quota limit is reached, no new messages can be added to the backlog until some older messages have been delivered, or deleted due to their TTL expiring.

If the backlog quota limit for the Pulsar from-device topic is reached, new MQTT PUBLISH packets from connected devices will be rejected. If the PUBLISH packet was sent with QoS level 0, the message will be lost. If the PUBLISH packet was sent with QoS level 1, the behaviour depends on the MQTT protocol version used by the device:

  • For devices using MQTT version 3, the device will be disconnected.
  • For devices using MQTT version 5, the device will receive a PUBACK packet with reason code 0x97, Quota exceeded.

If the backlog quota limit for the Pulsar to-device topic is reached, clients calling the Producer.send() method, or its equivalent in the Pulsar library used by the client, will receive an appropriate exception or error response from the client library.

Message time-to-live

Any undelivered messages will be automatically deleted if they have been on the backlog for longer than the time-to-live (TTL) limit. This policy helps to limit overall resource usage and reduces the need to process outdated data after a prolonged disconnection of a consumer.

No undelivered message will ever be deleted from the backlog unless it reaches its TTL limit. Messages will always be delivered to the consumer in the order they were published to the topic.

Best practices for reliable message delivery from devices

If a topic reaches its backlog quota limit, it stops accepting new messages and messages may be lost. To avoid this:

  • Process and acknowledge messages from the from-device topic as quickly as possible. Every message must be explicitly acknowledged, even if the client is not interested in it. Do not acknowledge a message until processing is complete or the message has been stored securely for later processing. Acknowledged messages will not be re-delivered after a client failure or restart.
  • Manage subscription lifecycles. Subscribing a consumer creates a durable subscription that remains until explicitly deleted. Messages published while the client is disconnected will be retained for the subscription and delivered when the client reconnects. Because subscriptions persist, a topic can reach its backlog quota even when no clients are running.
    1. Use the same subscription name each time the client connects. Avoid creating random subscription names on each run. That leaves inactive subscriptions accumulating and may exhaust the backlog.
    2. Explicitly delete subscriptions when they are no longer required. For example, when taking a client out of service for an extended period, call the consumer unsubscribe() method or use the Messaging Service monitoring and management interface to delete the subscription.

Example code – deleting the subscription

The code snippet below shows how to delete the subscription and close the other Pulsar client objects created by the earlier code examples.

        finally {
            // Delete the durable subscription.
            // This is only necessary if messages should *not* be retained
            // on the topic while the client is disconnected.
            consumer.unsubscribe();
        }

        // Close the other Pulsar objects that we created.
        consumer.close();
        client.close();

Handling Messaging Service errors

The Cumulocity Messaging Service is a complex, distributed service running remotely from your client. In common with all distributed systems, perfect reliability cannot be guaranteed, and a client should be prepared to handle errors reported by the Pulsar client library. These errors can be split into two general categories:

  1. Configuration or logical errors in the client implementation. Errors in this category are usually “fatal” and prevent the client from connecting to the Messaging Service, or publishing or consuming any messages. Some typical examples of this type of error include:
    • Attempting to connect with an incorrect Pulsar URL.
    • Using invalid authentication credentials.
    • Using the credentials of a user that is not authorized to access the Messaging Service.
    • Attempting to consume from the to-device topic, or publish to the from-device topic.
    • Attempting to publish to or consume from any other topic.
    • Attempting to publish incorrectly constructed messages. The most likely cause for this is attempting to publish a message with a payload that was not explicitly created as a byte array.
  2. Transient errors in the Messaging Service. Errors in this category usually reflect a temporary issue with the Messaging Service server, that will be resolved either automatically or by administrator action. Some transient errors that a client may experience include:
    • Connections may be dropped when Messaging Service components are restarted during upgrades, or during unplanned outages of the Messaging Service. This will cause publish or consume operations to fail, and it may be necessary to re-connect, or re-establish the producer or consumer, before retrying the operation.
    • Published messages will be rejected when the backlog quota limit on the to-device topic has been reached. See reliable delivery best practices for advice on avoiding this situation.
    • Published messages may be rejected if other limits or quotas on the Messaging Service are reached.

If your client is using the Java client library, almost all errors will be reported as a PulsarClientException thrown by a client library method. In some very rare cases a SchemaSerializationException runtime error might also be thrown, if the client has not used the Schema.BYTES schema and byte array payloads exclusively. The PulsarClientException class has many sub-classes that allow a client to determine the cause of the error more precisely. Other client libraries will have similar language-specific error reporting mechanisms.

In general, it is not possible to recover from a fatal configuration or logic error in the client implementation. The client will need to be restarted after the error has been corrected. For transient errors, a strategy of retrying after a delay is usually appropriate. When an operation on a producer or a consumer has failed, it may be difficult to identify the exact root cause and the optimal response. A simple recovery approach that covers most scenarios is to delete the failed producer or consumer and create a new one before retrying the operation. This avoids cases where the producer or consumer cannot reconnect after an error. A more sophisticated strategy can tailor the response to the specific subclass of PulsarClientException thrown. Use an exponential backoff strategy to increase the delay between retries until the service recovers.

Example MQTT Service clients

Source code for several example clients for the MQTT Service can be found in the cumulocity-examples GitHub repository. These examples may be good starting points for developing your own clients. In addition, the Cumulocity Tech Community is an excellent source of advice and examples for all aspects of Cumulocity development.

Refer to the README.md files included with each example for more information on how to build and run it.

A complete example Java client based on the code snippets in the Integrating with microservices and external applications section is available. Alongside this, there is a simple Python MQTT client that can be used to simulate an MQTT device and test the operation of the Java client. Start the Python client first to ensure messages sent to a device are received, then start the Java client.

For microservice developers, two examples are available. The example Java microservice uses the Cumulocity microservice SDK to connect to Pulsar from a Java-based microservice, consume messages from the MQTT Service, and transform messages into Cumulocity measurements. Likewise, there is an example Python microservice that implements similar functionality using the Python language. Both microservice examples also demonstrate how to determine the external ID of a device from the MQTT messages, map this to a Cumulocity managed object, and create the managed object if it does not already exist.

The examples repository is maintained continuously, and additional examples may have been added since this documentation was written.