Using connectivity plug-ins

Connectivity plug-ins allow plug-ins to transform and handle delivery of events.

The samples/connectivity_plugin/application/genericsendreceive directory of your Apama installation includes a simple sample which provides an easy way to get started with sending and receiving messages to or from any connectivity plug-in. For more information, see the README.txt file in the above directory and Sending and receiving events with connectivity plug-ins.

Overview of using connectivity plug-ins

Connectivity plug-ins can be written in Java or C++, and run inside the correlator process to allow messages to be sent and received to/from external systems. Individual plug-ins are combined together to form chains that define the path of a message, with the correlator host process at one end and an external system or library at the other, and with an optional sequence of message mapping transformations between them.

You can configure connectivity plug-ins and also develop applications that use them with Apama Plugin for Eclipse. To do so, you have to add an instance of the User Connectivity connectivity bundle to your project. See Adding connectivity and adapter bundles to projects or Creating and managing an Apama project from the command line for more information.

A configuration file describes both the chains and the plug-ins making up each chain. The configuration file is written using the YAML markup language, and can express structured configuration (maps, lists and simple values) for plug-ins. The default text encoding of the configuration file is UTF-8.

An example configuration may look like the following:

connectivityPlugins:
  stringCodec:
    libraryName: connectivity-string-codec
    class: StringCodec
  mapperCodec:
    libraryName: MapperCodec
    class: MapperCodec
  jsonCodec:
    libraryName: connectivity-json-codec
    class: JSONCodec
  HTTPClientTransport:
    libraryName: connectivity-http-client
    class: HTTPClient
startChains:
  weatherService:
    - apama.eventMap:
        defaultEventType: com.apamax.Weather
    - mapperCodec:
        "*":
          towardsTransport:
            mapFrom:
              - metadata.requestId: payload.id
            defaultValue:
              - metadata.http.path: /data/2.5/weather?q=Cambridge,uk
              - metadata.http.method: GET
    - jsonCodec
    - stringCodec
    - HTTPClientTransport:
        host: api.openweathermap.org

A chain is a combination of plug-ins with configuration. Every chain consists of the following:

  • Codec plug-in. Optionally, one or more codec plug-ins are responsible for applying transformations to the messages (for example, the JSON codec in the above example) to prepare them for the next plug-in in the chain.
  • Transport plug-in. One transport plug-in is responsible for sending/receiving messages to/from an external system (for example, HTTPClientTransport in the above example).
  • Host plug-in. One built-in host plug-in is responsible for sending/receiving messages to/from the correlator process that is hosting the chain. These are built-in plug-ins (which do not need to be specified in the connectivityPlugins stanza) which the correlator supports. Host plug-ins determine in which format events are passed in and out of the correlator. Thus, a chain should specify a host plug-in that is compatible with the next codec or transport element in the chain. Host plug-ins can also specify on which channel the chain receives events from the correlator, and can specify a default channel to send events in to the correlator (for example, apama.eventMap in the above example).

Each transport plug-in and codec plug-in used in the chain must also be described in the connectivityPlugins stanza. All of the plug-ins in a chain can optionally take configuration that is specified in the configuration file nested below them.

Plug-ins can pass messages in a number of different forms (strings, maps, plug-in-specific objects). Codecs can be used to translate from one form into another. For example, the JSON codec in the above example would convert the map objects from the host plug-in to strings in JSON format. Transport plug-ins and codec plug-ins written in Java and C++ may be used together in the same chain regardless of language, using strings or maps of values to represent messages passed across the language boundary.

Plug-in chains support sending events in both directions, to and from the external system:

  • An Apama application can send events to a connectivity chain in the same way as it would send them to any other receiver connected to the correlator, that is, using the send or emit keywords. Events from the EPL application are translated into the form specified by the host plug-in (the first in the chain configuration). They are then passed through each codec in turn, and then delivered to the transport. The host plug-in (apama.eventMap in the above example) by default listens for events from the application using the chain’s name as a channel name. The host plug-in can be configured to listen on a specific set of channels with the subscribeChannels configuration property.
  • Events from a connectivity chain’s transport are passed through the codecs in the reverse order and are translated by the host plug-in to Apama events which are enqueued to the Apama application on the desired channel. The channel can be specified per event, or a default channel can be configured in the host plug-in using the defaultChannel configuration property.

See Host plug-ins and configuration for more information on the above mentioned configuration properties.

Static and dynamic connectivity chains

Chains can be created statically using a YAML configuration file or dynamically at runtime. How the chains are created depends on the type of transport:

  • Some transports have a dynamic chain manager which manages chain creation in a transport-specific way. New chains are created, for example:

    • in response to external requests (for example, for each connection made to the HTTP server connectivity plug-in), or
    • when an Apama channel with a particular prefix is first used from EPL (for example, the Universal Messaging connectivity plug-in creates a chain by default when a channel beginning with “um:” is used from EPL, in a monitor.subscribe(...) method or a send... to statement). These transports always have a dynamicChainManagers section in their YAML configuration file. The connectivity chains are created dynamically by a transport chain manager plug-in, using chain definitions specified in dynamicChains. See Configuration file for connectivity plug-ins for information on how to configure dynamic chain managers and dynamic chains in a YAML configuration file.

    For more information on dynamic chain managers, see Requirements of a transport chain manager plug-in class.

  • For transports that do not provide a dynamic chain manager, chains are created either

    • statically using the startChains section of the YAML file, or
    • dynamically from EPL using ConnectivityPlugins.createDynamicChain. See Creating dynamic chains from EPL. A transport that permits user-controlled chain creation never has a dynamicChainManagers section in its YAML configuration file.

See the documentation for each transport in Standard connectivity plug-ins on how chains are created. See Configuration file for connectivity plug-ins for more details on startChains, dynamicChains and dynamicChainManagers.

Configuration file for connectivity plug-ins

A configuration file for the connectivity plug-ins is specified using the --config option when starting the correlator with the correlator executable. It is possible to specify multiple configuration files. See the description of the --config option in Starting the correlator.

A configuration file for the connectivity plug-ins is written in YAML. See also Using YAML configuration files.

A configuration file should contain a map at the top level which has keys for connectivityPlugins and some of startChains, dynamicChains and dynamicChainManagers.

  • The value of connectivityPlugins is a map which specifies how each plug-in is to be loaded. The keys name the plug-ins, and the values specify how the host loads the plug-ins. Plug-ins may be written in:

    • Java. In this case, a class key and a classpath key should exist.

      The class is name of the plug-in class, which must include the package. The plug-in’s documentation specifies the class name to be used. The class is a transport, a codec or dynamic transport chain manager. See also Requirements of a plug-in class.

      The classpath can be either a single string or a list of strings. For example:

      • Single string:

        classpath: one.jar
        
      • List of strings where each string is written on a new line with a preceding dash and space:

        classpath:
         - one.jar
         - two.jar
         - three.jar
        
      • List of strings where the strings are delimited by semicolons (;). For example:

        classpath: one.jar;two.jar;three.jar
        

      Each string can name an absolute or relative jar file or directory.

      An optional directory key specifies a directory which is where the jar files will be found (unless an absolute path is specified for a classpath element).

    • C++. In this case, a class key and a libraryName key should exist.

      The class is the base name of the class, without a package. The plug-in’s documentation specifies the class name to be used. The class is a transport, a codec or dynamic transport chain manager. See also Requirements of a plug-in class.

      The libraryName is the base filename name of the library, excluding the operating system-specific prefixes and suffixes (that is, excluding the “lib” prefix and “.so” suffix for UNIX, and the “.dll”" suffix for Windows).

      An optional directory key specifies a directory which is where the library will be found (see Deploying plug-in libraries).

    globalConfig is an optional map providing default configuration options for this plug-in, which are used by all chains/chain definitions using this plug-in that do not provide their own value for them. The globalConfig configuration can be overridden by configuration per chain.

  • Chains under startChains are created at startup. The value of startChains is a map where each key is a string that names a chain. Each value should be a list, naming the plug-ins that make up the chain. Each chain must contain one host plug-in, which is one of the built-in supported host plug-ins, optionally followed by a number of codecs, and end with a plug-in that is the transport. Configuration can optionally be specified for each plug-in, by following the plug-in name with a colon and space, and providing the configuration below it. Note that in YAML terms, the chain entry is a map rather than a string.

  • The dynamicChains map is used to provide chain definitions that are used by chain manager plug-ins or EPL code that dynamically create chain instances after the correlator has started. Each key in the map is a chain definition identifier, which is the string that will be used by the chain manager or from EPL to identify what kind of chain it wants to create. Each value should be a list, naming the plug-ins that make up the chain, similar to what you would specify in startChains. Each chain must contain one host plug-in, which is one of the built-in supported host plug-ins, optionally followed by a number of codecs, and end with a plug-in that is the transport. Configuration can optionally be specified for each plug-in, by following the plug-in name with a colon and space, and providing the configuration below it as a map value. One difference between dynamicChains and startChains is that the plug-in configurations used in dynamicChains can specify @{*varname*} variable placeholders which get replaced when a chain instance is created from the chain definition, with values provided dynamically by the chain manager plug-in or the EPL createDynamicChain call. If you are using a chain manager plug-in, see the plug-in’s documentation for information about any @{*varname*} substitutions that it supports. Note that this is unrelated to the ${*varname*} replacements that are performed statically when YAML files are loaded at startup.

  • The value of dynamicChainManagers is a map where each key is a manager name, that is, a string naming an instance of a dynamic chain manager plug-in class. Each value is a map providing the configuration for the chain manager instance (such as details for connecting to a specific external system) and the name of the transport plug-in it is associated with. The manager should have the following keys:

    • transport: Specifies the transport plug-in associated with this dynamic chain manager. This must match the key used in connectivityPlugins to load this chain manager, and also the name used in the dynamicChains definition to identify the transport plug-in at the end of each chain. This is the name used for the plug-in in the configuration file, not the name of the class that implements the plug-in.
    • managerConfig: Specifies the configuration map that will be passed to the chain manager constructor when it is created at startup. The available configuration options are defined by the plug-in author, therefore, see the plug-in’s documentation for details. If the managerConfig is invalid and the chain manager throws an exception, the correlator logs an error message and fails to start. The managerConfig usually includes details for connecting to a specific external server or system. Some chain managers may also provide some options that are set in the transport plug-in’s configuration section under dynamicChains, for example, options specific to the protocol or message format described by that chain definition. Note that there can be more than one manager instance configured for a given transport, for example, if you need to connect to several different servers of the same type. Each manager can make use of more than one chain definition, for example, if different message formats (such as XML and JSON) are being used with the same server or chain manager. In simple configurations where a transport only ever had a single manager instance and a single chain definition, it is common to use the same string for the transport name, dynamic chain definition identifier and manager name. However, there is no requirement for them to be the same.

You can use .properties files to specify values for ${*varname*} substitution variables in configuration files. See Using properties files for further information.

There are a few values which can be written into the configuration file which will be substituted at runtime. This is to aid portability of configuration files between different deployments. Specifically the following variables may be used:

Variable Description
${PARENT_DIR} The absolute normalized path of the directory
containing the properties file or YAML file currently being processed.
${APAMA_HOME} The path to the Apama installation.
${APAMA_WORK} The path to the Apama work directory.
${$} The literal $ sign.

Example:

connectivityPlugins:
  myTransport:
    directory: ${APAMA_WORK}/build
    classpath:
        - myTransport.jar
    class: org.my.Transport

startChains:
  service:
    - apama.eventMap
    - myTransport:
        apamaInstall: ${APAMA_HOME}

The following example shows a more complex configuration for a transport plug-in that uses two dynamic chain manager instances and also two different chain definitions:

connectivityPlugins:
  # a transport plug-in that uses a chain manager to instantiate
  # its chains and transport instances dynamically
  myTransport:
    directory: ${APAMA_WORK}/build
    classpath:
        - myTransport.jar
    class: org.my.MyTransportChainManager

  #... codecs defined here too

dynamicChainManagers:
  # example of multiple chain managers for the same transport;
  # an instance is created during correlator startup for each
  # manager listed here
  myTransportManager1:
    # must match transport plug-in name specified under connectivityPlugins
    transport: myTransport

    managerConfig:
      myManagerConfigOption: ${myTransport.foo}
  myTransportManager2:
    # must match transport plug-in name specified under connectivityPlugins
    transport: myTransport

    managerConfig:
      # some managers specify which chain definition
      # to use in their configuration (others decide it at runtime)
      myStaticallyConfiguredChainId: myJSONChainDefinition

dynamicChains:
  myJSONChainDefinition:
    - apama.eventMap
    - jsonCodec
    # must match transport plug-in name specified under connectivityPlugins
    - myTransport:
        myTransportChainDefOption1: @{bar}
        myTransportChainDefOption2: ${myTransport.baz}

  myXMLChainDefinition:
    - apama.eventMap
    - myXMLCodec
    - myTransport

To see a fully working example of using a dynamic chain manager plug-in, try adding the Universal Messaging connectivity plug-in to your project (see Adding the Universal Messaging connectivity plug-in to a project).

In a configuration file, you can also specify the following:

Host plug-ins and configuration

The first element in the configuration of a chain must be a host plug-in. This is a special type of plug-in that controls how the correlator interacts with the chain. The type of plug-in will determine in which form events are passed to and accepted from the chain. The host plug-in must use a compatible type with the first codec (or, if no codecs specified, the transport), otherwise errors will be reported and events will not be delivered.

Overview of host plug-ins

The following host plug-ins are supported:

  • apama.eventMap

    The eventMap plug-in translates EPL events to and from nested maps, which allows chains to convert arbitrary structured data into forms that can be automatically translated into EPL events without having to know the exact definition of the EPL event, provided the field names of the event definition match the keys in the map. See Translating EPL events using the apama.eventMap host plug-in for further information.

  • apama.eventString

    The eventString plug-in transfers events in Apama string event format, as used by the engine_send and engine_receive tools and the Apama client library.

Common configuration properties

All Apama host plug-ins take the following configuration properties:

Configuration Property

Description

subscribeChannels

Optional. Defines the channel or channels to which the chain subscribes in order to receive events from the correlator. subscribeChannels is only recommended for chains defined as startChains. Dynamic chains will have their channel subscription specified either from EPL or by the chain manager which will override any setting in the configuration file.

To send an event to a chain from EPL, use the EPL send statement with the name of a channel to which the chain has subscribed.

Type of configuration: string or list of strings.

Default: the chain name.

defaultChannel

Optional. Defines the default channel to deliver events from the transport to the correlator from the chain. Some chain managers provide a value for defaultChannel. It is not permitted to specify defaultChannel for chains used by such chain managers. It is recommended only for chains defined as startChains or which you intend to create from EPL.

Chains can also specify a channel name in the metadata of each message. A channel provided in metadata takes precedence over this configuration value.

Type of configuration: string.

Default: the default channel is an empty string (""), thus delivering the event to all public contexts.

suppressLoopback

Optional. If set to true, any events which will be received by this chain will not also be sent to contexts subscribed to the same channel. It is assumed that this receiver may send the events back into the correlator to be received by subscribed contexts. This is typically used when this chain is connected to a message bus. Any other external receivers are unaffected. Type of configuration: boolean.

Default: false.

description

Optional. A textual description which will appear in the Management interface (see also Using the Management interface).

remoteAddress

Optional. A textual address for the remote component (if any) to which this chain connects. This address will also appear in the Management interface.

Translating EPL events using the apama.eventMap host plug-in

The eventMap plug-in translates events to or from map objects, reflecting the structure of the event. Each map entry has a key which is the same as an EPL event field. The values of the map can be simple values (strings, numbers) or further maps or lists which correspond to dictionaries, nested events or sequences.

If the message’s payload does not contain all EPL fields, then by default the message is dropped and a warning is logged. The allowMissing configuration property can be set to true, in which case missing fields or fields with empty values are set to their default values.

If the message’s payload has fields that do not have corresponding EPL fields (or which are perhaps optional), then the map entries are ignored by default. An event definition can specify a com.softwareag.connectivity.ExtraFieldsDict annotation that names a dictionary field; extra values are placed in the dictionary (see Map contents used by the apama.eventMap host plug-in for more information). If needed, this can be disabled by setting the extraFields configuration property to false. The dictionary must be one of the following types:

  • dictionary<string,string> - Keys and values are coerced into strings. Lists generate the string form of sequence<string>. Maps generate the string form of dictionary<string,string>.
  • dictionary<any,any> - Values are mapped to the corresponding EPL type, or sequence<any> for lists and dictionary<any,any> for maps without names.
  • dictionary<string,any> - Keys are coerced into strings.

When events are sent from a chain to the correlator, the correlator needs to know what event type they are. This can be set by a chain plug-in (in the metadata of a message) or by setting the defaultEventType configuration property. The metadata will take precedence to specify a message’s type. Some chains will set the event type on every message, so the default event type does not need to be set in the configuration. Other chains may not be aware of event types, so the event type must be set.

Configuration Property

Description

defaultEventType

Optional. The name of the EPL type to which events that are going into the correlator are converted if no event type is specified on a message. Type of configuration: string.

Default: none - requires that the chain send messages with the event type set.

allowMissing

Optional. Defines whether missing fields or fields with empty values (null values in Java) are permitted on inbound events. If they are permitted, they are set to the EPL default for that type. Similarly, empty values in nested events, elements in sequences and key/value pairs in dictionaries are also set to their default values. There is an exception: an empty value that maps to an optional<*type*> or any in EPL is permitted even if allowMissing is false. See also the descriptions of the optional and any types in the API reference for EPL (ApamaDoc).

Type of configuration: boolean.

Default: false - this results in a WARN, and the events are dropped if there are any missing or empty fields.

extraFields

Optional. Defines whether to place map keys that do not name fields in an extraFields dictionary member that is identified with the @ExtraFieldsDict annotation (see above). Type of configuration: boolean.

Default: true.

Using reliable transports

Reliable messaging gives you the tools to write event-processing applications that are resilient against message loss (for example, due to crashes or network outages).

To make use of reliable messaging in your connectivity plug-ins, you must:

  • Configure the transports for reliable messaging. The nature of this configuration is transport-specific. Reliable messaging is only supported in chains that use the apama.eventMap host plug-in. See also Translating EPL events using the apama.eventMap host plug-in.
  • Write EPL to send and receive events via these transports and to handle acknowledgments. For detailed information, see the event descriptions for Chain, AckRequired and FlushAck in the com.softwareag.connectivity and com.softwareag.connectivity.control packages in the API reference for EPL (ApamaDoc).

Note:

No built-in transport currently supports reliable messaging, but the API is available if you want to implement your own reliable transport.

Reliable-messaging-aware transports only support at-least-once delivery, which admits the possibility of duplicate messages, especially after recovery from downtime. Your applications should be written to handle this.

In this version, reliable messaging with connectivity plug-ins is controlled exclusively from EPL. At this time, reliable messaging cannot be automatically tied-in to correlator persistence.

If a license file cannot be found, reliable messaging with connectivity plug-ins is disabled. See Running Apama without a license file.

Message identifiers

Messages going from the transport to the host contain unique message identifiers. Each identifier is stored as sag.messageId in the metadata. See Metadata values. You only need access to the message identifiers if you want to acknowledge individual events.

Where a message maps to an event type that has the MessageId annotation, the message identifier in the metadata is copied into a field on that event. You should not name a field that you expect to have a real value. See Adding predefined annotations.

The following EPL example shows how to use the MessageId annotation:

using com.softwareag.connectivity.MessageId;

@MessageId("messageIdentifier")
event MyEvent {
    string s;
    integer i;
    string messageIdentifier; // Contains the sag.messageId from the
                              // message that mapped to this event
}

Chains

In general, when receiving or sending reliably, you need to know which connectivity chain is receiving (from transport to host) or sending (from host to transport) the events. To identify that connectivity chain, you use the EPL Chain event, which provides a wrapper with helpful actions pertaining to reliability. There are two actions on the ConnectivityPlugins event that can be used to get the Chain event for the chain you want:

  • ConnectivityPlugins.getChainByChannel

    This action looks up a chain instance by a channel it is subscribed to or sending to.

  • ConnectivityPlugins.getChainById

    This action looks up a chain instance by its identifier.

See the API reference for EPL (ApamaDoc) for more information on these actions.

Reliable receiving

A transport can be configured for reliable receiving. This means the events are going from the outside world into the correlator, and you make sure that they are not lost in the case of a failure.

The EPL that receives the events is obliged to acknowledge when the events have been fully processed by the application. That is because the remote system to which a reliable transport connects typically keeps track of what messages have been acknowledged and what messages have not been acknowledged. In the event of a failure, any messages that have not been acknowledged are resent to you after reconnection/restart.

Keep in mind that “fully processed” is different from just receiving an event. It means that you have preserved the effect of that event, and done so safely enough that you will no longer need the event to be resent in the event of a failure. As an example, that might mean committing the contents of the event to a database, writing it to a file, or having sent an output event and received an acknowledgment for it.

There are often performance implications for an application that is late with acknowledgments. You should therefore acknowledge all events as soon as possible after receiving. There is no guarantee, however, that an acknowledgment will be processed immediately. For example, if you acknowledge some events in EPL and then the system goes down quite soon afterwards, the events may not have been fully acknowledged to the remote system and will therefore get redelivered.

Once the events have been fully processed by the EPL application, they can be acknowledged in either of the following ways:

  • Listen for AckRequired events from the com.softwareag.connectivity.control package, and call the ackUpTo action on them. Doing it this way means you are acknowledging potentially large batches of events at a time.
  • Use the ackUpTo action on the Chain event type to acknowledge all previously received events, up to and including a specific event of your choice. In this case, you identify the specific event with its message identifier. If the event definition has the MessageId annotation, you can obtain the message identifier from the named field.

The detailed technical reasons for choosing between the above mechanisms are given in the descriptions of the Chain and AckRequired events in the API reference for EPL (ApamaDoc).

The following EPL example shows how an application can write reliably-received events to a file. It uses a fictional plug-in named filePlugin for this purpose.

using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.control.AckRequired;
...

monitor.subscribe("incomingEvents"); // The chain is sending us events
                                     // on this channel
Chain chn := ConnectivityPlugins.getChainByChannel("incomingEvents",
    Direction.TOWARDS_HOST); // Get the chain itself

on all MyEvent() as e { // The events the application is interested in
    evtSequence.append(e);
}

on all AckRequired(chainId=chn.getId()) as ackRequired {
    // Periodically acknowledge all previously received events,
    // but only after safely writing their contents to a file
    filePlugin.writeAndSync(evtSequence.toString());
    evtSequence.clear();
    ackRequired.ackUpTo();
}

In all reliable receiving, you have to consider the possibility that some events that have already been acknowledged might be resent to your application, especially when recovering after a failure. Your application should be written to either eliminate duplicates or tolerate them.

Any towards-host messages that get dropped by the chain due to errors (for example, when a codec cannot translate from one form to another due to an invalid format) are treated as if they have already been acknowledged.

Reliable sending

Reliable sending is symmetrical with reliable receiving for most purposes. With reliable sending, your EPL application can ask to be notified when a remote system has safely processed your events. As before, you have to know what chain is being used for reliable sending of these events, and so you have to get the relevant Chain instance. After sending some events, you may call the flush() action of the Chain.

Once all events sent to the chain before this call have been safely stored on (or have been processed by) the remote system, your application will see an acknowledgment in the form of a FlushAck event. Your application might then respond to this, for example, by acknowledging reliably-received events that caused these events to be sent, or by recording the fact that the event has been sent in a way that means that the application will not send it again. In the event of a restart, your application should be written such that it is able to resend any events that were sent but not acknowledged in its previous incarnation.

The following is an example of EPL application using the flush() action:

using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Direction;
using com.softwareag.connectivity.Chain;
using com.softwareag.connectivity.control.FlushAck;
...

// Get the chain for the channel we are sending events to
Chain chn := ConnectivityPlugins.getChainByChannel("chanWeSendTo",
                Direction.TOWARDS_TRANSPORT);

on all wait(0.1) {
    globalInteger := globalInteger + 1;
    MyEvent e := MyEvent(globalInteger);
    send e to "chanWeSendTo";

    // All previously sent events have now been safely processed by
    // the remote system
    on FlushAck(requestId=chn.flush()) {
        log "Fully sent " + e.toString() at INFO;
    }
}

Your application should be written with the idea that it might send duplicate events in the case of a problem (for example, if your application sends out some events which are then processed by some remote system, but there is a crash before your application can see the FlushAck event).

Creating dynamic chains from EPL

You can define chains under the dynamicChains section of the configuration file which are not tied to a specific dynamicChainManager. These chains are not created on startup. Instead, multiple instances of these chains can be created on demand from EPL using the ConnectivityPlugins EPL API. There is a static method on the com.softwareag.connectivity.ConnectivityPlugins event type:

createDynamicChain(string *chainInstanceId*, sequence<string> *channels*, string *chainDefnName*, dictionary<string, string> *substitutions*, string *defaultChannelTowardsHost*) returns Chain;

Calling this method creates and starts a new instance of a chain defined under dynamicChains and returns a com.softwareag.connectivity.Chain object which can later be used to destroy the chain via the destroy() method.

The arguments to createDynamicChain are:

Argument

Description

*chainInstanceId*

The identifier to use for the new chain instance. This identifier is used for logging, and it must be unique.

*channels*

A sequence of channels to which this chain is to be subscribed in the correlator. Events sent to these channels from EPL are delivered to this chain.

*chainDefnName*

The name of a chain defined under dynamicChains in the configuration file. This must not be for a transport which has a dynamicChainManager.

*substitutions*

A map of key-value pairs to be substituted into the chain definition using @{key} syntax.

*defaultChannelTowardsHost*

The default channel to use for sending a message towards the host if no channel is specified on the message. Specify an empty string if you do not want to use a default channel. Note:

You must not specify a non-empty value for *defaultChannelTowardsHost* if the configuration property defaultChannel is also specified for the host plug-in (see also Host plug-ins and configuration). An error will occur in this case.

At the point when `createDynamicChain` returns, the chain is created and able to receive events. The chain exists until either the correlator is shut down or the `.destroy()` method is called on the `Chain` object returned by `createDynamicChain`.

Sending and receiving events with connectivity plug-ins

When the correlator starts up, any connectivity chains listed in the configuration file are loaded and started. At this point, events may be sent from EPL to the chains, through all of the codecs towards the transport.

onApplicationInitialized

While the transport is able to send events towards the host (the correlator), the correlator does not process those events immediately. This prevents problems with events that are sent from the transport to the correlator before the correlator has had event definitions injected, or the EPL to handle those events has been injected or is ready to process the events. Instead, these events are queued in the correlator.

An EPL application that sends or receives events to a transport should call the onApplicationInitialized method on the com.softwareag.connectivity.ConnectivityPlugins EPL object. This notifies the correlator that the application is ready to process events. Any events that the transport sends towards the host (correlator) before this is called are then delivered to the correlator. Events from a transport are maintained in the correct order.

Calling onApplicationInitialized notifies all codecs and transports that the host is ready by calling the hostReady method. The transport may choose not to receive events (for example, from a JMS topic) until the application is ready if doing so may have adverse effects.

Initialization:

  1. Apama recommends that after all an application’s EPL has been injected, the application should send an application-defined “start” event using a .evt file.

    Apama Plugin for Eclipse, engine_deploy and other tools all ensure that .evt files are sent in after all EPL has been injected.

  2. The monitor that handles the application-defined start event (from step 1) should use this event object to notify the correlator that the application is initialized and ready to receive messages, for example:

    on com.mycompany.myapp.Start() {
       com.softwareag.connectivity.ConnectivityPlugins.onApplicationInitialized();
       // Any other post-injection startup logic goes here too.
    }
    

Note:

For simple applications, you can add the EPL bundle Automatic onApplicationInitialized to your project (see also Adding bundles to projects). This bundle will ensure that onApplicationInitialized is called as soon as the entire application has been injected into the correlator. However, in cases where you need to wait for a MemoryStore, database or another resource to be prepared before your application is able to begin to process incoming messages, you should not use the bundle. In these cases, you should write your own start event and application logic.

To aid diagnosing problems when part of the system is not ready in a timely manner, the correlator logs this on every Status: log line (by default, every 5 seconds). For example:

Application has not called onApplicationInitialized yet - 500 events from connectivity transports will not be processed yet

If EPL has not yet called onApplicationInitialized or if a plug-in in myChain has not returned from hostReady yet, the following is logged:

Chain myChain is handling hostReady call

Calling ConnectivityPlugins.onApplicationInitialized also notifies the correlator-integrated messaging for JMS, if enabled, that it is ready to receive events. It will then implicitly perform the JMS.onApplicationInitialized() call (see Using EPL to send and receive JMS messages). You should only call ConnectivityPlugins.onApplicationInitialized() once the application is ready to receive all incoming events, either from connectivity chains or JMS.

Diagnostic codec

You can use the Diagnostic codec to view the messages being sent at any point in the connectivity chain. This is very useful for diagnosing problems, and for configuring message transformations using codecs such as the Mapper and Classifier codecs. For more information, see The Diagnostic codec connectivity plug-in for further information.

Simple sample

The samples/connectivity_plugin/application/genericsendreceive directory of your Apama installation includes a simple sample which provides an easy way to get started with sending and receiving messages to or from any connectivity plug-in.

The sample contains some simple event definitions called mypackage.MySampleInput and mypackage.MySampleOutput that can be used for input and output messages. These event definitions can be customized with additional fields as you wish.

In addition, the sample contains a monitor that subscribes to a specific transport channel and logs all events received from it, and also sends test events to a specific transport channel.

To use the sample, simply copy it into your Apama work directory or import it into Apama Plugin for Eclipse as an existing project. Then customize the channel (or channels) it is sending to/from to match the channel naming scheme specified in the documentation or configuration of the transport you are using (for example, um:MyChannelName for Universal Messaging). Finally, add and configure the connectivity plug-in you wish to use (see also Standard connectivity plug-ins). Depending on the transport and chain configuration you are using, you may also need to configure Mapper and/or Classifier codec rules in the YAML file (see also Codec Connectivity Plug-ins) to use the mypackage.MySampleInput and mypackage.MySampleOutput event definitions used by the sample.

See the README.txt file in the genericsendreceive sample directory for more details.

Deploying plug-in libraries

Every Java plug-in is loaded in a separate class loader. Static variables cannot be shared between different plug-ins, unless a class is placed on the system’s classpath. This allows different plug-ins to use different versions of Java libraries without interference.

For C++ plug-ins, note that while a library can be loaded from any directory by specifying a directory key in the connectivityPlugins section of the configuration file (see Configuration file for connectivity plug-ins), any libraries that the plug-in depends on will only be loaded from the system library path (using the LD_LIBRARY_PATH environment variable and standard locations on UNIX, or using the PATH on Windows). The Apama Command Prompt (see Setting up the environment using the Apama Command Prompt) adds $APAMA_WORK/lib to the system library path, which is the recommended place to put any libraries that plug-ins require. Any libraries that the plug-in loads will be shared across the entire process, therefore different plug-ins will need to use the same version of third-party libraries where applicable. On Windows, ensure that the class and package names of plug-ins are different across plug-ins.

In general, C++ plug-ins will give better performance than Java libraries, as there is a cost in converting objects between Java and the C++ types. In particular, avoid mixing many interleaved Java and C++ plug-ins in the same chain. If possible, put the C++ plug-ins on the host side of the chain, as the correlator host plug-ins are C++ and adjacent plug-ins of the same type are cheaper than conversions. However, the language bindings of any libraries required by a plug-in and familiarity with the programming environment should be the primary factors when deciding in which language to write a new plug-in.