Developing IAF Plug-ins

C/C++ transport plug-in development

The transport layer is the front-end of the IAF. The transport layer’s purpose is to abstract away the differences between the programming interfaces exposed by different middleware message sources and sinks. It consists of one or more custom plug-in libraries that extract downstream messages from external message sources ready for delivery to the codec layer, and send Apama events already encoded by the codec layer upstream to the external message sink. See The Integration Adapter Framework for a full introduction to transport plug-ins and the IAF’s architecture.

An adapter should send events to the correlator only after its start function is called and before the stop function returns.

This section includes the C/C++ transport plug-in development specification and additional information for developers of event transports using C/C++. Transport plug-in development in Java provides information about developing transport plug-ins in Java.

To configure the build for a transport plug-in:

  • On Linux, copying and customizing an Apama makefile from a sample application is the easiest method.

  • On Windows, you might find it easiest to copy an Apama sample project. If you prefer to use a project you already have, be sure to add $(APAMA_HOME)\include as an include directory. To do this in Visual Studio, select your project and then select Project Properties > C/C++ > General > Additional Include Directories.

    Also, link against apiaf.lib. To do this in Visual Studio, select your project and then select Project Properties > Linker > Input > Additional Dependencies and add apiaf.lib;apcommon.lib.

    Finally, select Project Properties > Linker > General > Additional Library Directories, and add $(APAMA_HOME)\lib.

The C/C++ transport plug-in development specification

A C/C++ transport layer plug-in is implemented as a dynamic shared library. In order for the IAF to be able to load and use it, it must comply with Apama’s transport plug-in development specification. This specification describes the structure of a transport layer plug-in, and the C/C++ functions it needs to implement so that it can be used with the IAF. The specification also provides a mechanism for startup and configuration parameters to be passed to the plug-in from the IAF’s configuration file.

Property names and values used by transport plug-ins must be in UTF-8 format.

A transport layer plug-in implementation must include the C header file EventTransport.h. It also needs to include EventCodec.h, to allow the event transport to pass messages to codecs within the IAF codec layer. You can find these files in the include directory of your Apama installation.

Transport functions to implement

EventTransport.h provides the definition for a number of functions whose implementation needs to be provided by the event transport author. See the AP_EventTransport_Functions structure in the API reference for C++ (Doxygen) for detailed information on these functions.

When the start function is invoked, the event transport is effectively signaled to start accepting incoming messages and pass them onto a codec. Events should not be sent to the correlator until the start function is called.

It is up to the event transport to determine which codec to communicate with from the list of codecs made available to it through the addEventDecoder and removeEventDecoder functions. Typically, a configuration property would be used to specify the codec to be used. If a handle to the desired codec had been stored in a variable called decoder (of type AP_EventDecoder*) when addEventDecoder was called, an event could be passed on to the codec using:

decoder->functions->sendTransportEvent(decoder, event);

This codec function is described in C/C++ codec plug-in development.

Events should not be sent to the correlator after the stop function has returned. The stop method must wait for any other threads sending events to complete before the stop method returns.

Defining the transport function table

The EventTransport.h header file provides a definition for an AP_EventTransport_Functions structure. This defines a function table whose elements must be set to point to the implementations of the above functions. See the AP_EventTransport_Functions structure in the API reference for C++ (Doxygen) for more information.

Note that the order of the function pointers within the function table is critical to the reliable operation of the IAF. However, the order that the function definitions appear within the plug-in source code, and indeed the names of the functions, are not important. Apama recommends that the functions be declared static, so that they are not globally visible and can only be accessed via the function table.

It is therefore not obligatory to implement the functions with the same names as per the definitions, as long as the mapping is performed correctly in an instantiation of AP_EventTransport_Functions. A definition in an event transport implementation would look as follows:

static struct AP_EventTransport_Functions EventTransport_Functions
  = {
  updateProperties,
  sendTransportEvent,
  addEventDecoder,
  removeEventDecoder,
  flushUpstream,
  flushDownstream,
  start,
  stop,
  getLastError,
  getStatus
};

The function table created above needs to be placed in an AP_EventTransport object, and one such object needs to be created for every plug-in within its constructor function. See the AP_EventTransport_Functions structure in the API reference for C++ (Doxygen) for more information.

The transport constructor, destructor and info functions

Every event transport needs to implement a constructor function, a destructor function and an “info” function. These methods are called by the IAF to (respectively) instantiate the event transport, to clean it up during unloading, and to provide information about the plug-in’s capabilities.

EventTransport.h provides the following definitions:

  • AP_EventTransportCtorPtr points to the constructor function. Typically part of the work of this constructor would be a call to updateProperties, in order to set up the initial configuration of the plug-in.
  • AP_EventTransportDtorPtr points to the related destructor function.
  • AP_EventTransportInfoPtr points to the info function.

The IAF will search for these functions by the names AP_EventTransport_ctor, AP_EventTransport_dtor and AP_EventTransport_info when the library is loaded, so you must use these exact names when implementing them in a transport layer plug-in.

See the API reference for C++ (Doxygen) for more information on the above definitions.

Other transport definitions

EventTransport.h also provides some additional definitions that the event transport author needs to be aware of:

  • AP_EventTransportError defines the set of error codes that can be returned by the transport’s functions.
  • The AP_EventTransportProperty structure is a definition for a configuration property. This corresponds to the properties that can be passed in as initialization or re-configuration parameters from the configuration file of the IAF.
  • Properties are passed to the event transport within an AP_EventTransportProperties structure.
  • The status of a transport is reported in an AP_EventTransportStatus structure.

Transport utilities

The header files AP_EventParser.h and AP_EventWriter.h provide definitions for the Event Parser and Event Writer utilities. These utilities allow parsing and writing of the string form of reference types that are used by any <map type="reference"> elements in the adapters configuration file. These files are located in the include directory of your Apama installation. See the contents of these files for more information.

Communication with the codec layer

If a transport layer plug-in is to be able to receive messages and then pass them on to the codec layer, it must be able to communicate with appropriate decoding codecs. A decoding codec is one that can accept messages from the transport layer and parse them (decode them) into the normalized event format accepted by the Semantic Mapper.

When a codec is loaded into the IAF, its details are passed to all transport layer plug-ins by calling their addEventDecoder function. This tells the transport layer plug-in the name of the decoding codec and provides a reference to its AP_EventDecoder structure.

The reference to AP_EventDecoder gives the transport layer plug-in access to the following functions:

  • sendTransportEvent
  • getLastError

See AP_EventTransport_Functions in the API reference for C++ (Doxygen) for more information on these functions.

Assuming the reference to the AP_EventDecoder structure has been stored in a variable called decoder, the functions can be called as follows:

errorCode = decoder->functions->sendTransportEvent(decoder, event);
errorMessage = decoder->functions->getLastError(decoder);

C/C++ codec plug-in development

The codec layer is a layer of abstraction between the transport layer and the IAF’s Semantic Mapper. It consists of one or more plug-in libraries that perform message encoding and/or decoding. Decoders translate downstream messages retrieved by the transport layer into the standard “normalized event” format on which the Semantic Mapper’s rules run. Encoders work in the opposite direction, encoding upstream normalized events into an appropriate format for transport layer plug-ins to send on. See The Integration Adapter Framework for a full introduction to codec plug-ins and the IAF’s architecture.

This topic includes the C/C++ codec plug-in development specification and additional information for developers of C/C++ event codecs. Java codec plug-in development provides analogous information about developing codec plug-ins in Java.

Before developing a new codec plug-in, it is worth considering whether one of the standard Apama IAF plug-ins could be used instead. Codec IAF plug-ins provides more information on the standard IAF codec plug-ins: StringCodec and NullCodec. The StringCodec plug-in codes normalized events as formatted text strings. The NullCodec plug-in is useful in situations where it does not make sense to decouple the codec and transport layers, and allows transport plug-ins to communicate with the Semantic Mapper directly using normalized events.

To configure the build for a codec plug-in:

  • On Linux, copying and customizing an Apama makefile from a sample application is the easiest method.

  • On Windows, you might find it easiest to copy an Apama sample project. If you prefer to use a project you already have, be sure to add $(APAMA_HOME)\include as an include directory. To do this in Visual Studio, select your project and then select Project Properties > C/C++ > General > Additional Include Directories.

    Also, link against apiaf.lib. To do this in Visual Studio, select your project and then select Project Properties > Linker > Input > Additional Dependencies and add:

    apiaf.lib;apcommon.lib

    Finally, select Project Properties > Linker > General > Additional Library Directories, and add $(APAMA_HOME)\lib.

The C/C++ codec plug-in development specification

A codec plug-in needs to be structured as a dynamic shared library. In order for the IAF to be able to load and use it, it must comply with Apama’s codec plug-in development specification. This describes the overall format of a codec plug-in and the C/C++ functions it needs to implement so that its functionality is accessible by the IAF. The specification also provides a mechanism for startup and configuration parameters to be passed to the plug-in from the IAF’s configuration file.

Property names and values used by codec plug-ins must be in UTF-8 format.

A codec plug-in implementation must include the C header file EventCodec.h. As a codec also needs to communicate both with a transport layer plug-in (or event transport) and with the Semantic Mapper, EventTransport.h and SemanticMapper.h also need to be included. You can find these files in the include directory of your Apama installation.

Codec functions to implement

EventCodec.h provides the definition for a number of functions whose implementation needs to be provided by the event transport author.

However, in contrast to the Transport Layer Plug-in Development Specification, the set of functions that need to be implemented varies depending on whether the codec is to implement only a message decoder, only a message encoder, or a bidirectional encoder/decoder.

In all cases, implementations need to be provided for the following functions:

  • updateProperties
  • getLastError
  • getStatus

It is recommended that updateProperties is invoked by the codec constructor.

See the AP_EventCodec_Functions structure in the API reference for C++ (Doxygen) for detailed information on these functions.

Codec encoder functions

If the codec is to implement an encoder, implementations need to be provided for the following functions:

  • sendNormalisedEvent
  • flushUpstream
  • getLastError
  • addEventTransport
  • removeEventTransport

See the AP_EventEncoder_Functions structure in the API reference for C++ (Doxygen) for detailed information on these functions.

Codec decoder functions

If the codec is to provide a decoder, implementations need to be provided for the following functions:

  • sendTransportEvent
  • setSemanticMapper
  • flushDownstream
  • getLastError

See the AP_EventDecoder_Functions structure in the API reference for C++ (Doxygen) for detailed information on these functions.

Defining the codec function tables

In a transport layer plug-in, the plug-in author needs to provide a function table that tells the IAF which functions to call to invoke specific functionality.

The Codec Development Specification follows this model but depending on whether the codec being developed is an encoder, a decoder or an encoder/decoder, up to three function tables may need to be defined.

Note that the order of the function pointers within each function table is critical to the reliable operation of the IAF. However, the order that the function definitions appear within the plug-in source code, and indeed the names of the functions, are not important. Apama recommends that the functions be declared static, so that they are not globally visible and can only be accessed via the function table.

The codec function table

Every codec needs to define a generic codec function table. The header file provides a definition for this as an AP_EventCodec_Functions structure with the following functions:

  • updateProperties
  • getLastError
  • getStatus

where the library functions updateProperties, getLastError and getStatus are being defined as being the implementations of the Codec Development Specification’s updateProperties,``getLastError and getStatus function definitions respectively.

See the AP_EventCodec_Functions structure in the API reference for C++ (Doxygen) for detailed information.

The codec encoder function table

If the codec being implemented is to act as an encoder, it needs to implement the encoder functions listed previously and map them in an encoder function table. This structure is defined in EventCodec.h as an AP_EventEncoder_Functions structure with the following functions:

  • sendNormalisedEvent
  • flushUpstream
  • getLastError
  • addEventTransport
  • removeEventTransport

See the AP_EventEncoder_Functions structure in the API reference for C++ (Doxygen) for detailed information.

In the implementation of an encoding codec, this function table could be implemented as follows:

static struct AP_EventEncoder_Functions EventEncoder_Functions = {
  sendNormalisedEvent,
  flushUpstream,
  getLastErrorEncoder,
  addEventTransport,
  removeEventTransport
};

This time, the library functions sendNormalisedEvent, flushUpstream,``getLastError, addEventTransport and removeEventTransport are being defined as the implementations of the Codec Development Specification’s sendNormalisedEvent, flushUpstream,``getLastError, addEventTransport and removeEventTransport function definitions respectively.

The codec decoder function table

If the codec being implemented is to act as a decoder, it needs to implement the decoder functions listed previously and map them in a decoder function table. This structure is defined in EventCodec.h as an AP_EventDecoder_Functions structure with the following functions:

  • sendTransportEvent
  • setSemanticMapper
  • flushDownstream
  • getLastError

See the AP_EventDecoder_Functions structure in the API reference for C++ (Doxygen) for detailed information.

In the implementation of a decoding codec, this function table could be implemented as follows:

static struct AP_EventDecoder_Functions EventDecoder_Functions = {
  sendTransportEvent,
  setSemanticMapper,
  flushDownstream,
  getLastErrorDecoder
};

As before, this definition defines a number of library functions as the implementations of the function definitions specified in the Codec Development Specification.

Registering the codec function tables

The encoding and decoding function tables created above need to be placed in the relevant object, AP_EventEncoder and AP_EventDecoder. These, together with the generic function table, need to be placed in an AP_EventCodec object. See the API reference for C++ (Doxygen) for detailed information on these structures.

An AP_EventCodec object needs to be created for every plug-in within its constructor function. The encoder and decoder fields in it may be set to NULL if the codec does not implement the respective functionality, although clearly it is meaningless to have both set to NULL.

The codec constructor, destructor and info functions

Every event codec needs to implement a constructor function, a destructor function and an “info” function. These methods are called by the IAF to (respectively) to instantiate the event codec, to clean it up during unloading, and to provide information about the plug-in’s capabilities.

EventCodec.h provides the following definitions:

  • AP_EventCodecCtorPtr points to the constructor function.
  • AP_EventCodecDtorPtr points to the destructor function.
  • AP_EventCodecInfoPtr points to the info function. Every codec needs to implement an info function. This is called by the IAF to obtain information as to the capabilities (encoder/decoder) of the codec.

The IAF will search for these functions by the names AP_EventCodec_ctor and AP_EventCodec_dtor when the library is loaded, and it will search for and call AP_EventCodec_info. So you must use these exact names when implementing a codec plug-in.

See the API reference for C++ (Doxygen) for more information on the above definitions.

Other codec definitions

EventCodec.h also provides some additional definitions that the codec author needs to be aware of.

First of these are the codec capability bits. These are returned by the info function to define whether the codec can decode or encode messages.

###define AP_EVENTCODEC_CAP_ENCODER  0x0001
###define AP_EVENTCODEC_CAP_DECODER  0x0002
  • AP_EventCodecError defines the set of error codes that can be returned by the codec’s functions.
  • The AP_EventCodecProperty structure is a definition for a configuration property. This corresponds to the properties that can be passed in as initialization or re-configuration parameters from the configuration file of the IAF.
  • Properties are passed to the event transport within an AP_EventCodecProperties structure.
  • The status of a codec is reported in an AP_EventCodecStatus structure.

You are advised to peruse EventCodec.h for the complete definitions. EventTransport.h and SemanticMapper.h are also relevant as they define the functions that a codec author can invoke within the transport layer and the Semantic Mapper, respectively.

Codec utilities

The header files AP_EventParser.h and AP_EventWriter.h provide definitions for the Event Parser and Event Writer utilities. These utilities allow parsing and writing of the string form of reference types that are used by any <map type="reference"> elements in the adapters configuration file. These files are located in the include directory of your Apama installation. See the contents of these files for more information.

Communication with other layers

A decoding codec plug-in’s role is to decode messages from a transport layer plug-in into a normalized format that can be processed by the Semantic Mapper. To achieve this, it needs to be able to communicate with the Semantic Mapper. The accessible Semantic Mapper functionality is presented in SemanticMapper.h.

When a decoding codec starts, it is passed a handle to an AP_SemanticMapper object through its setSemanticMapper function. This object is defined in SemanticMapper.h, where functions, (of type AP_SemanticMapper_Functions*) points to the definitions for two functions:

  • sendNormalisedEvent
  • getLastError

Code inside a decoding codec that calls these functions on the Semantic Mapper looks as follows. Assuming that mapper holds a reference to the AP_SemanticMapper object:

errorCode = mapper->functions->sendNormalisedEvent(mapper, NormalisedEvent);

and likewise for getLastError.

AP_SemanticMapperError defines the error codes that can be returned by sendNormalisedEvent.

On the other hand, an encoding codec plug-in’s role is to encode messages in normalized format into some specific format that can then be accepted by a transport layer plug-in for transmission to an external message sink (like a message bus). To achieve this, it needs to be able to communicate with a transport layer plug-in loaded in the IAF.

When an encoding codec starts, its addEventTransport function will be called once for each available transport. For each, it is passed a handle to an AP_EventTransport object. This object is defined in EventTransport.h and was described in detail in C/C++ transport plug-in development. It contains a pointer to AP_EventTransport_Functions, which in turn references the functions available in the transport layer plug-in. Of these, only two are relevant to the author of an encoding codec:

  • sendTransportEvent
  • getLastError

Code inside an encoding codec that calls these functions on the transport layer plug-in looks as follows. Assuming that transport holds a reference to the AP_EventTransport object:

errorCode = transport->functions->sendTransportEvent(transport, event);

and likewise for getLastError.

Working with normalized events in C++

The function of a decoding codec plug-in is to convert incoming messages into a standard normalized event format that can be processed by the Semantic Mapper. Events sent upstream to an encoding codec plug-in are provided to the plug-in in this same format.

Normalized events are essentially dictionaries of name-value pairs, where the names and values are both character strings. Each name-value pair nominally represents the name and content of a single field from an event, but users of the data structure are free to invent custom naming schemes to represent more complex event structures. Names must be unique within a given event. Values may be empty or NULL.

Some examples of normalized event field values for different types are:

  • string   "a string"
  • integer  "1"
  • float    "2.0"
  • decimal "100.0d"
  • sequence<boolean>  "[true,false]"
  • dictionary<float,integer> "{2.3:2,4.3:5}"
  • SomeEvent  "SomeEvent(12)"

Note: When assigning names to fields in normalized events, keep in mind that the fields and transport attributes for event mapping conditions and event mapping rules both use a list of fields delimited by spaces or commas. This means, for example that <id fields="Exchange EX,foo" test="==" value="LSE"/> will successfully match a field called Exchange, EX or foo, but not a field called Exchange EX,foo. While fields with spaces or commas in their names may be included in a payload dictionary in upstream or downstream directions, they cannot be referenced directly in mapping or id rules.

To construct strings for the normalized event fields representing container types (dictionaries, sequences, or nested events), use the Event Writer utility found in the AP_EventWriter.h header file, which is located in the include directory of the Apama installation. The following examples show how to add a sequence and a dictionary to a normalized event (note the escape character (\) used in order to insert a quotation mark into a string).

#include <AP_EventWriter.h>

AP_EventWriter *map, *list;
AP_NormalisedEvent *event;
AP_EventWriterValue key, value;

list=AP_EventWriter_ctor(AP_SEQUENCE, NULL);
list->addString(list, "abc");
list->addString(list, "de\"f");

map=AP_EventWriter_ctor(AP_DICTIONARY, NULL);
key.stringValue="key1"; value.stringValue="value";
map->addDictValue(map, AP_STRING, key, AP_STRING, value);
key.stringValue="key\"{}2";
value.stringValue="value\"{}2";
map->addDictValue(map, AP_STRING, key, AP_STRING, value);

event=AP_NormalisedEvent_ctor();
event->functions->addQuick(event, "mySequenceField",
event->functions->list->toString(list));
event->functions->event->functions->addQuick(event,
       "myDictionaryField", event->functions->map->toString(map));

AP_EventWriter_dtor(list);
AP_EventWriter_dtor(map);

An any field and optional field can be added as follows:

AP_EventWriter* event = AP_EventWriter_ctor(AP_EVENT, "MyEvent");

AP_EventWriterValue val;
val.refValue = NULL;

//add an 'empty' any
event->addAny(event, AP_EMPTY, val, NULL);

//add an 'empty' optional as the second field
event->addOptional(event, AP_EMPTY, val);

val.intValue = 100;
event->addAny(event, AP_INTEGER, val, NULL);
val.intValue = 200;
event->addOptional(event, AP_INTEGER, val);

//Add an 'any' field containing 'optional'
AP_EventWriter *opt = AP_EventWriter_ctor(AP_EVENT, "optional");
opt->addInt(opt, 1);

val.refValue = opt;
event->addAny(writer, AP_EVENT, val, "optional<integer>");

Fields names and values of normalized events are in UTF-8 format. This means that the writer of the codec needs to ensure that downstream events are correctly formed and the codec should expect to handle UTF-8 coming upstream.

The NormalisedEvent.h header file defines objects and functions that make up a special programming interface for constructing and examining normalized events. It contains two main structures:

  • AP_NormalisedEvent

    This structure represents a single normalized event. It has a pointer to a table of client-visible functions exported by the object called AP_NormalisedEvent_Functions. This function table provides access to the operations that may be performed on the event object.

    In addition, the AP_NormalisedEvent_ctor constructor function is provided to create a new event instance. AP_NormalisedEvent_dtor destroys a normalized event object, and should be called when the event is no longer required to free up resources.

  • AP_NormalisedEventIterator

    This structure can be used to step through the contents of a normalized event structure, in forwards or reverse order. It contains a function table defined by AP_NormalisedEventIterator_Functions, which includes all of the functions exported by a normalized event iterator.

    AP_NormalisedEventIterator_dtor destroys a normalized event iterator object, and should be called when the iterator is no longer required to free up resources. There is no public constructor function; iterators are created and returned only by AP_NormalisedEvent functions.

In both AP_NormalisedEvent and AP_NormalisedEventIterator functions, there is always a pointer to the corresponding structure. This is analogous to the implicit this pointer passed to a C++ object when a member function is invoked on it.

See the NormalisedEvent.h header file for more information about the structures and functions.

C/C++ plug-in support APIs

This section describes other programming interfaces provided with the Apama software that may be useful in implementing transport layer and codec plug-ins for the IAF.

Logging from IAF plug-ins in C/C++

This API provides a mechanism for recording status and error log messages from the IAF runtime and any plug-ins loaded within it. Plug-in developers are encouraged to make use of the logging API instead of custom logging solutions so that all the information may be logged together in the same standard format and log file(s) used by other plug-ins and the IAF runtime.

The logging API also allows control of logging verbosity, so that any messages below the configured logging level will not be written to the log. The logging level and file are initially set when an adapter first starts up; see Logging configuration (optional) for more information about the logging configuration.

The C/C++ interface to the logging system is declared in the header file AP_Logger.h, which can be found in the include directory of your Apama installation. All users of the logging system should include this header file. The types and functions of interest to IAF plug-in writers are:

  • AP_LogLevel

    AP_LogLevel_NULL means “no log level has been set” and should be interpreted by IAF and plug-ins as “use the default logging level”.

  • AP_LogTrace

    Along with the other logging functions below, AP_LogTrace is based on the standard C library printf function. The message parameter may contain printf formatting characters that will be filled in from the remaining arguments.

  • AP_LogDebug

  • AP_LogInfo

  • AP_LogWarn

  • AP_LogError

  • AP_LogCrit

The logging API offers other functions to set and query the current logging level and output file. While these functions are available to plug-in code, it is recommended that plug-ins do not use them. The IAF core is responsible for updating the state of the logging system in response to adapter reconfiguration requests.

Using the latency framework

The latency framework API provides a way to measure adapter latency by attaching high-resolution timing data to events as they stream into, through, and out of the adapter. Developers can then use these events to compute upstream, downstream, and round-trip latency numbers, including latency across multiple adapters.

The sendNormalisedEvent() and sendTransportEvent() functions contain an AP_TimestampSet parameter that carries the microsecond-accurate timestamps that can be used to compute the desired statistics.

C/C++ timestamp

A timestamp is an index-value pair. The index represents the point in the event processing chain at which the timestamp was recorded, for example “upstream entry to semantic mapper” and the value is a floating point number representing the time. The header file AP_TimestampSet.h defines a set of standard indexes, but a custom plug-in can define additional indexes for even finer-grained measurements. When you add a custom index definition, be sure to preserve the correct order, for example, an index denoting an “entry” point should be less than one denoting an “exit” point from that component.

Timestamps are relative measurements and are meant to be compared only to other timestamps in the same or similar processes on the same computer. Timestamps have no relationship to real-world “wall time”.

C/C++ timestamp set

A timestamp set is the collection of timestamps that are associated with an event. The latency framework API provides functions that developers can use to add, inspect, and remove timestamps from an event’s timestamp set.

C/C++ timestamp configuration object

Constructors and updateProperties() methods for transport and codec plug-ins take the following argument: IAF_TimestampConfig.

A timestamp configuration object contains a set of fields that a plug-in can use to decide whether to record and/or log timestamp information. Although timestamp configuration objects are passed to all transport and codec plug-ins, it is up to the authors of a plug-ins to write the code that makes use of them.

See the IAF_TimestampConfig structure in the API reference for C++ (Doxygen) for detailed information on the fields.

C/C++ latency framework API

The C/C++ interface for the latency framework is declared in the header file AP_TimestampSet.h. Plug-ins using the latency framework should include this file and also include the IAF_TimestampConfig.h header file, which declares the timestamp configuration object.

See the AP_TimestampSet_Functions structure in the API reference for C++ (Doxygen) for detailed information on the available functions.

Transport plug-in development in Java

The transport layer is the front-end of the IAF. The transport layer’s purpose is to abstract away the differences between the programming interfaces exposed by different middleware message sources and sinks. It consists of one or more custom plug-in libraries that extract downstream messages from external message sources ready for delivery to the codec layer, and send Apama events already encoded by the codec layer upstream to the external message sink. See The Integration Adapter Framework for a full introduction to transport plug-ins and the IAF’s architecture.

An adapter should send events to the correlator only after its start function is called and before the stop function returns.

This section includes the transport plug-in development specification for Java and additional information for developers of Java event transports. C/C++ transport plug-in development provides analogous information about developing transport plug-ins using C/C++.

The transport plug-in development specification for Java

A Java transport layer plug-in is implemented as a Java class extending AbstractEventTransport. Typically this class would be packaged up, together with any supporting classes, as a Java Archive (.jar) file.

To comply with Apama’s transport plug-in development specification, an event transport class must satisfy two conditions:

  1. It must have a constructor with the signature:

    public AbstractEventTransport(
              String **name**,
              EventTransportProperty[] **properties,**
              TimestampConfig **timestampConfig**)
           throws TransportException
    

    This will be used by the IAF to instantiate the plug-in.

  2. It must extend the com.apama.iaf.plugin.AbstractEventTransport class, correctly implementing all of its abstract methods.

(These methods are mostly directly equivalent to the functions with the same names in the C/C++ transport plug-in development specification.)

Note that all Java plug-ins are dependent on classes in ap-iaf-extension-api.jar, so this file must always be on the classpath during plug-in development. It is located in the lib directory of your Apama installation.

Unless otherwise stated, Java classes referred to in this topic are members of the com.apama.iaf.plugin package, whose classes and interfaces are contained in this .jar.

Java transport functions to implement

HTML Javadoc documentation for AbstractEventTransport and related classes is provided as part of the Apama documentation set. See the API reference for Java (Javadoc) for detailed information on the functions that a transport plug-in author needs to implement.

AbstractEventTransport is the constructor. A typical constructor would create a logger using the plug-in name provided (see Logging from IAF plug-ins in Java), make a call to the updateProperties method to deal with the initial property set passed in, and perform any other initialization operations required for the particular transport being developed.

See Communication with the codec layer for information on how the transport layer communicates with the codec layer in both the upstream and downstream directions.

Communication with the codec layer

This section discusses how the transport layer communicates with the codec layer in both the upstream and downstream directions.

Sending upstream messages received from a codec plug-in to a sink

When a codec plug-in has encoded an event ready for transmission by a transport plug-in it will pass it on calling the transport’s sendTransportEvent method (as defined above). It is then up to the transport plug-in to process the message (which will be of some type agreed by the codec and transport plug-in authors), and send it on to the external sink it provides access to.

Note that there are no guarantees about which threads might call this method, so plug-in authors will need to consider thread synchronization issues carefully.

If there is a problem sending the event on, the transport plug-in should throw a TransportException.

Sending downstream messages received from a source on to a codec plug-in

In order that messages can be easily sent on to a codec plug-in, an event transport will usually have saved a reference to the event codec(s) it will be using before it establishes a connection to the external source.

Typically an event transport will build up a list of registered codec plug-ins from the parameters passed to the addEventDecoder and removeEventDecoder methods. If this is the case, the start method of the plug-in can select one of these plug-ins on the basis of a plug-in property provided in the configuration file (for example, <property name="decoderName" value="MyCodec"/>), and saving it in an instance field (for example, currentDecoder).

Once the plug-in has a reference to the event codec (or codecs) it will use, whenever an external message is received it should be passed on by calling the sendTransportEvent method on the codec plug-in (from the EventDecoder interface). See the API reference for Java (Javadoc) for more information on this method.

For example, part of the event processing code for a transport plug-in might be:

**_MyCustomMessageType_** message = **_myCustomMessageSource.getNextMessage\(\)_**;
currentDecoder.sendTransportEvent(message, timestamps);

If an error occurs in the codec or Semantic Mapper layers preventing the message from being converted into an Apama event, a CodecException or SemanticMapperException is thrown. Like all per-message errors, these should be logged at Warning level, preferably with a full stack trace logged at Debug level too. If necessary, transports may also send messages downstream to the correlator to inform running monitors about the error.

When a transport sends a message to the codec via the sendTransportEvent method, it passes an Object reference and this allows custom types to be passed between the two plug-ins. However, any custom types should be loaded via the main (parent) classloader, as each plug-in specified in the IAF configuration file is loaded with its own classloader. Consider, for example, the following three classes all loaded into a single jar file, MyAdapter.jar, which is used in the IAF configuration file in the jarName attribute of the <transport> element:

  • MyTransport.class
  • MyCodec.class
  • MyContainer.class (the container class used in the call to sendTransportEvent)

When you load the transport and codec, a new classloader is used for each. This means both have their own copy of the MyContainer class. When the transport creates an instance of MyContainer and then passes it into the codec, the codec will recognize that the Object getClass().getName() is MyContainer, but will not be able to cast it to this type as its MyContainer class is from a different classloader.

To prevent this from happening, make sure that all shared classes are in a separate jar that is specified by a <classpath> element. The shared classes are then loaded by the parent classloader. This ensures that when a codec or transport references a shared class, they will both agree it is the same class.

Note that any codec plug-in called by a Java transport plug-in must also be written in Java.

Transport exceptions

TransportException is the exception class that should be thrown by a transport plug-in whenever the IAF calls one of its methods and an error prevents the method from successfully completing — for example, a message that cannot be sent on to an external sink in sendTransportEvent, or a serious problem that prevents the plug-in from initializing when start is called.

A TransportException object always has an associated message, which is a String explaining the problem (this may include information about another exception that caused the TransportException to be thrown). There is also a code field that specifies the kind of error that occurred; the possible codes are defined as constants in the TransportException class.

TransportException defines a number of constructors, to make it easy to set up the exception’s information quickly in different situations.

See the TransportException class in the API reference for Java (Javadoc) for more information on these constants and constructors.

Logging

See Logging from IAF plug-ins in Java for information about how transport plug-ins should log error, status and debug information.

Java codec plug-in development

The codec layer is a layer of abstraction between the transport layer and the IAF’s Semantic Mapper. It consists of one or more plug-in libraries that perform message encoding and decoding. Decoding involves translating downstream messages retrieved by the transport layer into the standard “normalized event” format on which the Semantic Mapper’s rules run; encoding works in the opposite direction, converting upstream normalized events into an appropriate format for transport layer plug-ins to send on. Note that unlike the situation with C/C++, in Java codec plug-ins are always both encoders and decoders. See The Integration Adapter Framework for a full introduction to codec plug-ins and the IAF’s architecture.

This chapter includes the codec plug-in development specification for Java and additional information for developers of Java event codecs. C/C++ codec plug-in development provides analogous information about developing codec plug-ins using C/C++.

Before developing a new codec plug-in, it is worth considering whether one of the standard Apama plug-ins could be used instead. Codec IAF plug-ins provides more information on the standard IAF codec plug-ins: JStringCodec and JNullCodec. The JStringCodec plug-in codes normalized events as formatted text strings. The JNullCodec plug-in is useful in situations where it does not make sense to decouple the codec and transport layers, and allows transport plug-ins to communicate with the Semantic Mapper directly using normalized events.

The codec plug-in development specification for Java

A Java codec layer plug-in is implemented as a Java class extending AbstractEventCodec. Typically this class would be packaged up, together with any supporting classes, as a Java Archive (.jar) file.

To comply with Apama’s codec plug-in development specification, an event codec class must satisfy two conditions:

  1. It must have a constructor with the signature:

    public AbstractEventCodec(
          String **name**,
          EventCodecProperty[] **properties**,
          TimestampConfig **timestampConfig**)
       throws CodecException
    

    This will be used by the IAF to instantiate the plug-in.

  2. It must extend the com.apama.iaf.plugin.AbstractEventCodec class, correctly implementing all of its abstract methods.

(These methods are mostly directly equivalent to the functions with the same names in the C/C++ codec plug-in development specification.)

Note that all Java plug-ins are dependent on classes in ap-iaf-extension-api.jar, so this file must always be on the classpath during plug-in development. It is located in the Apama installation’s lib directory.

Unless otherwise stated, Java classes referred to in this chapter are members of the com.apama.iaf.plugin package, whose classes and interfaces are contained in this .jar.

Java codec functions to implement

HTML Javadoc documentation for AbstractEventCodec and related classes is provided as part of the Apama documentation set. See the API reference for Java (Javadoc) for detailed information on the functions that a codec plug-in author needs to implement.

AbstractEventCodec is the constructor. A typical constructor would create a logger using the plug-in name provided (see Logging from IAF plug-ins in Java), make a call to the updateProperties method to deal with the initial property set passed in, and perform any other initialization operations required for the particular event codec being developed.

Note that unlike event transports, codec plug-ins do not have start and stop methods.

See Communication with other layers for information on how the codec layer communicates with the transport layer and Semantic Mapper in upstream and downstream directions.

See also Working with normalized events for help working with NormalisedEvent objects.

Communication with other layers

This section discusses how the codec layer communicates with the transport layer and Semantic Mapper in upstream and downstream directions.

Sending upstream messages received from the Semantic Mapper to a transport plug-in

When the Semantic Mapper produces normalized events, it sends them on to the codec layer by calling the codec plug-ins’ sendNormalisedEvent methods (as defined above). The event codec must then encode the normalized event for transmission by the transport layer.

In order to send messages upstream to an event transport, a codec plug-in must have a reference to the transport plug-in object. Typically, an event codec does this by building up a map of registered transport plug-ins from the parameters passed to the addEventTransport and removeEventTransport methods. It might then use a property provided in the configuration file (for example, <property name="transportName" value="MyTransport"/>) to determine which event transport to use when the sendNormalisedEvent method is called.

Alternatively, if this transport plug-in will only ever be used in an adapter with just one codec plug-in, the EventTransport object could be stored in an instance field when it is provided to the addEventTransport method.

Once the plug-in has a reference to the event transport (or transports) it will use, it can pass on normalized events it has encoded into transport messages by calling the transport plug-in sendTransportEvent method. See the EventTransport interface in the API reference for Java (Javadoc) for more information on this method.

For example, the implementation of the event codec’s sendNormalisedEvent could look something like this:

// Select EventTransport using saved plug-in property value
EventTransport transport = eventTransports.get(currentTransportName);
// Encode message
MyCustomMessageType message = myEncodeMessage(event);
// Send to Transport layer plug-in
transport.sendTransportEvent(message, timestamps);

If an error occurs in the transport layer, a TransportException is thrown. Typically such exceptions do not need to be caught by the codec plug-in, unless the codec plug-in is able to somehow deal with the problem.

A CodecException should be thrown if there is an error encoding the normalized event.

Note that there are no guarantees about which threads might call the sendNormalisedEvent method, so plug-in authors will need to consider any thread synchronization issues arising from use of shared data structures.

Any transport plug-in called by a Java codec plug-in must also be written in Java.

Sending downstream messages received from a transport plug-in to the Semantic Mapper

When a transport plug-in configured to work with the event codec receives a messages from its external message source, it will pass it on to the codec plug-in by calling the sendTransportEvent method (as defined above). It is then up to the codec plug-in to decode the message from whatever custom format is agreed between the transport and codec plug-ins into a standard normalized event that can be passed on to the Semantic Mapper.

When the message has been decoded, it should be sent to the Semantic Mapper using its sendNormalisedEvent method. See the SemanticMapper interface in the API reference for Java (Javadoc) for more information on this method.

For example, the implementation of the event codec’s sendTransportEvent could look something like this:

// (Assume there's an instance field: SemanticMapper semanticMapper)
// Decode message
NormalisedEvent normalisedEvent = myDecodeMessage(event);
// Send to Transport layer plug-in
semanticMapper.sendNormalisedEvent(normalisedEvent, timestamps);

If an error occurs in the Semantic Mapper, a SemanticMapperException is thrown. Typically such exceptions do not need to be caught by the codec plug-in, unless the codec plug-in is able to somehow deal with the problem.

A CodecException should be thrown if there is an error decoding the normalized event.

Java codec exceptions

CodecException is the exception class that should be thrown by a codec plug-in whenever the one of its methods is called and an error prevents the method from successfully completing — for example, a message that cannot be encoded or decoded because it has an invalid format.

A CodecException object always has an associated message, which is a String explaining the problem (this may include information about another exception that caused the CodecException to be thrown). There is also a code field that specifies the kind of error that occurred; the possible codes are defined as constants in the CodecException class.

Like the TransportException object, CodecException defines a number of constructors, to make it easy to set up the exception’s information quickly in different situations.

See the CodecException class in the API reference for Java (Javadoc) for more information on these constants and constructors.

Semantic Mapper exceptions

Codec plug-ins should never need to construct or throw SemanticMapperException objects, but they need to be able to catch them if they are thrown from the SemanticMapper.sendNormalisedEvent method when it is called by the event codec.

SemanticMapperException has exactly the same set of constructors as the CodecException class described above. The only significant difference is the set of error codes.

See the SemanticMapperException class in the API reference for Java (Javadoc) for more information on the constructors and error codes.

Logging

See Logging from IAF plug-ins in Java for information about how codec plug-ins should log error, status and debug information.

Working with normalized events in Java

The function of a decoding codec plug-in is to convert incoming messages into a standard normalized event format that can be processed by the Semantic Mapper. Events sent upstream to an encoding codec plug-in are provided to the plug-in in this same format.

Normalized events are essentially dictionaries of name-value pairs, where the names and values are both character strings. Each name-value pair nominally represents the name and content of a single field from an event, but users of the data structure are free to invent custom naming schemes to represent more complex event structures. Names must be unique within a given event. Values may be empty or null.

Some examples of normalized event field values for different types are:

  • string   "a string"
  • integer  "1"
  • float    "2.0"
  • decimal "100.0d"
  • sequence<boolean>  "[true,false]"
  • dictionary<float,integer> "{2.3:2,4.3:5}"
  • SomeEvent  "SomeEvent(12)"

Note: When assigning names to fields in normalized events, keep in mind that the fields and transport attributes for event mapping conditions and event mapping rules both use a list of fields delimited by spaces or commas. This means, for example that <id fields="Exchange EX,foo" test="==" value="LSE"/> will successfully match a field called Exchange, EX or foo, but not a field called Exchange EX,foo. While fields with spaces or commas in their names may be included in a payload dictionary in upstream or downstream directions, they cannot be referenced directly in mapping or id rules.

To construct strings for the normalized event fields representing container types (dictionaries, sequences, or nested events), use the event parser/builder found in the ap-util.jar file, which is located in the Apama installation’s lib directory The following examples show how to add a sequence and a dictionary to a normalized event (note the escape character (\) used in order to insert a quotation mark into a string).

List<String> list = new ArrayList<String>();
list.add("abc");
list.add("de\"f");
Map<String,String> map = new HashMap<String,String>();
map.put("key1", "value1");
map.put("key\"{}2", "value\"{}2");
final SequenceFieldType STRING_SEQUENCE_FIELD_TYPE =
    new SequenceFieldType(StringFieldType.TYPE);
final DictionaryFieldType STRING_DICT_FIELD_TYPE =
    new DictionaryFieldType(StringFieldType.TYPE, StringFieldType.TYPE);
NormalisedEvent event = new NormalisedEvent();
event.add("mySequenceField",
    STRING_SEQUENCE_FIELD_TYPE.format(list));
event.add("myDictionaryField", STRING_DICT_FIELD_TYPE.format(map));

The programming interface for constructing and using normalized events is made up of three Java classes:

  • NormalisedEvent

    The NormalisedEvent class represents a single normalized event. This class the most important part of the interface, and encapsulates the data and operations that can be performed on a single normalized event.

    Normalized events are not thread-safe. If your code will be accessing the same normalized event object (or associated iterators) from multiple threads, you must implement your own thread synchronization to prevent concurrent modification.

    A public zero-argument constructor is provided for creation of new (initially empty) NormalisedEvent objects.

  • NormalisedEventIterator

    Several of the NormalisedEvent methods return an instance of the NormalisedEventIterator class, which provides a way to step though the name-value pairs making up the normalized event, forwards or backwards.

    There is no public constructor. Iterators are created and returned only by NormalisedEvent methods.

  • NormalisedEventException

    Any errors encountered by NormalisedEvent result in instances of NormalisedEventException being thrown.

See the API reference for Java (Javadoc) for detailed information on these classes.

Plug-in support APIs for Java

This section describes other programming interfaces provided with the Apama software that may be useful in implementing transport layer and codec plug-ins for the IAF.

Logging from IAF plug-ins in Java

This API provides a mechanism for recording status and error log messages from the IAF runtime and any plug-ins loaded within it. Plug-in developers are encouraged to make use of the logging API instead of custom logging solutions so that all the information may be logged together in the same standard format and log file(s) used by other plug-ins and the IAF runtime.

The logging API also allows control of logging verbosity, so that any messages below the configured logging level will not be written to the log. The logging level and file are initially set when an adapter first starts up – see Logging configuration (optional) for more information about the logging configuration.

The Java logging API is based around the Logger class.

The recommended way of using the Logger class is to have a private final com.apama.util.Logger variable, and then create an instance in the transport or codec’s constructor based on the plug-in name, such as the following:

private final Logger logger;
public MyTransport(String name,...)
{   super(...);
   logger = Logger.getLogger(name);
}

The Logger class supports the following logging levels:

  • FORCE
  • CRIT
  • FATAL
  • ERROR
  • WARN
  • INFO
  • DEBUG
  • TRACE

It is recommended that you do not use the FATAL or CRIT log levels provided by the Logger class, which are present only for historical reasons. It is better to use ERROR for all error conditions regardless of how fatal they are, and INFO for informational messages. See Setting correlator and plug-in log files and log levels in a YAML configuration file for information about configuring log levels in the correlator.

For each level, there are three main methods. For example, for logging at the DEBUG level, here are the three main methods:

  • logger.debug(String) — Logs a message, if this log level is currently enabled.

  • logger.debug(String, Throwable) — Logs the stack trace and message of a caught exception together with a high-level description of the problem. Apama strongly recommend logging exceptions like this to assist with debugging in the event of problems.

  • logger.debugEnabled() — Determines whether messages at this log level are currently enabled (this depends on the current IAF log level, which may be changed dynamically). Apama strongly recommend checking this method’s result (particularly for DEBUG messages) before logging messages where constructing the message string may be costly, for example:

    if (logger.debugEnabled())
      logger.debug("A huge message was received, and the string
          representation of it is: "+thing.toString()+
          " and here is some other useful info: "+foo+", "+bar);
    

Note that there is no point using the Enabled() methods if the log message is a simple string (or string plus exception), such as:

logger.debug("The operation completed with an error: ", exception);

To make it easier to diagnose any errors that may occur, Apama recommends one of the following methods to log the application’s stack trace:

  • errorWithDebugStackTrace(java.lang.String msg, java.lang.Throwable ex) — Logs the specified message at the ERROR level followed by the exception’s message string, and then logs the exception’s stack trace at the DEBUG level.
  • warnWithDebugStackTrace(java.lang.String msg, java.lang.Throwable ex) — Logs the specified message at the WARN level followed by the exception’s message string, and then logs the exception’s stack trace at the DEBUG level.

See the API reference for Java (Javadoc) for more information about the Logger class.

Using the latency framework

The latency framework API provides a way to measure adapter latency by attaching high-resolution timing data to events as they stream into, through, and out of the adapter. Developers can then use these events to compute upstream, downstream, and round-trip latency numbers, including latency across multiple adapters.

The sendNormalisedEvent() and sendTransportEvent() methods contain a TimestampSet parameter that carries the microsecond-accurate timestamps that can be used to compute the desired statistics.

Javadoc documentation for com.apama.util.TimestampSet and com.apama.util.TimestampConfig classes is provided as part of the Apama documentation set. See the API reference for Java (Javadoc).

Java timestamp

A timestamp is an index-value pair. The index represents the point in the event processing chain at which the timestamp was recorded, for example “upstream entry to semantic mapper” and the value is a floating point number representing the time. The TimestampSet class defines a set of standard indexes but a custom plug-in can define additional indexes for even finer-grained measurements. When you add a custom index definition, be sure to preserve the correct order, for example, an index denoting an “entry” point should be less than an one denoting an “exit” point from that component.

Timestamps are relative measurements and are meant to be compared only to other timestamps in the same or similar processes on the same computer.

Java timestamp set

A timestamp set is the collection of timestamps that are associated with an event. The latency framework API provides functions that developers can use to add, inspect, and remove timestamps from an event’s timestamp set.

The timestamp set is represented as a dictionary of integer-float pairs, where the integer index refers to the location at which the timestamp was added and the floating-point time gives the time at which an event was there.

Java timestamp configuration object

The constructors and updateProperties() methods for transport and codec plug-ins take this additional argument: TimestampConfig.

A timestamp configuration object contains a set of fields that a plug-in can use to decide whether to record and/or log timestamp information. See the API reference for Java (Javadoc) for more information on the fields of the TimestampConfig class.

Java latency framework API

The Java interface for the latency framework is declared in the header file com.apama.util.TimestampSet class.

See the TimestampSet class in the API reference for Java (Javadoc) for detailed information on the available functions.