Common EPL patterns in monitors
When developing EPL monitor applications it can be helpful to be familiar with common EPL patterns.
When developing EPL monitor applications it can be helpful to be familiar with common EPL patterns.
The sample code in this topic contrasts the use of a dictionary with spawning. Usually, the dictionary approach is preferred. This is because the spawning approach uses an unmatched
event expression, which is vulnerable to maintenance issues if someone else loads an event listener for a pattern that you expect to have no other matches.
The events to be processed:
event Input { string value; }
event Output { string value; }
event Translation {
string raw;
string converted;
}
The monitor:
monitor Translator {
dictionary < string, string > translations;
action onload() {
on all Translation() as t addTranslation(t);
on all Input() as i translate(i);
}
action addTranslation(Translation t) {
translations[t.raw] := t.converted;
}
action translate(Input i) {
if translations.hasKey(i.value) {
send Output( translations[i.value] ) to "output";
}
else { fail(i); }
}
action fail(Input i ) {
print "Cannot translate: " + i.value;
}
}
Same events as translation using dictionary.
The monitor:
monitor Translator {
action onload() {
on all Translation() as t addTranslation(t);
on all unmatched Input() as i fail(i);
}
action addTranslation(Translation t) {
spawn translation(t);
}
action translation(Translation t) {
on all Input(t.raw) translate(t.converted);
}
action translate(string converted) {
send Output(converted) to "output";
}
action fail(Input i) {
print "Cannot translate: " + i.value;
}
}
The factory pattern creates a new monitor instance to handle each new item/request. Its essential features include:
onload()
action sets up an event listener for creation events,There are two common forms of the factory pattern:
Canonical form
The monitor instance spawns to an action that initializes the state of the new monitor instance and creates event listeners specific to that monitor instance. The spawned monitor instances use local variables for coassignment and passes them into the action.
It is likely that some of the data from the creation event is copied into global variables.
Alternate form
The initial monitor instance uses coassignment to global variables to set some state before spawning.
This is a “lazy” form in that it stores the complete creation event inside the monitor. You should not use this form if you are spawning large number of monitor instances and you have a large creation event, where only part of the creation event data needs to be retained.
As an exercise, consider rewriting the example in Translation using spawning, to use the alternate factory form.
The event:
event NewOrder {...}
The monitor:
monitor OrderProcessor {
...
action onload() {
on all NewOrder() as order spawn processNewOrder(order);
}
action processNewOrder(NewOrder order) {
...
}
}
The event:
event NewOrder {...}
The monitor:
monitor OrderProcessor {
action onload() {
on all NewOrder() as order spawn processOrder();
}
action processOrder() {
...
}
}
The example below demonstrates the use of quit()
to terminate an event listener. This example is somewhat contrived in order to demonstrate a situation where it might be desirable to use quit()
. Typically, other methods are often more appropriate, for example, you can use die
to kill a monitor instance and you can specify and not
to terminate an event listener.
The example shows a monitor that trades received orders by breaking them into smaller orders, which it might place concurrently (perhaps on several exchanges). The monitor listens for fills on these orders, and sums up the fills. (A real monitor might also send status on what the filled volume is for each child order together with the total volume filled for the order. The logic for this is not shown here.) When each order is completely filled the monitor terminates the Trade
event listener for that order.
The events:
event OrderIn {integer id;... }
event OrderOut {integer id; integer volume;... }
event Trade {integer orderOutId; integer volume;... }
The monitor:
monitor TradeOrderAsSeveralSmallerOrders {
event PlacedOrderRecord {
listener listener;
integer volumeToTrade;
integer volumeTraded;
}
dictionary < integer, PlacedOrderRecord > records;
action onload() {
on all OrderIn() as theOrder spawn tradeOrder();
}
action tradeOrder() {
// some logic determining when and what volume to trade
...
placeOrder( volume ); //called multiple times
...
}
action placeOrder(integer volume) {
PlacedOrderRecord r := new PlacedOrderRecord;
integer id := integer.incrementCounter("orderId");
r.listener := on all Trade(orderOutId=id) as t
processTrade(t);
records[id] := r;
r.volumeToTrade := volume;
route OrderOut(id,volume,...);
}
action processTrade(Trade t) {
PlacedOrderRecord r := records[t.orderOutId];
r.volumeTraded := r.volumeTraded + t.volume;
if (r.volumeToTrade - r.volumeTraded) <= 0 {
r.listener.quit();
...
}
...
}
}
As stated earlier, for real-world solutions there is generally a better option that using quit()
. For example, the exchange(s) probably also send OrderComplete
events. In this case you can change the on
statement as follows:
on all Trade(orderOutId=id) as t and not OrderComplete(orderOutId=id)
processTrade(t);
Of course, you must be certain that the OrderComplete
event can be received only after all trades for that order have been received.
The dictionary and factory patterns are often combined. This pattern achieves separation of concerns by using two monitors. The first monitor is responsible for managing global concerns, for example, it ensures that each order has a unique key. The second monitor is responsible for local concerns, for example, it manages all data associated with processing that order.
The example does the following:
OrderFilter
monitor accepts NewOrder
events and checks for uniqueness of the order key.OrderFilter
monitor routes a ValidOrder
event.The events:
event OrderKey{...}
event NewOrder {
OrderKey key; //You can use anything for key as long as it is unique
...
}
event ValidNewOrder {
NewOrder order;
}
The monitors:
monitor OrderFilter {
dictionary < OrderKey, NewOrder > orders;
action onload() {
on all NewOrder() as order validateOrder(order);
}
action validateOrder(NewOrder order){
if orders.hasKey(order.key) {
print "Duplicate order!";
print "Original: " + orders[order.key].toString();
print "Incoming: " + order.toString();
}
else {
orders.add(order.key,order);
route ValidNewOrder(order);
}
}
}
monitor OrderProcessor {
...
action onload() {
on all ValidNewOrder() as valid spawn processOrder(valid.order);
}
action processOrder( NewOrder order ) {
...
}
}
The following pattern is another example that you can use to keep a count of how many clients are using a particular service object, which in turn can be used to determine the lifetime of these service objects. The example subscription management mechanism is fairly sophisticated, possibly too sophisticated, but it provides the big advantage of separating the concerns by using two monitors. If you decide to change the subscription mechanism, you can do so simply by changing the ServiceManager
monitor. There is no impact at all on the ServiceItem
monitor.
The events:
package com.apamax.service;
event Subscribe {
string toWhat;
string originator;
}
event Unsubscribe {
string fromWhat;
string originator;
}
event CreateServiceItem {
string what;
}
event DestroyServiceItem {
string what;
}
The monitors:
monitor ServiceManager {
dictionary <string, dictionary<string, integer>> items;
action onload() {
on all Subscribe() as s subscribe(s);
on all Unsubscribe() as u unsubscribe(u);
}
action subscribe(Subscribe s){
dictionary < string, integer > subscriptions:= {};
if items.hasKey(s.toWhat) {
subscriptions :=
items[s.toWhat];
if subscriptions.hasKey(s.originator) {
subscriptions[s.originator] :=
subscriptions[s.originator] + 1;
}
else {
subscriptions[s.originator] := 1;
}
}
else {
items[s.toWhat] := subscriptions;
route CreateServiceItem(s.toWhat);
}
}
action unsubscribe(Unsubscribe u) {
if items.hasKey(u.fromWhat) {
dictionary < string, integer > subscriptions :=
items[u.fromWhat];
if subscriptions.hasKey(u.originator) {
if subscriptions[u.originator] <= 1 {
subscriptions.remove(u.originator);
if subscriptions.size() = 0 {
items.remove(u.fromWhat);
route DestroyServiceItem(u.fromWhat);
}
}
else {
subscriptions[u.originator] :=
subscriptions[u.originator] - 1;
}
}
else {
print "Unsubscribe failed: no originator: " +
u.toString();
}
}
else {
print "Unsubscribe failed: no item: " + u.toString();
}
}
}
monitor ServiceItem {
//...
action onload() {
on all CreateServiceItem() as c spawn createServiceItem(c);
}
action createServiceItem(CreateServiceItem c) {
//...
on all DestroyServiceItem() as d destroyServiceItem(d);
}
action destroyServiceItem(DestroyServiceItem d) {
//...die;
}
}
You can use the route
statement to write EPL that exhibits inline (synchronous) request-response behavior. The following example shows that when you want to perform an ordered pattern of operations that contain (as one operation) a request to another monitor, the subsequent operations must wait until the requesting monitor receives the response.
The ordering of the route
and on
statements is not relevant. The correlator sets up the event listener before processing the routed event.
A common mistake is to place code after the on
statement code block and expect that code to execute after the code in the on
statement code block.
The events:
event Request { integer requestId;... }
event Response { integer requestId;... }
The monitors:
monitor Client {
action doWork() {
//do some processing
...
integer id := integer.getUnique();
route Request(id,... );
on Response(requestId=id) as r {
// continue processing
...
// Beware! Any code here will execute immediately
// (before processing the response)
}
}
monitor Server {
action processRequests() {
on all Request() as r {
// evaluate response
route Response(r.requestId);
}
}
}
The next example show the canonical form for when you want to code a pattern that specifies two or more synchronous requests.
The events:
event RequestA { integer requestId;... }
event ResponseA { integer requestId;... }
event RequestB { integer requestId;... }
event ResponseB { integer requestId;... }
The monitor:
monitor Client {
action doWork() {
//do some processing
integer requestId := integer.getUnique();
route RequestA(requestId,...);
on ResponseA(id=requestId) as ra doWork2(ra);
}
action doWork2(ResponseA ra) {
//do some more processing
integer requestId := integer.getUnique();
route RequestB(requestId,...);
on ResponseB(id=requestId) as rb doWork3(rb);
}
action doWork3(ResponseB rb) {
//do yet more processing
}
}
A common practice is to write an echo monitor for debugging purposes. Typically, an echo monitor listens for the same events as your production monitor and tracks various behavior.
Writing an echo monitor is typically straightforward, but keep the following caveat in mind. If your production monitor uses the unmatched
keyword for a certain event, and your echo monitor listens for the same event, and both monitors are in the same context, your unmatched
event listener will never trigger. This is because the event listener in the echo monitor matches the event and this prevents the unmatched
event listener from ever triggering. The scope of an unmatched
event listener is the context that it is in.
To avoid an unmatched
event listener that never triggers, specify the completed
keyword in the event listener in the echo monitor. For example, suppose you have the following code in your production monitor:
on all unmatched SubscribeDepth() as subDepth {
doSomething();
}
If you want to track SubscribeDepth
events in your echo monitor, write the event expression in the echo monitor as follows:
on all completed SubscribeDepth() as subDepth {
doSomethingElse();
}
The completed
event listener in the echo monitor triggers after the correlator finishes processing the unmatched
event listener in the production monitor.
If an application requires functionality to upgrade its monitor instances while still running, there are architectural patterns that must be used during application design. A monitor must contain code that enables its state to be transferred to a new version and for it to terminate upon request. If a deployed monitor does not contain such code, it is not possible to upgrade while transferring its state.
The sample application in the samples/epl/hot-redeploy/StoreState
directory of the Apama installation shows how to transfer monitor state using the MemoryStore (see also Using the MemoryStore). This sample takes the following steps to upgrade monitor instances:
Upgrade
event. This informs the version 1 monitor that an upgrade is happening and it should save its state and its listeners should be destroyed.Upgrade
event, the version 2 monitor iterates the MemoryStore table and loads the state of each monitor. The version 2 monitors take over the processing of the input events from version 1.The monitor state that is stored is limited to a small number of required variables contained in a State
event that can easily be stored in the MemoryStore. Complex types (such as chunks, listeners or action references) cannot be stored in the MemoryStore and should not be included in a monitor’s state. The application monitor supplies a callback to handle the old state, which is typically done by converting the old State
object to a new State
object. This can be done automatically for fields of the State
object that are the same type and name. Added fields or fields whose type is changed, however, need to be handled explicitly by upgrade code. The application monitor can then spawn to handle the upgraded instances. For more information, see the README.txt
file and the source code files of the sample application.
This sample is deliberately simple to only show the concept of storing and retrieving a monitor instance state. An application may require more functionality. For example, you can enhance the sample as follows:
Upgrade
events that all monitors in any context could subscribe to.Upgrade
events and allow all partially evaluated event expressions to run with the old version, but all new instances will use the new version.