C/C++ transport plug-in development
The transport layer is the front-end of the IAF. The transport layer’s purpose is to abstract away the differences between the programming interfaces exposed by different middleware message sources and sinks. It consists of one or more custom plug-in libraries that extract downstream messages from external message sources ready for delivery to the codec layer, and send Apama events already encoded by the codec layer upstream to the external message sink. See The Integration Adapter Framework for a full introduction to transport plug-ins and the IAF’s architecture.
An adapter should send events to the correlator only after its start
function is called and before the stop
function returns.
This section includes the C/C++ transport plug-in development specification and additional information for developers of event transports using C/C++. Transport plug-in development in Java provides information about developing transport plug-ins in Java.
To configure the build for a transport plug-in:
-
On Linux, copying and customizing an Apama makefile from a sample application is the easiest method.
-
On Windows, you might find it easiest to copy an Apama sample project. If you prefer to use a project you already have, be sure to add
$(APAMA_HOME)\include
as an include directory. To do this in Visual Studio, select your project and then select Project Properties > C/C++ > General > Additional Include Directories.Also, link against
apiaf.lib
. To do this in Visual Studio, select your project and then select Project Properties > Linker > Input > Additional Dependencies and addapiaf.lib;apcommon.lib
.Finally, select Project Properties > Linker > General > Additional Library Directories, and add
$(APAMA_HOME)\lib
.
The C/C++ transport plug-in development specification
A C/C++ transport layer plug-in is implemented as a dynamic shared library. In order for the IAF to be able to load and use it, it must comply with Apama’s transport plug-in development specification. This specification describes the structure of a transport layer plug-in, and the C/C++ functions it needs to implement so that it can be used with the IAF. The specification also provides a mechanism for startup and configuration parameters to be passed to the plug-in from the IAF’s configuration file.
Property names and values used by transport plug-ins must be in UTF-8 format.
A transport layer plug-in implementation must include the C header file EventTransport.h
. It also needs to include EventCodec.h
, to allow the event transport to pass messages to codecs within the IAF codec layer. You can find these files in the include
directory of your Apama installation.
Transport functions to implement
EventTransport.h
provides the definition for a number of functions whose implementation needs to be provided by the event transport author. See the AP_EventTransport_Functions
structure in the API reference for C++ (Doxygen) for detailed information on these functions.
When the start
function is invoked, the event transport is effectively signaled to start accepting incoming messages and pass them onto a codec. Events should not be sent to the correlator until the start
function is called.
It is up to the event transport to determine which codec to communicate with from the list of codecs made available to it through the addEventDecoder
and removeEventDecoder
functions. Typically, a configuration property would be used to specify the codec to be used. If a handle to the desired codec had been stored in a variable called decoder
(of type AP_EventDecoder*
) when addEventDecoder
was called, an event could be passed on to the codec using:
decoder->functions->sendTransportEvent(decoder, event);
This codec function is described in C/C++ codec plug-in development.
Events should not be sent to the correlator after the stop
function has returned. The stop
method must wait for any other threads sending events to complete before the stop
method returns.
Defining the transport function table
The EventTransport.h
header file provides a definition for an AP_EventTransport_Functions
structure. This defines a function table whose elements must be set to point to the implementations of the above functions. See the AP_EventTransport_Functions
structure in the API reference for C++ (Doxygen) for more information.
Note that the order of the function pointers within the function table is critical to the reliable operation of the IAF. However, the order that the function definitions appear within the plug-in source code, and indeed the names of the functions, are not important. Apama recommends that the functions be declared static
, so that they are not globally visible and can only be accessed via the function table.
It is therefore not obligatory to implement the functions with the same names as per the definitions, as long as the mapping is performed correctly in an instantiation of AP_EventTransport_Functions
. A definition in an event transport implementation would look as follows:
static struct AP_EventTransport_Functions EventTransport_Functions
= {
updateProperties,
sendTransportEvent,
addEventDecoder,
removeEventDecoder,
flushUpstream,
flushDownstream,
start,
stop,
getLastError,
getStatus
};
The function table created above needs to be placed in an AP_EventTransport
object, and one such object needs to be created for every plug-in within its constructor function. See the AP_EventTransport_Functions
structure in the API reference for C++ (Doxygen) for more information.
The transport constructor, destructor and info functions
Every event transport needs to implement a constructor function, a destructor function and an “info” function. These methods are called by the IAF to (respectively) instantiate the event transport, to clean it up during unloading, and to provide information about the plug-in’s capabilities.
EventTransport.h
provides the following definitions:
AP_EventTransportCtorPtr
points to the constructor function. Typically part of the work of this constructor would be a call toupdateProperties
, in order to set up the initial configuration of the plug-in.AP_EventTransportDtorPtr
points to the related destructor function.AP_EventTransportInfoPtr
points to the info function.
The IAF will search for these functions by the names AP_EventTransport_ctor
, AP_EventTransport_dtor
and AP_EventTransport_info
when the library is loaded, so you must use these exact names when implementing them in a transport layer plug-in.
See the API reference for C++ (Doxygen) for more information on the above definitions.
Other transport definitions
EventTransport.h
also provides some additional definitions that the event transport author needs to be aware of:
AP_EventTransportError
defines the set of error codes that can be returned by the transport’s functions.- The
AP_EventTransportProperty
structure is a definition for a configuration property. This corresponds to the properties that can be passed in as initialization or re-configuration parameters from the configuration file of the IAF. - Properties are passed to the event transport within an
AP_EventTransportProperties
structure. - The status of a transport is reported in an
AP_EventTransportStatus
structure.
Transport utilities
The header files AP_EventParser.h
and AP_EventWriter.h
provide definitions for the Event Parser and Event Writer utilities. These utilities allow parsing and writing of the string form of reference types that are used by any <map type="reference">
elements in the adapters configuration file. These files are located in the include
directory of your Apama installation. See the contents of these files for more information.
Communication with the codec layer
If a transport layer plug-in is to be able to receive messages and then pass them on to the codec layer, it must be able to communicate with appropriate decoding codecs. A decoding codec is one that can accept messages from the transport layer and parse them (decode them) into the normalized event format accepted by the Semantic Mapper.
When a codec is loaded into the IAF, its details are passed to all transport layer plug-ins by calling their addEventDecoder
function. This tells the transport layer plug-in the name of the decoding codec and provides a reference to its AP_EventDecoder
structure.
The reference to AP_EventDecoder
gives the transport layer plug-in access to the following functions:
sendTransportEvent
getLastError
See AP_EventTransport_Functions
in the API reference for C++ (Doxygen) for more information on these functions.
Assuming the reference to the AP_EventDecoder
structure has been stored in a variable called decoder
, the functions can be called as follows:
errorCode = decoder->functions->sendTransportEvent(decoder, event);
errorMessage = decoder->functions->getLastError(decoder);
C/C++ codec plug-in development
The codec layer is a layer of abstraction between the transport layer and the IAF’s Semantic Mapper. It consists of one or more plug-in libraries that perform message encoding and/or decoding. Decoders translate downstream messages retrieved by the transport layer into the standard “normalized event” format on which the Semantic Mapper’s rules run. Encoders work in the opposite direction, encoding upstream normalized events into an appropriate format for transport layer plug-ins to send on. See The Integration Adapter Framework for a full introduction to codec plug-ins and the IAF’s architecture.
This topic includes the C/C++ codec plug-in development specification and additional information for developers of C/C++ event codecs. Java codec plug-in development provides analogous information about developing codec plug-ins in Java.
Before developing a new codec plug-in, it is worth considering whether one of the standard Apama IAF plug-ins could be used instead. Codec IAF plug-ins provides more information on the standard IAF codec plug-ins: StringCodec
and NullCodec
. The StringCodec
plug-in codes normalized events as formatted text strings. The NullCodec
plug-in is useful in situations where it does not make sense to decouple the codec and transport layers, and allows transport plug-ins to communicate with the Semantic Mapper directly using normalized events.
To configure the build for a codec plug-in:
-
On Linux, copying and customizing an Apama makefile from a sample application is the easiest method.
-
On Windows, you might find it easiest to copy an Apama sample project. If you prefer to use a project you already have, be sure to add
$(APAMA_HOME)\include
as an include directory. To do this in Visual Studio, select your project and then select Project Properties > C/C++ > General > Additional Include Directories.Also, link against
apiaf.lib
. To do this in Visual Studio, select your project and then select Project Properties > Linker > Input > Additional Dependencies and add:apiaf.lib;apcommon.lib
Finally, select Project Properties > Linker > General > Additional Library Directories, and add
$(APAMA_HOME)\lib
.
The C/C++ codec plug-in development specification
A codec plug-in needs to be structured as a dynamic shared library. In order for the IAF to be able to load and use it, it must comply with Apama’s codec plug-in development specification. This describes the overall format of a codec plug-in and the C/C++ functions it needs to implement so that its functionality is accessible by the IAF. The specification also provides a mechanism for startup and configuration parameters to be passed to the plug-in from the IAF’s configuration file.
Property names and values used by codec plug-ins must be in UTF-8 format.
A codec plug-in implementation must include the C header file EventCodec.h
. As a codec also needs to communicate both with a transport layer plug-in (or event transport) and with the Semantic Mapper, EventTransport.h
and SemanticMapper.h
also need to be included. You can find these files in the include
directory of your Apama installation.
Codec functions to implement
EventCodec.h
provides the definition for a number of functions whose implementation needs to be provided by the event transport author.
However, in contrast to the Transport Layer Plug-in Development Specification, the set of functions that need to be implemented varies depending on whether the codec is to implement only a message decoder, only a message encoder, or a bidirectional encoder/decoder.
In all cases, implementations need to be provided for the following functions:
updateProperties
getLastError
getStatus
It is recommended that updateProperties
is invoked by the codec constructor.
See the AP_EventCodec_Functions
structure in the API reference for C++ (Doxygen) for detailed information on these functions.
Codec encoder functions
If the codec is to implement an encoder, implementations need to be provided for the following functions:
sendNormalisedEvent
flushUpstream
getLastError
addEventTransport
removeEventTransport
See the AP_EventEncoder_Functions
structure in the API reference for C++ (Doxygen) for detailed information on these functions.
Codec decoder functions
If the codec is to provide a decoder, implementations need to be provided for the following functions:
sendTransportEvent
setSemanticMapper
flushDownstream
getLastError
See the AP_EventDecoder_Functions
structure in the API reference for C++ (Doxygen) for detailed information on these functions.
Defining the codec function tables
In a transport layer plug-in, the plug-in author needs to provide a function table that tells the IAF which functions to call to invoke specific functionality.
The Codec Development Specification follows this model but depending on whether the codec being developed is an encoder, a decoder or an encoder/decoder, up to three function tables may need to be defined.
Note that the order of the function pointers within each function table is critical to the reliable operation of the IAF. However, the order that the function definitions appear within the plug-in source code, and indeed the names of the functions, are not important. Apama recommends that the functions be declared static
, so that they are not globally visible and can only be accessed via the function table.
The codec function table
Every codec needs to define a generic codec function table. The header file provides a definition for this as an AP_EventCodec_Functions
structure with the following functions:
updateProperties
getLastError
getStatus
where the library functions updateProperties
, getLastError
and getStatus
are being defined as being the implementations of the Codec Development Specification’s updateProperties,``getLastError
and getStatus
function definitions respectively.
See the AP_EventCodec_Functions
structure in the API reference for C++ (Doxygen) for detailed information.
The codec encoder function table
If the codec being implemented is to act as an encoder, it needs to implement the encoder functions listed previously and map them in an encoder function table. This structure is defined in EventCodec.h
as an AP_EventEncoder_Functions
structure with the following functions:
sendNormalisedEvent
flushUpstream
getLastError
addEventTransport
removeEventTransport
See the AP_EventEncoder_Functions
structure in the API reference for C++ (Doxygen) for detailed information.
In the implementation of an encoding codec, this function table could be implemented as follows:
static struct AP_EventEncoder_Functions EventEncoder_Functions = {
sendNormalisedEvent,
flushUpstream,
getLastErrorEncoder,
addEventTransport,
removeEventTransport
};
This time, the library functions sendNormalisedEvent
, flushUpstream,``getLastError
, addEventTransport
and removeEventTransport
are being defined as the implementations of the Codec Development Specification’s sendNormalisedEvent
, flushUpstream,``getLastError
, addEventTransport
and removeEventTransport
function definitions respectively.
The codec decoder function table
If the codec being implemented is to act as a decoder, it needs to implement the decoder functions listed previously and map them in a decoder function table. This structure is defined in EventCodec.h
as an AP_EventDecoder_Functions
structure with the following functions:
sendTransportEvent
setSemanticMapper
flushDownstream
getLastError
See the AP_EventDecoder_Functions
structure in the API reference for C++ (Doxygen) for detailed information.
In the implementation of a decoding codec, this function table could be implemented as follows:
static struct AP_EventDecoder_Functions EventDecoder_Functions = {
sendTransportEvent,
setSemanticMapper,
flushDownstream,
getLastErrorDecoder
};
As before, this definition defines a number of library functions as the implementations of the function definitions specified in the Codec Development Specification.
Registering the codec function tables
The encoding and decoding function tables created above need to be placed in the relevant object, AP_EventEncoder
and AP_EventDecoder
. These, together with the generic function table, need to be placed in an AP_EventCodec
object. See the API reference for C++ (Doxygen) for detailed information on these structures.
An AP_EventCodec
object needs to be created for every plug-in within its constructor function. The encoder and decoder fields in it may be set to NULL
if the codec does not implement the respective functionality, although clearly it is meaningless to have both set to NULL
.
The codec constructor, destructor and info functions
Every event codec needs to implement a constructor function, a destructor function and an “info” function. These methods are called by the IAF to (respectively) to instantiate the event codec, to clean it up during unloading, and to provide information about the plug-in’s capabilities.
EventCodec.h
provides the following definitions:
AP_EventCodecCtorPtr
points to the constructor function.AP_EventCodecDtorPtr
points to the destructor function.AP_EventCodecInfoPtr
points to the info function. Every codec needs to implement an info function. This is called by the IAF to obtain information as to the capabilities (encoder/decoder) of the codec.
The IAF will search for these functions by the names AP_EventCodec_ctor
and AP_EventCodec_dtor
when the library is loaded, and it will search for and call AP_EventCodec_info
. So you must use these exact names when implementing a codec plug-in.
See the API reference for C++ (Doxygen) for more information on the above definitions.
Other codec definitions
EventCodec.h
also provides some additional definitions that the codec author needs to be aware of.
First of these are the codec capability bits. These are returned by the info function to define whether the codec can decode or encode messages.
###define AP_EVENTCODEC_CAP_ENCODER 0x0001
###define AP_EVENTCODEC_CAP_DECODER 0x0002
AP_EventCodecError
defines the set of error codes that can be returned by the codec’s functions.- The
AP_EventCodecProperty
structure is a definition for a configuration property. This corresponds to the properties that can be passed in as initialization or re-configuration parameters from the configuration file of the IAF. - Properties are passed to the event transport within an
AP_EventCodecProperties
structure. - The status of a codec is reported in an
AP_EventCodecStatus
structure.
You are advised to peruse EventCodec.h
for the complete definitions. EventTransport.h
and SemanticMapper.h
are also relevant as they define the functions that a codec author can invoke within the transport layer and the Semantic Mapper, respectively.
Codec utilities
The header files AP_EventParser.h
and AP_EventWriter.h
provide definitions for the Event Parser and Event Writer utilities. These utilities allow parsing and writing of the string form of reference types that are used by any <map type="reference">
elements in the adapters configuration file. These files are located in the include
directory of your Apama installation. See the contents of these files for more information.
Communication with other layers
A decoding codec plug-in’s role is to decode messages from a transport layer plug-in into a normalized format that can be processed by the Semantic Mapper. To achieve this, it needs to be able to communicate with the Semantic Mapper. The accessible Semantic Mapper functionality is presented in SemanticMapper.h
.
When a decoding codec starts, it is passed a handle to an AP_SemanticMapper
object through its setSemanticMapper
function. This object is defined in SemanticMapper.h
, where functions
, (of type AP_SemanticMapper_Functions*
) points to the definitions for two functions:
sendNormalisedEvent
getLastError
Code inside a decoding codec that calls these functions on the Semantic Mapper looks as follows. Assuming that mapper
holds a reference to the AP_SemanticMapper
object:
errorCode = mapper->functions->sendNormalisedEvent(mapper, NormalisedEvent);
and likewise for getLastError
.
AP_SemanticMapperError
defines the error codes that can be returned by sendNormalisedEvent
.
On the other hand, an encoding codec plug-in’s role is to encode messages in normalized format into some specific format that can then be accepted by a transport layer plug-in for transmission to an external message sink (like a message bus). To achieve this, it needs to be able to communicate with a transport layer plug-in loaded in the IAF.
When an encoding codec starts, its addEventTransport
function will be called once for each available transport. For each, it is passed a handle to an AP_EventTransport
object. This object is defined in EventTransport.h
and was described in detail in C/C++ transport plug-in development. It contains a pointer to AP_EventTransport_Functions
, which in turn references the functions available in the transport layer plug-in. Of these, only two are relevant to the author of an encoding codec:
sendTransportEvent
getLastError
Code inside an encoding codec that calls these functions on the transport layer plug-in looks as follows. Assuming that transport
holds a reference to the AP_EventTransport
object:
errorCode = transport->functions->sendTransportEvent(transport, event);
and likewise for getLastError
.
Working with normalized events in C++
The function of a decoding codec plug-in is to convert incoming messages into a standard normalized event format that can be processed by the Semantic Mapper. Events sent upstream to an encoding codec plug-in are provided to the plug-in in this same format.
Normalized events are essentially dictionaries of name-value pairs, where the names and values are both character strings. Each name-value pair nominally represents the name and content of a single field from an event, but users of the data structure are free to invent custom naming schemes to represent more complex event structures. Names must be unique within a given event. Values may be empty or NULL
.
Some examples of normalized event field values for different types are:
string "a string"
integer "1"
float "2.0"
decimal "100.0d"
sequence<boolean> "[true,false]"
dictionary<float,integer> "{2.3:2,4.3:5}"
SomeEvent "SomeEvent(12)"
Note: When assigning names to fields in normalized events, keep in mind that the fields
and transport
attributes for event mapping conditions and event mapping rules both use a list of fields delimited by spaces or commas. This means, for example that <id fields="Exchange EX,foo" test="==" value="LSE"/>
will successfully match a field called Exchange
, EX
or foo
, but not a field called Exchange EX,foo
. While fields with spaces or commas in their names may be included in a payload dictionary in upstream or downstream directions, they cannot be referenced directly in mapping or id rules.
To construct strings for the normalized event fields representing container types (dictionaries, sequences, or nested events), use the Event Writer utility found in the AP_EventWriter.h
header file, which is located in the include
directory of the Apama installation. The following examples show how to add a sequence and a dictionary to a normalized event (note the escape character (\
) used in order to insert a quotation mark into a string).
#include <AP_EventWriter.h>
AP_EventWriter *map, *list;
AP_NormalisedEvent *event;
AP_EventWriterValue key, value;
list=AP_EventWriter_ctor(AP_SEQUENCE, NULL);
list->addString(list, "abc");
list->addString(list, "de\"f");
map=AP_EventWriter_ctor(AP_DICTIONARY, NULL);
key.stringValue="key1"; value.stringValue="value";
map->addDictValue(map, AP_STRING, key, AP_STRING, value);
key.stringValue="key\"{}2";
value.stringValue="value\"{}2";
map->addDictValue(map, AP_STRING, key, AP_STRING, value);
event=AP_NormalisedEvent_ctor();
event->functions->addQuick(event, "mySequenceField",
event->functions->list->toString(list));
event->functions->event->functions->addQuick(event,
"myDictionaryField", event->functions->map->toString(map));
AP_EventWriter_dtor(list);
AP_EventWriter_dtor(map);
An any
field and optional
field can be added as follows:
AP_EventWriter* event = AP_EventWriter_ctor(AP_EVENT, "MyEvent");
AP_EventWriterValue val;
val.refValue = NULL;
//add an 'empty' any
event->addAny(event, AP_EMPTY, val, NULL);
//add an 'empty' optional as the second field
event->addOptional(event, AP_EMPTY, val);
val.intValue = 100;
event->addAny(event, AP_INTEGER, val, NULL);
val.intValue = 200;
event->addOptional(event, AP_INTEGER, val);
//Add an 'any' field containing 'optional'
AP_EventWriter *opt = AP_EventWriter_ctor(AP_EVENT, "optional");
opt->addInt(opt, 1);
val.refValue = opt;
event->addAny(writer, AP_EVENT, val, "optional<integer>");
Fields names and values of normalized events are in UTF-8 format. This means that the writer of the codec needs to ensure that downstream events are correctly formed and the codec should expect to handle UTF-8 coming upstream.
The NormalisedEvent.h
header file defines objects and functions that make up a special programming interface for constructing and examining normalized events. It contains two main structures:
-
AP_NormalisedEvent
This structure represents a single normalized event. It has a pointer to a table of client-visible functions exported by the object called
AP_NormalisedEvent_Functions
. This function table provides access to the operations that may be performed on the event object.In addition, the
AP_NormalisedEvent_ctor
constructor function is provided to create a new event instance.AP_NormalisedEvent_dtor
destroys a normalized event object, and should be called when the event is no longer required to free up resources. -
AP_NormalisedEventIterator
This structure can be used to step through the contents of a normalized event structure, in forwards or reverse order. It contains a function table defined by
AP_NormalisedEventIterator_Functions
, which includes all of the functions exported by a normalized event iterator.AP_NormalisedEventIterator_dtor
destroys a normalized event iterator object, and should be called when the iterator is no longer required to free up resources. There is no public constructor function; iterators are created and returned only byAP_NormalisedEvent
functions.
In both AP_NormalisedEvent
and AP_NormalisedEventIterator
functions, there is always a pointer to the corresponding structure. This is analogous to the implicit this
pointer passed to a C++ object when a member function is invoked on it.
See the NormalisedEvent.h
header file for more information about the structures and functions.
C/C++ plug-in support APIs
This section describes other programming interfaces provided with the Apama software that may be useful in implementing transport layer and codec plug-ins for the IAF.
Logging from IAF plug-ins in C/C++
This API provides a mechanism for recording status and error log messages from the IAF runtime and any plug-ins loaded within it. Plug-in developers are encouraged to make use of the logging API instead of custom logging solutions so that all the information may be logged together in the same standard format and log file(s) used by other plug-ins and the IAF runtime.
The logging API also allows control of logging verbosity, so that any messages below the configured logging level will not be written to the log. The logging level and file are initially set when an adapter first starts up; see Logging configuration (optional) for more information about the logging configuration.
The C/C++ interface to the logging system is declared in the header file AP_Logger.h
, which can be found in the include
directory of your Apama installation. All users of the logging system should include this header file. The types and functions of interest to IAF plug-in writers are:
-
AP_LogLevel
AP_LogLevel_NULL
means “no log level has been set” and should be interpreted by IAF and plug-ins as “use the default logging level”. -
AP_LogTrace
Along with the other logging functions below,
AP_LogTrace
is based on the standard C libraryprintf
function. The message parameter may containprintf
formatting characters that will be filled in from the remaining arguments. -
AP_LogDebug
-
AP_LogInfo
-
AP_LogWarn
-
AP_LogError
-
AP_LogCrit
The logging API offers other functions to set and query the current logging level and output file. While these functions are available to plug-in code, it is recommended that plug-ins do not use them. The IAF core is responsible for updating the state of the logging system in response to adapter reconfiguration requests.
Using the latency framework
The latency framework API provides a way to measure adapter latency by attaching high-resolution timing data to events as they stream into, through, and out of the adapter. Developers can then use these events to compute upstream, downstream, and round-trip latency numbers, including latency across multiple adapters.
The sendNormalisedEvent()
and sendTransportEvent()
functions contain an AP_TimestampSet
parameter that carries the microsecond-accurate timestamps that can be used to compute the desired statistics.
C/C++ timestamp
A timestamp is an index-value pair. The index represents the point in the event processing chain at which the timestamp was recorded, for example “upstream entry to semantic mapper” and the value is a floating point number representing the time. The header file AP_TimestampSet.h
defines a set of standard indexes, but a custom plug-in can define additional indexes for even finer-grained measurements. When you add a custom index definition, be sure to preserve the correct order, for example, an index denoting an “entry” point should be less than one denoting an “exit” point from that component.
Timestamps are relative measurements and are meant to be compared only to other timestamps in the same or similar processes on the same computer. Timestamps have no relationship to real-world “wall time”.
C/C++ timestamp set
A timestamp set is the collection of timestamps that are associated with an event. The latency framework API provides functions that developers can use to add, inspect, and remove timestamps from an event’s timestamp set.
C/C++ timestamp configuration object
Constructors and updateProperties()
methods for transport and codec plug-ins take the following argument: IAF_TimestampConfig
.
A timestamp configuration object contains a set of fields that a plug-in can use to decide whether to record and/or log timestamp information. Although timestamp configuration objects are passed to all transport and codec plug-ins, it is up to the authors of a plug-ins to write the code that makes use of them.
See the IAF_TimestampConfig
structure in the API reference for C++ (Doxygen) for detailed information on the fields.
C/C++ latency framework API
The C/C++ interface for the latency framework is declared in the header file AP_TimestampSet.h
. Plug-ins using the latency framework should include this file and also include the IAF_TimestampConfig.h
header file, which declares the timestamp configuration object.
See the AP_TimestampSet_Functions
structure in the API reference for C++ (Doxygen) for detailed information on the available functions.
Transport plug-in development in Java
The transport layer is the front-end of the IAF. The transport layer’s purpose is to abstract away the differences between the programming interfaces exposed by different middleware message sources and sinks. It consists of one or more custom plug-in libraries that extract downstream messages from external message sources ready for delivery to the codec layer, and send Apama events already encoded by the codec layer upstream to the external message sink. See The Integration Adapter Framework for a full introduction to transport plug-ins and the IAF’s architecture.
An adapter should send events to the correlator only after its start
function is called and before the stop
function returns.
This section includes the transport plug-in development specification for Java and additional information for developers of Java event transports. C/C++ transport plug-in development provides analogous information about developing transport plug-ins using C/C++.
The transport plug-in development specification for Java
A Java transport layer plug-in is implemented as a Java class extending AbstractEventTransport
. Typically this class would be packaged up, together with any supporting classes, as a Java Archive (.jar
) file.
To comply with Apama’s transport plug-in development specification, an event transport class must satisfy two conditions:
-
It must have a constructor with the signature:
public AbstractEventTransport( String **name**, EventTransportProperty[] **properties,** TimestampConfig **timestampConfig**) throws TransportException
This will be used by the IAF to instantiate the plug-in.
-
It must extend the
com.apama.iaf.plugin.AbstractEventTransport
class, correctly implementing all of its abstract methods.
(These methods are mostly directly equivalent to the functions with the same names in the C/C++ transport plug-in development specification.)
Note that all Java plug-ins are dependent on classes in ap-iaf-extension-api.jar
, so this file must always be on the classpath during plug-in development. It is located in the lib
directory of your Apama installation.
Unless otherwise stated, Java classes referred to in this topic are members of the com.apama.iaf.plugin
package, whose classes and interfaces are contained in this .jar
.
Java transport functions to implement
HTML Javadoc documentation for AbstractEventTransport
and related classes is provided as part of the Apama documentation set. See the API reference for Java (Javadoc) for detailed information on the functions that a transport plug-in author needs to implement.
AbstractEventTransport
is the constructor. A typical constructor would create a logger using the plug-in name
provided (see Logging from IAF plug-ins in Java), make a call to the updateProperties
method to deal with the initial property set passed in, and perform any other initialization operations required for the particular transport being developed.
See Communication with the codec layer for information on how the transport layer communicates with the codec layer in both the upstream and downstream directions.
Communication with the codec layer
This section discusses how the transport layer communicates with the codec layer in both the upstream and downstream directions.
Sending upstream messages received from a codec plug-in to a sink
When a codec plug-in has encoded an event ready for transmission by a transport plug-in it will pass it on calling the transport’s sendTransportEvent
method (as defined above). It is then up to the transport plug-in to process the message (which will be of some type agreed by the codec and transport plug-in authors), and send it on to the external sink it provides access to.
Note that there are no guarantees about which threads might call this method, so plug-in authors will need to consider thread synchronization issues carefully.
If there is a problem sending the event on, the transport plug-in should throw a TransportException
.
Sending downstream messages received from a source on to a codec plug-in
In order that messages can be easily sent on to a codec plug-in, an event transport will usually have saved a reference to the event codec(s) it will be using before it establishes a connection to the external source.
Typically an event transport will build up a list of registered codec plug-ins from the parameters passed to the addEventDecoder
and removeEventDecoder
methods. If this is the case, the start
method of the plug-in can select one of these plug-ins on the basis of a plug-in property provided in the configuration file (for example, <property name="decoderName" value="MyCodec"/>
), and saving it in an instance field (for example, currentDecoder
).
Once the plug-in has a reference to the event codec (or codecs) it will use, whenever an external message is received it should be passed on by calling the sendTransportEvent
method on the codec plug-in (from the EventDecoder
interface). See the API reference for Java (Javadoc) for more information on this method.
For example, part of the event processing code for a transport plug-in might be:
**_MyCustomMessageType_** message = **_myCustomMessageSource.getNextMessage\(\)_**;
currentDecoder.sendTransportEvent(message, timestamps);
If an error occurs in the codec or Semantic Mapper layers preventing the message from being converted into an Apama event, a CodecException
or SemanticMapperException
is thrown. Like all per-message errors, these should be logged at Warning
level, preferably with a full stack trace logged at Debug
level too. If necessary, transports may also send messages downstream to the correlator to inform running monitors about the error.
When a transport sends a message to the codec via the sendTransportEvent
method, it passes an Object
reference and this allows custom types to be passed between the two plug-ins. However, any custom types should be loaded via the main (parent) classloader, as each plug-in specified in the IAF configuration file is loaded with its own classloader. Consider, for example, the following three classes all loaded into a single jar file, MyAdapter.jar
, which is used in the IAF configuration file in the jarName
attribute of the <transport>
element:
MyTransport.class
MyCodec.class
MyContainer.class
(the container class used in the call tosendTransportEvent
)
When you load the transport and codec, a new classloader is used for each. This means both have their own copy of the MyContainer
class. When the transport creates an instance of MyContainer
and then passes it into the codec, the codec will recognize that the Object getClass().getName()
is MyContainer
, but will not be able to cast it to this type as its MyContainer
class is from a different classloader.
To prevent this from happening, make sure that all shared classes are in a separate jar that is specified by a <classpath>
element. The shared classes are then loaded by the parent classloader. This ensures that when a codec or transport references a shared class, they will both agree it is the same class.
Note that any codec plug-in called by a Java transport plug-in must also be written in Java.
Transport exceptions
TransportException
is the exception class that should be thrown by a transport plug-in whenever the IAF calls one of its methods and an error prevents the method from successfully completing — for example, a message that cannot be sent on to an external sink in sendTransportEvent
, or a serious problem that prevents the plug-in from initializing when start
is called.
A TransportException
object always has an associated message
, which is a String
explaining the problem (this may include information about another exception that caused the TransportException
to be thrown). There is also a code
field that specifies the kind of error that occurred; the possible codes are defined as constants in the TransportException
class.
TransportException
defines a number of constructors, to make it easy to set up the exception’s information quickly in different situations.
See the TransportException
class in the API reference for Java (Javadoc) for more information on these constants and constructors.
Logging
See Logging from IAF plug-ins in Java for information about how transport plug-ins should log error, status and debug information.
Java codec plug-in development
The codec layer is a layer of abstraction between the transport layer and the IAF’s Semantic Mapper. It consists of one or more plug-in libraries that perform message encoding and decoding. Decoding involves translating downstream messages retrieved by the transport layer into the standard “normalized event” format on which the Semantic Mapper’s rules run; encoding works in the opposite direction, converting upstream normalized events into an appropriate format for transport layer plug-ins to send on. Note that unlike the situation with C/C++, in Java codec plug-ins are always both encoders and decoders. See The Integration Adapter Framework for a full introduction to codec plug-ins and the IAF’s architecture.
This chapter includes the codec plug-in development specification for Java and additional information for developers of Java event codecs. C/C++ codec plug-in development provides analogous information about developing codec plug-ins using C/C++.
Before developing a new codec plug-in, it is worth considering whether one of the standard Apama plug-ins could be used instead. Codec IAF plug-ins provides more information on the standard IAF codec plug-ins: JStringCodec
and JNullCodec
. The JStringCodec
plug-in codes normalized events as formatted text strings. The JNullCodec
plug-in is useful in situations where it does not make sense to decouple the codec and transport layers, and allows transport plug-ins to communicate with the Semantic Mapper directly using normalized events.
The codec plug-in development specification for Java
A Java codec layer plug-in is implemented as a Java class extending AbstractEventCodec
. Typically this class would be packaged up, together with any supporting classes, as a Java Archive (.jar
) file.
To comply with Apama’s codec plug-in development specification, an event codec class must satisfy two conditions:
-
It must have a constructor with the signature:
public AbstractEventCodec( String **name**, EventCodecProperty[] **properties**, TimestampConfig **timestampConfig**) throws CodecException
This will be used by the IAF to instantiate the plug-in.
-
It must extend the
com.apama.iaf.plugin.AbstractEventCodec
class, correctly implementing all of its abstract methods.
(These methods are mostly directly equivalent to the functions with the same names in the C/C++ codec plug-in development specification.)
Note that all Java plug-ins are dependent on classes in ap-iaf-extension-api.jar
, so this file must always be on the classpath during plug-in development. It is located in the Apama installation’s lib
directory.
Unless otherwise stated, Java classes referred to in this chapter are members of the com.apama.iaf.plugin
package, whose classes and interfaces are contained in this .jar
.
Java codec functions to implement
HTML Javadoc documentation for AbstractEventCodec
and related classes is provided as part of the Apama documentation set. See the API reference for Java (Javadoc) for detailed information on the functions that a codec plug-in author needs to implement.
AbstractEventCodec
is the constructor. A typical constructor would create a logger using the plug-in name
provided (see Logging from IAF plug-ins in Java), make a call to the updateProperties
method to deal with the initial property set passed in, and perform any other initialization operations required for the particular event codec being developed.
Note that unlike event transports, codec plug-ins do not have start and stop methods.
See Communication with other layers for information on how the codec layer communicates with the transport layer and Semantic Mapper in upstream and downstream directions.
See also Working with normalized events for help working with NormalisedEvent
objects.
Communication with other layers
This section discusses how the codec layer communicates with the transport layer and Semantic Mapper in upstream and downstream directions.
Sending upstream messages received from the Semantic Mapper to a transport plug-in
When the Semantic Mapper produces normalized events, it sends them on to the codec layer by calling the codec plug-ins’ sendNormalisedEvent
methods (as defined above). The event codec must then encode the normalized event for transmission by the transport layer.
In order to send messages upstream to an event transport, a codec plug-in must have a reference to the transport plug-in object. Typically, an event codec does this by building up a map of registered transport plug-ins from the parameters passed to the addEventTransport
and removeEventTransport
methods. It might then use a property provided in the configuration file (for example, <property name="transportName" value="MyTransport"/>
) to determine which event transport to use when the sendNormalisedEvent
method is called.
Alternatively, if this transport plug-in will only ever be used in an adapter with just one codec plug-in, the EventTransport
object could be stored in an instance field when it is provided to the addEventTransport
method.
Once the plug-in has a reference to the event transport (or transports) it will use, it can pass on normalized events it has encoded into transport messages by calling the transport plug-in sendTransportEvent
method. See the EventTransport
interface in the API reference for Java (Javadoc) for more information on this method.
For example, the implementation of the event codec’s sendNormalisedEvent
could look something like this:
// Select EventTransport using saved plug-in property value
EventTransport transport = eventTransports.get(currentTransportName);
// Encode message
MyCustomMessageType message = myEncodeMessage(event);
// Send to Transport layer plug-in
transport.sendTransportEvent(message, timestamps);
If an error occurs in the transport layer, a TransportException
is thrown. Typically such exceptions do not need to be caught by the codec plug-in, unless the codec plug-in is able to somehow deal with the problem.
A CodecException
should be thrown if there is an error encoding the normalized event.
Note that there are no guarantees about which threads might call the sendNormalisedEvent
method, so plug-in authors will need to consider any thread synchronization issues arising from use of shared data structures.
Any transport plug-in called by a Java codec plug-in must also be written in Java.
Sending downstream messages received from a transport plug-in to the Semantic Mapper
When a transport plug-in configured to work with the event codec receives a messages from its external message source, it will pass it on to the codec plug-in by calling the sendTransportEvent
method (as defined above). It is then up to the codec plug-in to decode the message from whatever custom format is agreed between the transport and codec plug-ins into a standard normalized event that can be passed on to the Semantic Mapper.
When the message has been decoded, it should be sent to the Semantic Mapper using its sendNormalisedEvent
method. See the SemanticMapper
interface in the API reference for Java (Javadoc) for more information on this method.
For example, the implementation of the event codec’s sendTransportEvent
could look something like this:
// (Assume there's an instance field: SemanticMapper semanticMapper)
// Decode message
NormalisedEvent normalisedEvent = myDecodeMessage(event);
// Send to Transport layer plug-in
semanticMapper.sendNormalisedEvent(normalisedEvent, timestamps);
If an error occurs in the Semantic Mapper, a SemanticMapperException
is thrown. Typically such exceptions do not need to be caught by the codec plug-in, unless the codec plug-in is able to somehow deal with the problem.
A CodecException
should be thrown if there is an error decoding the normalized event.
Java codec exceptions
CodecException
is the exception class that should be thrown by a codec plug-in whenever the one of its methods is called and an error prevents the method from successfully completing — for example, a message that cannot be encoded or decoded because it has an invalid format.
A CodecException
object always has an associated message
, which is a String
explaining the problem (this may include information about another exception that caused the CodecException
to be thrown). There is also a code
field that specifies the kind of error that occurred; the possible codes are defined as constants in the CodecException
class.
Like the TransportException
object, CodecException
defines a number of constructors, to make it easy to set up the exception’s information quickly in different situations.
See the CodecException
class in the API reference for Java (Javadoc) for more information on these constants and constructors.
Semantic Mapper exceptions
Codec plug-ins should never need to construct or throw SemanticMapperException
objects, but they need to be able to catch them if they are thrown from the SemanticMapper.sendNormalisedEvent
method when it is called by the event codec.
SemanticMapperException
has exactly the same set of constructors as the CodecException
class described above. The only significant difference is the set of error codes.
See the SemanticMapperException
class in the API reference for Java (Javadoc) for more information on the constructors and error codes.
Logging
See Logging from IAF plug-ins in Java for information about how codec plug-ins should log error, status and debug information.
Working with normalized events in Java
The function of a decoding codec plug-in is to convert incoming messages into a standard normalized event format that can be processed by the Semantic Mapper. Events sent upstream to an encoding codec plug-in are provided to the plug-in in this same format.
Normalized events are essentially dictionaries of name-value pairs, where the names and values are both character strings. Each name-value pair nominally represents the name and content of a single field from an event, but users of the data structure are free to invent custom naming schemes to represent more complex event structures. Names must be unique within a given event. Values may be empty or null
.
Some examples of normalized event field values for different types are:
string "a string"
integer "1"
float "2.0"
decimal "100.0d"
sequence<boolean> "[true,false]"
dictionary<float,integer> "{2.3:2,4.3:5}"
SomeEvent "SomeEvent(12)"
Note: When assigning names to fields in normalized events, keep in mind that the fields
and transport
attributes for event mapping conditions and event mapping rules both use a list of fields delimited by spaces or commas. This means, for example that <id fields="Exchange EX,foo" test="==" value="LSE"/>
will successfully match a field called Exchange
, EX
or foo
, but not a field called Exchange EX,foo
. While fields with spaces or commas in their names may be included in a payload dictionary in upstream or downstream directions, they cannot be referenced directly in mapping or id rules.
To construct strings for the normalized event fields representing container types (dictionaries, sequences, or nested events), use the event parser/builder found in the ap-util.jar
file, which is located in the Apama installation’s lib
directory The following examples show how to add a sequence and a dictionary to a normalized event (note the escape character (\
) used in order to insert a quotation mark into a string).
List<String> list = new ArrayList<String>();
list.add("abc");
list.add("de\"f");
Map<String,String> map = new HashMap<String,String>();
map.put("key1", "value1");
map.put("key\"{}2", "value\"{}2");
final SequenceFieldType STRING_SEQUENCE_FIELD_TYPE =
new SequenceFieldType(StringFieldType.TYPE);
final DictionaryFieldType STRING_DICT_FIELD_TYPE =
new DictionaryFieldType(StringFieldType.TYPE, StringFieldType.TYPE);
NormalisedEvent event = new NormalisedEvent();
event.add("mySequenceField",
STRING_SEQUENCE_FIELD_TYPE.format(list));
event.add("myDictionaryField", STRING_DICT_FIELD_TYPE.format(map));
The programming interface for constructing and using normalized events is made up of three Java classes:
-
NormalisedEvent
The
NormalisedEvent
class represents a single normalized event. This class the most important part of the interface, and encapsulates the data and operations that can be performed on a single normalized event.Normalized events are not thread-safe. If your code will be accessing the same normalized event object (or associated iterators) from multiple threads, you must implement your own thread synchronization to prevent concurrent modification.
A public zero-argument constructor is provided for creation of new (initially empty)
NormalisedEvent
objects. -
NormalisedEventIterator
Several of the
NormalisedEvent
methods return an instance of theNormalisedEventIterator
class, which provides a way to step though the name-value pairs making up the normalized event, forwards or backwards.There is no public constructor. Iterators are created and returned only by
NormalisedEvent
methods. -
NormalisedEventException
Any errors encountered by
NormalisedEvent
result in instances ofNormalisedEventException
being thrown.
See the API reference for Java (Javadoc) for detailed information on these classes.
Plug-in support APIs for Java
This section describes other programming interfaces provided with the Apama software that may be useful in implementing transport layer and codec plug-ins for the IAF.
Logging from IAF plug-ins in Java
This API provides a mechanism for recording status and error log messages from the IAF runtime and any plug-ins loaded within it. Plug-in developers are encouraged to make use of the logging API instead of custom logging solutions so that all the information may be logged together in the same standard format and log file(s) used by other plug-ins and the IAF runtime.
The logging API also allows control of logging verbosity, so that any messages below the configured logging level will not be written to the log. The logging level and file are initially set when an adapter first starts up – see Logging configuration (optional) for more information about the logging configuration.
The Java logging API is based around the Logger
class.
The recommended way of using the Logger
class is to have a private final com.apama.util.Logger
variable, and then create an instance in the transport or codec’s constructor based on the plug-in name, such as the following:
private final Logger logger;
public MyTransport(String name,...)
{ super(...);
logger = Logger.getLogger(name);
}
The Logger
class supports the following logging levels:
FORCE
CRIT
FATAL
ERROR
WARN
INFO
DEBUG
TRACE
It is recommended that you do not use the FATAL
or CRIT
log levels provided by the Logger
class, which are present only for historical reasons. It is better to use ERROR
for all error conditions regardless of how fatal they are, and INFO
for informational messages. See Setting correlator and plug-in log files and log levels in a YAML configuration file for information about configuring log levels in the correlator.
For each level, there are three main methods. For example, for logging at the DEBUG
level, here are the three main methods:
-
logger.debug(String)
— Logs a message, if this log level is currently enabled. -
logger.debug(String, Throwable)
— Logs the stack trace and message of a caught exception together with a high-level description of the problem. Apama strongly recommend logging exceptions like this to assist with debugging in the event of problems. -
logger.debugEnabled()
— Determines whether messages at this log level are currently enabled (this depends on the current IAF log level, which may be changed dynamically). Apama strongly recommend checking this method’s result (particularly forDEBUG
messages) before logging messages where constructing the message string may be costly, for example:if (logger.debugEnabled()) logger.debug("A huge message was received, and the string representation of it is: "+thing.toString()+ " and here is some other useful info: "+foo+", "+bar);
Note that there is no point using the Enabled()
methods if the log message is a simple string (or string plus exception), such as:
logger.debug("The operation completed with an error: ", exception);
To make it easier to diagnose any errors that may occur, Apama recommends one of the following methods to log the application’s stack trace:
errorWithDebugStackTrace(java.lang.String msg, java.lang.Throwable ex)
— Logs the specified message at theERROR
level followed by the exception’s message string, and then logs the exception’s stack trace at theDEBUG
level.warnWithDebugStackTrace(java.lang.String msg, java.lang.Throwable ex)
— Logs the specified message at theWARN
level followed by the exception’s message string, and then logs the exception’s stack trace at theDEBUG
level.
See the API reference for Java (Javadoc) for more information about the Logger
class.
Using the latency framework
The latency framework API provides a way to measure adapter latency by attaching high-resolution timing data to events as they stream into, through, and out of the adapter. Developers can then use these events to compute upstream, downstream, and round-trip latency numbers, including latency across multiple adapters.
The sendNormalisedEvent()
and sendTransportEvent()
methods contain a TimestampSet
parameter that carries the microsecond-accurate timestamps that can be used to compute the desired statistics.
Javadoc documentation for com.apama.util.TimestampSet
and com.apama.util.TimestampConfig
classes is provided as part of the Apama documentation set. See the API reference for Java (Javadoc).
Java timestamp
A timestamp is an index-value pair. The index represents the point in the event processing chain at which the timestamp was recorded, for example “upstream entry to semantic mapper” and the value is a floating point number representing the time. The TimestampSet
class defines a set of standard indexes but a custom plug-in can define additional indexes for even finer-grained measurements. When you add a custom index definition, be sure to preserve the correct order, for example, an index denoting an “entry” point should be less than an one denoting an “exit” point from that component.
Timestamps are relative measurements and are meant to be compared only to other timestamps in the same or similar processes on the same computer.
Java timestamp set
A timestamp set is the collection of timestamps that are associated with an event. The latency framework API provides functions that developers can use to add, inspect, and remove timestamps from an event’s timestamp set.
The timestamp set is represented as a dictionary
of integer-float pairs, where the integer index refers to the location at which the timestamp was added and the floating-point time gives the time at which an event was there.
Java timestamp configuration object
The constructors and updateProperties()
methods for transport and codec plug-ins take this additional argument: TimestampConfig
.
A timestamp configuration object contains a set of fields that a plug-in can use to decide whether to record and/or log timestamp information. See the API reference for Java (Javadoc) for more information on the fields of the TimestampConfig
class.
Java latency framework API
The Java interface for the latency framework is declared in the header file com.apama.util.TimestampSet
class.
See the TimestampSet
class in the API reference for Java (Javadoc) for detailed information on the available functions.