The MQTT Transport Connectivity Plug-in

About the MQTT transport

MQTT is a publish/subscribe-based “lightweight” message protocol designed for communication between constrained devices, for example, devices with limited network bandwidth or unreliable networks. See http://mqtt.org/ for detailed information.

Info
While it is possible to use MQTT to communicate between Apama and Cumulocity, we recommend using the Cumulocity transport connectivity plug-in provided with Apama. See The Cumulocity IoT transport connectivity plug-in for detailed information.

Apama provides a connectivity plug-in, the MQTT transport, which can be used to communicate between the correlator and an MQTT broker, where the MQTT broker uses topics to filter the messages. MQTT messages can be transformed to and from Apama events by listening for and sending events to channels such as prefix:topic (where the prefix is configurable).

The MQTT transport automatically reconnects in case of a connection failure. The transport will retry sending any messages sent after the connection has been lost when reconnection has succeeded

You configure the MQTT connectivity plug-in by editing the files that come with the MQTT bundle. The properties file defines the substitution variables that are used in the YAML configuration file which also comes with the bundle. See Adding the MQTT connectivity plug-in to a project for further information.

Info
In addition to using Apama Plugin for Eclipse to add bundles, you can also do this using the apama_project command-line tool. See Creating and managing an Apama project from the command line for more information.

This transport provides a dynamic chain manager which creates chains automatically when EPL subscribes or sends to a correlator channel with the configured prefix, typically mqtt:. For the MQTT transport, there must be exactly one chain definition provided in the dynamicChains section of the YAML configuration file.

For more information on YAML configuration files, see Using connectivity plug-ins and especially Configuration file for connectivity plug-ins.

Info
The MQTT connectivity plug-in does not support reliable messaging.

Using MQTT connectivity from EPL

The MQTT transport can either subscribe to or send to a particular topic, depending on whether your EPL is subscribing to or sending to a particular channel.

In EPL, in order to receive an MQTT message, you just need to subscribe to an MQTT topic with the appropriate prefix. For example:

monitor.subscribe("mqtt:topic_a");
on all A() as a {
   print a.toString();
}

To send an Apama event to the MQTT broker, you just need to use the send...to statement to deliver the event to the MQTT topic. For example:

send A("hello world") to "mqtt:topic_a";

As with all connectivity plug-ins, the EPL application is responsible for telling the system when it is ready to start receiving events with onApplicationInitialized. See also Sending and receiving events with connectivity plug-ins.

The samples/connectivity_plugin/application/genericsendreceive directory of your Apama installation includes a simple sample which provides an easy way to get started with sending and receiving messages to or from any connectivity plug-in. For more information, see the README.txt file in the above directory and Sending and receiving events with connectivity plug-ins.

Loading the MQTT transport

The MQTT transport is loaded with the following connectivityPlugins stanza:

mqttTransport:
  libraryName: connectivity-mqtt
  class: MQTTTransport

Configuring the connection to MQTT

You configure one or more dynamicChainManagers to connect to different MQTT brokers. For example:

dynamicChainManagers:
  mqttManager:
    transport: mqttTransport
    managerConfig:
      brokerURL: tcp://localhost:1883

Connection-related configuration is specified in the managerConfig stanza on the dynamicChainManagers instance. The following configuration options are available for managerConfig:

Configuration option

Description

brokerURL

URL for the MQTT broker. For example, you can use the following URL for non-TLS connections:

tcp://localhost:1883

To enable SSL/TLS, you simply indicate this in the broker URL. For example:

ssl://localhost:8883

Type: string.

channelPrefix

Prefix for dynamic mapping. If the prefix ends with a colon (:), it needs to be enclosed in quotation marks (see also Using YAML configuration files). When the channel is mapped to an MQTT topic, the prefix is not used. For example, if the prefix is "mqtt:", then the channel mqtt:test/a maps to the MQTT topic test/a.

Type: string.

Default: "mqtt:".

mqttClientId

Optional. By default, a random client identifier is generated during startup. If you do not want to use this random identifier, you can set this option to configure your own client identifier. This can be any alphanumerical value. Type: string.

cleanSession

Starts a clean session with the MQTT broker. Set this to false if the previous session is to be resumed. You should only do this in conjunction with setting the mqttClientId. Type: bool.

Default: true.

acceptUnrecognizedCertificates

Used with TLS. By default, connections to unrecognized certificates are terminated. Set this to true if non-validated server certificates are to be accepted. Type: bool.

Default: false.

certificateAuthorityFile

Used with TLS. By default, server certifications signed by all standard Certificate Authorities are validated. Optionally, you can set this option to provide a path to a CA certificate file in PEM format to authenticate the host with. Type: string.

authentication/username

User name for authentication. Type: string.

authentication/password

Password for authentication. Type: string.

authentication/certificateFile

Used by TLS. Optionally, you can set this option to provide a path to a CA certificate file in PEM format to authenticate the client with. Type: string.

authentication/certificatePassword

Used by TLS. Optional password used to decrypt the client private key file, if encrypted. Type: string.

authentication/privateKeyFile

Used by TLS. Optional path to a PEM file containing the private key, if not already included in the certificate file. Type: string.

Important: If you provide a password for authentication via the configuration file, you must ensure to protect the configuration file against any unauthorized access, since the password will be readable in plain text.

Mapping events between MQTT messages and EPL

You can use the apama.eventMap host plug-in in a dynamic chain to translate events to or from nested messages like JSON data. You configure exactly one dynamicChains section to handle transforming messages from the MQTT broker into the correlator, and vice versa.

The following description shows how to configure the genericsendreceive sample to send/receive data to/from an MQTT broker. The advantage of this sample is that all required EPL code is already available (see also Writing EPL). The sample is located in the samples/connectivity_plugin/application/genericsendreceive directory of your Apama installation. To use the sample, do the following:

  1. Import the sample into Apama Plugin for Eclipse as an existing project (make sure to create a copy).
  2. Add the MQTT connectivity plug-in to that project (see also Adding the MQTT connectivity plug-in to a project) and edit the MQTT.yaml file as per your application.
  3. Configure both the input and output channels to apamax by sending the ConfigureSample event (in order to send continuous data, keepSending must be set to true).
  4. Send the AppReady event to start the application.

We recommend use of the suppressLoopback configuration property to prevent undesirable behavior. See Host plug-ins and configuration for further information.

Mapping events between MQTT messages and EPL

You can use the apama.eventMap host plug-in in a dynamic chain to translate events to or from nested messages like JSON data. You configure exactly one dynamicChains section to handle transforming messages from the MQTT broker into the correlator, and vice versa.

The following description shows how to configure the genericsendreceive sample to send/receive data to/from an MQTT broker. The advantage of this sample is that all required EPL code is already available (see also Writing EPL). The sample is located in the samples/connectivity_plugin/application/genericsendreceive directory of your Apama installation. To use the sample, do the following:

  1. Import the sample into Apama Plugin for Eclipse as an existing project (make sure to create a copy).
  2. Add the MQTT connectivity plug-in to that project (see also Adding the MQTT connectivity plug-in to a project) and edit the MQTT.yaml file as per your application.
  3. Configure both the input and output channels to apamax by sending the ConfigureSample event (in order to send continuous data, keepSending must be set to true).
  4. Send the AppReady event to start the application.

We recommend use of the suppressLoopback configuration property to prevent undesirable behavior. See Host plug-ins and configuration for further information.

Payload for the MQTT message

As with all other transports, the translation between EPL events and MQTT payloads is based on the choice of host plug-in and codecs. See Host plug-ins and configuration and Codec Connectivity Plug-ins for further information.

The payload for the MQTT message is a byte array. Therefore, the String codec should usually be used to convert a byte[] (Java) or buffer_t (C++) type payload into a hostward string event. The same String codec can also be used to convert a string event to a transportward message with a byte[] or buffer_t type.

Wildcard topic subscriptions

MQTT supports a hierarchical topic namespace and allows you to subscribe to every topic in a namespace using a wildcard symbol such as #. Any MQTT messages that are sent to the broker and that satisfy the topic namespace are sent to the correlator.

A potential result of this may be that a single MQTT message that is sent to the broker is received more than once by the correlator. For example, assume that Apama subscribes to both of the following channels:

"mqtt:SENSOR/#"
"mqtt:SENSOR/1"

If a single MQTT message is sent to the broker using the topic name SENSOR/1, then this MQTT message will be received twice by the correlator. You should be aware of such situations and write your EPL accordingly to handle this.

Metadata for the MQTT message

Messages coming from the transport have useful pieces of information inserted into their metadata. This information is stored as a map associated with the mqtt key. This map contains the following information:

Field Description
metadata.mqtt.topic Contains the full name of the topic from which the message originated. This allows you to differentiate between messages coming from different sources in the case of a transport subscribed with a wildcard.

Restrictions

Not all MQTT features are supported by the MQTT transport. The following features are not supported:

  • Reliable messaging, that is, session persistency and QoS (Quality of Service) level greater than 0.
  • Retained messages.
  • Last will and testament options.