Common EPL patterns in monitors

When developing EPL monitor applications it can be helpful to be familiar with common EPL patterns.

Contrasting using a dictionary with spawning

The sample code in this topic contrasts the use of a dictionary with spawning. Usually, the dictionary approach is preferred. This is because the spawning approach uses an unmatched event expression, which is vulnerable to maintenance issues if someone else loads an event listener for a pattern that you expect to have no other matches.

Translation using a dictionary

The events to be processed:

event Input { string value; }
event Output { string value; }
event Translation {
   string raw;
   string converted;
}

The monitor:

monitor Translator {
   dictionary < string, string > translations;

   action onload() {
      on all Translation() as t addTranslation(t);
      on all Input() as i translate(i);
   }
   action addTranslation(Translation t) {
      translations[t.raw] := t.converted;
   }
   action translate(Input i) {
      if translations.hasKey(i.value) {
         send Output( translations[i.value] ) to "output";
      }
      else { fail(i); }
   }
   action fail(Input i ) {
      print "Cannot translate: " + i.value;
   }
}

Translation using spawning

Same events as translation using dictionary.

The monitor:

monitor Translator {
   action onload() {
      on all Translation() as t addTranslation(t);
      on all unmatched Input() as i fail(i);
   }
   action addTranslation(Translation t) {
      spawn translation(t);
   }
   action translation(Translation t) {
      on all Input(t.raw) translate(t.converted);
   }
   action translate(string converted) {
      send Output(converted) to "output";
   }
   action fail(Input i) {
      print "Cannot translate: " + i.value;
   }
}

Factory pattern

The factory pattern creates a new monitor instance to handle each new item/request. Its essential features include:

  • The onload() action sets up an event listener for creation events,
  • Each creation event causes a monitor instance to be spawned.

There are two common forms of the factory pattern:

  • Canonical form

    The monitor instance spawns to an action that initializes the state of the new monitor instance and creates event listeners specific to that monitor instance. The spawned monitor instances use local variables for coassignment and passes them into the action.

    It is likely that some of the data from the creation event is copied into global variables.

  • Alternate form

    The initial monitor instance uses coassignment to global variables to set some state before spawning.

    This is a “lazy” form in that it stores the complete creation event inside the monitor. You should not use this form if you are spawning large number of monitor instances and you have a large creation event, where only part of the creation event data needs to be retained.

As an exercise, consider rewriting the example in Translation using spawning, to use the alternate factory form.

Canonical factory pattern

The event:

event NewOrder {...}

The monitor:

monitor OrderProcessor {
   ...
   action onload() {
      on all NewOrder() as order spawn processNewOrder(order);
   }
   action processNewOrder(NewOrder order) {
   ...
   }
}

Alternate factory pattern

The event:

event NewOrder {...}

The monitor:

monitor OrderProcessor {
   action onload() {
      on all NewOrder() as order spawn processOrder();
   }
   action processOrder() {
   ...
   }
}

Using quit() to terminate event listeners

The example below demonstrates the use of quit() to terminate an event listener. This example is somewhat contrived in order to demonstrate a situation where it might be desirable to use quit(). Typically, other methods are often more appropriate, for example, you can use die to kill a monitor instance and you can specify and not to terminate an event listener.

The example shows a monitor that trades received orders by breaking them into smaller orders, which it might place concurrently (perhaps on several exchanges). The monitor listens for fills on these orders, and sums up the fills. (A real monitor might also send status on what the filled volume is for each child order together with the total volume filled for the order. The logic for this is not shown here.) When each order is completely filled the monitor terminates the Trade event listener for that order.

The events:

event OrderIn {integer id;... }
event OrderOut {integer id; integer volume;... }
event Trade {integer orderOutId; integer volume;... }

The monitor:

monitor TradeOrderAsSeveralSmallerOrders {
   event PlacedOrderRecord {
      listener listener;
      integer volumeToTrade;
      integer volumeTraded;
   }
   dictionary < integer, PlacedOrderRecord > records;
   action onload() {
      on all OrderIn() as theOrder spawn tradeOrder();
   }
   action tradeOrder() {
      // some logic determining when and what volume to trade
   ...
      placeOrder( volume ); //called multiple times
   ...
   }
   action placeOrder(integer volume) {
      PlacedOrderRecord r := new PlacedOrderRecord;
      integer id := integer.incrementCounter("orderId");
      r.listener := on all Trade(orderOutId=id) as t
         processTrade(t);
      records[id] := r;
      r.volumeToTrade := volume;
      route OrderOut(id,volume,...);
   }
   action processTrade(Trade t) {
      PlacedOrderRecord r := records[t.orderOutId];
      r.volumeTraded := r.volumeTraded + t.volume;
      if (r.volumeToTrade - r.volumeTraded) <= 0 {
         r.listener.quit();
         ...
      }
      ...
   }
}

As stated earlier, for real-world solutions there is generally a better option that using quit(). For example, the exchange(s) probably also send OrderComplete events. In this case you can change the on statement as follows:

on all Trade(orderOutId=id) as t and not OrderComplete(orderOutId=id)
   processTrade(t);

Of course, you must be certain that the OrderComplete event can be received only after all trades for that order have been received.

Combining the dictionary and factory patterns

The dictionary and factory patterns are often combined. This pattern achieves separation of concerns by using two monitors. The first monitor is responsible for managing global concerns, for example, it ensures that each order has a unique key. The second monitor is responsible for local concerns, for example, it manages all data associated with processing that order.

The example does the following:

  1. The OrderFilter monitor accepts NewOrder events and checks for uniqueness of the order key.
  2. For all orders with unique keys, the OrderFilter monitor routes a ValidOrder event.

Testing uniqueness

The events:

event OrderKey{...}
event NewOrder {
   OrderKey key; //You can use anything for key as long as it is unique
   ...
}
event ValidNewOrder {
   NewOrder order;
}

The monitors:

monitor OrderFilter {
   dictionary < OrderKey, NewOrder > orders;
   action onload() {
      on all NewOrder() as order validateOrder(order);
   }
   action validateOrder(NewOrder order){
      if orders.hasKey(order.key) {
         print "Duplicate order!";
         print "Original: " + orders[order.key].toString();
         print "Incoming: " + order.toString();
      }
      else {
         orders.add(order.key,order);
         route ValidNewOrder(order);
      }
   }
}

monitor OrderProcessor {
...
   action onload() {
      on all ValidNewOrder() as valid spawn processOrder(valid.order);
   }
   action processOrder( NewOrder order ) {
   ...
   }
}

Reference counting

The following pattern is another example that you can use to keep a count of how many clients are using a particular service object, which in turn can be used to determine the lifetime of these service objects. The example subscription management mechanism is fairly sophisticated, possibly too sophisticated, but it provides the big advantage of separating the concerns by using two monitors. If you decide to change the subscription mechanism, you can do so simply by changing the ServiceManager monitor. There is no impact at all on the ServiceItem monitor.

The events:

package com.apamax.service;
event Subscribe {
   string toWhat;
   string originator;
}
event Unsubscribe {
   string fromWhat;
   string originator;
}
event CreateServiceItem {
   string what;
}
event DestroyServiceItem {
   string what;
}

The monitors:

monitor ServiceManager {
   dictionary <string, dictionary<string, integer>> items;

   action onload() {
      on all Subscribe() as s subscribe(s);
      on all Unsubscribe() as u unsubscribe(u);
   }

   action subscribe(Subscribe s){
      dictionary < string, integer > subscriptions:= {};
      if items.hasKey(s.toWhat) {
         subscriptions :=
            items[s.toWhat];
      if subscriptions.hasKey(s.originator) {
         subscriptions[s.originator] :=
            subscriptions[s.originator] + 1;
         }
         else {
            subscriptions[s.originator] := 1;
         }
      }
      else {
         items[s.toWhat] := subscriptions;
         route CreateServiceItem(s.toWhat);
      }
   }

   action unsubscribe(Unsubscribe u) {
      if items.hasKey(u.fromWhat) {
         dictionary < string, integer > subscriptions :=
            items[u.fromWhat];
         if subscriptions.hasKey(u.originator) {
            if subscriptions[u.originator] <= 1 {
               subscriptions.remove(u.originator);
               if subscriptions.size() = 0 {
                  items.remove(u.fromWhat);
                  route DestroyServiceItem(u.fromWhat);
               }
            }
            else {
               subscriptions[u.originator] :=
                  subscriptions[u.originator] - 1;
            }
         }
         else {
            print "Unsubscribe failed: no originator: " +
               u.toString();
         }
      }
      else {
         print "Unsubscribe failed: no item: " + u.toString();
      }
   }
}

monitor ServiceItem {
   //...

   action onload() {
      on all CreateServiceItem() as c spawn createServiceItem(c);
   }

   action createServiceItem(CreateServiceItem c) {
   //...
      on all DestroyServiceItem() as d destroyServiceItem(d);
   }

   action destroyServiceItem(DestroyServiceItem d) {
   //...die;
   }
}

Inline request-response pattern

You can use the route statement to write EPL that exhibits inline (synchronous) request-response behavior. The following example shows that when you want to perform an ordered pattern of operations that contain (as one operation) a request to another monitor, the subsequent operations must wait until the requesting monitor receives the response.

The ordering of the route and on statements is not relevant. The correlator sets up the event listener before processing the routed event.

A common mistake is to place code after the on statement code block and expect that code to execute after the code in the on statement code block.

Routing events for request-response behavior

The events:

event Request { integer requestId;... }
event Response { integer requestId;... }

The monitors:

monitor Client {
   action doWork() {
      //do some processing
      ...
      integer id := integer.getUnique();
      route Request(id,... );
      on Response(requestId=id) as r {
      // continue processing
      ...
      // Beware! Any code here will execute immediately
      // (before processing the response)
   }
}

monitor Server {
   action processRequests() {
      on all Request() as r {
         // evaluate response
         route Response(r.requestId);
      }
   }
}

Canonical form for synchronous requests

The next example show the canonical form for when you want to code a pattern that specifies two or more synchronous requests.

The events:

event RequestA { integer requestId;... }
event ResponseA { integer requestId;... }
event RequestB { integer requestId;... }
event ResponseB { integer requestId;... }

The monitor:

monitor Client {
   action doWork() {
      //do some processing
      integer requestId := integer.getUnique();
      route RequestA(requestId,...);
      on ResponseA(id=requestId) as ra doWork2(ra);
   }
   action doWork2(ResponseA ra) {
      //do some more processing
      integer requestId := integer.getUnique();
      route RequestB(requestId,...);
      on ResponseB(id=requestId) as rb doWork3(rb);
   }
   action doWork3(ResponseB rb) {
      //do yet more processing
   }
}

Writing echo monitors for debugging

A common practice is to write an echo monitor for debugging purposes. Typically, an echo monitor listens for the same events as your production monitor and tracks various behavior.

Writing an echo monitor is typically straightforward, but keep the following caveat in mind. If your production monitor uses the unmatched keyword for a certain event, and your echo monitor listens for the same event, and both monitors are in the same context, your unmatched event listener will never trigger. This is because the event listener in the echo monitor matches the event and this prevents the unmatched event listener from ever triggering. The scope of an unmatched event listener is the context that it is in.

To avoid an unmatched event listener that never triggers, specify the completed keyword in the event listener in the echo monitor. For example, suppose you have the following code in your production monitor:

on all unmatched SubscribeDepth() as subDepth {
   doSomething();
}

If you want to track SubscribeDepth events in your echo monitor, write the event expression in the echo monitor as follows:

on all completed SubscribeDepth() as subDepth {
   doSomethingElse();
}

The completed event listener in the echo monitor triggers after the correlator finishes processing the unmatched event listener in the production monitor.

Versioning and upgrading monitors

If an application requires functionality to upgrade its monitor instances while still running, there are architectural patterns that must be used during application design. A monitor must contain code that enables its state to be transferred to a new version and for it to terminate upon request. If a deployed monitor does not contain such code, it is not possible to upgrade while transferring its state.

The sample application in the samples/epl/hot-redeploy/StoreState directory of the Apama installation shows how to transfer monitor state using the MemoryStore (see also Using the MemoryStore). This sample takes the following steps to upgrade monitor instances:

  1. Injects version 1 of the Counter application.
  2. Processes version 1 until an upgrade is required.
  3. Stops the input events and flushes all queues to ensure that all current events have been processed.
  4. Injects version 2 of the Counter application which sends an Upgrade event. This informs the version 1 monitor that an upgrade is happening and it should save its state and its listeners should be destroyed.
  5. Once version 1 has finished processing the Upgrade event, the version 2 monitor iterates the MemoryStore table and loads the state of each monitor. The version 2 monitors take over the processing of the input events from version 1.
  6. Starts the input events.
  7. Processes version 2.

The monitor state that is stored is limited to a small number of required variables contained in a State event that can easily be stored in the MemoryStore. Complex types (such as chunks, listeners or action references) cannot be stored in the MemoryStore and should not be included in a monitor’s state. The application monitor supplies a callback to handle the old state, which is typically done by converting the old State object to a new State object. This can be done automatically for fields of the State object that are the same type and name. Added fields or fields whose type is changed, however, need to be handled explicitly by upgrade code. The application monitor can then spawn to handle the upgraded instances. For more information, see the README.txt file and the source code files of the sample application.

This sample is deliberately simple to only show the concept of storing and retrieving a monitor instance state. An application may require more functionality. For example, you can enhance the sample as follows:

  • Spawn to other contexts. This sample only spawns instances to the main context. It could spawn to other contexts, but you need to take care if instances need to be restored to the same context. You would need some form of context store and lookup.
  • Add a dedicated channel for the Upgrade events that all monitors in any context could subscribe to.
  • Only kill the monitor factory on Upgrade events and allow all partially evaluated event expressions to run with the old version, but all new instances will use the new version.
Info
This sample only transfers global monitor instance state. It does not transfer event expressions; these will be lost. It is possible to allow partially evaluated event expressions to complete (by not explicitly killing or deleting them) but have all new event expressions created by the new version.