Codec Connectivity Plug-ins

The String codec connectivity plug-in

The String codec can be used for transports that are dealing with binary payloads or map payloads with binary data in one or more specific fields of the map which reflect the structure of the event. It provides the ability to perform bidirectional translations between binary format and string format. The codec is able to operate on arbitrary fields of the message, or the entire payload.

  • The field of a message going from the host to the transport should be of java.lang.String (Java) or const char (C++) type. The String codec translates the fields of such a message to the byte[] (Java) or buffer_t (C++) type.
  • The field of a message going from the transport to the host should be of byte[] or buffer_t type. The String codec translates the fields of such a message to the java.lang.String or const char* type.

By default, the String codec does UTF-8 encoding and decoding of a string:

  • When converting to a buffer_t or byte[], the end result is UTF-8 encoded.
  • When converting to a java.lang.String or const char*, the String codec assumes that the source (buffer_t or byte[]) is UTF-8 encoded.

To reference the String codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

stringCodec:
  libraryName: connectivity-string-codec
  class: StringCodec

You then need to add the String codec into your connectivity chain with the configuration for that instance. An example configuration may look as follows:

startChains:
  testChain:
     - apama.eventString
     - stringCodec:
         nullTerminated: false
         eventTypes:
            - com.apamax.test.MyEventType
         encoding: Latin-1
         fields:
            - metadata.foo
            - payload.bar.baz
            - payload.zot
     - myBinaryTransport

The following configuration options are available for the String codec:

Configuration option

Description

nullTerminated

It is only permitted to set this option to true when the encoding is UTF-8. It only affects the conversion to bytes for messages sent from the host towards the transport. When messages are converted from bytes (on the transport side) to strings (on the host side), a terminating null character is always permitted but never required, regardless of the value of this configuration option. If set to true, a null-terminator character is added to the end of the buffer when sending messages towards the transport.

If set to false, messages sent towards the transport do not include a null-terminator.

Default: false.

eventTypes

Specifies which event types this codec will handle. Messages with a type that is not listed or where sag.type is not set will be ignored by this codec. If omitted, the codec attempts to encode/decode the payload of all messages.

fields

The list of metadata or payload fields that are to be converted by this codec. Listed field that are not present in the event will be ignored by this codec. It is recommended that you prefix each field with either payload or metadata, for example: payload.myfield or metadata.myfield. If omitted, the codec attempts to encode the entire payload.

encoding

The character set to be used for encoding/decoding. Default: UTF-8.

If charset is set in the metadata of a message (in either direction), this will quietly override the encoding option. For example, if the encoding option is set to Latin-1 and if the message carries charset in the metadata (for example, metadata.charset=CP1252), then the payload/metadata field or the entire payload of the message is converted using the character-set value of metadata.charset.

The Base64 codec connectivity plug-in

The Base64 codec performs bidirectional translation between binary payloads and a string that is the Base64-encoding of that payload. The codec is able to operate on arbitrary fields of the message, or the entire payload.

  • The field of a message going from the host to the transport should be of byte[] (Java) or buffer_t (C++) type. The Base64 codec encodes the fields of such a message to their Base64-encoding, which will be a java.lang.String (Java) or const char (C++) type.
  • The field of a message going from the transport to the host should be a java.lang.String (Java) or const char (C++) type in Base64 format. The Base64 codec decodes the fields of such a message to the byte[] (Java) or buffer_t (C++) type.

The Base64 codec follows the MIME (Multipurpose Internet Mail Extensions) specification, which lists Base64 as one of two binary-to-text encoding schemes.

To reference the Base64 codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

base64Codec:
  libraryName: connectivity-base64-codec
  class: Base64Codec

You then need to add the Base64 codec into your connectivity chain with the configuration for that instance. An example configuration may look as follows:

startChains:
  testChain:
     - apama.eventMap
     - base64Codec:
         eventTypes:
            - com.apamax.MyEvent
         fields:
            - metadata.baz
            - payload.foo.asdf
     - myTransport

The following configuration options are available for the Base64 codec:

Configuration option Description
eventTypes Specifies which event types this codec will handle. Messages with a type that is not listed or where sag.type is not set will be ignored by this codec. If omitted, the codec attempts to encode/decode the payload of all messages.
fields The list of metadata or payload fields that are to be converted by this codec. Listed field that are not present in the event will be ignored by this codec. It is recommended that you prefix each field with either payload or metadata, for example: payload.myfield or metadata.myfield. If omitted, the codec attempts to encode the entire payload.
Info
Similar functionality is available in EPL using the Base64 EPL plug-in. The EPL plug-in can be used for converting between Base64 and UTF-8 strings. It cannot handle arbitrary binary data. See Using the Base64 plug-in for more information.

The JSON codec connectivity plug-in

The JSON codec can be used if you have a transport that is dealing with messages in the JSON format and you want your EPL application to be able to interact with it by sending and listening for events. It does not matter whether the transport

  • takes JSON-formatted data from an external system and sends it towards the host, or
  • wants to be given JSON-formatted data from the direction of the host, and then sends it to an external system, or
  • even does both of the above.

The JSON codec does bidirectional translation between JSON documents in the payload of a message (transport side) and an object in the payload of a message (host side). If the JSON codec is adjacent to the eventMap plug-in, then the JSON document on the transportwards side should be an object with fields for the event members. The Mapper codec can help move fields to match the event structure, map parts of the metadata into the payload, and support JSON values other than objects.

For example, a JSON document like

{"a":2,"b":["x","y","z"]}

on the transport side corresponds to a java.util.Map (Java) or map_t (C++) on the host side. This java.util.Map or map_t maps a string to an integer for one map entry and a string to a list of strings for the other entry. When the apama.eventMap host plug-in sees an object like this, it will be able to map it to/from an EPL event type such as the following:

event E {
  integer a;
  sequence<string> b;
}

The above assumes that either metadata.sag.type is set to E (see also Metadata values) or the apama.eventMap host plug-in has been configured with defaultEventType: E. Remember that this is completely bidirectional.

Taking a closer look at the inbound response, a typical chain would start with the HTTP client transport (see also The HTTP Client Transport Connectivity Plug-in). This transport returns a byte array containing the response body which is normally transformed into a UTF-8 string using the String codec (see also The String codec connectivity plug-in). This UTF-8 string, held in the payload, is then passed to the JSON codec and transformed into a data structure that the JSON represents. At this point, the metadata.http map holds the headers and other elements of the HTTP response, and is used to set the metadata.sag.type and add to the response payload. After the mapping rules have been applied, the payload is passed to the apama.eventMap host plug-in and converted to the event that is defined in the metadata.sag.type.

The apama.eventMap host plug-in can convert a map into an event. This map will either be created from the JSON if it contains an object, or it will need to be created by mapping in the chain. In the example below, we get a JSON object in the response and it maps to the EPL event shown:

// Example response payload contains a JSON object.

response = {"hostname":"host","name":"correlator","value":"expected value"}

// Maps to HTTPResponse event putting "value" into extra fields dictionary
// and adding the id from metadata.

HTTPResponse(id,"host","correlator",{value:"expected value"})

The top-level value in the JSON will normally be an object, which can be mapped directly to an Apama event. However, it is also possible to use other JSON types such as string, array, boolean or number. In those cases, you will need to map the decoded payload before it can be received by Apama. For example:

// Other valid JSON responses that require mapping
// payload = "valid"
// payload = 3.14
// payload = []

HTTPResponse:
  towardsHost:
    mapFrom:
    - payload.contents: payload

// The event can define a field or use @ExtraFieldsDict
// and provide a map for the contents.

event MyEvent{ any contents; }

It is not supported to have a JSON null as the top-level value in the payload. Empty payloads are treated as special control messages in connectivity chains and the JSON codec will ignore such messages. Nulls can occur elsewhere in the JSON structure and will be mapped to the empty any type in EPL.

The content of the event that forms the JSON request will be transformed similarly, so care needs to be taken over how the content will end up in the request JSON:

event Example{}

// Creates an empty JSON object in the payload.

{}

// Any and Optional need careful handling if the values
// are not set. They default to null.

event Example{ optional<string> test1; any test2 }

// Maps to

{"test1":null,"test2":null}

Since the JSON standard does not permit floating point “special” values such as Infinity and NaN (see Support for IEEE 754 special values), float fields containing such values are represented as strings when generating JSON using this codec. For example, this is represented as [1.2, 3.4, "Infinity", "NaN"] in the generated JSON. If you need different handling of special values (for example, representing them as a JSON null), add a custom codec to your connectivity chain before the JSON codec to make any necessary conversions. In the other direction, when parsing JSON, the codec gives an error if requested to parse one of these special values that is not wrapped as a string.

There are two identically behaving versions of the JSON codec, one implemented using C++ and the other implemented using Java. The C++ version of the codec has the same behavior as the Java version, but it usually gives a better performance. In particular, changing between Java and C++ is expensive, so you should match the implementation language of the codec to that of the adjacent codecs. See Deploying plug-in libraries for more information.

A Java sample is provided in the samples/connectivity_plugin/java/JSON-Codec directory of your Apama installation. This provides a more detailed end-to-end example of this codec (along with the source code to this codec), which allows an EPL application to consume and emit JSON as EPL events.

To reference the JSON codec, an entry such as the one below is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins). You can then just have a JSON codec in a chain in between the host and a transport. No configuration is required for this plug-in.

Configuration file entry for Java:

jsonCodec:
  directory: ${APAMA_HOME}/lib/
  classpath:
        - json-codec.jar
  class: com.softwareag.connectivity.plugins.JSONCodec

Configuration file entry for C++:

jsonCodec:
  libraryName: connectivity-json-codec
  class: JSONCodec

The following configuration option is available for the JSON codec:

Configuration option

Description

filterOnContentType

This option can be used in chains which have multiple messages of different types through the chain with a transport or mapper configuration which is setting the metadata.contentType field in the messages. It allows the JSON codec to only process messages which (hostwards) are correctly encoded in JSON or (transportwards) should be encoded in JSON. Other messages need to be handled by another codec in the chain which will handle non-JSON-encoded messages. If set to true and the metadata.contentType is set to anything that does not match the pattern "^application/([^/]*[+])?json(;[^=;]+=(\".*\"|[^=;]+))*$", or if it is not set, then the codec will ignore the event in either direction and pass it on unmodified.

If set to false, then the codec will attempt to process the message no matter what the value of metadata.contentType is.

Default: false.

Info
Equivalent functionality is available with the JSON EPL plug-in. See Using the JSON plug-in for detailed information.

The Classifier codec connectivity plug-in

The Classifier codec can be used to take messages from a transport and assign them message types suitable for the host. Typically this is the type of an EPL event, which corresponds to the metadata field sag.type (see also Metadata values) when you are using the apama.eventMap host plug-in. The Classifier codec can inspect the message payload and metadata in order to classify messages to the correct type. If it finds a match, it sets the sag.type appropriately, overwriting anything which was there before.

To reference the Classifier codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

classifierCodec:
  libraryName: ClassifierCodec
  class: ClassifierCodec

You then need to add classifierCodec into your connectivity chains with the configuration for that instance. An example configuration may look as follows:

classifierCodec:
  rules:
    - SomeType:
        - payload.someField: someValue
        - payload.anotherField:
    - AnotherType:
        - metadata.metadataField: true
    - ThirdType:
        - regex:payload.field2: /bar/([0-9]{3})([a-zA-Z]{3})/[!@#%\^&\*]+
        - payload.field3: something
    - FourthType:
        - regex:payload.field1: Valid PLAIN String
    - FallbackType:

The top-level configuration element is just rules. It contains a list of one or more types which can be assigned to messages. Each type contains a list of rules to match against the type with the following properties:

  • Types are evaluated in order and the first one to match is assigned to a message.
  • If the list of rules for a type is empty, then it matches any message. There should be only one such type and it must be last. With FallbackType in the above example configuration, it is always guaranteed that a type is set on a message (because FallbackType is a last resort).
  • Rules within a type are evaluated in order, and comparisons stop on the first failure so that common cases are evaluated first.
  • Field names must start with “metadata.” or “payload.”, or with “regex:metadata.” or “regex:payload.” if you are using regular expressions.
  • Metadata and payload field names which contain a period (.) refer to nested map structures within the metadata or payload. For example, metadata.http.headers.accept refers to a map called “http” within the metadata, which contains a map called “headers”, which contains an element called “accept”.
  • Empty rules match if the field is present (even if empty) with any value. With SomeType in the above example configuration, this rule matches if anotherField in the payload contains any value. This does not apply for regular expressions where empty rules are not allowed.
  • A non-empty rule usually looks for an exact string match, unless the field name begins with “regex:”. In this case, the rule looks for a regular expression match against the entire field value. For example, if field2 of the payload in the above example configuration is equal to /bar/123aBc/&& or another matching string, and field3 contains something, then the message can be classified as being of type ThirdType.
  • Regular expression matches are performed using the ICU library (see the ICU User Guide at https://unicode-org.github.io/icu/userguide/strings/regexp.html for detailed information) with the default flags - single line and case sensitive.
  • All rules within a type must be true to match that type. For the above example configuration, this means that if anotherField in the payload exists, but someField does not contain someValue, then SomeType does not match.
  • If no types match, then the sag.type metadata field remains unchanged.
  • For “payload.” rules to match, the payload must be a java.util.Map (Java) or map_t (C++) with string keys.
  • Messages coming from the direction of the host do not interact with the Classifier codec at all. Use the apama.eventMap host plug-in, which always sets a sag.type for messages going from the host towards the transport.

If you want to encode an or rule, then you can list the same type twice with different sets of rules.

The Mapper codec connectivity plug-in

The Mapper codec can be used to take messages from a transport which do not match the schema expected by the host and turn them into messages which are suitable for the host. Typically this means making sure that the fields in the message have the same names as the fields in the corresponding EPL event type if you are using the apama.eventMap host plug-in (see also Translating EPL events using the apama.eventMap host plug-in). This codec can move fields around between the payload and the metadata and set the values of fields which have no value from the transport. It is bidirectional and can also map messages coming from the host into a format suitable for the transport.

The source code for this plug-in is also shipped as a sample in the samples/connectivity_plugin/cpp/mapper directory of your Apama installation.

To reference the Mapper codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

mapperCodec:
  libraryName: MapperCodec
  class: MapperCodec

You then need to add mapperCodec into your connectivity chains with the configuration for that instance. If you are also using the Classifier codec to assign types to incoming messages, then you must have that codec on the transport side of the Mapper codec. An example configuration may look as follows:

- mapperCodec:
    allowMissing: true
    SomeType:
      towardsHost:
        mapFrom:
          - metadata.targetField1: metadata.sourceField1
          - payload.myField2: metadata.myField2
          - metadata.targetField3: payload.sourceField3
          # to set the correlator channel on a per-message basis:
          - metadata.sag.channel: payload.mychannel
          # to move all fields from payload to metadata, use:
          # - metadata: payload
        copyFrom:
          - metadata.targetField4: metadata.sourceField4
        forEach:
          - payload.listA:
              mapFrom:
                - targetFieldA: sourceFieldA
              copyFrom:
                - targetFieldB: sourceFieldB
        set:
          - payload.fieldName: An overridden value
        defaultValue:
          - payload.targetFieldX: A default value

      towardsTransport:
        mapFrom:
          - metadata.myField2: payload.myField2
          - payload.sourceField3: metadata.targetField3

    "*":
      towardsHost:
        defaultValue:
          - payload.targetFieldY: A different value

An example message input and output for the above mapping a SomeType event towards the host, as logged by the Diagnostic codec (with extra spacing to make it clearer), is:

[premap] Towards Host: {myField2:Field2,
                        sag.type:SomeType,
                        sourceField4:Field4,
                        sourceField1:Field1} /
                       {listA:[{sourceFieldB:Beta,
                                sourceFieldA:Alpha},
                               {sourceFieldB:Brian,
                                sourceFieldA:Andrew}],
                        fieldName:Gamma,
                        sourceField3:Field3}

[postmap] Towards Host: {targetField3:Field3,
                         sag.type:SomeType,
                         targetField1:Field1,
                         sourceField4:Field4,
                         targetField4:Field4} /
                        {listA:[{targetFieldA:Alpha,
                                 sourceFieldB:Beta,
                                 targetFieldB:Beta},
                                {targetFieldA:Andrew,
                                 sourceFieldB:Brian,
                                 targetFieldB:Brian}],
                         fieldName:An overridden value,
                         myField2:Field2,
                         targetFieldX:A default value,
                         targetFieldY:A different value}

The configuration of the Mapper codec is nested, with a map of type names, each containing directions, each containing actions, each containing rules. The type name corresponds to the sag.type metadata field (see Metadata values). Instead of the name of a type, the special symbol "*" (which must include the quotes so that it can be parsed in the YAML format correctly) can be used to list rules to apply to all types. Messages are first processed with any rules matching their specific type, then any rules under "*" are applied.

The rules for a type are split into two directions:

  • towardsHost - Messages going from the transport to the host.
  • towardsTransport - Messages going from the host to the transport.

If you are writing a bidirectional chain, these rules will usually be the converse of each other.

Within a direction, the following actions can be applied. Each action operates on one or two fields. Each field can be the entire payload, a field within the payload or metadata, or a nested field; see below for details of field names.

  • mapFrom - Move a value to a new field from the specified field (target: source).

    For the above example configuration, this means that if a message of type SomeType is being passed from the transport towards the host, then the field sourceField1 from the metadata is removed and its value is put into the field targetField1 of the metadata. The second mapFrom rule moves the value of myField2 from the metadata to the payload, using the same field name. It is always the field on the left-hand side of the rule which is the target of the move operation, regardless of whether messages are moving towards the transport or towards the host. For bidirectional message types, it is quite common to have rules in towardsTransport that are the inverse of the towardsHost rules, as is the case for myField2 in this example.

  • copyFrom - This action is exactly the same as the mapFrom action except that the source field is not removed after the copy.

  • forEach - Iterate through all elements of a given sequence and apply the supplied mapping actions.

    For the above example configuration, this means that if a message of type SomeType is being passed from the transport towards the host and if the message payload contains a sequence field listA, then for each element of listA the subrules of mapFrom and copyFrom are applied. That is, for every child element of listA, the field sourceFieldA is mapped to targetFieldA (mapFrom) and the value of field sourceFieldB is copied to targetFieldB (copyFrom).

    Note that the rules can only be applied if the child elements of the sequence are another event or a dictionary (with string keys).

  • set - Set a metadata field, payload field or the top-level payload to a specific value, regardless of whether that field already exists or not. For the above example configuration, this means that if a message of the type SomeType is being passed from the transport towards the host, then the field fieldName will be set to An overridden value.

  • defaultValue - If a metadata field, payload field or top-level payload is unset or empty/null, then set it to a specified value. For the above example configuration, this means that if a message of any type is being passed from the transport towards the host, then the field targetFieldY of its payload is set to A different value if - and only if - that field does not already exist in the map. The following applies:

    • A top-level payload can be mapped to a string or map.
    • A payload field or metadata field can be mapped to a string, list or map.

Each of the above actions has a list of rules of the form target_field: source (where source is either a field name or a string value). Notes:

  • The actions are applied in the following order: copyFrom, mapFrom, forEach, set, defaultValue.

  • Rules are applied in order within each action section, so you can move the contents out of a field and then replace it with a different value.

  • The left-hand side is the field whose value is being set.

    In the case of forEach, the left-hand side field corresponds to the sequence to which the subrules are applied.

  • Field names must start with “metadata.” or “payload.”, or must be the string “payload” or “metadata” - except for those within a forEach action, in which case they only name a field within an element of the sequence.

  • Field names which contain a period (.) refer to nested map structures within the metadata or payload. For example, payload.http.headers.accept refers to a map called “http” within the payload, which contains a map called “headers”, which contains an element called “accept”.

    Special cases: in metadata source expressions, a field name with a period (.) in it is looked up at the top-level and used if it is found, otherwise as a nested name. Using the sag. prefix as a target does not create a new map within the metadata.

  • A copyFrom or mapFrom rule where the source field does not exist uses the default value if the defaultValue exists or if a subsequent copyFrom or mapFrom rule exists for the same destination field. If none of these fallback options exist (like a defaultValue), then the message is discarded with an error.

  • The Mapper codec also accepts an allowMissing configuration item at the top level. This affects all rules in the Mapper codec and defaults to false. If allowMissing is set to true, an error is not raised when a defaultValue (or a subsequent copyFrom or mapFrom rule) has not been set and a source field is missing. allowMissing needs to be defined at the same level as the event types.

  • If setting a payload field on a payload that is not a map, the payload is first overwritten with an empty map.

  • payload in the left-hand side or right-hand side of a rule (rather than payload.*fieldname*) refers to the entire payload object. This allows you, for example, to map an entire string payload into a field in a map in the resulting message’s payload, or vice-versa.

  • Any rules that mention payload.*fieldname* assume that the payload is a java.util.Map (Java) or map_t (C++) with string keys.

The Batch Accumulator codec connectivity plug-in

Events being written from the correlator to a connectivity transport are automatically batched for performance reasons. Many transports also read batches of incoming messages and send them into the correlator in a batch as well. However, some transports do not perform this batching and deliver messages one at a time. For such transports, performance can be increased by adding the batching before it is parsed by the correlator. This can be done with the Batch Accumulator codec.

For the transports that are provided out of the box, all the message-bus transports such as MQTT and Kafka already provide batching, The HTTP client, however, does not benefit from it because of the request/response nature. If you are using the HTTP server in submission-only mode and want to achieve a high rate of requests with multiple clients, then the Batch Accumulator codec can be useful. It may also be useful with any custom transports you write.

The Batch Accumulator codec automatically tunes batch sizes from one up, depending on the rate of incoming requests, and requires no manual tuning. It does not wait to accumulate a batch for a certain period of time and so does not introduce unnecessary latency. The batching is performed for messages going from the transport to the host. Messages from the host to the transport are passed through verbatim since they are already in batches.

To load the Batch Accumulator codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

batchAccumulatorCodec:
  libraryName: connectivity-batch-accumulator-codec
  class: BatchAccumulator

You then need to insert the batchAccumulatorCodec in your connectivity chain just before the transport. For example:

dynamicChains:
    http:
      - apama.eventMap
      - mapperCodec:
            ...
      - classifierCodec:
            ...
      - jsonCodec
      - stringCodec
      - batchAccumulatorCodec
      - httpServer:
           automaticResponses: true

The Batch Accumulator codec can be inserted anywhere in the chain, but it is better to insert it close to the transport. It is entirely transparent to the plug-ins either side.

By default, the Batch Accumulator codec has a maximum batch size of 1000. This means if more than 1000 messages are waiting to be processed by the host-bound thread, requests from the transport will block. It also means you can be using up to 1000 times your message size in memory in outstanding events. You can configure a different batch size with the maxBatchSize configuration option (see below).

The Batch Accumulator codec exposes the actual size of the queue via a user-defined status value. This is available through the various monitoring tools for the correlator with the name user-chain.batchAccumulator.queueSize. This will be the most recent batch size removed from the queue. See also User-defined status reporting from connectivity plug-ins.

The following configuration option is available for the Batch Accumulator codec:

Configuration option

Description

maxBatchSize

Optional. The maximum number of messages in a batch. This must be a positive integer, at least 1.

Default: 1000.

The Message List codec connectivity plug-in

The Message List codec can be used with a service which supports processing a batch of events in a single request by combining multiple requests into a single list. This requires support from the connected service since the Message List codec changes the type of events being sent.

At high event rates, the correlator will produce batches of messages rather than a single message. Some transports, such as the HTTP client, are inherently single-event based and the maximum rate they can send at depends on the speed of processing a single message. The Message List codec can combine a batch of messages being sent from the correlator into a single message whose body is a list of the original messages in the batch. If the destination service supports this, then the whole batch can be delivered in a single round-trip from the transport.

If the service produces replies which are also a list of replies, then the Message List codec splits them up and delivers them back to the correlator as separate messages.

You need a transport or downstream codec which expects the lists produced by the Message List codec as well as a service which supports them. Often this will be by encoding the lists in something like JSON and then using the String codec to produce a binary message for the transport.

To load the Message List codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

messageListCodec:
   libraryName: connectivity-message-list-codec
   class: MessageListCodec

You then need to add the messageListCodec to your connectivity chain in the appropriate place. A typical use might be for accessing a web service which has been configured to process lists of messages in JSON format. The HTTP client chain for such a service might look like this:

startChains:
   webServiceChain:
      - apama.eventMap
      - mapperCodec:
            ...
      - classifierCodec:
            ...
      - messageListCodec:
            metadataMode: first
      - jsonCodec
      - stringCodec
      - httpClient:
           host: ${WEBSERVICE_HOST}
           port: ${WEBSERVICE_PORT}

With the above example, the lists produced by the Message List codec are being encoded into JSON to be consumed by the web service.

The following configuration options are available for the Message List codec:

Configuration option

Description

maxBatchSize

Optional. The maximum number of events that are to be combined into a single list. The actual number will depend on the correlator’s accumulation of messages to send. This must be a positive integer.

Default: 1000.

metadataMode

Required. The strategy for handling the metadata of multiple requests. This must be one of the following: - first - Just use the metadata from the first message in the batch as the metadata for the list message.

  • splitBatch - Only add items whose metadata is identical from the batch to a list. Create a new list message when the metadata changes.
  • requestIdList - Use the metadata from the first message, but set metadata.requestId to be a list containing the requestId of each message
  • member - Instead of creating a list of payloads, create a list of maps, each with two members, where metadata refers to the metadata for that message and payload refers to the payload of that message.

When converting a list back to separate messages, the above mapping is performed in reverse to create the individual messages.

The main choice to make is how to handle the metadata when combining multiple messages into a single message. Which choice you will make depends on your application. Let us assume that we have a batch of messages of the following form:
metadata = { requestId: 5, http: { method: PUT, path: /add } }
payload = { name: "Matt", age: 30 }

The payload values and the requestId vary with each message. The examples below show how the Message List codec combines two messages in a batch using the different metadataMode strategies.

metadataMode: first
metadata = { requestId: 5, http: { method: PUT, path: /add } }
payload = [ { name: "Matt", age: 30 }, { name: "James", age: 21 } ]
metadataMode: requestIdList
metadata = { requestId: [5, 6], http: { method: PUT, path: /add } }
payload = [ { name: "Matt", age: 30 }, { name: "James", age: 21 } ]
metadataMode: splitBatch
metadata = { requestId: 5, http: { method: PUT, path: /add } }
payload = [ { name: "Matt", age: 30 } ]
metadata = { requestId: 6, http: { method: PUT, path: /add } }
payload = [ { name: "James", age: 21 } ]
metadataMode: member
metadata = { }
payload = [
   {
     metadata: { requestId: 5, http: { method: PUT, path: /add } },
     payload: { name: "Matt", age: 30 }
   },
   {
      metadata: { requestId: 6, http: { method: PUT, path: /add } },
      payload: { name: "James", age: 21 }
   }
]

You need to construct your application and the web service it is calling for the strategy that you have chosen. In some cases, you may need an additional Mapper codec to set some of the metadata for the combined message on the transport side of the Message List codec.

The Unit Test Harness codec connectivity plug-in

The Unit Test Harness codec and the Null Transport transport make it easy to send test messages into a connectivity chain and/or to write out received messages to a text file, without the need to write any EPL in the correlator, which is very useful for writing unit tests for connectivity plug-ins, and especially codecs.

  • Unit Test Harness. This is a Java codec that is useful for unit testing your plug-ins in isolation, by writing messages from the chain to a text file, and/or sending test messages into the chain from a text file (in either direction) without the need to use or configure the host or transport at either end of the chain.
  • Null Transport. This is a trivial Java transport that does not send any messages towards the host and ignores any messages received from the host direction. If you are unit-testing a codec plug-in, no sending or receiving functionality is required for the transport. However, as a transport needs to exist at the end of a connectivity chain, you should use the Null Transport as the chain’s transport.

The following example configuration shows the how the harness could be used for testing the behavior of a codec plug-in in the “towards host” direction, by sending test messages through the plug-in being tested and writing the messages from the plug-in out to a file:

connectivityPlugins:
  unitTestHarness:
    classpath: ${APAMA_HOME}/lib/connectivity-unit-test-harness.jar
    class: com.softwareag.connectivity.testplugins.UnitTestHarness
  nullTransport:
    classpath: ${APAMA_HOME}/lib/connectivity-unit-test-harness.jar
    class: com.softwareag.connectivity.testplugins.NullTransport
  # plug-in being tested would also be defined here

startChains:
  MyTestChain:
    - apama.eventMap
    # this is a unit test, so the host is not used

    - unitTestHarness:
        pluginUnderTest: towardsTransport
        writeTo: output-towards-host.txt

    - myCodecPluginBeingTested

    - unitTestHarness:
        pluginUnderTest: towardsHost
        readFrom: ${TEST_INPUT_DIR}/input-towards-host.txt

    - nullTransport
    # this is a codec unit test, so no transport functionality is required

Apama ships an example of some PySys test cases using the Unit Test Harness codec as part of the JSON codec. You can find it in the samples\connectivity_plugin\java\JSON-Codec\tests directory of your Apama installation.

The following configuration options are available for the Unit Test Harness codec:

Configuration option

Description

pluginUnderTest

Required. Either towardsTransport or towardsHost, indicating which direction the plug-in being tested is along the chain relative to this unitTestHarness instance. Messages from the readFrom file are sent down the chain towards the direction identified by pluginUnderTest, and messages received from that direction of the chain (that is, that were sent towards the opposite of the pluginUnderTest direction) are written to the writeTo file.

For example, a chain might have the host plug-in followed by a unitTestHarness with pluginUnderTest=towardsTransport followed by a codec plug-in that you are testing, followed by a unitTestHarness with pluginUnderTest=towardsHost followed by a nullTransport instance.

writeTo

The path of a UTF-8 text file to which messages from the plug-in under test are written. If empty, no messages are written to a text file.

Default: empty.

readFrom

The path of a UTF-8 text file from which messages are read and sent towards the plug-in under test, or a directory containing such text files. When this plug-in is started, messages are read from the file and sent towards the plug-in under test. If a directory is specified, then the same is done for any new files in that directory, including any files that are subsequently written to that directory while the plug-in is running. If empty, no messages are sent by the unitTestHarness.

Default: empty.

logOutput

By default, the unitTestHarness writes a single-line log message to the host’s log file for each message from the plug-in under test, using the same format as writeTo. This may be useful as an alternative to writeTo, or as a way to create test cases that block until either the host log file contains the final expected message or an error message. If you want to disable this behavior because the log file is becoming too large, then set this to false.

Default: true.

passThrough

By default, any messages received by the unitTestHarness are not passed on to the next plug-in in the chain (typically the transport or the host), as usually writing or logging the messages is all that is required and passing them on would require extra configuration in the host or transport to avoid error messages. If you want to include the host or transport in your testing, then set this to true.

Default: false.

echoBack

If set to true, messages received from the plug-in under test are automatically sent back in the opposite direction towards the plug-in under test. This is useful for testing the round-trip mapping behavior of a codec plug-in. Default: false.

The file format for readFrom and writeTo is identical, containing a metadata line followed by a payload line, repeating until the end of the file. Blank lines and lines starting with a hash (#) are ignored. The file encoding for both is always UTF-8 (which is the same as ASCII for the most common alphanumeric and punctuation characters).

A metadata line is encoded as a single-line JSON string. For example:

metadata={"aim":"purpose of this test message", "other stuff":[1, 2, 3]}

A payload line can use one of the following formats, depending on the type:

  • Any message payload except for byte[] can be encoded using as a single-line JSON string. For example:

    payload_json={"a\nb":123, "c":[true]}
    

    There are some special JSON formats that can be used:

    • JSON does not allow non-string keys. A special format of a JSON object with a ".Map" key and a value of a list of size 2 lists will be converted to a map. For example:

      {".Map":[[123,"abc"],[987,"zyx"]]}
      

      This would be converted to {123:"abc",987:"zyx"} if JSON allowed it. These may also be nested, for example:

      {".Map":[[{".Map":[[333,"abc"],[555,"zyx"]]},"value"]]}
      

      Any other keys in the ".Map" object will be ignored.

    • A special format of a JSON map object with a ".NamedMap" key to a string value and a ".Map" key will create a NamedMap Java class which can be used with the EPL any type. For example:

      {".NamedMap":"MyEvent",".Map":{"i":123}}
      

      The contents of ".Map" will be named with the MyEvent name which can be used to determine the type of the converted any type variable. The ".Map" value may also use the special formatting above.

  • Although JSON can be used to represent simple string payloads, it is sometimes simpler to use payload_string format for these as it removes the need to follow JSON quoting rules. For example:

    payload_string=a " b
    

    Note that the above can only be used if there are no new lines in the string. If there are new lines, use a JSON string instead. For example:

    payload_json="my\nstring"
    
  • For binary payloads (that is, a message whose payload is a byte array), use payload_byte[], which takes a base64-encoded representation of binary data. For example:

    payload_byte[]=SGVsbG8gV29ybGQ=
    

The Diagnostic codec connectivity plug-in

The Diagnostic codec can be used to diagnose issues with connectivity plug-ins. It logs the events that go through a connectivity chain in either direction.

To reference the Diagnostic codec, an entry such as the following is required in the connectivityPlugins section of the configuration file (see also Configuration file for connectivity plug-ins):

diagnosticCodec:
  libraryName: DiagnosticCodec
  class: DiagnosticCodec

You can then add the diagnosticCodec at any point in a connectivity chain. With no further configuration, the codec logs to the correlator log file at INFO level.

An example configuration may look as follows:

startChains:
  myChain:
     - apama.eventMap
     - diagnosticCodec:
          tag: host
          output: logger
          logLevel: DEBUG
     - myCodec # the codec being inspected
     - diagnosticCodec:
          tag: transport
          output: logger
          logLevel: DEBUG
     - myTransport

The following configuration options are available:

Configuration option

Description

tag: string

If a chain has multiple diagnosticCodec instances, you can specify a tag for each instance to distinguish it from other instances. string is the tag that is used to prefix the messages from the current instance. Default: empty.

output: mode

Defines the file to which the codec logs its output. mode can be one of the following: - logger - Default. The codec logs to the correlator log file at the log level that is defined with logLevel.

  • file - The codec logs to the file that is defined with fileName.

logLevel: level

Applies when the logger mode is defined. level can be any correlator log level. Default: INFO.

fileName: file

Applies when the file mode is defined. file is either the path to a file or one of the special strings stdout or stderr. Default: stdout.

When writing to the correlator log file, the Diagnostic codec replaces any non-printable ASCII characters (those with ASCII values less than 0x20, which includes tab and newline) with an underscore (_) character.
Output to files

If the mode of the output configuration option is set to file, the Diagnostic codec formats the output messages as follows:

[tag] direction: metadata / payload

where:

  • tag is optional. This is the tag that is defined in the configuration (or omitted completely if no tag is configured). If a tag is present, it is enclosed in square brackets.
  • direction is either “Towards Host” or “Towards Transport”.
  • metadata is the content of the metadata at the point in the chain where the Diagnostic codec has been placed.
  • payload is the content of the payload at the point in the chain where the Diagnostic codec has been placed.

The following is an example of an output message that is written to a file:

[host] Towards Transport: {sag.type:test.EventType, sag.channel:myChain} / {t:Isabelle,isB:true}
Output to loggers

If the mode of the output configuration option is set to logger, the Diagnostic codec formats the output messages with an additional prefix:

timestamp loglevel [threadID] - <connectivity.codecname.chainname>

where:

  • timestamp is the date and time that the output was logged.
  • loglevel is configured in the Diagnostic codec configuration.
  • threadID is the unique integer identifier of the thread that logged the message.
  • codecname is the name of the Diagnostic codec listed in the configuration (usually “diagnosticCodec”).
  • chainname is the name of the codec chain listed in the configuration.

The following is an example of an output message that is written to a log file:

2020-03-16 17:45:14.472 DEBUG [18744] - <connectivity.diagnosticCodec.myChain> [host] Towards Transport: {sag.type:test.EventType,sag.channel:myChain} / {t:Isabelle,isB:true}