By default, the correlator operates in a serial manner. In a monitor, you have the option of implementing contexts for parallel processing.
During serial correlator operation, the correlator processes events in the order in which they arrive. Each external event matches zero or more listeners. The correlator executes a matching event’s associated listeners in a rigid order. The correlator completes the processing related to a particular event before it examines the next event.
For some applications, this serial behavior might not be necessary. In this case, you might be able to improve performance by implementing parallel processing. Parallel processing lets the correlator concurrently process the EPL in multiple monitor instances. To implement parallel processing, you create one or more contexts.
Info
If a license file cannot be found, the number of contexts that the correlator allows to be created is limited. See Running Apama without a license file.
Parallel processing in the correlator is quite different from the parallel processing provided by Java, C++, and other languages. These languages allow shared state, and rely on mutexes, conditions, semaphores, monitors, and so on, to enforce correct behavior. The correlator does not automatically provide shared state. Data sharing happens by sending events between contexts and by using the MemoryStore. See Using the MemoryStore. Parallel processing in the correlator is a message-passing system.
Introduction to contexts
Contexts allow EPL applications to organize work into threads that the correlator can execute concurrently.
In EPL, context is a reference type. When you create a variable of type context, or an event field of type context, you are actually creating an object that refers to a context. The context might or might not already exist. You can then use the context reference to spawn to the context or send an event to the context. When you spawn to a context, the correlator creates the context if it does not already exist.
What is inside/outside a context?
When you start a correlator it has a single main context. You can then create additional contexts. A context consists of the following:
One or more monitor instances. Except, the main context exists even if it does not contain any monitor instances.
An event input queue.
Listeners that belong to the contained monitor instances.
The correlator maintains event definitions and monitor definitions outside contexts. This lets all contexts share the same event and monitor definitions.
Instances of the same monitor can exist in multiple contexts. Each monitor instance belongs to a single context. For example, suppose you inject monitor A. Monitor A spawns within its own context (the main context) twice and spawns once to the alpha context. This creates three additional monitor instances. Two instances are in the main context and one instance is in the alpha context. These instances do not share any data, other than by means of passing events.
About context properties
A context has the following properties:
Name — A string that you specify when you create the context. This name does not need to be unique. The name is a convenient identifier that you can use in your code.
ID — The correlator assigns a unique integer.
receiveInput flag — A Boolean value that indicates whether the context can receive external input events on the default channel, which is the empty string ("").
A value of true lets the context receive external events on the default channel; this is a public context. A value of true is equivalent to a subscription to the default channel; there is no requirement for a monitor instance in this context to subscribe to the default channel.
A value of false indicates a private context that does not receive external events on the default channel. This is the default.
Note that the main context is public.
Channel subscriptions — A context is subscribed to the union of the channels each of the monitor instances in that context is subscribed to. This is a property of the monitor instances running in a context and is not accessible by means of the context reference object.
You can spawn to other contexts. When the last monitor instance in a context terminates, that context stops doing work and stops consuming resources until you spawn another monitor instance to it.
In a context, when you route an event, the event goes to the front of that context’s input queue. You can route events only within a context.
You can send an event to a particular context. When you do this, the event goes to the end of the specified context’s input queue. The correlator processes it after it processes any other events that are already on the context’s input queue. See Sending an event to a particular context.
You can use a context as part of the key for a dictionary. You can route an event that contains a context field. You cannot parse a context. Context objects are immutable reference objects.
Context lifecycle
A context has a lifecycle that starts when a spawn...to operation occurs and ends when the last monitor instance in the context terminates. This is completely independent of any context objects that refer to the context. It is possible for a context to be running when no references to it exist, and it is possible for a context object to refer to a context that is no longer running. In the latter case, spawning to a context that is not running is permissible. The correlator restarts the context as required.
Info
If a license file cannot be found, the number of contexts that the correlator allows to be created is limited. See Running Apama without a license file.
Comparison of a correlator and a context
Upon injection, each monitor’s initial instance runs in the main context. You must explicitly create additional contexts. Conceptually, a context is like a correlator but with the following differences:
All contexts share the same namespace, and thus share all monitor and event definitions that have been injected.
A monitor instance must have a context reference to pass an event to that context.
Execution of Java is allowed in only the main context.
The engine_receive tool receives events from all contexts or it can be configured to receive events from only specified channels.
The engine_send tool sends events to all public contexts or to the contexts that are subscribed to the channels it is configured to send events on.
Creating contexts
In EPL, you refer to a context by means of an object of type context. The context type is a reference type.
The recommendation is to use private contexts and have monitor instances subscribe to the channels they require events from. This gives greater flexibility over using public contexts. For information on the constructors needed to create a context, see the description of the context type in the API reference for EPL (ApamaDoc).
The name of a context does not have to be unique, and is only used for diagnostic purposes (it is recommended that context names be meaningful and distinct). Creating a new context object with the same name as another context creates a reference to a different context, not the same context. Context references are independent to the actual context where monitors run. A context continues running if there are no references to it. A reference to a context may exist even though no active monitors are running in that context. You use the context reference to spawn to the context or send an event to the context. When you spawn to a context, the correlator creates the context if it does not already exist.
When you start a correlator, it has a single main context. You can then create additional contexts. Context reference objects are lightweight and creating one only creates a stub object and allocates an ID. In other words, when you create an EPL context object, you are actually creating a context reference.
The following example creates a reference, c, to a private context whose name is test:
context c:=context("test");
For information on the methods you can call on a context, see the description of the context type in the API reference for EPL (ApamaDoc).
You can create any number of contexts. A context is a very lightweight object. Creating a context just allocates an identifier and creates a small object. Consequently, it is possible to create a thousand contexts with little performance penalty.
You can have any number of running contexts. A running context means that the context contains at least one monitor instance that has work to do. The more CPU cores you have, the more contexts it is practical to be running at a given time. The performance of multiple contexts running concurrently should scale approximately according to the number of CPU cores available on the host.
Because the cost of each context is low, it is possible to divide applications into the finest level of parallelism possible and let the correlator balance running those contexts across all CPU cores. This is true even if that means creating very many contexts.
Using channels to communicate between contexts
Contexts can subscribe to channels, using the monitor.subscribe(channelName) operation. When a monitor executes monitor.subscribe(channelName), it causes the context it is running in to be subscribed to that channel. The subscription’s lifetime is tied to the lifetime of the monitor instance that executes subscribe(). The subscription is active until that monitor instance terminates or executes monitor.unsubscribe(channelName).
Subscriptions are reference counted. That is, if one monitor instance subscribes twice to the same channel then it needs to unsubscribe twice from that channel. If two monitor instances each subscribe once to the same channel then the subscription is active while either monitor instance exists or until both monitor instances unsubscribe from that channel.
When a context is subscribed to a channel it receives all events sent on that channel. This includes:
Events sent to the correlator from
An IAF adapter
engine_send
Another correlator connected with engine_connect and using parallel mode
Clients
Universal Messaging
Events sent from EPL using the send...to statement
Events sent from EPL plug-ins to a specific channel
It does not include events emitted with the emit...to statement. Even if the target of an emit...to statement is a channel that the context is subscribed to, an event sent by the emit statement goes only to external receivers and not to any contexts.
By using a channel for each stream of data an application may be interested in, an application can control which streams of data it receives through execution of the appropriate monitor.subscribe(channelName) and monitor.unsubscribe(channelName) operations. The correlator can efficiently distribute events within the correlator to multiple contexts, plug-ins or receivers subscribed to channels. If further scale-out is required, using channels allows some application components to be deployed to correlator processes running on other hosts, which are connected using the engine_connect correlator tool or Universal Messaging. See Tuning correlator performance.
Obtaining context references
To obtain a reference to the context that a piece of code is running in, call the context.current() method. This is a static method that returns a context object that is a reference to the current context. The current context is the context that contains the EPL that calls this method.
For a monitor instance to interact with the EPL by means of a context object in another context, the monitor instance must have a reference to that context. A monitor instance can obtain a reference to another context in only the following ways:
By creating the context.
By receiving a context reference, which must be of type context. A monitor instance can receive this reference by means of a routed or sent event, or a spawn operation.
For example:
on all Calculate() as calc {
integer calcId:=integer.getUnique();
spawn doCalculation(calc, calcId, context.current())
to context("Calculation");
do something
}
action doCalculation(Calculate req, integer id, context caller) {
do something
send CalculationResponse(id, value) to caller;
}
If a monitor instance that creates a context does not send a context reference outside itself, and does not subscribe to any channels, no other context can send events to that context, except by means of EPL plug-ins. This affords some degree of privacy for the context.
A context object (a context reference) does not do anything. It is simply the target of the following:
spawn ActionIdentifier([ArgumentList]) to ContextExpression;
In a monitor, you can spawn to a context. The format for doing this is as follows:
spawn ActionIdentifier([ArgumentList]) to ContextExpression;
Replace ContextExpression with any valid EPL expression that is of the context type. Typically, this is the name of a context variable. It is possible to spawn to only a context; it is not possible to spawn to a channel.
This statement asynchronously creates a new monitor instance in the target context. The correlator can immediately create the new monitor instance and begin processing it. The correlator does not need to finish processing the monitor instance that spawned to the context before it starts processing the spawned instance. The correlator might create the spawned monitor instance before it finishes processing the action that spawned the new instance. Or, the correlator might create the spawned monitor instance some time after it completes processing the action that spawned the new instance. The order is unpredictable. For example:
Unlike the regular spawn operation, the correlator runs the new monitor instance in the specified context. The correlator concurrently processes the new monitor instance and the instance that spawned it.
A context processes spawn operations and events in the order in which they arrive. For example, suppose a monitor contains the following statements:
spawn action1() to ctx;
send e1 to ctx;
spawn action2() to ctx;
send e2 to ctx;
The ctx context processes this in the following order: action1(), e1, action2(), e2.
Channels and contexts
Contexts can subscribe to particular channels to receive events delivered to those channels from adapters and from other contexts. See Channels and input events and Subscribing to channels. Contexts that are public, that is, they were created with a true flag in the context constructor, have a permanent subscription to the default channel. The name of the default channel is the empty string.
Contexts can send events to channels without knowledge of whether the event is required by contexts, clients, adapters, or some combination. When an event is sent from a context to a channel the event is received by all contexts subscribed to that channel and by all external receivers that are listening on that channel. See Generating events with the send statement.
Channels are useful for:
Identifying service monitors. If many monitors need to send events to a service monitor you can use a well known name (which can appear in EPL as a string literal or string constant) as a channel name. The service monitor (and only the service monitor) should subscribe to the channel and other monitors send events to that channel. When a request-response event protocol is required the sender can specify a channel to which it is subscribed, or a context to send the response to.
Applications that have different contexts that consume different streams of data can use channels to send the data to the intended contexts, even if many contexts require the same data stream or one context requires multiple data streams.
Different components of an application can be de-coupled by using an event protocol that sends events to channels for each interaction point between components. This allows adapters to be replaced with monitors that simulate those adapters for testing, and makes it easy to scale an application across several hosts by running different parts on different correlators and then connecting them.
Sending an event to a channel
In a monitor, you can send an event to a channel by using either
A string value that identifies the channel name.
A com.apama.Channel type that either names a channel or holds a context reference.
The format for sending an event to a particular context is as follows:
send EventExpression to ChannelExpression;
Replace EventExpression with any valid EPL expression that is of an event type.
Replace ChannelExpression with any valid EPL expression that is of the string or com.apama.Channel type. Typically, this is a string value.
This statement asynchronously sends an event to everything subscribed to the specified channel. Subscribers can include:
Contexts.
Receivers connected to external components by means of Apama’s messaging, JMS or Universal Messaging.
EPL plug-ins that have subscribed an EventHandler object.
For each target subscribed to a channel, the event goes to the back of the context’s input queue.
In a target context, the correlator can immediately process the sent event. The correlator does not need to finish executing the action that sends the event before it processes the sent event in a target context. The correlator might process the sent event before it finishes executing the action that sent the event. Or, the correlator might process the sent event some time after it completes executing the action that sent the event. The order is unpredictable. The order in which the target contexts receive the sent event is also unpredictable. For example:
action analyse(string symbol) {
spawn submon(symbol) to context(symbol);
log "Listening for "+symbol;
on all com.apama.marketdata.Tick(symbol=symbol) as tick {
send tick to symbol;
}
on com.apama.marketdata.Finished() {
send com.apama.marketdata.Finished() to symbol;
}
}
action submon(string symbol) {
monitor.subscribe(symbol);...
}
It is possible for a send...to operation to block the sending context from further processing if the input queue of any target (context, receiver or plug-in) is full. Either an event that you send to a particular target arrives on the target’s input queue or the sending context waits for room on the target’s input queue.
If you send an event to a channel that has no subscribers, the correlator discards the event because there are no listeners for it. This is not an error.
In a monitor, you can send an event to a particular context, as described here, or you can send an event to a sequence of contexts, described in the next topic. The format for sending an event to a particular context is as follows:
send EventExpression to Expression;
or:
enqueue EventExpression to ContextExpression;
Info
The enqueue...to statement will be deprecated in a future release. Use the send...to statement. Both statements perform the same operation.
Replace EventExpression with any valid EPL expression that is of an event type. You cannot specify a string representation of an event. For example, you cannot send &TIME pseudo-ticks.
Replace Expression, in the first format, with any valid EPL expression that is of the context type or with a com.apama.Channel object that contains a context. See Sending events to com.apama.Channel objects.
Replace ContextExpression with any valid EPL expression that is of the context type. This can be the name of a context variable or a method that returns a context. This cannot be a com.apama.Channel object that contains a context.
This statement asynchronously sends an event to the specified context. The event goes to the back of the context’s input queue.
In the target context, the correlator can immediately process the sent event. The correlator does not need to finish executing the action that sent the event before it processes the sent event in the target context. The correlator might process the sent event before it finishes executing the action that sent the event. Or, the correlator might process the sent event some time after it completes executing the action that sent the event. The order is unpredictable. The order in which the target contexts receive the sent event is also unpredictable. For example:
action analyse(string symbol) {
context c:=context(symbol);
spawn submon(symbol) to c;
log "Listening for "+symbol;
on all com.apama.marketdata.Tick(symbol=symbol) as tick {
send tick to c;
}
on com.apama.marketdata.Finished() {
send com.apama.marketdata.Finished() to c;
}
}
action submon(string symbol) {
...
}
The send...to and enqueue...to statements do not place the event on the special enqueued events queue. Instead, they put the event on the end of the target context’s input queue. Consequently, it is possible for a send...to or enqueue...to operation to block the sending context from further processing if the input queue of the target context is full. Either an event that you send to a particular context arrives on the target context’s input queue or the sending context waits for room on the target context’s input queue.
If you send an event to a context that does not contain any monitor instances, the correlator discards the event because there are no listeners for it.
In some situations, for example when you change a single-context application to use parallel processing, you might want to explicitly send an event to only the context that contains the monitor instance that contains the send statement. To send an event to only this context specify:
send eventExpression to context.current()
You must set a valid value to a context variable before you send an event to the context. You cannot send an event to a context that you have declared but has not been set to a valid value. For example, the following code causes the correlator to terminate the monitor instance:
monitor m {
context c;
action onload()
{
send A() to c;
}
}
In a monitor, you can send an event to a sequence of contexts. The format for doing this is as follows:
send EventExpression to ContextSequenceExpression;
or:
enqueue EventExpression to ContextSequenceExpression;
Info
The enqueue...to statement will be deprecated in a future release. Use the send...to statement. Both statements perform the same operation.
Replace EventExpression with any valid EPL expression that is an event. You cannot specify a string representation of an event.
Replace ContextSequenceExpression with any valid EPL expression that resolves to sequence<context>. You cannot specify a sequence that contains com.apama.Channel objects.
Each statement asynchronously sends a copy of an event to each context in the specified sequence. The event goes to the back of the input queue of each context.
In each target context, the correlator can immediately process the sent event. The correlator does not need to finish executing the action that sent the event (in the source context) before it processes the sent events in the target contexts. The correlator might process a sent event before it finishes executing the action that sent the event. Or, the correlator might process a sent event some time after it completes executing the action that sent the event. The order is unpredictable, depending on the relative execution speeds of the contexts.
The following example uses the sequence type:
action analyse(string symbol) {
context c1:=context(symbol + "-1");
context c2:=context(symbol + "-2");
context c3:=context(symbol + "-3");
spawn submon(symbol) to c1;
spawn submon(symbol) to c2;
spawn submon(symbol) to c3;
sequence <context> ctxs := [ c1, c2, c3 ];
log "Listening for "+symbol;
on all com.apama.marketdata.Tick(symbol=symbol) as tick {
send tick to ctxs;
}
on com.apama.marketdata.Finished() {
send com.apama.marketdata.Finished() to ctxs;
}
}
action submon(string symbol) {
...
}
The following example uses the values() method on a dictionary of contexts to obtain a sequence of contexts:
action analyse(string symbol) {
context c1:=context(symbol + "-1");
context c2:=context(symbol + "-2");
context c3:=context(symbol + "-3");
spawn submon(symbol) to c1;
spawn submon(symbol) to c2;
spawn submon(symbol) to c3;
dictionary <string, context>
ctxs := [ "c1": c1, "c2": c2, "c3": c3 ];
log "Listening for "+symbol;
on all com.apama.marketdata.Tick(symbol=symbol) as tick {
send tick to ctxs.values();
}
on com.apama.marketdata.Finished() {
send com.apama.marketdata.Finished() to ctxs.values();
}
}
action submon(string symbol) {
...
}
The send...to and enqueue...to statements do not place the event on the special enqueued events queue. Instead, they put the event on the end of the input queue of each target context. Consequently, it is possible for a send...to or enqueue...to operation to block the sending context from further processing if the input queue of a target context is full. The sending context does not continue beyond a send...to or enqueue...to statement until the event has been placed on the input queues of all target contexts.
If one of the contexts in the sequence does not contain any monitor instances the correlator ignores the sent event in that context because there are no listeners for it.
If one of the contexts in the sequence does not have a valid value before you send an event to it then the correlator terminates the monitor instance.
Consider the following two code fragments:
for c in mySequence {
send myEvent to c;
}
send myEvent to mySequence;
Execution of each of these fragments is typically equivalent. However, you cannot rely on equivalence. When the correlator executes the first fragment, it always delivers the event to the contexts according to their order in the sequence. When the correlator executes the second fragment it can deliver the event to contexts in any order. For example, if a context’s input queue is full this can affect the order in which the correlator delivers the event to the contexts.
Apama provides a number of applications that illustrate the use of contexts. These examples are in the samples\epl\contexts directory and in the samples\epl\concurrency-theory directory.
Information for using these examples is given in the topics below.
Simple sample implementation of contexts
In your Apama installation directory, in the samples\epl\contexts directory, there are two versions of a simple application. One version implements serial processing and the other implements parallel processing. Open the analyse-parallel.mon and analyse-serial.mon files from the Input directory in Apama Plugin for Eclipse to compare the implementations.
The sample uses the PySys testing framework. To execute the sample, use pysys run. The script runs the serial application and then the parallel version.
On a 2.4GHz Quad core Intel Q6600 machine, the serial implementation completes in about 63 seconds, while the parallel implementation completes in about 17 seconds. For an equivalent dual-core processor, you can expect the parallel implementation to complete in about 30 seconds.
Look at serial-results.evt and parallel-results.evt to compare the results. While the per-symbol output for each implementation is identical, the ordering of sent events for different symbols is different. Also, in the parallel implementation, there is more variation in the time taken to process all events for one symbol. The sample uses eight worker contexts. Each context is doing much the same work, but on different segments of the data. While it is not required, an application that has eight contexts typically working most of the time benefits from running on an 8-core host. You can expect an 8-core processor to run the sample parallel implementation more than seven times faster than it runs the serial implementation.
Running samples of common concurrency problems
Sample applications in the samples\epl\concurrency-theory directory illustrate a few common concurrency problems. There are three implementations of a simple deposit bank:
Race — implements Get and Set events, and corresponding Response events, so that a teller can find the value of an account, perform some modification and then set the new account value.
Deadlock — lets tellers lock an account.
Compareswap — is similar to the race implementation but it does not rely on locking and it does not compute values based on out-of-date information.
Change to the samples/epl/concurrency-theory directory of your Apama installation.
Invoke the following:
pysys run --mode=mode
where mode can be Race, Deadlock, CompareSwap or ALL, according to which sample monitors you want the test to run. The subsequent topics describe each sample.
The script starts a correlator on the default port (15903). Consequently, you should not have a correlator already running on the default port. If you do, the script causes the application to be injected into the running correlator and it also shuts the correlator down when the sample execution is complete. The script creates an event file in the Output directory (which it creates). The event file has the name of the sample with an .evt file suffix (for example, Race.evt, Deadlock.evt or CompareSwap.evt).
About the samples of concurrency problems
The sample of concurrency problems try to implement a simple deposit bank. The customer-visible part of the bank consists of a number of tellers, who have the ability to transfer money from one account to another. In an effort to scale well, the bank is implemented with each teller running in a separate context, which lets all tellers work concurrently. Of course, the simple work of the tellers does not require or even justify this, but the purpose of these samples is to show potential bugs, not to be a practical system. Similarly, no security checks are enforced.
Because data cannot be shared between contexts, the application requires a separate monitor that acts as the bank’s database. The tellers send requests to the bank’s database and receive responses from the database. There is also a simple mechanism to initialize the state of the bank database (SetupAccount event) and for tellers to discover the context in which the database is running. The communication between the bank and the tellers typically needs to get or set an account’s value. The tellers perform the actual arithmetic on a bank account’s value. Each implementation (race, deadlock, and compareswap) differs mainly in the way the tellers and database interact with each other.
Customer interactions with tellers are the same across all implementations. The customer sends a TransferMoney event, specifying which teller to use. It is assumed that customers know the names of tellers, the from and to account, and the amount to transfer. The customer receives a TransferMoneyComplete event when the transfer is complete.
The state of the bank’s accounts can be inspected by sending a SendBalances event to the correlator, which causes the correlator to log and send the balances.
To expose the problems, there are calls to the spinSleep action at key places in the implementations. If the correlator receives an ExposeRaces event, the spinSleep action suspends work by the specified teller for the specified time. This simulates tellers working at different rates, and means that difficult to reproduce conflicts are easier to identify. While this is useful for exposing bugs, it is not suitable for general-purpose sleeps because it consumes CPU time while sleeping and does not let other work in that context get done. This strategy is useful for exposing problems only when you know exactly where to place the sleeps.
Each implementation has its own transfer-sample_name.evt file, which the script sends as each bug is exposed with a different set of input data.
About the race sample
The race sample is in Bank-race.mon. It implements Get and Set events, and corresponding Response events. A teller can find the value of an account, perform some modification and then set the new account value. To take money from one account, the protocol is as follows:
Send a Get event to obtain the current value of the account.
Wait for a GetResponse event that contains the current value.
Compute the new account value.
Send a Set event to set the new account value.
Wait for a SetResponse event.
This works well when a single transfer occurs at a time. However, there is a bug because between the time that teller 1 obtains an account value and the time that teller 1 sets the new account value, teller 2 can obtain the account value, compute a new value, and set a new account value. The following time line demonstrates this:
Time
Teller 1
Teller 2
Bank Database
0 (setup)
Transfer 50 from A to B
A: 100 B: 100 C: 100
Get A, Get B
A=100, B=100
Sleep 1 second
0.5
Transfer 25 from B to C
Get B, Get C
B=100, C=100
newB=75, newC=125
Set B, Set C
A: 100, B: 75, C: 125
1.0
newA=50, newB = 150
Set A, Set B
A: 50, B: 150, C: 125
B’s account should have 100 + 50 – 25 = 125. But it ends up with 150 because teller 1 overwrites teller 2’s value for B’s account (75). Teller 1 based its calculation on values that were out of date at the point they were sent to the database.
About the deadlock sample
While EPL does not provide any mutual exclusion locking primitives, you can implement something similar in a monitor. The deadlock sample’s bank implements a locking mechanism. Tellers can send a Lock event for an account, and the database returns a LockResponse event when the account is locked. If another teller tries to lock the same account, the correlator queues the request until it processes an Unlock event to unlock the account. Note that the locking is fair; the correlator allocates locks in the order in which they are requested.
The deadlock implementation does no checking. For example, it does not check that the unlock event comes from the teller that locked an account, nor that a teller holds a lock for an account before performing an operation on that account. (A robust application would of course perform such checking.)
The deadlock sample fixes the problem shown in the race sample where a value was overwritten by a value that resulted from computation on out-of-date values. If you replicate the race pattern of events, teller 2 would wait to lock B’s account until teller 1 had finished with it. (This assumes all tellers follow the correct protocols. A robust implementation would perform checks to ensure that was the case).
However, even when all tellers follow the locking protocol correctly, there is a different problem. If teller 1 locks account A and teller 2 locks account B, and teller 1 tries to lock account B and teller 2 tries to lock account A, then each teller waits for the other teller to release a lock. The following timeline shows this:
Time
Teller 1
Teller 2
Bank Database
0
Transfer 50 from A to B
A: 100 B: 100 C: 100
Lock A
A: Locked by t1
Sleep 1 second
0.5
Transfer 25 from B to A
Lock B
A: locked by t1 B: locked by t2
Lock A
A: locked by t1, t2 waitingB: locked by t2
(waiting for LockResponse(A))
Lock B
A: locked by t1, t2 waitingB: locked by t2, t1 waiting
1.0
(waiting for LockResponse(B))
At this point, neither teller can make any further progress.
One solution to this (not implemented here) is to implement a timeout. If a lock request is outstanding for more than some threshold, the correlator abandons the lock. When this happens, the tellers would wait a random amount of time and try again. The random wait should prevent the retries from overlapping, if not on the first retry, then on a subsequent retry. However, such a mechanism invariably performs poorly in the (hopefully rare) case that a lock times out.
Alternatively, you can prevent deadlock by defining priority orders for locks. For example, you can specify that A must always be locked before B. Applying this priority order to all transactions would prevent deadlock.
About the compareswap sample
This compareswap sample is more like the race sample. The protocol between tellers and the database consists of Get and Set events, except the Set event is a CompareSet event, which contains an expected old value. If the old value does not match the database account value, then the teller retries the operation — getting a new value and re-computing the account value.
This has the advantage that it does not rely on locking (so does not suffer from deadlock) and does not result in values computed from out of date data being set in the database.
The only disadvantage is that under some circumstances (the same as for the race sample), the tellers need to re-try a calculation. However, unlike the timeout on locking, tellers know about this as soon as they receive an event back from the database, and no timeouts are involved.
This strategy is the recommended way to share state between different contexts. Note that while it guarantees progress is made by at least one context, an interaction between the database and a single context can take an unbounded amount of time, as other contexts can require the context to re-try its transaction. A further refinement would be to use a generation counter that the correlator increments on every successful Set event. This detects the difference between the database’s value being unchanged and the database’s value being changed back to a previous value. While such a difference might not matter in many situations, it might when you are computing interest.
Info
Due to the requirement to retry, the compareswap implementation is slightly different from the race implementation. One account is modified at a time; the teller transfers money from the fromAccount, and then adds it to the toAccount.
Time
Teller 1
Teller 2
Bank Database
0 (setup)
Transfer 50 from A to B
A: 100 B: 100 C: 100
Get A
A=100
newA=50
A: 50, B: 100, C:100
Set A success
Get B
B = 100
Sleep 1
0.5
Transfer 25 from B to C
Get B
B=100
newB=75
Set B (old=100)
A: 100, B: 75, C: 100
Set B success
Get C
C=100
newC=125
Set C (old=100)
A: 50, B: 75, C: 125
Set C success
1.0
newB = 150
Set B (old=100)
A: 50, B: 75, C: 125
Set B FAILED
Get B
B = 75
newB = 125
Set B (old=75)
A: 50, B: 125, C: 125
Set B success
Contexts and correlator determinism
Creating one or more contexts makes the correlator non-deterministic. In other words, injecting the same monitor can produce different results if the monitor contains statements that spawn to contexts.
For example, suppose an application creates two contexts, spawns to each of them, and each context runs code that calls integer.getUnique(). The assignment of unique integers to contexts is not deterministic; if you re-run the code, each context might receive an integer that is different from the integer it received during the previous run. Other behavior that can be non-deterministic in a parallel processing application includes the following:
The assignment of particular IDs to particular contexts
The order in which contexts send events
The order in which contexts spawn to other contexts
How contexts affect other parts of your Apama application
When you implement contexts in an EPL application, an understanding of how contexts affect other parts of your Apama application is required.
The topics below provide information to help you understand the behavior.
About input logs and parallel processing
Applications that implement parallel processing might have non-deterministic behavior. While you can inject a parallel application into a correlator that you started with the --inputLog option, you cannot expect to use that input log to exactly duplicate correlator execution.
For applications that use multiple contexts or that send events, just re-sending the events and EPL sent to the correlator is insufficient to reproduce the same output and state. The timing of which context ran which send, emit, enqueue...to or other operation is important. Operations that can affect the state of other contexts or the sent events are non-deterministic when run in parallel.
Deadlock avoidance when parallel processing
Parallel processing in the correlator uses a message passing system. Each context has a fixed-size input queue for events (messages). A deadlock is possible when all of the following conditions are true:
Context 1 is enqueuing an event to context 2.
Context 2 is enqueuing an event to context 1.
The input queues for context 1 and context 2 are both full.
In this situation, each context is blocked from further processing until the queue of the other context is no longer full. Neither context can process the next event on its input queue. Such a deadlock is not limited to two contexts but can occur with any number of contexts enqueuing events to each other.
The correlator avoids such a deadlock by detecting the potential for it to occur and then expanding input queues as needed. Also, the correlator logs a warning that a potential deadlock was detected. The correlator expands input queues only when not doing so causes a deadlock. The correlator does not expand input queues when one or more contexts are blocked from further processing while one or more contexts are processing as usual. However, it is still possible to create applications that result in out of memory errors or other kinds of deadlocks. Out of memory errors can result from requiring excessive expansion of input queues through the deadlock avoidance mechanism, or other means, such as creating a very large sequence.
Clock ticks when parallel processing
Since all contexts receive clock ticks, timers work in all contexts. However, it is possible for some contexts to run behind others. That is, a timer in a particular monitor for which there are monitor instances in multiple contexts might fire at different points in real time. In each context, the timer can process the series of clock ticks at a speed that is different from the other contexts.
A context that is running a monitor instance in a very long running loop might not remove entries from its input queue for a long time. If a context has a full input queue the clock tick distributer thread does not block. Instead, the correlator quashes clock ticks onto the end of the context’s input queue. This means that the correlator unpacks the clock tick event when the context input queue either drains or accepts a new event. There is no perceptible difference between normally received clock ticks and quashed clock ticks.
Using EPL plug-ins in parallel processing applications
The standard MemoryStore and Time Format plug-ins are thread safe, which means that you can use them in parallel applications. The MemoryStore can be quite helpful in a parallel application and is very efficient when used simultaneously by multiple contexts.