About the MQTT transport
MQTT is a publish/subscribe-based “lightweight” message protocol designed for communication between constrained devices, for example, devices with limited network bandwidth or unreliable networks. See http://mqtt.org/ for detailed information.
Apama provides a connectivity plug-in, the MQTT transport, which can be used to communicate between the correlator and an MQTT broker, where the MQTT broker uses topics to filter the messages. MQTT messages can be transformed to and from Apama events by listening for and sending events to channels such as prefix:topic
(where the prefix is configurable).
The MQTT transport automatically reconnects in case of a connection failure. The transport will retry sending any messages sent after the connection has been lost when reconnection has succeeded
You configure the MQTT connectivity plug-in by editing the files that come with the MQTT bundle. The properties file defines the substitution variables that are used in the YAML configuration file which also comes with the bundle. See Adding the MQTT connectivity plug-in to a project for further information.
apama_project
command-line tool. See Creating and managing an Apama project from the command line for more information.This transport provides a dynamic chain manager which creates chains automatically when EPL subscribes or sends to a correlator channel with the configured prefix, typically mqtt:
. For the MQTT transport, there must be exactly one chain definition provided in the dynamicChains
section of the YAML configuration file.
For more information on YAML configuration files, see Using connectivity plug-ins and especially Configuration file for connectivity plug-ins.
Using MQTT connectivity from EPL
The MQTT transport can either subscribe to or send to a particular topic, depending on whether your EPL is subscribing to or sending to a particular channel.
In EPL, in order to receive an MQTT message, you just need to subscribe to an MQTT topic with the appropriate prefix. For example:
monitor.subscribe("mqtt:topic_a");
on all A() as a {
print a.toString();
}
To send an Apama event to the MQTT broker, you just need to use the send...to
statement to deliver the event to the MQTT topic. For example:
send A("hello world") to "mqtt:topic_a";
As with all connectivity plug-ins, the EPL application is responsible for telling the system when it is ready to start receiving events with onApplicationInitialized
. See also Sending and receiving events with connectivity plug-ins.
The samples/connectivity_plugin/application/genericsendreceive
directory of your Apama installation includes a simple sample which provides an easy way to get started with sending and receiving messages to or from any connectivity plug-in. For more information, see the README.txt
file in the above directory and Sending and receiving events with connectivity plug-ins.
Loading the MQTT transport
The MQTT transport is loaded with the following connectivityPlugins
stanza:
mqttTransport:
libraryName: connectivity-mqtt
class: MQTTTransport
Configuring the connection to MQTT
You configure one or more dynamicChainManagers
to connect to different MQTT brokers. For example:
dynamicChainManagers:
mqttManager:
transport: mqttTransport
managerConfig:
brokerURL: tcp://localhost:1883
Connection-related configuration is specified in the managerConfig
stanza on the dynamicChainManagers
instance. The following configuration options are available for managerConfig
:
Configuration option |
Description |
---|---|
|
URL for the MQTT broker. For example, you can use the following URL for non-TLS connections:
To enable SSL/TLS, you simply indicate this in the broker URL. For example:
Type: |
|
Prefix for dynamic mapping. If the prefix ends with a colon (:), it needs to be enclosed in quotation marks (see also Using YAML configuration files). When the channel is mapped to an MQTT topic, the prefix is not used. For example, if the prefix is Type: Default: |
|
Optional. By default, a random client identifier is generated during startup. If you do not want to use this random identifier, you can set this option to configure your own client identifier. This can be any alphanumerical value. Type: |
|
Starts a clean session with the MQTT broker. Set this to Default: |
|
Used with TLS. By default, connections to unrecognized certificates are terminated. Set this to Default: |
|
Used with TLS. By default, server certifications signed by all standard Certificate Authorities are validated. Optionally, you can set this option to provide a path to a CA certificate file in PEM format to authenticate the host with. Type: |
|
User name for authentication. Type: |
|
Password for authentication. Type: |
|
Used by TLS. Optionally, you can set this option to provide a path to a CA certificate file in PEM format to authenticate the client with. Type: |
|
Used by TLS. Optional password used to decrypt the client private key file, if encrypted. Type: |
|
Used by TLS. Optional path to a PEM file containing the private key, if not already included in the certificate file. Type: |
Important: If you provide a password for authentication via the configuration file, you must ensure to protect the configuration file against any unauthorized access, since the password will be readable in plain text.
Mapping events between MQTT messages and EPL
You can use the apama.eventMap
host plug-in in a dynamic chain to translate events to or from nested messages like JSON data. You configure exactly one dynamicChains
section to handle transforming messages from the MQTT broker into the correlator, and vice versa.
The following description shows how to configure the genericsendreceive
sample to send/receive data to/from an MQTT broker. The advantage of this sample is that all required EPL code is already available (see also Writing EPL). The sample is located in the samples/connectivity_plugin/application/genericsendreceive
directory of your Apama installation. To use the sample, do the following:
- Import the sample into Apama Plugin for Eclipse as an existing project (make sure to create a copy).
- Add the MQTT connectivity plug-in to that project (see also Adding the MQTT connectivity plug-in to a project) and edit the
MQTT.yaml
file as per your application. - Configure both the input and output channels to
apamax
by sending theConfigureSample
event (in order to send continuous data,keepSending
must be set totrue
). - Send the
AppReady
event to start the application.
We recommend use of the suppressLoopback
configuration property to prevent undesirable behavior. See Host plug-ins and configuration for further information.
Mapping events between MQTT messages and EPL
You can use the apama.eventMap
host plug-in in a dynamic chain to translate events to or from nested messages like JSON data. You configure exactly one dynamicChains
section to handle transforming messages from the MQTT broker into the correlator, and vice versa.
The following description shows how to configure the genericsendreceive
sample to send/receive data to/from an MQTT broker. The advantage of this sample is that all required EPL code is already available (see also Writing EPL). The sample is located in the samples/connectivity_plugin/application/genericsendreceive
directory of your Apama installation. To use the sample, do the following:
- Import the sample into Apama Plugin for Eclipse as an existing project (make sure to create a copy).
- Add the MQTT connectivity plug-in to that project (see also Adding the MQTT connectivity plug-in to a project) and edit the
MQTT.yaml
file as per your application. - Configure both the input and output channels to
apamax
by sending theConfigureSample
event (in order to send continuous data,keepSending
must be set totrue
). - Send the
AppReady
event to start the application.
We recommend use of the suppressLoopback
configuration property to prevent undesirable behavior. See Host plug-ins and configuration for further information.
Payload for the MQTT message
As with all other transports, the translation between EPL events and MQTT payloads is based on the choice of host plug-in and codecs. See Host plug-ins and configuration and Codec Connectivity Plug-ins for further information.
The payload for the MQTT message is a byte array. Therefore, the String codec should usually be used to convert a byte[]
(Java) or buffer_t
(C++) type payload into a hostward string event. The same String codec can also be used to convert a string event to a transportward message with a byte[]
or buffer_t
type.
Wildcard topic subscriptions
MQTT supports a hierarchical topic namespace and allows you to subscribe to every topic in a namespace using a wildcard symbol such as #
. Any MQTT messages that are sent to the broker and that satisfy the topic namespace are sent to the correlator.
A potential result of this may be that a single MQTT message that is sent to the broker is received more than once by the correlator. For example, assume that Apama subscribes to both of the following channels:
"mqtt:SENSOR/#"
"mqtt:SENSOR/1"
If a single MQTT message is sent to the broker using the topic name SENSOR/1
, then this MQTT message will be received twice by the correlator. You should be aware of such situations and write your EPL accordingly to handle this.
Metadata for the MQTT message
Messages coming from the transport have useful pieces of information inserted into their metadata. This information is stored as a map associated with the mqtt
key. This map contains the following information:
Field | Description |
---|---|
metadata.mqtt.topic |
Contains the full name of the topic from which the message originated. This allows you to differentiate between messages coming from different sources in the case of a transport subscribed with a wildcard. |
Restrictions
Not all MQTT features are supported by the MQTT transport. The following features are not supported:
- Reliable messaging, that is, session persistency and QoS (Quality of Service) level greater than 0.
- Retained messages.
- Last will and testament options.