Scaling applications using multiple contexts or correlators
Scaling up Apama
Apama provides services for real-time matching on streams of events against hundreds of different applications concurrently. This level of capacity is made possible by the advanced matching algorithms developed for Apama’s correlator component and the scalability features of the correlator platform.
Should it prove necessary, capacity can further be increased by using multiple correlators on multiple hosts. To facilitate such multi-process deployments, Apama provides features to enable connecting components to pass events between them. It is recommended that each correlator is run on a separate host, to assist in the configuration of scaled-up topologies. However, it is possible to run multiple correlators on a single host.
Note:
This topic focuses on scaling Apama for applications written in EPL. Java plug-ins can be used if invocation of Java code is required on multiple threads, either directly from EPL or by registering an event handler. See Using EPL plug-ins written in Java. Knowledge of aspects of EPL is assumed, specifically monitors, spawning, listeners and channels. Definitions of these terms can be found in Getting started with Apama EPL.
The core event processing and matching service offered by Apama is provided by one or more correlator processes. In a simple deployment, Apama comprises a single correlator connected directly to at least one input event feed and output event receiver. Although this arrangement is suitable for a wide variety of applications (the actual size depending on the hardware in use, networking, and other available resources), for some high-end applications it may be necessary to scale up Apama by deploying multiple correlator processes on multiple hosts to partition the workload across several machines.
Partitioning strategies
Using the patterns and tools described in this guide it is possible to configure the arrangement of multiple contexts within a single correlator or multiple correlators within Apama (the engine topology). It is important to understand that the appropriate engine topology for an application is firmly dependent on the partitioning strategy to be employed. In turn, the partitioning strategy is determined by the nature of the application itself, in terms of the event rate that must be supported, the number of contexts, spawned monitors expected and the inter-dependencies between monitors and events. The following examples illustrate this.
Consider an application that monitors for changes in the values of named stocks and emits an event should a stock of interest fall below a certain value. The stocks to watch for and the prices on which to notify could be set up by initialization events, which cause monitors that contain the relevant details to be spawned. In this example, the need for partitioning arises from a very high event rate (perhaps hundreds of thousands of stock ticks per second), which is too high a rate for a single context to serially process.
A suitable partitioning scheme here might be to split the event stream such that different stock symbols are sent on different channels, perhaps by using connectivity configuration to map the stock symbol from incoming messages to the channel metadata field. In the correlator, all monitors interested in events for a given symbol would need to set up listeners in a context where a monitor has subscribed to that symbol. To achieve good scaling, the application is arranged so that each context is subscribed to only one symbol. A separate context per symbol would be created, and the monitor spawns a new monitor instance to each context. In each context, the monitor instance would execute monitor.subscribe(stockSymbol);
where stockSymbol
would have a value such as "XOM"
or "WMT"
corresponding to the stock symbol it is interested in. This application will scale well, as each event stream (for the different stock symbols) can run in parallel on the same host; this is referred to as scale-up.
Listeners in each context would listen for events matching a pattern, such as on all Tick(symbol="XOM", price < 90.0)
.
If the number of stock symbols is very large and the amount of processing for each stock symbol is large, then it may be required to run correlators on more than one host to use more hardware resources than are available in a single machine. This is referred to scale-out. To achieve scale-out, connections per channel need to be made between the Apama components by Setting up connections between correlators in a YAML configuration file.
Now consider a portfolio monitoring application that monitors portfolios of stocks, emitting an event whenever the value of a portfolio (calculated as the sum of stock price * volume held) changes by a percentage. A single spawned monitor manages each portfolio and any stock can be added to/removed from a portfolio at any time by sending suitable events.
This application potentially calls for significant processing with each stock tick, as values of all portfolios containing that stock must be re-calculated. If the number of portfolios being monitored grows very large, it may not be possible for a single context to perform the necessary recalculations for each stock tick, thus requiring the application to be partitioned across multiple contexts.
Unlike the stock watch example, it is not possible to achieve scaling to larger numbers of portfolios by splitting the event stream. Each portfolio can contain multiple stocks, and stocks can be dynamically added and removed, thus one event may be required by multiple contexts. In this case, a suitable partitioning scheme is to partition the monitor instances across contexts (as with stock watch) but to duplicate as well as split the event stream to each correlator.
For this scenario, each monitor instance is spawned to a new context and subscribes to the channels (stock symbols in this application) that it requires data for. Note that while the previous example would scale very well, this will not scale as well. In particular, if one monitor instance requires data from all or the majority of the channels, then it can become a bottleneck. However, there may be multiple such monitor instances running in parallel if they are running in separate contexts.
Similar to the stock watch application, the portfolio monitoring application may require scale-out across multiple hosts rather than just multiple correlators.
In summary, the partitioning strategy can be thought of as a formula for splitting and duplicating monitors and/or events between correlators while preserving the correct behavior of the application. In some circumstances, it may be necessary to re-write monitors that work correctly on a single correlator to allow them to be partitioned across correlators, as the following section describes.
Engine topologies
Once the partitioning strategy has been defined, in terms of which events and monitors go to which correlators, it is necessary to translate this into an engine topology. This is achieved by connecting source and target correlators on separate channels, such that events sent by a source correlator on a specific channel find their way to the correct contexts in the target correlator. A set of two or more correlators connected in this way is known as a correlator pipeline, as shown in the following image. This figure represents an example topology for a high-end application – the majority of applications use a single correlator only, or have far simpler topologies.
In this image, a correlator can perform the function of each of the 7 nodes (generator, worker, watcher, tallier). Each target correlator performs some processing before passing the results to a second worker correlator (worker3
, worker4
) in the form of events, sent on the channels as marked on the diagram. tallier
collates the results from these correlators for forwarding to any registered receivers. A final correlator, watcher
, monitors the events emitted by generator
on chan1
and chan2
and emits events (possibly containing status information or statistical analysis of the incoming event stream) to any registered receivers.
To deploy an application on a topology like that shown above requires separating the processing performed into a number of self-contained chunks. In the previous figure, it is assumed that the core processing can be serialized into three chunks, with the first two chunks split across two correlators each (worker1
/2
and worker3
/4
respectively) and the third chunk residing on a single correlator (tallier
). Intermediate results from each stage of processing are passed to the next stage as sent events, which contexts in the connected correlators receive by subscribing to the appropriate channels.
To realize this application structure requires coding each chunk of processing as one or more separate monitors, which send intermediate results as an event of known type on a pre-determined channel. These monitors can then be loaded onto the appropriate correlator. This may require an existing application that grows beyond the capacity of a single correlator, to be re-written as a number of (smaller) monitors to allow partitioning of the application processing into separate chunks in the manner described above.