Testing and tuning EPL monitors

This section provides information about testing and tuning your EPL monitors.

Optimizing EPL programs

Best practices for optimizing EPL programs include:

EPL, like languages such as Java or C#, relies on garbage collection. Intermittently, the correlator analyses the objects that have been allocated, including events, dictionaries and sequences, and allows memory used by objects that are no longer referenced to be re-used. Thus, the actual memory usage of the correlator might be temporarily above the size of all live objects. While running EPL, the correlator might wait until a listener, onload() action or stream network activation completes before performing garbage collection. Therefore, any garbage generated within a single action, listener invocation or stream network activation might not be disposed of before the action/listener/activation has completed. It is thus advisable to limit individual actions/listeners/activations to performing small pieces of work. This also aids in reducing system latency.

The cost of garbage collection increases as the number of events a monitor instance creates and references increases. If latency is a concern, it is recommended to keep this number low, dividing the working set by spawning new monitor instances if possible and appropriate. Reducing the number of object creations, including string operations that result in a new string being created, also helps to reduce the cost of garbage collection. The exact cost of garbage collection could change in future releases as product improvements are made.

Best practices for writing EPL

EPL is a programming language with some special features. As such, it shares the characteristic with every other programming language that it is possible to write poor, inefficient code. All the techniques that apply to other languages to minimize wasted cycles can also be applied to EPL.

Basic programming optimization techniques all apply:

  • Move code out of tight loops
  • Avoid unnecessary allocation, for example, strings
  • Put common tests first in if... else form

There is no substitute for empirical evaluation of the performance of your application. You must measure performance and compare measurements when modifications are made. Also, ensure that you are comparing like-with-like. Understanding performance implications is invaluable and it helps in avoiding unnecessary performance costs.

You should know how fast your application needs to be.

Wildcard fields that are not relevant

Once a design has stabilized and event interfaces are well defined, it is possible to wildcard fields that do not need to be matched on in event listeners. Designating an event field as a wildcard prevents the correlator from creating an index for that field. Most importantly, a wildcard field means that the correlator does not need to traverse that index when receiving an event of that type to try to find interested event listeners (as there will not be any). This can give tangible performance benefits, particularly with large events.

Premature wildcarding is not advised but is not harmful. You can easily remove the wildcard annotation from event fields with no impact on existing code. The compiler gives an error if any code attempts to match on a field that is a wildcard.

The correlator can index up to 32 fields for each event type. If you are using an event that has more than 32 fields, you must designate the additional fields as wildcards.

See Improving performance by ignoring some fields in matching events.

Avoiding unnecessary allocations

You should eliminate unnecessary allocations, especially when the size of an event is very large. For example:

event LargeEventWith1000Fields {}     // field definitions omitted

integer i := 0;
while (i < 1000) {
   route LargeEvent(0,0,i,...);      // bad
   i := i + 1;
}

LargeEvent le := new LargeEvent();    // good
while (i < 1000) {
   le.foo := i;
   route le;
   i := i + 1;
}

Implementing states

When you want to write a process that passes through one or more states it is good practice to have one action per state. For example:

action inAuction() {
   on AuctionClosed outOfAuction();
}

action outOfAuction() {
   on all Price (stock,*) as p and not InAuction() {
      on Price(stock,>p.price*1.01) and not InAuction() {
         sellStock();
      }
   }
   on InAuction() inAuction();
}

Structure of a basic test framework

Apama lends itself to automated testing because

  • You can define test cases in event files that you feed into the correlator.
  • Apama includes a comprehensive set of command line utilities, all of which are scriptable using standard scripting languages on different platforms.
  • The correlator is deterministic when there is only the main context. When there is more than one context, each context is deterministic but the correlator as a whole is not.

If the advocated event interface pattern is employed for encapsulation, then modules can be tested in isolation (unit testing) as well as in more comprehensive integration-level tests.

A basic test case includes the following:

  • EPL files (.mon) to deploy (or references to them).
  • Input event files (.evt) to send to the correlator.
  • Reference event files (.evt) to compare to actual output.
  • Script to orchestrate execution of the test-case.

You should assemble all of these files in an Apama project in Apama Plugin for Eclipse and then use Apama Plugin for Eclipse to launch the test case.

Each test-case can reside in its own project with all relevant files local to it. The basic test process is to launch the application, send in some events, capture outputs, then compare to expected output, printing the results of the test to the console or log file at the minimum.

Using event files

The following example shows how to use &TIME (Clock) events to explicitly set the correlator clock. To do this, the correlator must have been started in external clocking mode (the &TIME events give errors otherwise). Times are in seconds since the midnight, Jan 1970 epoch.

#####seed initial time (seconds since Jan 1970 epoch)
&TIME(1)

##### Send in configuration of heartbeat interval to
##### 5 SecondsSetHeartbeatInterval(5.0)
##### Advance the clock (5.5 seconds)
&TIME(6.5)

##### Correlator should have sent heartbeat with id 1##### acknowledge all is well
HeartbeatResponse(1,true)

Notice that the input event file has a lot of knowledge regarding the way in which the module will (should) respond. For example, the HeartbeatResponse event expects that the first HeartbeatRequest will have the ID of 1. There is necessarily a close coupling between the input scripts and the implementation of the module being tested. This is another reason why as much of this information should be extracted into the module’s message exchange protocol and made explicit, and perhaps enforced by one or more interface intermediaries.

A single correlator context is guaranteed to generate the same output in the same order, even when EPL timers (such as on all wait()) are employed. This is a benefit of correlator determinism, and makes regression testing, even of temporal logic, possible.

Info
The correlator’s behavior can be nondeterministic when events are sent between multiple contexts, or when plug-ins are used.

Handling runtime errors

EPL eliminates many runtime errors because of the following:

  • Strict, static typing, so there are no class cast exceptions.
  • No implicit type conversion so there are no number format exceptions.
  • Values can only be “null” (or “empty” as it is called in EPL) if they are explicitly declared as optional, and can be accessed safely without risk of null pointer exceptions. See the description of the optional type in the API reference for EPL (ApamaDoc) and The ifpresent statement for more information.

However, EPL cannot entirely eliminate runtime errors. For example, you receive a runtime error if you try to divide by zero or specify an array index that is out of bounds. Some runtime errors are obscure. For example:

mySeq.remove(mySeq.indexOf("foo"));

If foo is not in mySeq, indexOf() returns –1, which causes a runtime error.

See also Exception handling.

What happens

When the correlator detects a runtime error, an exception is raised. If that is not caught by a catch block, one of the two things happen. If this is called from the onload() action, from an action that has been spawned, or from an action within a stream query, the correlator terminates the monitor instance that contains the code that caused the error and terminates any listeners that were set up by the monitor instance being terminated. If the exception is within a listener or stream listener, the exception will be logged and the listener activation will be terminated, stopping the processing of the event. The monitor instance continues to process future events.

If an ondie() action is defined in a monitor, then all unhandled exceptions and runtime errors will terminate the monitor instance and all listeners and state within it.

Unhandled exceptions should still be avoided even if they do not terminate the monitor instance, since they can render the instance’s state invalid, affecting the results from future events.

Using ondie() to diagnose runtime errors

The preferred method for handling runtime errors is using a catch block; see also Exception handling.

If you are not sure where the catch block is necessary, you can specify some logging in the ondie() action to help diagnose the problem and to alert other system modules that a problem occurred. For example:

action ondie() {
   log "monitor instance terminating for " + myId;
   route InternalError("Foo");
}

In some circumstances, you can move into a suspended or safe state, or initiate damage limitation activities, for example, such as pulling all active orders from the market. For example, Apama scenarios use the ondie() action to route an InstanceDied() event to a ScenarioService monitor. This in turn sends the event to connected clients so the termination of the instance can be handled, perhaps displayed, in a dashboard.

An alternative to using ondie() in this manner is to use a basic ACK, NACK, and timeout message exchange protocol so that a client is robust against its services being unavailable.

See About executing ondie() actions for information about how ondie() can optionally receive exception information if an instance dies due to an uncaught exception.

Info
If an ondie() action is defined, then all the runtime errors terminate the monitor instance and call the ondie() action, not just those from onload(), stream queries and spawned actions.

Using logging to diagnose errors

Logging is an effective means of generating diagnostic information. When writing log entries, consider the overhead of string allocation, garbage collection, and writing data to disk. Use conditional tests to reduce this overhead and minimize unnecessary logging.

The EPL log statement is a simple means of generating logging output. The EPL log statement writes to the correlator log file by default so any messages your program sends to the log file are mixed in with all other correlator logging messages. However, you can configure the correlator to send your EPL logging to a separate file. See Setting EPL log files and log levels dynamically. The logging attributes you can specify include a particular target log file and a particular log level for any number of individual packages, monitors and events.

When sending messages to the correlator log file, consider the following:

  • Log messages can be lost if the correlator is logging to stdout.
  • Using the correlator log is relatively expensive if there are many log statements in the critical path.
  • Anything you send to the log might be lost if the correlator log level is OFF.

See also Logging and printing.

Standard diagnostic log output

By default, the correlator outputs diagnostic information every five seconds, and sends it to the correlator log file at INFO log level. You can use this information to diagnose common problems. See Descriptions of correlator status log fields for further information.

The correlator sends this information to its log file during normal operation. While it is possible to disable this output (by setting the correlator’s log level to WARN), doing so is not advisable. In the unlikely event that you run into a problem, Apama Technical Support always ask for a copy of this log file, as the information in it is often useful for diagnosing the nature of a failure.

Capturing test output

All receivers should be started before any events are sent in to the correlator and set to write events to file. The file(s) can be easily compared to reference output using standard operating system tools.

Other tools are also useful in checking the output. The engine_inspect correlator utility is good for verifying that the right number of monitor instances and listeners is present after (stages of) a test. Also, you can use this utility to detect listeners and monitor instances that never terminate, or premature existence of monitor instances.

Use the engine_receive utility to capture event output. You can specify the -f option to pipe received events to a file. Start multiple receivers on different channels as required

The engine_inspect utility provides useful data for testing including the number of monitor instances, listeners, receivers, events generated and so on. Split input event files and run the engine_inspect utility after each file.

Capture the correlator log and compare to reference data. This is useful if your application logs errors or there are interesting diagnostics.

Avoiding listeners and monitor instances that never terminate

An Out of Memory condition causes the correlator to exit. This condition can be caused by listeners and monitor instances that never terminate — also referred to as listener leaks. For example, the following on statement defines event listeners that never terminate:

on all ( Foo(id=1) or all Foo(id=2) ) {      // second "all" is bogus
...
}

The following example spawns monitor instances that never terminate:

on all Trade() as t spawn handle();         // missing "unmatched" action
...
action handle() {
   on all Trade(symbol=t.symbol) as t {
      ...
   }
}

The sm (number of monitor instances) and ls (number of listeners) counts in the log file are often revealing in the case of a memory leak. An increasing trend can be seen in these counts over a period of time, when there is no valid reason for this given the intended logic of the application.

Handling slow or blocked receivers

You can use correlator diagnostic output to identify slow or blocked receivers.

  • The oc (number of events on the output queue) can grow to 10,000 maximum. If you see a steady trend that it is growing, it probably indicates a slow receiver.
  • The tx (number of events transmitted) should always be increasing. If it is static, or not increasing as fast as it should, it probably indicates a slow receiver.

Slow receivers include:

  • Receivers that are not consuming events as quickly as the correlator is generating them.
  • Blocked receivers that are not accepting new events.

When the correlator’s output queue fills, operations that are sending events from the processing thread (or threads, if there is more than one context) are blocked. If the output queue remains filled, and the processing thread(s) remain blocked, the input queue(s) start(s) to fill. Events are never dropped.

If you specify the -x correlator option when you start the correlator, it causes the correlator to disconnect any receiver that becomes slow. If you discover that your application is producing events at too high a rate for a particular receiver you might be able to publish the events to separate channels so that the receiver needs to handle fewer events. Alternatively, or in addition, you might be able to modify your application to throttle the rate at which it sends events to this receiver.

If you cannot speed the receiver up, or install faster hardware, you can partition the correlator’s output event flow into channels so that the receiver needs to handle fewer events. Alternatively, you can use throttling in the correlator to output events less frequently.

See also Determining whether to disconnect slow receivers.

Diagnosing infinite loops in the correlator

A correlator live lock occurs when events are recursively routed without a termination mechanism. The following example shows this in its simplest form:

on all Foo() {
   route Foo();
}

More complex forms might recurse after a connected chain of several events being routed between different monitors.

There are no limits on how many routed events can be queued. Consequently, depending on the nature of the bug, the correlator might run out of memory. Note that an overloaded correlator would show similar symptoms, but can be distinguished by the fact that work is still being done (events are being sent out from the correlator).

When the correlator is in an infinite loop, it quickly uses an entire CPU and if there are events being routed as part of the loop then the correlator will run out of memory. Use the following correlator diagnostics to diagnose an infinite loop:

  • rq —sum of the number of routed events on the input queues of all contexts. When the correlator is in an infinite loop, this will always be 1 or it will always be increasing. It depends on the application.
  • iq — sum of the number of entries on the input queues of all contexts. When the correlator is in an infinite loop, this number is continuously increasing.
  • tx — number of transmitted events. This number is static when the correlator is in an infinite loop.

To identify an infinite loop in a particular context, run engine_inspect -x a few times. This lists each context along with the number of events on its input queue. See if there are contexts that have input queues that are getting bigger and bigger.

Tuning contexts

You should implement contexts whenever you want the correlator to perform concurrent processing. Work to be divided among contexts should have minimum or no interdependencies and no ordering requirements. Many applications present a natural way to partition work that is largely independent. For example, you could partition a financial application by stock symbol, or by user, or by strategy.

The following topics describe common ways to optimize use of contexts.

Parallel processing for instances of an event type

A candidate for implementing parallel processing is when an application performs calculations for a number of events that are of the same type, but that have different identifiers. For example, different stock symbols from a stock market data feed. You can use either of the following strategies to implement parallel processing for this situation:

  • Create multiple public contexts. Each context listens for one identifier, operates on the events that have that identifier, and discards events that have any other identifier.
  • Have one context distribute data to multiple contexts, which are each dedicated to processing the events that have a particular identifier.

The performance of these strategies varies according to the work being done. A distributor can be a bottleneck. However, there is a cost in every context discarding events for which it is not interested. In the following situations, the distributor strategy is likely to be more efficient:

  • There is a very large set of identifiers but a relatively low overall rate of arriving events.
  • Events must be pre-processed.
  • Events are not arriving from external sources. Instead, you must explicitly send events.

The sample code below shows the distributor strategy.

event Tick {
   string symbol;
   integer price;
}

/** In the main context, the following monitor distributes Tick events
    to other contexts. There is one context to process each unique symbol. */
monitor TickDistributor {

    /** The dictionary maps each unique Tick symbol to the (private)
        context that ultimately processes it. */
    dictionary<string, context> symbolMapping;

   action onload() {
      on all Tick() as t {
         // If the context for this symbol does not yet exist, create it.
         if(not symbolMapping.hasKey(t.symbol)) {
            context c := context("Processing-"+t.symbol);
            symbolMapping[t.symbol] := c;
            spawn processSymbol(t.symbol) to c;
         }

         // Send each Tick event to the context that handles its symbol.
         send t to symbolMapping[t.symbol];
      }
   }

    /** The following action handles Tick events with the given symbol.
        This action executes in a private context that processes all Tick
        events that have one particular symbol. */
   action processSymbol(string symbol) {
      // Because this context receives a homogeneous stream of Tick events
      // that all have the same particular symbol, there is no need to specify
      // an event listener that discriminates based on symbol.
      on all Tick() as t {
...
      }
   }
}

Parallel processing for long-running calculations

Suppose a required calculation takes a relatively long time. You can do the calculation in a context while the main context performs other operations. Or, you might want multiple contexts to concurrently perform the long calculation on different groups of the incoming events.

The following code provides an example of performing the calculation in another context.

monitor parallel {
   integer numTicks;
   action onload() {
      on all Tick() {
         numTicks:=numTicks+1;
         send NumberTicks(numTicks) to "output";
      }
      on all Calculate() as calc {
         integer atNumTicks:=numTicks;
         integer calcId:=integer.getUnique();
         spawn doCalculation(calc, calcId, context.current())
            to context("Calculation");
         on CalculationResponse(id=calcId) as resp {
            send CalculationResult(resp, atNumTicks, numTicks) to "output";
         }
      }
   }
   action doCalculation(Calculate req, integer id, context caller) {
      float value:=*actual\_calculation\_function*(req);
      send CalculationResponse(id, value) to caller;
   }
}

For each Calculate event found, the event listener specifies a spawn...to statement that creates a new context. All contexts have the same name — Calculation — and a different context ID. All contexts can run concurrently.

A Calculation context might send a CalculationResponse event to the main context before the main context sets up the CalculationResponse event listener. However, the correlator completes the operations, including setting up the CalculationResponse event listener, that result from finding a Calculate event before it processes the sent CalculationResponse event.

While the calculations are running, other Tick events might arrive from external components and the correlator can process them.

The order in which CalculationResponse events arrive on the main context’s input queue can be different from the order of creation of the contexts that generated the CalculationResponse events. The order of responses depends on when the calculation started and how long it took to complete the calculation. The monitor instance in the main context uses the calcId variable to distinguish responses.