Chain components and messages
Connectivity chains consist of zero or more codecs and one transport.
Codecs perform some transformation of events, processing events going to and from the correlator, and passing them on to the next component in the chain. Examples of codecs include:
- Translating events from structured form into a serialized form such as JSON or XML.
- Changing the name of a field to translate between different naming conventions.
- Removing unnecessary fields.
- Filtering, merging or splitting events.
Transports are the end of a chain. They may deliver the events to an external system that is not part of the chain and may be out of process, or may perform some operation and send the result to back to the correlator. Examples of transports include:
- Sending HTTP requests and receiving responses.
- Receiving events from a message bus.
- Performing operations on a file system.
Codecs and transports may be written in Java or C++, and chains can contain a mixture of C++ and Java plug-ins. Messages will be automatically converted between C++ and Java forms. The language bindings of any libraries required by a plug-in and familiarity with the programming environment should be the primary factors when deciding on the language in which to write a new plug-in. Note that conversions between C++ and Java forms are copies and there are overheads in performing these conversions. As the Apama host plug-ins are implemented in C++ (as is the core of the correlator), a chain consisting of only C++ plug-ins will perform better. In particular, avoid mixing many interleaved Java and C++ plug-ins in the same chain. If possible, put the C++ plug-ins on the host side of the chain. Where adjacent plug-ins in a chain are of the same type (C++/Java), messages are passed by reference/pointer (they are not copied).
Plug-ins communicate with each other and the correlator (also referred to as the host) by sending batches of messages. Messages are converted to/from the events within a correlator. When a chain sends a message to the host, the host plug-in converts it to an event and sends it into the correlator. When the correlator emits an event to a channel on which a chain is listening, then the host plug-in converts it from an event to a message, and delivers it to the chain. The plug-ins in a chain may do conversions, for example, a codec may convert a map to a single string (for example, a JSON codec), but that can be passed within a message. When it gets to a transport, it may be taken from the message and delivered in some other form (for example, an HTTP request). A message consists of a payload (which can be of different types according to the needs of the plug-in) and a metadata map of strings. For more information, see the Message
class in the API reference for Java (Javadoc) or API reference for C++ (Doxygen).
Metadata holds data about the event, such as the channel on which the event was delivered or the type of the event. Plug-ins can use metadata to pass extra information about events that are not part of the event (for example, latency through a chain could be measured by adding timestamps to metadata and comparing that with the time needed for processing an event).
The message payload can be null. This means that the message is not a real event. This can be useful for passing non-event information between plug-ins; many plug-ins will ignore this. For example, a request to terminate a connection could be sent from one codec to signal the transport to disconnect, and intermediate codecs that perform transformations such as JSON encoding would ignore the event.
Messages are passed in batches so that transports (and codecs) can take advantage of amortizing costs if operating at high throughput. Most plug-ins can be written by subclassing AbstractSimpleCodec
or AbstractSimpleTransport
classes (see Requirements of a plug-in class) and only need to process a single Message
at a time. The delineation of messages into batches does not carry any significance beyond the events are all available at the same time. This is only an opportunity for optimizations, not for passing extra semantic information. Codecs are not required to maintain the same batches and can re-batch messages if desired.
Messages are not copied between plug-ins and do not perform any locking or synchronization. If a codec wants to keep hold of a pristine copy of a message, it should store a copy of the message.
Every chain will need to work with one of the supplied host plug-ins. Most chains will use the apama.eventMap
plug-in which allows events to be sent without needing to know the exact event definition. See also Map contents used by the apama.eventMap host plug-in.
Requirements of a plug-in class
Java and C++ plug-ins are identified in the connectivityPlugins
section of the configuration file for the connectivity plug-ins. See Configuration file for connectivity plug-ins for detailed information.
The named class
must be a descendent of either AbstractCodec
or AbstractTransport
, unless it is a transport with a dynamic chain manager in which case the class must subclass AbstractChainManager
(see Requirements of a transport chain manager plug-in class for more information about developing chain managers).
In most cases, the easiest way to write codec and transport classes is by subclassing AbstractSimpleCodec
or AbstractSimpleTransport
. However, in some cases, a plug-in can achieve better performance by directly subclassing the base class AbstractCodec
or AbstractTransport
; these classes support handling a batch of multiple messages in a single call.
The classes are summarized in the following table. They are all in the com.softwareag.connectivity
package (Java) or in the com::softwareag::connectivity
namespace (C++). See the API reference for Java (Javadoc) and API reference for C++ (Doxygen) for more information.
Base class |
Subclasses deal in |
Minimum methods subclasses need to implement |
---|---|---|
|
Batches of messages |
|
|
Batches of messages |
|
|
Individual messages |
|
|
Individual messages |
|
- Member
chainId
- Member
config
- Member
logger
for logging (see below) - Member
hostSide
- the next component in the chain towards the host - Member
transportSide
- the next component in the chain towards the transport (for codecs only) start
hostReady
shutdown
The start
, hostReady
and shutdown
methods can be overridden if required. See also Lifetime of connectivity plug-ins.
Plug-in class constructor
Subclasses should provide a constructor like the following, for Java:
public <ClassName>(org.slf4j.Logger logger,
TransportConstructorParameters params) throws Exception,
IllegalArgumentException {
super(logger, params);
...
}
or for C++:
public:
<ClassName>(const TransportConstructorParameters ¶ms)
: AbstractSimpleTransport(params)
{
...
}
The constructors for codecs follow the same pattern as for transports. The TransportConstructorParameters
or CodecConstructorParameters
object provides access to the plug-in’s configuration and other useful information and capabilities, some of which are also made available as member fields for convenience.
Note that transports with an associated dynamic chain manager are created by the chain manager’s createTransport
method (for Java) or must have a public constructor with signature (const ManagedTransportConstructorParameters &,...)
where ...
are any extra parameters passed in the createChain
call (for C++).
In both Java and C++, there is a logger available using the logger
field (for Java, this is also passed into the constructor). Note that all messages logged using the logger will include the chainId
and pluginName
, so there is no need to explicitly add that information to each message. See the API reference for Java (Javadoc) and API reference for C++ (Doxygen) for detailed information.
Using AbstractSimpleCodec and AbstractSimpleTransport
The AbstractSimpleCodec
and AbstractSimpleTransport
classes handle batches by iterating through each message within a batch and calling one of the methods listed above for each message. For Java codecs, the result of the transform
method replaces that message in the batch. For C++ codecs, the transform
method passes a reference to a message which can be mutated or the message discarded if the method returns false
. By default, messages with a null payload are ignored by the AbstractSimpleCodec
and AbstractSimpleTransport
classes, but subclasses may override methods to handle them (see the API reference for Java (Javadoc) and API reference for C++ (Doxygen) for details).
Exceptions from processing one message are logged by default (this behavior can be overridden by implementing handleException
) and the next message is then processed.
To deliver events to the correlator, transports call the sendBatchTowardsHost
method on the hostSide
member of AbstractSimpleTransport
, passing a batch of messages as a List<Message>
(they can use Collections.singletonList()
if needed).
Using AbstractCodec and AbstractTransport
Chains are bidirectional, passing events to and from the correlator. The order of plug-ins within a chain is defined by the configuration file: first the host plug-in, then codecs, and finally a transport. Plug-ins are connected such that the hostSide
and transportSide
members of AbstractCodec
point to the previous and next plug-in in the chain; and for AbstractTransport
, hostSide
points to the last codec (or the host plug-in if there are no codecs).
Events from the correlator are sent to the first codec (or transport if there are no codecs). Each codec should pass the message through to the next component, invoking the sendBatchTowardsTransport
method on the transportSide
member.
Events to the correlator originate from the transport and are delivered by invoking the sendBatchTowardsHost
method on the hostSide
member which delivers the events to the last codec. The last codec should invoke the sendBatchTowardsHost
method on its hostSide
object, thus traversing plug-ins in the reverse order. For Java, transports must always provide hostSide
a batch of messages as a List<Message>
(they can use Collections.singletonList()
if needed). For C++ plug-ins, the batches are passed as a pair of start and end pointers to Message
. The batch is defined as starting from the message pointed to by start
and up to just before the message pointed to by end
- similar to begin()
and end()
iterators on C++ containers. Thus, the messages in a batch can be iterated with a loop such as:
for (Message *it = start; it != end; ++it) {
handleMessage(*it);
}
Plug-ins are provided with a HostSide
and (for codecs only) TransportSide
interface view of the next component in the chain (as members of AbstractTransport
or AbstractCodec
).
Codecs are not required to maintain a one-to-one mapping of events going in and out. They may choose to discard or merge multiple messages or split a message into separate messages.
Exporting the required symbols from C++ plug-ins
C++ plug-ins also require a macro which exports the symbols that the correlator needs to create and manage the plug-in object. The macro has one of the following names:
-
SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(*class-name*)
This macro should not be used for transports with a chain manager.
-
SAG_DECLARE_CONNECTIVITY_CODEC_CLASS(*class-name*)
-
SAG_DECLARE_CONNECTIVITY_TRANSPORT_CHAIN_MANAGER_CLASS(*class-name*)
This macro is used for exporting a chain manager class.
The macro takes the base name of the class - the class’s name excluding any package. It is recommended to declare codecs and transports in a package to avoid name collisions, and using the macro within the namespace declaration, or where a using
statement applies. For example:
#####include <sag_connectivity_plugins.hpp>
using namespace com::softwareag::connectivity;
namespace my_namespace {
class MyTransport: public AbstractSimpleTransport
{
public:
MyTransport(const TransportConstructorParameters ¶ms)
: AbstractSimpleTransport(params)
{
...
}
virtual void deliverMessageTowardsTransport(Message &m)
{
logger.info("deliverMessageTowardsTransport()");
}
...
};
SAG_DECLARE_CONNECTIVITY_TRANSPORT_CLASS(MyTransport)
} // end my_namespace
Note:
For a chain manager, you should include the header file sag_connectivity_chain_managers.hpp
instead of sag_connectivity_plugins.hpp
which is used in the above example.
Requirements of a transport chain manager plug-in class
A transport plug-in can control the lifetime of chains involving that transport, by providing a dynamic chain manager. The chain manager can decide when to create or destroy chains, and is typically controlled by either listening to channel subscriptions from the correlator host, or by listening to external connections.
For example, any topic or queue on a message bus can be exposed dynamically without having to provide a list of the topics/queues to connect to. On a channel-created notification, the chain manager would check if there is a topic/queue to which it can connect, and create a chain instance to connect to that topic/queue on demand.
Alternatively, the chain manager may listen to accept new connections, and each new connection can create a new chain instance. For example, new incoming connections could each create a new chain instance, with the chain manager holding a server socket, and on accepting connections creating a suitable chain instance to handle messages on that connection. In both cases, the chain manager will typically hold some connection object, which it then needs to pass to transport instances when they are created. Thus, the chain manager and transport are usually tightly coupled, and a chain manager can only create chains using its own transport class.
A transport that uses a dynamic chain manager to create its instances consists of a subclass of AbstractTransport
(or AbstractSimpleTransport
), and a subclass of AbstractChainManager
which is the class that must be specified in the configuration file’s connectivityPlugins
section. See Configuration file for connectivity plug-ins and Requirements of a plug-in class.
The chain manager is responsible for:
-
Creating and destroying chains as needed, often in response to notifications about the channels that the EPL application is using or to handle new connections initiated from another process. Some managers will create a single chain for sending messages in both directions on a given channel (towards host and towards transport), others may create separate chains for each direction, or may only support one direction. For detailed information about this, see Creating dynamic chains from a chain manager plug-in. In summary, there are two main aspects to chain creation:
-
Selecting which chain definition to use when creating a new chain instance, if there is more than one chain definition available for this transport. For more information, see Creating dynamic chains from a chain manager plug-in.
-
Instantiating the transport plug-in during creation of the chain, by calling the transport’s constructor.
With Java, the chain manager can simply pass through the
logger
andparams
arguments to a transport constructor with the same signature as thecreateTransport
method, or can pass additional information that the transport needs - such as a reference to the chain manager or connection, or information about the host channel(s) the chain is sending to/from.With C++, the transport’s constructor is invoked directly, with a signature of
(const ManagedTransportConstructorParameters &,...)
where...
are any extra parameters passed in thecreateChain
call.
-
-
Instantiating and managing the lifetime of any connection to an external server or other resources that should be shared by all associated transports. Usually, it is undesirable for each transport or chain to have its own separate connection to any external server that the transport is using, as the number of chains may be large. In many protocols, connections are heavyweight entities that you would not want to have lots of. The chain manager can create its connections at any time, but it is recommended to create the initial connection in the chain manager’s
start()
method if it is desirable for the correlator to delay coming up until the connection is established, and for the correlator to fail to start if an exception is thrown while making the initial connection. If not, it should happen on a background thread created by thestart()
method. -
Optionally, reporting status information that applies to the chain manager rather than to individual transports. For example, status about a connection shared across all transports could be reported by the chain manager, as could aggregated KPI statistics from all transport chains.
The transport class is responsible only for sending and/or receiving messages, often making use of a connection owned by the chain manager. Transports can also report their own status values if desired, though if it is likely there will be a large number of transports, individual status for each may be less useful and more expensive to report than aggregated status for the whole chain manager.
Every chain manager is required to implement the following:
- A public constructor that will be called during correlator startup for each configured
dynamicChainManager
, with the same signature as theAbstractChainManager
constructor. createTransport
(Java only; for C++, the transport’s constructor is invoked directly as described above)start
shutdown
The AbstractChainManager
base class has a number of member fields that provide access to logging, the configuration for all dynamic chain definitions associated with its transport, and a ChainManagerHost
interface which supports creating chains and registering channel listeners.
A typical chain manager would use its start()
method to create any required connection(s) to external servers, and to add a ChannelLifecycleListener
providing notifications when channels with a specific prefix are created or destroyed.
It is possible to listen for all channels regardless of prefix, but using a prefix to limit the subset of channels monitored by each chain manager is recommended to improve performance. The ChannelLifecycleListener
will fire to indicate that a channel has been created when the channel name is used for the first time, typically as a result of the Apama application calling monitor.subscribe(*channel*)
or send *event* to *channel*
. When this happens, the manager must first decide whether it needs to have a chain for the specified channel, as some managers may only wish to take action if a channel with the specified name exists on the external system they are connected to. The manager must also check if it already has a chain for this channel in the specified direction, since in some situations the listener will notify about creation of the same channel more than once (see flushChannelCache
in Shutting down and managing components). If the manager has established that a chain is needed for this channel and none already exists, it should create one before returning from the listener callback. Or if a chain already exists for this channel, but is no longer needed, it should destroy it. In other cases, it should do nothing.
The first EPL monitor.subscribe(*channel*)
or send *event* to *channel*
call to use a channel with a registered listener will block until the listener returns to ensure that no messages are missed if the manager does decide to create a chain for that channel. If an error occurs in the chain manager’s implementation of the listener callback, it will be logged but no exception is thrown in the EPL application. See Creating dynamic chains from a chain manager plug-in for more details about how to create chains from a dynamic chain manager.
When the onChannelDestroyed
method of the ChannelLifecycleListener
is called to indicate that a channel has been destroyed (that is, implies that there are no remaining EPL monitors using the channel for the specified direction), the chain manager should call destroy
on the chain to shut down and disconnect all associated transport and codec plug-ins. Chain managers should not implement reference counting, as the destroy notification will not be fired until all uses of the channel have finished.
Note that at present, channel destroy notifications are only sent for the TOWARDS_HOST
direction (monitor.subscribe()
) since in the TOWARDS_TRANSPORT
direction (send *event* to *channel*
) there is no unambiguous way of determining when a channel is no longer needed.
If using correlator persistence, the required channel lifecycle notifications for channels in use by any persistent monitors will be replayed to chain managers during recovery, so there is no need for chain managers to persist any state across restarts to support correct operation of persistence.
For detailed information about the classes and interfaces involved in creating a chain manager, including more detailed information about how to use the listener API correctly and safely, see the API reference for Java (Javadoc) on the com.softwareag.connectivity.chainmanagers
package, or see the API reference for C++ (Doxygen) on the com::softwareag::connectivity::chainmanagers
namespace.
For a complete example of a working Java chain manager and transport, see the Kafka sample in the samples/connectivity_plugin/java/KafkaTransport
directory of your Apama installation.
A skeleton sample for C++ is provided in the samples/connectivity_plugin/cpp/skeleton_chainmanager
directory of your Apama installation. You can use this sample as a starting point to write your own C++ chain manager and transport.
Building plug-ins
See the samples/connectivity_plugin
directory of your Apama installation for working samples of connectivity plug-in source code, Ant build files, makefiles, or Microsoft Visual Studio projects for building C++ plug-ins (note that the build instructions in the samples/connectivity_plugin
directory assume that you are using a recent version of Microsoft Visual Studio).
Building Java plug-ins
Java plug-ins require the connectivity-plugins-api.jar
file in the lib
directory of your Apama installation to be on the compiler’s classpath as it defines Message
, AbstractCodec
, AbstractTransport
, AbstractChainManager
and associated classes. The classes are in the com.softwareag.connectivity.*
packages.
All code samples shown in this connectivity plug-ins documentation assume either that the following lines of code are present in the source file, or that the classes are imported individually.
import com.softwareag.connectivity.*;
import java.util.*; // Map, HashMap, List, ArrayList
// are commonly used classes in these samples
You can develop Java-based connectivity plug-ins in Apama Plugin for Eclipse. To do so, you have to add the Apama Java support to your Apama project. See Creating Apama projects for more information. This will automatically take care of the classpath for you.
Building C++ plug-ins
C++ plug-ins require the header files in the include
directory of your Apama installation to be on the compiler’s include path. The plug-in should be linked as a shared library and it should link against the apclient
library in the lib
directory of your Apama installation. The resultant library will thus depend on the apclient
library.
All code samples shown in this connectivity plug-ins documentation assume either that the following lines of code are present in the source file, or that individual using
statements are used for each class.
#####include <sag_connectivity_plugins.hpp>
using namespace com::softwareag::connectivity;
For chain manager classes, the following is also needed:
#####include <sag_connectivity_chain_managers.hpp>
using namespace com::softwareag::connectivity::chainmanagers;
For information on the compilers had have been tested and are supported, refer to the Supported Platforms document for the current Apama version. This can be found at Supported platforms.
Connectivity plug-in headers are a wrapper around a C ABI. Unlike other plug-ins, the C++ plug-ins are therefore not sensitive to which C++ compiler product, compiler version and compiler configuration (for example, a debug or release build) is used. The C++ compiler used does need to correctly support parts of the C++11 standard, and exact settings required for each compiler will vary.
If you are building a shared library to be used by multiple plug-ins and using the plug-in-specific data structures as part of your API between the library and the plug-ins, then you must ensure that the library and all of the plug-ins are compiled using the same version of the Apama header files. This means that if you upgrade Apama and want to recompile one of them, you must recompile all of them. You can choose not to recompile anything and they will still work.
If you compile with headers from multiple service packs of Apama, then you may see errors similar to the following when you try to link them.
-
Linux :
undefined reference to
Foo::test(com::softwareag::connectivity10_5_3::data_t const&)'` -
Windows:
testlib2.obj : error LNK2019: unresolved external symbol "public: void __cdecl Foo::test(class com::softwareag::connectivity10_5_3::data_t const &)" (?test@Foo@@QEAAXAEBVdata_t@connectivity10_5_3@softwareag@com@@@Z) referenced in function "public: void __cdecl Bar::test(class com::softwareag::connectivity10_5_3::data_t const &)" (?test@Bar@@QEAAXAEBVdata_t@connectivity10_5_3@softwareag@com@@@Z)
testlib2.dll : fatal error LNK1120: 1 unresolved externals
If you encounter a similar error, try recompiling all your components with the same version of the headers.
If you are compiling a single plug-in, or multiple completely independent plug-ins, you can recompile them in any combination at any time.
If you want to develop plug-ins in C++, you have to use your own C++ compiler/development environment.
C++ data types
C++ plug-ins handle messages, which have a payload which can be any of the following:
- string (null terminated UTF-8)
(const char*
) - integer (64 bit) (
int64_t
) - float (64 bit) (
double
) - decimal (64 bit) (
decimal_t
) - boolean (
bool
) - map of any of these types to any of these types (
map_t
) - list of any of these types (
list_t
) - byte buffer (
buffer_t
) - a custom object - a pointer to a user-defined class (
custom_t
) - an empty or “null” value
To facilitate this, the payload member of a message is of the com::softwareag::connectivity::data_t
class type. The data_t
type is a “smart union” that holds one of the above types, and knows which type it holds. It has a similar API to a boost variant. The data_t
class has constructors from each of the above types, and a no-argument constructor which creates an empty value. Accessing the data contained in a data_t
instance can be performed as described below.
-
Use the
get
free template function. For example:data_t data("some text"); const char *text = get<const char*>(data); map_t map; data_t mapdata(map); map &mapref = get<map_t>(mapdata);
For compound types
map_t
,list_t
,custom_t
andbuffer_t
, this returns a reference to the object. -
You can attempt to convert integer, boolean, string or float values inside a
data_t
to each other, regardless of the underlying type. The following is an example for turning a string into its numerical representation:data_t data("10"); int64_t i = convert_to<int64_t>(data); double f = convert_to<double>(data);
-
Use a visitor and the
apply_visitor
free template function. A visitor is a class withoperator()
methods for each of the types (and no arguments for emptydata_t
). If you wish to use a visitor that only handles a few types and throws an error on all other types, then sub-class the providedvisitor
orconst_visitor
template and override one or more of the following methods:visitEmpty
visitInteger
visitDouble
visitBoolean
visitDecimal
visitBuffer
visitList
visitMap
visitCustom
The result ofapply_visitor
is of typevisitor::result_type
(typically atypedef
), or the second template argument ofvisitor
/const_visitor
. For example:
struct print_data: public const_visitor<print_config, void> { void visitString(const char *s) const { std::cout << s; } void visitList(const list_t &l) const { std::cout << "["; for (list_t::const_iterator it = l.begin(); it != l.end(); ++it) { apply_visitor(print_data(), *it); if (it+1 != l.end()) std::cout << ", "; } std::cout << "]"; } void visitMap(const map_t &m) const { std::cout << "{"; std::vector<std::string> keys; for (map_t::const_iterator it = m.begin(); it != m.end(); ++it) { keys.push_back(get<const char*>(it.key())); } std::sort(keys.begin(), keys.end()); for (std::vector<std::string>::iterator it = keys.begin(); it != keys.end(); ++it) { std::cout << *it << "="; apply_visitor(print_data(), m.find(data_t(it->c_str())).value()); if (it+1 != keys.end()) std::cout << ", "; } std::cout << "}"; } }; data_t data; apply_visitor(print_data(), data);
Containers and custom values
The list_t
and map_t
classes are containers that hold a list or unordered (hash) map of data_t
elements (and for map_t
, keys). These are similar in behavior to the C++ standard library classes std::vector
and std::unordered_map
- with a subset of the methods available. list_t
maintains order of the elements in them, and allows access with the operator[]
overload or iterators returned from begin
and end
(or rbegin
and rend
, or cbegin
and cend
). map_t
does not maintain ordering, and should give average O(1) cost for insertions and lookups. map_t
does not permit a data_t
holding a custom_t
value to be used as a key.
When using iterators over the map_t
and list_t
types, or references to entries within the container, you must not modify the parent container while iterating over it, or before accessing those references. Trying to use an iterator after modifying the parent container will assert, or throw an exception if asserts are disabled. There is no such protection for references. Note that if you have a non-const
 map_t
, then the operator[]
can count as a mutation - it will add an entry if the entry does not already exist.
The buffer_t
is similar to list_t
, but its element type is byte (uint8_t
). buffer_t
can be translated to and from a Java byte[]
, but not to host plug-ins as there is no correlator type that maps to or from them.
The custom_t
type behaves like a std::unique_ptr
to a user-specified class, with an explicit copy method. The class must have a copy constructor and destructor that do not throw exceptions. It is up to you to ensure that the correct type is used; but if all classes wrapped in custom_t
are virtual, then it is possible to use dynamic_cast
or typeinfo
to distinguish the types of the objects held by custom_t
. Note that visitors are called with a sag_underlying_custom_t
reference; this needs to be cast with static_cast
to the expected custom_t<Type>
reference. custom_t
values can only be passed between C++ plug-ins; they cannot be passed to host plug-ins or Java plug-ins (and you need to ensure that the plug-ins share the same definition of the class).
Decimals
The Apama decimal type is converted to/from a decimal_t struct
. This has a single int64_t
which is the bit pattern of the IEE754 64-bit floating point decimal. This can be serialized, copied or moved, but no facilities are provided for arithmetic or conversion to string or normal floating point types; a third-party decimal library is required if such functionality is required.
Copying, moving and swapping
The data_t
and compound types list_t
, map_t
, buffer_t
and custom_t
deliberately hide access to the copy constructor and assignment operator to avoid accidental copies. Explicit copies are possible with the copy()
method, which performs a deep copy (that is, for a map or list value, it copies each element, and each element of those if they are compound types). Rather than copying values, consider if the move
constructor or move
assignment operator can be used (these leave the object moved from as empty). To call these, the argument needs to be enclosed in the std::move( )
.
Map contents used by the apama.eventMap host plug-in
The payloads that the apama.eventMap
generates for transportward messages and that it requires for hostward messages are maps. For Java chains, this is java.util.Map<Object, Object>
. For C++ chains, this is a map_t
.
Each key in the map is the name of a field in the EPL event definition and the value the corresponding EPL value. Each event containing other events is represented as a Map
value within the top-level field, allowing nesting of events, dictionaries and sequences. For events sent from chains into the correlator, all fields must have non-empty values and must be present as keys in the map, unless the configuration setting allowMissing
is set to true
. Keys that do not correspond to fields are ignored by default. There is an exception: an empty value that maps to an optional<*type*>
or any
in EPL is permitted even if allowMissing
is false
(see also the descriptions of the optional
and any
types in the API reference for EPL (ApamaDoc)).
Events can be annotated with the com.softwareag.connectivity.ExtraFieldsDict
annotation (see Adding predefined annotations) which names a dictionary field, in which case any unmapped keys are placed into this dictionary field. This can be disabled with the extraFields
configuration property. The dictionary must be one of:
dictionary<string,string>
- Keys and values are coerced into strings. Lists generate the string form ofsequence<string>
. Maps generate the string form ofdictionary<string,string>
.dictionary<any,any>
- Values are mapped to the corresponding EPL type, orsequence<any>
for lists anddictionary<any,any>
for maps without names.dictionary<string,any>
- Keys are coerced into strings. Values are mapped as described above.
The types are converted as described below:
EPL type |
Transportward events will contain Java or C++ type |
Hostward events can also convert from types |
---|---|---|
|
| |
|
| |
|
| |
|
| |
|
| |
|
|
all numeric types, boolean |
|
|
all numeric types (except NaN float values), strings if they can be parsed as an integer |
|
|
all numeric types, strings if they can be parsed as a float |
|
|
all numeric types, strings if they can be parsed as a float |
|
|
string, if |
|
EPL values of type
|
A Java object or
|
|
EPL values of type
See also the note below for |
A Java object or
See also the note below for |
Note:
An any
type containing an Event
is represented as either com.softwareag.connectivity.NamedMap
or map_t
and the name field is set to the event type.
Non-native conversions (a floating point to integer conversion or vice versa) may lose precision, and conversions to/from strings or decimals are more expensive than float or integer conversions. If anything other than an exact match is found, a debug-level log message is generated; you may wish to verify that there are none if the conversion is performance-sensitive.
The following applies to Java only: an EPL decimal value which is NaN (not a number) or an infinity is mapped to/from a Double
representation of NaN or infinity, as the BigDecimal
Java type does not support them.
Events containing the following types cannot be sent into the correlator, as they cannot be serialized:
chunk
listener
action
variables
Events containing the following can be sent in, provided allowMissing
is set to true
in the host plug-in configuration and no value is provided for that field:
context
com.apama.exceptions.Exception
com.apama.exceptions.StackTraceElement
Events containing cycles cannot be sent into or out of the correlator, but arbitrary nesting is permitted. Aliases will be flattened.
For Java plug-ins, handling messages from the apama.eventMap
plug-in thus involves casting the payload of the message from Object
to Map
, and then accessing members of that, casting as necessary (or, for flexibility, introspecting their types by using the instanceof
operator). For example, for the following event definition, the CustomerOrder
is translated to a map with deliveryAddress
, userId
and items
keys, and items
will be a list of maps containing itemId
, price
and qty
.
event LineItem {
string itemId;
float price;
integer qty;
}
event CustomerOrder {
string deliveryAddress;
string userId;
sequence<LineItem> items;
}
To print the total cost of an order (sum of product of qty
and price
for each item), the Java code would be as follows:
public void deliverMessageTowardsTransport(Message message) {
MapExtractor mList = new MapExtractor((Map)message.getPayload(),
"CustomerOrder");
List<MapExtractor> items = mList.getListOfMaps("items", false);
double total = 0.0;
for(MapExtractor item : items) {
double price = item.get("price", Double.class);
long qty = item.get("qty", Long.class);
total = total + price * qty;
}
LOGGER.info("Order value is "+total);
}
Note that due to type erasure, the type parameters on Map
or List
are not checked or guaranteed. In the above example, it is convenient to cast the list representing EPL field sequence<LineItems>
to List<Map>
to avoid having to cast the entries within it. The Map
, however, is still treated as a map of objects as it has different types (String
, Double
, Long
) in it.
For C++ plug-ins, handling messages from the apama.eventMap
plug-in involves using the get<map_t>
function and accessing the members of that, using get<>
as necessary. If code needs to be flexible as to which type it accepts, then use the visitor pattern (see C++ data types). For example, using the event definition above, the following C++ code will print the total cost of the order:
virtual void deliverMessageTowardsTransport(Message &message) {
map_t &payload = get<map_t>(message.getPayload());
list_t &items = get<list_t>(payload[data_t("items")]);
double total = 0.0;
for(list_t::iterator it = items.begin(); it != items.end(); it++) {
MapExtractor m( get<map_t>( *it ), "LineItem" );
double price = m.get<double>("price");
long qty = m.get<int64_t>("qty");
total = total + price * qty;
}
logger.info("Order value is %f", total);
}
The following constructs and sends an order with one line item into the correlator:
Map<String,Object> payload = new HashMap<>();
payload.put("deliveryAddress", "1 Roadsworth Avenue");
payload.put("userId", "jbloggs");
List<Map> items = new ArrayList<>();
Map<String,Object> item = new HashMap<String,Object>();
item.put("itemId", "item1");
item.put("price", 3.14);
item.put("qty", 10);
items.add(item);
payload.put("items", items);
Map<String, String> metadata = new HashMap<String, String>();
metadata.put(Message.HOST_MESSAGE_TYPE, "CustomerOrder");
Message msg = new Message(payload, metadata);
hostSide.sendBatchTowardsHost(Collections.singletonList(msg));
The above can also be written more compactly:
hostSide.sendBatchTowardsHost(Collections.singletonList(new
Message(payload).putMetadataValue(Message.HOST_MESSAGE_TYPE,"CustomerOrder")));
This would typically be done in a more automated fashion, translating data from some other form, rather than laboriously setting each field as needed - though some combination will often be needed.
The equivalent C++ code is:
map_t payload;
payload.insert(data_t("deliveryAddress"), data_t("1 Roadsworth Avenue"));
payload.insert(data_t("userId"), data_t("jbloggs"));
list_t items;
map_t item;
item.insert(data_t("itemId"), data_t("item1"));
item.insert(data_t("price"), data_t(3.14));
item.insert(data_t("qty"), data_t((int64_t) 10));
items.push_back(data_t(std::move(item)));
payload[data_t("items")] = data_t(std::move(items));
Message msg(data_t(std::move(payload)));
msg.putMetadataValue(Message::HOST_MESSAGE_TYPE(), "CustomerOrder");
hostSide->sendBatchTowardsHost(&msg, (&msg)+1);
Metadata values
Every message has a metadata member, which for Java is a Map
object containing String
keys and Object
values. For C++, it is a map_t
which by convention only contains const char *
keys, but any type as values.
The metadata holds information about the event:
Value |
Description |
---|---|
|
The name of the event type. This is required when sending events into the |
|
The name of the channel from which the event originated or to which it is to be delivered. This is optional for hostwards messages. Note: If the transport uses the same channel through its lifetime, it is recommended that you set the |
|
The message identifier. This is used for reliable receiving (that is, in reliable messages going towards the host). The message identifier should be unique within the scope of the chain and deployment and during the lifetime of the application. Typically it will be generated by the message bus to which the connectivity plug-in transport is connected. See also Using reliable transports. CAUTION: If you are using a codec to make the message identifier visible as an event field in EPL, it is important to copy the value from |
Message.HOST_MESSAGE_TYPE
Message.CHANNEL
Message.MESSAGE_ID
In C++, these are the following methods:
HOST_MESSAGE_TYPE()
CHANNEL()
MESSAGE_ID()
Plug-in components can use the metadata to pass other auxiliary data about a message between chain components. These could be headers from an HTTP connection, authentication tokens, or signalling for transaction boundaries. It is recommended that all metadata keys are namespaced. The sag
namespace is reserved use by the product. Host plug-ins currently only use the metadata keys above.
The metadata contents can be manipulated directly by calling methods on the map returned by Message.getMetadataMap()
. A Message.getMetadata()
is also available in order to manipulate the stringified version of the metadata values. Values can be inserted into the metadata by using Message.putMetadataValue(...)
.
Lifetime of connectivity plug-ins
Instances of connectivity chains can be created in different ways. See Static and dynamic connectivity chains for detailed information.
At correlator startup:
- Each codec, transport and dynamic chain manager class is loaded using the
classpath
orlibraryName
. - Each dynamic chain manager listed in
dynamicChainManagers
is instantiated using its public constructor and passing themanagerConfig
from the configuration file. - The
start()
method is called on any dynamic chain managers. Chain managers can create dynamic chains at any point after this, though in practice, dynamic chains are usually created after correlator startup, once the Apama application is injected and running. - Each chain listed in
startChains
is created and started (see below).
The correlator is only pingable and available for external access after all of the above operations have completed.
Whenever a new chain instance is created (either during correlator startup if listed in startChains
, or at any time dynamically by EPL or a chain manager):
- The correlator determines the list of codec and transport plug-ins in the chain and the configuration for each as follows:
- If the chain is statically configured, the plug-ins and plug-in configurations listed under
startChains
are used. - If the chain is being created dynamically, the chain manager implementation or EPL
createDynamicChain
call specifies which of the chain definitions listed underdynamicChains
is to be used, and the configuration for this chain instance is prepared by replacing any@{*varname*}
runtime substitution variables in the chain definition using the map passed in tocreateCreate
or supplied by the chain manager.
- If the chain is statically configured, the plug-ins and plug-in configurations listed under
- A new instance of each transport and codec class in the chain is constructed using the public constructor, as described in Requirements of a plug-in class. If the transport has a dynamic chain manager, the manager’s
createTransport
method is used instead of calling the transport constructor directly (for Java) or extra parameters to thecreateChain
call are passed through to the constructor (C++), which gives the chain manager the opportunity to pass extra information required by the managed transport (such a reference to itself). hostSide
andtransportSide
members are set on all transport and codec plug-ins in the chain.- Static and EPL-created chains are started automatically once created. Chain managers must explicitly call
start()
on the newly created chain when they are ready. - The
start()
method is first called on all codecs in the chain. - Then the
start()
method is called on the transport. - Messages may begin flowing.
If any of the constructors or start()
methods invoked during correlator startup throw an exception, that will be logged as an error and the correlator will fail to start. These methods should complete quickly; delays here will delay the correlator starting up. Blocking or long running operations should be handled by a separate thread.
After start()
is called on all members of the chain, events may flow through the chain in either direction (if an EPL application is emitting events to the chain, they will be delivered as messages and delivered through the codecs towards the transport). The transport is permitted to send events hostwards, but they will be queued by the correlator until the application is ready for them.
Soon after the EPL application has been injected (and, if necessary, it has performed initialization), the EPL application should call ConnectivityPlugins.onApplicationInitialized()
. At this point:
hostReady()
is called on every codec.hostReady()
is called on the transport.
Dynamic chains that are created after onApplicationInitialized
has been called will have hostReady
called as soon as the chain is created.
If an exception is thrown by a plug-in’s hostReady()
method or by the start()
method of a dynamically instantiated plug-in, that will be logged as an error and the chain will be disconnected. These methods should complete quickly; delays here will delay the EPL application. Blocking or long running operations should be handled by a separate thread. Any events previously sent to the host will now be delivered, but the order of all events from a chain will be maintained.
When the correlator is shut down (for example, via engine_management -s
) or when the dynamic chain is destroyed by EPL or a dynamic chain manager, chains will be stopped:
shutdown()
is called on all chain managers (if any exist)shutdown()
is called on every codec.shutdown()
is called on the transport.
The shutdown()
method gives chain managers an opportunity to destroy any chains they are managing in an orderly fashion.
The shutdown
method on transports should make the transport discard any further messages sent to the transport, and unblock if any threads are currently delivering messages to the transport and are blocked. If possible, the sendEventsTowardsTransport
method should be written to allow any blocking behavior to be unblocked when a shutdown occurs. For example, if a socket is being used by a transport, it should be shut down or closed so that any threads reading or writing on the socket’s streams terminate.
Any messages delivered to a plug-in once the shutdown
method has been called may be discarded by the plug-in. Messages may be delivered to a plug-in even after the shutdown
call has completed, and the plug-in should not crash if that occurs.
If threads are required by a transport to deliver events to the transport or read from a connection, they would normally be started by the hostReady
method and stopped and joined in the shutdown
method.
Note: For C++ plug-ins only: the plug-in object of each plug-in is destroyed, so the plug-in class’s destructor (if defined) is called. No events should be flowing through a chain at this point.
Exceptions thrown from any of sendBatchTowards
, transformEvent
or deliverEvent
will be logged and not propagated to their callers. Exceptions are not a suitable means to provide information between plug-ins as they are ambiguous if a large batch of events are being processed, and some codecs may choose to send events on a separate thread. Use messages to send such events; these can be null payload with information stored in the metadata, in which case most codecs will ignore the messages and pass them through.
Creating dynamic chains from a chain manager plug-in
If a transport has an associated chain manager, the chain manager is responsible for creating all chains involving that transport. Note that this is the only way to create chains involving such a transport, they cannot be created using startChains
or from EPL’s ConnectivityPlugin.createDynamicChain
action.
Chain managers may create chains at any time after start()
has been called and before shutdown()
, and for any reason. However, most managers create chains in response to a notification that a channel has been created, which means it is in use for the first time. See Requirements of a transport chain manager plug-in class for more information about how to do this.
When a chain manager is ready to create a new chain, it does so by calling ChainManagerHost.createChain()
, usually making use of the host field on AbstractChainManager
. The following information must be supplied when creating a chain:
-
chainInstanceIdSuffix
- A string identifier which will be suffixed onto “*managerName*
-” to uniquely identify the new chain instance.CAUTION:
A small amount of memory is allocated for each unique chain instance identifier. This memory is not freed when the chain is destroyed. Therefore, if you are creating many chains, consider reusing old chain instance identifiers. If you create more than 1000 unique identifiers, a warning is written to the correlator log file to notify you of this. You cannot have two active chains with the same chain instance identifier, so only reuse identifiers which are no longer in use.
-
dynamicChainDefinition
- Specifies which of the chain definitions that contain this transport should be used. TheAbstractChainManager
providesgetChainDefinition()
helper methods to select a chain definition based on its identifier or by assuming that only one definition will be configured. For more complex cases, a collection of all the chain definitions for this transport is provided in thechainDefinitions
field which a manger can iterate over to find the one with the desired transport plug-in configuration. There are various possible approaches to selecting which chain definition to use to create a chain:- For some managers, it may not make sense to support multiple chain definitions and can be written to just use a singleton chain definition.
- Some managers may allow the user to specify a chain definition by providing a chain definition identifier as a configuration option for the manager in
managerConfig
. - Another approach is for the manager to search through the available chain definitions and use the transport plug-in’s configuration of each one to decide which to use, for example, by providing a channel prefix or regular expression pattern as part of the transport configuration.
-
substitutions
- The chain manager can provide zero or more@{*varname*}
variable replacement values. This provides a way to use information from the manager or transport to configure the host or codec plug-ins, for example, by having the Mapper codec set a field with details about the manager’s connection. -
defaultChannelTowardsHost
andsubscribeChannels
- Used to specify the channel or channels that this chain will send to (unless overridden in individual messages) or subscribe to. You can either use a single chain to send messages in both directions, or have a separate chain for each direction, that is, each transport instance will only be responsible for sending or receiving, but not both.
The transport from the chain definition should match the transport that contains the chain manager making the call. To create the transport object, the chain host will call createTransport
rather than the transport’s constructor (for Java), or the transports constructor, passing through any extra parameters passed to createChain
(for C++). Once the chain has been created, it needs to be started by calling the start()
method on the returned Chain
object (a Chain
pointer for C++).
For more detailed information about the classes and interfaces involved in creating a chain manager, see the API reference for Java (Javadoc) on the com.softwareag.connectivity.chainmanagers
package, or see the API reference for C++ (Doxygen) on the com::softwareag::connectivity::chainmanagers
namespace.
User-defined status reporting from connectivity plug-ins
Connectivity plug-ins can add any number of user-defined status values which are reported as part of the correlator’s status information from the REST API, Prometheus, the engine_watch
tool, the Engine Client API, and from the EPL Management interface. Status values can be reported by transports, codecs, or dynamic chain managers.
For example, a transport plug-in might report a status value to indicate whether it is currently online and working correctly, or failed. Or it can report numeric KPIs indicating the number of messages sent towards the host (correlator) and towards the transport. A dynamic chain manager might report information about a connection it maintains, and perhaps provide some KPI statistics aggregated across all the transport instances it is managing.
To report status information, create a status item by calling the getStatusReporter().createStatusItem(...)
method on your plug-in class, specifying the key for this status item and its initial value, and store the resulting StatusItem
object in a field so its value can be updated as necessary. Status items are automatically removed when a transport or codec plug-in is shut down or when the chain is destroyed (in C++, this assumes the StatusItem
is held by a std::unique_ptr
in a member of the plug-in class, as we recommend). We recommend using the pluginName
as a prefix for transport and codec plug-ins and specifying the chainId
as a label, or the managerName
for chain managers. Status keys will have leading and trailing whitespace stripped. Keys cannot be empty. For example, in Java:
final StatusItem transportStatus =
getStatusReporter().createStatusItem("{chainId=" + chainName + "}."+pluginName
+ ".status", StatusReporter.STATUS_STARTING);
final StatusItem messagesTowardsHost =
getStatusReporter().createStatusItem("{chainId=" + chainName + "}."+pluginName
+".messagesTowardsHost", 0);
...
transportStatus.setStatus(StatusReporter.STATUS_ONLINE);
messagesTowardsHost.increment();
Or in C++:
std::unique_ptr<StatusReporter::StatusItem> transportStatus;
std::unique_ptr<StatusReporter::StatusItem> messagesTowardsHost;
MyPluginConstructor(...):
:...,
transportStatus(getStatusReporter().createStatusItem(
"{chainId=" + chainName + "}."+pluginName+".status",
StatusReporter::STATUS_STARTING())),
messagesTowardsHost(getStatusReporter().createStatusItem(
"{chainId=" + chainName + "}."+pluginName+".messagesTowardsHost", 0))
{...
}
...
transportStatus->setStatus(StatusReporter::STATUS_ONLINE());
messagesTowardsHost->increment();
We recommend using the STATUS_*
constants provided on StatusReporter
for values of ".status"
items, to provide consistency.
In addition to the StatusItem
interface, there is a separate method for atomically setting multiple related items in a single call (for example, a status and an error message). But as the StatusItem
method is more efficient, it should be used in most cases, especially for items that might be updated frequently such as message counters.
All user-defined status values are currently represented as strings, but for convenience when reporting KPI numbers, an overload of setStatus
exists that accepts an integer argument for the value, which is automatically converted to a string by the method. There is also an increment()
method.
For transports and codecs, status reporting is only permitted when your plug-in provides the TransportConstructorParameters
and CodecConstructorParameters
constructors. It is not supported when using the older deprecated constructors.
For examples of how to report status information from a connectivity plug-in, see the samples\connectivity_plugin\cpp\httpclient
and samples\connectivity_plugin\java\HTTPServer
directories of your Apama installation.
See the StatusReporter
interface in the API reference for Java (Javadoc) and API reference for C++ (Doxygen) for more information about how to report status.
See also Using the Management interface for information about how status values can be set and retrieved by EPL code.
For other ways to view the correlator’s status, see Managing and monitoring over REST, Monitoring with Prometheus and Watching correlator runtime status
For examples of how to specify user status metrics with names containing labels, see Monitoring with Prometheus.
Logging and configuration
For Java plug-ins, the plug-in’s constructor is passed a configuration object, the chain name and a logger object. The Abstract
classes supplied store these as members (the logger object is named logger
).
For C++ plug-ins, the Abstract
classes have a logger
member with methods to log at log levels from TRACE
to CRIT
, with a printf
format and variadic arguments. Expensive computations can be predicated on a check of is<Level>Enabled()
.
Plug-ins should use the SLF4J logger
object provided for logging. You should avoid using System.out
or System.err
for logging. For both plug-ins written in C++ and Java, log messages are prefixed with connectivity.*PluginName*.*ChainName*
, which is also the category to configure log levels using the correlatorLogging
section in the YAML configuration file (see Setting correlator and plug-in log files and log levels in a YAML configuration file). This means, it is not required to identify the plug-in or chain in every log statement.
If your plug-in uses a third-party library that logs with SLF4J or Log4j 2, then the log output goes to the main correlator log file automatically. You can customize log levels as needed using correlatorLogging
in the YAML configuration file (see Setting correlator and plug-in log files and log levels in a YAML configuration file). When using a library which uses some other logging implementation, such as Log4j 1, the JDK logger, or Apache Java commons logging (JCL), then add a bridging jar to convert it to SLF4J where possible. Several bridges are available in the common/lib/ext
and common/lib/ext/log4j
directories of your installation.
The configuration contains the definitions from the configuration file for connectivity plug-ins (any globalConfig
is merged with the per-chain configuration so that the per-chain configuration takes precedence). The configuration is a map with String
keys. The values will be one of the following classes:
List<Object>
(for C++,data_t
of typelist_t
) for list YAML entries. Each element will be one of these types.Map<String, Object>
(for C++,data_t
of typemap_t
) for map YAML entries. Each value will be one of these types.String
(for C++,data_t
of typeconst char *
). Even if the entry in the configuration file is numeric or a boolean, it will be provided as a string.
Plug-ins should use the MapExtractor
class to extract values from the configuration map which makes it easy to check for invalid configuration options and produce helpful error messages if a required value is missing or of the wrong type.
You can also use the Diagnostic codec to diagnose issues with connectivity plug-ins. See The Diagnostic codec connectivity plug-in for further information.
Developing reliable transports
This section explains how to develop transports that support reliable messaging. For information on how to use them, see Using reliable transports.
Reliable messaging uses control messages, which are special messages that are sent between the host and the transport. They are used to signal actions that the host or transport should take as well as the acknowledgments from these actions. The control messages have null
(Java) or empty (C++) payloads, and instead store all their information in the metadata.
The type of a control message is stored in a metadata field that can be accessed with the CONTROL_TYPE
constant of the com.softwareag.connectivity.Message
(Java) or com::softwareag::connectivity::Message
(C++) class. The value of this field should be one of the type names listed below. These names are also accessed by constants. For more information, see the Message
class in the API reference for Java (Javadoc) or API reference for C++ (Doxygen).
Type | Constant | Description |
---|---|---|
AckRequired |
CONTROL_TYPE_ACK_REQUIRED |
This control message is sent from the transport to the host. It is
used to ask the host to acknowledge all events that have been sent
towards the host before this AckRequired . |
AckUpTo |
CONTROL_TYPE_ACK_UPTO |
This control message is sent from the host to the transport, and it
is the acknowledgment for the AckRequired control message.
It is used to inform the transport that a particular
AckRequired request has been fulfilled. |
Flush |
CONTROL_TYPE_FLUSH |
This control message is sent from the host to the transport. It is
used to ask the transport to acknowledge all events that have been sent
towards the transport before this Flush . |
FlushAck |
CONTROL_TYPE_FLUSH_ACK |
This control message is sent from the transport to the host, and it
is the acknowledgment for the Flush control message. It is
used to inform the host that a particular Flush request has
been fulfilled. |
The control message metadata also contains fields that can be accessed with the following constants:
-
MESSAGE_ID
This constant names a metadata field used for uniquely identifying non-control messages (that is, real events with payloads) that are being sent towards the host. This constant also names a metadata field on the
AckRequired
andAckUpTo
control messages that are used for reliable receiving. InAckRequired
, it contains the message identifier of the immediately preceding non-control message. InAckUpTo
, it contains the message identifier of theAckRequired
that is being responded to. -
REQUEST_ID
This constant names a metadata field on the
Flush
andFlushAck
messages that are used for reliable sending. The field denotes a unique identifier for matching up aFlush
with its correspondingFlushAck
.
Transports receive and send the above-mentioned control messages. The exact logic of how they should be processed depends on the exact nature of the external system that the transport connects to. More information and examples are provided below.
Note: The Java examples below are not intended to be used as a starting point. They only illustrate the core concept of handling control messages.
Threading
For events being delivered from the correlator to a chain towards the transport, the correlator will only ever call sendBatchTowardsTransport
from a single thread at a time. Most codecs will call the next component in the chain in the thread that invoked them, but are not required to. A codec can queue events and drain the queue from a separate thread if desired.
Transports and codecs should only make a single call at a time to the hostSide
plug-in (and thus only one thread at a time passes events towards the host) as the next plug-in may not be thread-safe. Similarly, codecs should only make one call at a time to the transportSide
plug-in, though one codec may have threads invoking both hostSide
and transportSide
concurrently. Plug-ins should not assume that they are called on the same thread each time (in particular, the correlator will use different threads for sending batches of events), but they can assume that no more than one thread at a time sends events to the transport.
Transports and codecs will typically be processing events towards the transport and towards the host in different threads concurrently. The start
, hostReady
and shutdown
methods will be called from different threads to any other operation and while other calls are in progress.
When a chain is disconnected or when the correlator is shut down, the shutdown
method on the plug-in is called. This should ensure the following:
- Any threads calling into the plug-in which are blocked in the plug-in (particularly for transports) should unblock and return.
- Any threads that the plug-in has started have been stopped and joined.
- The plug-in should ensure any in-progress calls out to other plug-ins have completed.
- The plug-in must ensure no more calls are made out of a plug-in to send messages to other plug-ins.
This is particularly important for C++ plug-ins. Methods calling out after returning from shutdown, or in progress at the point the shutdown
method completes, could cause a crash. In practice, a plug-in that starts a thread to read from a socket or other connection and send messages towards the host should close the socket and join the thread (waiting for it to terminate) to meet these requirements.
For C++ plug-ins, we recommend use of the standard libraries such as std::thread
and std::mutex
for managing threads and locking in plug-ins. If not available, we provide some simple macros in the sag_connectivity_threading.h
header file. See the API reference for C++ (Doxygen) for using it.
Writing a transport for reliable receiving
This section describes the obligations of a transport that wishes to see acknowledgments of messages that it is sending towards the host, in order that it can pass those acknowledgments to the reliable messaging system that it is connected to. Such a transport must declare its reliability before any messaging can take place, before the plug-in is fully started. This is achieved by calling the enableReliability
function on the PluginHost
member of the transport, either from the constructor or start()
method.
public MyReliableTransport(Logger logger, TransportConstructorParameters params)
throws IllegalArgumentException, Exception
{
super(logger, params);
host.enableReliability(Direction.TOWARDS_HOST);
}
A transport must place unique identifiers on any non-control messages (that is, real events) that it is sending towards the host. Ideally, these correspond to identifiers provided by the remote messaging system that your code is receiving from. While not 100% necessary, it makes tracing a message through the wider system much easier.
MyExternalMessage externalMessage = fictionalRemoteSystem.get();
Message msg = transformToMessage(externalMessage);
msg.putMetadataValue(Message.MESSAGE_ID,
externalMessage.getUniqueIdentifier());
hostside.sendBatchTowardsHost(Collections.singletonList(msg));
A transport must decide how regularly it wishes to receive acknowledgments (AckUpTo
) from the host application, by deciding when it sends AckRequired
control messages towards the host. In general, you should attempt to space these messages as widely as possible, so as not to put too much burden on the EPL application. The steps taken to “commit” the effects of received events may be quite expensive. However, the frequency of acknowledgments will probably also be constrained by the nature of the remote messaging system your transport is connected to. For example, it may only permit 1,000 unacknowledged messages to be outstanding before blocking receipt of further messages. In this case, you will want to be sending out AckRequired
control messages after every n real messages where n is a large fraction of 1,000.
Time is another factor to consider. In the worst case, for example, if acknowledgments are too sparse, a reconnecting application may face 10 minutes of redelivered messages that did not get acknowledged in a previous session. So in general, a transport should make sure to issue AckRequired
control messages at least every few seconds, assuming that any non-control messages have been sent towards the host since the last AckRequired
.
An AckRequired
control message must also contain the message identifier of the preceding non-control message, in order to identify which tranche of previous messages is covered by a corresponding acknowledgment.
Message ackRequired = new Message(null);
ackRequired.putMetadataValue(Message.CONTROL_TYPE,
Message.CONTROL_TYPE_ACK_REQUIRED);
ackRequired.putMetadataValue(Message.MESSAGE_ID, lastId);
Finally, a transport should be prepared to act on acknowledgments received from the EPL application, that is, AckUpTo
control messages from the host. Each AckUpTo
corresponds exactly to a previously issued AckRequired
, with both containing the same MESSAGE_ID
. AckUpTo
messages are seen in the exact same order that the AckRequired
messages were issued.
public void deliverNullPayloadTowardsTransport(Message message)
throws Exception {
Map<String, Object> metadata = message.getMetadataMap()
if (metadata.containsKey(Message.CONTROL_TYPE))
{
String controlType = (String)metadata.get(Message.CONTROL_TYPE);
if(Message.CONTROL_TYPE_ACK_UPTO.equals(controlType))
{
String messageId = metadata.get(Message.MESSAGE_ID);
fictionalRemoteSystem.ackUpToAndIncluding(messageId);
}
}
}
Writing a transport for reliable sending
This section describes the obligations of a transport that wishes to reliably acknowledge messages that are being sent to it from an EPL application, that is, from the host. As before, the transport should declare its reliable nature and direction.
public MyReliableTransport(Logger logger, TransportConstructorParameters params)
throws IllegalArgumentException, Exception
{
super(logger, params);
host.enableReliability(Direction.TOWARDS_TRANSPORT);
}
The transport should be prepared to act on Flush
control messages, ensuring that all preceding non-control messages are reliably delivered to a remote reliable messaging system. Once done, the transport should respond with a FlushAck
control message towards the host, with a REQUEST_ID
set to match it with the corresponding Flush
.
Frequent Flush
messages are automatically coalesced into individual messages that are more widely spaced. So a transport need not be concerned with the performance impact of responding to every Flush
request. Also, Flush
messages are subsumed by subsequent Flush
messages and their acknowledgments. For example, if a transport receives three Flush
messages, a FlushAck
corresponding to the final Flush
is interpreted as being a response to all three.
@Override
public void deliverNullPayloadTowardsTransport(Message message)
throws Exception {
Map<String, Object> metadata = message.getMetadataMap();
if (metadata.containsKey(Message.CONTROL_TYPE))
{
String controlType = (String)metadata.get(Message.CONTROL_TYPE);
if(Message.CONTROL_TYPE_FLUSH.equals(controlType))
{
fictionalRemoteSystem.commitEverythingSoFar();
Message response = new Message(null);
response.putMetadataValue(Message.CONTROL_TYPE,
Message.CONTROL_TYPE_FLUSH_ACK);
response.putMetadataValue(Message.REQUEST_ID,
Long.parseLong(metadata.get(Message.REQUEST));
hostSide.sendBatchTowardsHost(Collections.singletonList(response));
}
}
}
Flushing background queues in connectivity plug-ins
For most connectivity plug-ins, the correlator’s flushAllQueues
management request waits until all events have been processed by the plug-ins. However, some connectivity plug-ins implement an internal queue, in which case events are only flushed onto the queue, and it does not wait for them to actually complete. To extend flushAllQueues
to any hidden queues, there is an additional mechanism that can be implemented in a plug-in. This has been done for all the relevant plug-ins shipped as part of the product.
This is done using a control message, which is a special type of message sent between the host and the transport. Control messages have empty payloads and store all their information in the metadata instead. To enable receiving these control messages, at least one plug-in in the chain must call the enableQueueFlushMessages
function on the host
pointer:
void start() override {
host->enableQueueFlushMessages();
}
Note:
Flushing is only available for chains that do not use any of the Java-based connectivity plug-ins.
The control message for flushAllQueues
has the sag.control.type
metadata field set to QueueFlush
. It also has an opaque object in the sag.controlObject
metadata field. These constants are available as CONTROL_TYPE
, CONTROL_TYPE_QUEUE_FLUSH
, and CONTROL_OBJECT
constants on the Message
class.
The control message contains a custom_t
object in the CONTROL_OBJECT
metadata field. This is completely opaque to the user, but its lifetime controls the flush. It can be copied if there are multiple things to wait for. The flush is complete when all copies of the object have been destroyed. If you do nothing, this happens when the message is fully processed down the chain. To wait for an additional background queue to be flushed, store the object on the queue (either by moving or copying it from the metadata, or by placing the QueueFlush
message itself on the queue). When all previous messages on the queue have been processed, then destroy the object that was on the queue.
General notes for developing transports
OpenSSL
OpenSSL initialization and cleanup is handled internally by the correlator process itself. User-developed transports must not perform these tasks.