Working with streams and stream queries

EPL lets you create two kinds of queries:

  • Self-contained queries are processing elements that communicate with other self-contained queries, and with their environment, by receiving and sending events. Self-contained queries are designed to be multithreaded and to scale across machines. A self-contained query is sometimes referred to as an Apama query. This kind of query is defined in a .qry file, which cannot contain a monitor. See Defining queries.
  • Stream queries operate on streams of items to generate more valuable streams that contain derived items. Stream queries are defined in monitors. The following topics provide information about stream queries.

In stream queries, derived items can be of any EPL type. You can use standard relational operations, such as filters, joins, aggregation, and projection, to generate items. For example, you can define a query that converts a stream of raw tick data into a stream of volume-weighted average price (VWAP) items.

Stream-based language elements allow operations that refine events to be expressed more clearly and concisely than when using procedural language constructs such as event listeners. In particular, applications that need to calculate one value based on multiple items from an input stream are simpler and more efficient when written with stream queries.

Apama provides sample code that uses streams and stream queries in the samples\epl directory of your Apama installation directory.

Introduction to streams and stream networks

A stream query is part of a stream network. A stream network starts with one or more stream source templates (see Creating streams from event templates). A stream source template collects matching events received by the monitor instance and places them as items in a stream. Stream queries (see Defining stream queries) take existing streams (a stream created by a stream source template or by another stream query) and generate added-value streams that contain derived items. Finally, stream listeners (see Using output from streams) bring items out of the stream network and into procedural code. In a given stream network, upstream elements feed into downstream elements to generate derived items.

When a monitor instance receives an event that matches a stream source template, the correlator activates the stream network. The passage of time can also cause the correlator to activate a stream network. If, for example, a stream query operates on the items received within the last 5.0 seconds, then 5.0 seconds after an item arrives the correlator will again activate the stream network (see Adding window definitions to from and join clauses).

In a given stream network activation, not all stream queries and not all stream listeners necessarily receive items. Which queries and stream listeners receive items depends on the definitions of the stream queries and stream listeners. However, in a given stream network activation, the correlator passes items through all queries and stream listeners in the network that receive items. A query or stream listener that receives an item is considered to be activated. Only when processing of all activated queries and stream listeners is complete does the correlator process the next event on the context’s input queue.

In a given stream network activation, various queries can produce multiple items on their output streams. The items in a particular stream during a particular stream network activation are called a lot. If a stream query or stream listener receives a lot that contains multiple items, it processes all items as part of a single stream network activation (see Working with lots that contain multiple items, and Coassigning to sequences in stream listeners).

The items in a lot are always ordered, and the lots themselves are always ordered.

Defining streams

You can use a stream variable to reference a stream. A stream variable declaration has the following form:

stream<type> name

Replace type with the type of the items in the stream. This can be any Apama type.

Replace name with an identifier for the stream. For example:

stream<Tick> ticks;

A stream variable can be a field in an event. However, you cannot route, enqueue, or send an event that contains a stream variable field.

There are two ways to create a stream:

To obtain a reference to an existing stream, you must assign from or clone another stream value.

An inert stream never generates any output. There are a number of ways to create an inert stream including, but not limited to, the following:

  • Calling new on a stream type or a type that contains a stream
  • Declaring a global variable of stream type, or a type that contains a stream
  • Spawning a monitor instance that contains a stream value

Creating streams from event templates

A stream can be created from an event template using the all keyword, including all any() to listen for events of all types. This is referred to as a stream source template.

Example:

stream<Tick> ticks := all Tick(symbol="APMA");

This creates a stream that contains all subsequent Tick events that have the symbol APMA. You can use any single event template this way, however, you must specify the all keyword and you cannot use operators such as and or followed-by to combine several event templates. See also Stream network lifetime.

Terminating streams

If a stream goes out of scope, it continues to exist until the monitor instance terminates or the stream is explicitly terminated in some fashion. Streams are not garbage-collected. This means it is possible to leak streams, thereby consuming memory and potentially performing unnecessary computation, if you do not explicitly terminate streams.

To terminate a stream, call the quit() method on a stream variable that refers to the stream you want to terminate. For example:

stream<integer> foo := all A();
...
foo.quit();

This might also terminate connected streams. See Stream network lifetime. It is also possible to terminate connected streams by quitting a stream listener.

Using output from streams

A stream listener passes output items from a stream to procedural code. You use a from statement to create a stream listener. The from statement has two forms.

The first form of the from statement creates a stream listener that takes items from an existing stream. For example:

from sA as a {
   /* Code here executes whenever an item is available from sA. */
}

The second form of the from statement contains a stream query definition, which creates a new stream query. The stream listener takes items from the output stream of the query. For example:

from a in sA select a as a {
   /* Code here executes whenever the query produces output. */
}

The syntax for the first form of the from statement is as follows:

[listener:= ] from streamExpr coassignment block

Syntax Element

Description

listener

Optional. You can specify a listener variable to refer to the stream listener that the from statement creates. You can declare a new listener variable or a use an existing listener variable.

streamExpr

Specifies any expression of type stream except a stream query. This can be, for example, a stream variable or a stream source template. If you want to specify a stream query, use the other form of the from statement.

coassignment

You must coassign the stream output into a variable. You can either use the as operator to implicitly declare the variable in the scope of the following statement or the : assignment operator to coassign to a local or global variable of the same type as the stream output that has already been declared. For details about the characters you can specify, see Identifiers.

The output from a stream is referred to as a lot. Like an auction lot, a stream output lot can contain one or more items. If the stream output is a lot that contains more than one item, the from statement coassigns each item, in turn, to the variable. See Working with lots that contain multiple items.

A from statement cannot specify multiple coassignments.

block

Specifies a block of EPL statements, enclosed in braces. The from statement coassigns each stream output item to the specified variable and executes the block once for each output item. If the stream output is a lot that contains more than one item, and you want to execute the block just once for the lot rather than once for each item in the lot, coassign the result to a sequence. See Coassigning to sequences in stream listeners.

The syntax for the second form of the `from` statement is as follows:
[listener:=] StreamQueryDefinition coassignment block

Syntax Element

Description

listener

Optional. You can specify a listener variable to refer to the stream listener that the from statement creates. You can declare a new listener variable or a use an existing listener variable.

StreamQueryDefinition

Specifies a stream query. See Defining stream queries.

coassignment

You must coassign the stream output into a variable. You can either use the as operator to implicitly declare the variable in the scope of the following statement or the : assignment operator to coassign to a local or global variable of the same type as the stream output that has already been declared. For details about the characters you can specify, see Identifiers.

If the query outputs lots that contain more than one item, the from statement coassigns each item in the lot, in turn, to the variable. See Working with lots that contain multiple items. A from statement cannot specify multiple coassignments.

block

Specifies a block of EPL statements, enclosed in braces. The from statement coassigns each stream output item to the specified variable and executes the block once for each output item. If the stream output is a lot that contains more than one item, and you want to execute the block just once for the lot rather than once for each item in the lot, coassign the result to a sequence. See Coassigning to sequences in stream listeners.

Listener variables and streams

Like event listeners, you can assign a stream listener to a listener variable. A stream listener exists until one of the following happens:

  • The monitor instance that contains the stream listener is terminated.
  • The stream or streams the listener refers to are terminated.

If you do not want to wait for one of the above to occur, you can stop a stream listener by calling the quit() method on a listener variable that refers to it. Note that in many cases this will also terminate the stream that is feeding the stream listener. See Stream network lifetime.

Coassigning to sequences in stream listeners

Unlike event listeners, a stream query might generate multiple items for each external or routed event. This is usually due to a batched window (a window that is updated after every p seconds or after every m items arrive) or to a join operation on two streams. In this case, the correlator executes a stream listener action multiple times, once for each generated item.

In a stream query definition, a window defines the set of items from the input stream that the query operates on. See Adding window definitions to from and join clauses.

To execute the stream listener action only once, and coassign all generated items at once, specify a stream listener that coassigns to a sequence variable. The sequence must contain items of the same type as the stream. For example:

sequence<A> seqA;
from batchedEvents: seqA {
   /* seqA contains all events that arrive in this batch */
}

Defining stream queries

A stream query operates on one or two streams to transform their contents into a single output stream. A stream query definition declares an identifier for the items in the stream so that the item can be referred to by the operators in the stream query. Here is a simple stream query definition:

stream<integer> ints := from a in sA select a.i;

When the correlator executes a statement that contains a stream query definition, the correlator creates a new stream query. Each stream query has an output stream (the type of which might differ from that of the input stream).

A stream query definition is an expression that evaluates to a stream value. The value is a reference to the output stream of the generated query.

Following is an example of a simple stream query in a stream listener:

from a in sA select a.b as b {
   doSomethingWith(b);
}

The following table describes the user-defined parts of this stream listener. It is important to understand the distinctive role each one serves.

Syntax Element Description
a This is an identifier that represents the current item in the stream being queried. See Specifying input streams in from clauses.
sA This variable represents the stream being queried.
a.b This expression describes what each query result looks like. In this example, the query produces outputs from the b field of the events in the stream.
b This is the variable that you coassign the query results to so that the correlator can use the query result in the stream listener’s code block.

Linking stream queries together

A stream query definition is an expression and its result is a stream. Consequently, with one exception described below, you can use a stream query definition anywhere that you can use a stream value. For example, you can assign the resulting value to a stream variable:

stream <float> values := from a in sA select a.value;

Alternatively, you can use a stream query definition as the return value from an action. For example:

action createPriceStream (stream<Tick> ticks) returns stream<float> {
   return from t in ticks select t.price;
}

Another option is to embed a stream query within another stream query. For example:

from p in (from t in ticks where t.price > threshold select t.price)
within period
select wavg(t.price,t.volume) as vwap {
   processVwap(vwap);
}

You can use stream variables to link stream queries together, as detailed in the next section.

The exception is that you cannot use a stream query immediately after the from keyword in the first form of the from statement. For example, the following is not a valid statement:

from from t in ticks select t.price as tickPrice {
   print tickPrice.toString();
}

Instead, use the second form of the from statement and specify a stream variable or a stream source template. The following example specifies a stream variable:

from t in ticks select t.price as tickPrice{
   print tick.price.toString();
}

For more information on the different forms of the from statement, see Using output from streams.

Simple example of a stream network

Sometimes a single from statement is all that is required to achieve your goal. For example, to obtain a volume-weighted average price (VWAP) for a stock, you can add the following from statement to a monitor:

from t in all Tick(symbol="APMA")
   within period
   select wavg(t.price,t.volume) as vwap {
      processNewVwap(vwap); }

Often, however, you want to use the output from one query as the input to another query.

spreads :=
      from a in all com.apama.demo.marketdata.Depth(symbol=order.Instrument_1)
        retain 1
      from b in all com.apama.demo.marketdata.Depth(symbol=order.Instrument_2)
        retain 1
      select (a.midPrices[0] - b.midPrices[0]);
   stream<MeanSd> meanSds :=
      from s in spreads within 20.0 select MeanSd( mean(s), stddev(s) );
   stream<integer> comparison :=
      from s in spreads from m in meanSds select
        compareSpreadAndBands(s, m.mean, m.sd, order.Std_Dev_Multiplier);
   stream<integer> prevComparison :=
      from c in comparison retain 1 select rstream c;
   from c in comparison from p in prevComparison
      where c!=p select c as instruction {
        if state = WAIT_FOR_SPREAD and instruction = HOLD {
              monitorState();
        }
        if state = MONITOR and instruction != HOLD {
              waitForOrders(instruction);
        }
     }

When queries are connected like this, the set of connected queries is referred to as a stream network.

A stream network is strictly within a monitor instance. Routing an event takes that event entirely out of the stream network since the event would not be received in the same network activation even if it is received by the same monitor. Spawning a monitor makes any stream variables point to inert streams, so it is not possible to refer to a stream network from a different monitor instance.

Stream query definition syntax

A stream query definition contains several elements, some of which are optional and some of which are required. These elements, and their constituent parts, are described in the following sections. The elements appear in a stream query in this order:

FromClause [ FromClause | JoinClause ] [ WhereClause ] ProjectionDefinition

Element

Required or Optional

Description

FromClause

Required

Specifies the input stream for the query. See Specifying input streams in from clauses. A from clause can also specify which items from the input stream the query should operate on. See Adding window definitions to from and join clauses.

If a second from clause appears, the correlator performs a cross-join to combine items from the two streams. See Defining cross-joins with two from clauses.

JoinClause

Optional

Specifies a second stream for the query to operate on. The correlator performs an equi-join to combine items from the two streams. See Defining equi-joins with the join clause. A join clause can also specify which items from the input stream the query should operate on. See Adding window definitions to from and join clauses.

WhereClause

Optional

Applies a filtering criterion to the items in the window or the items produced by the join operation. See Filtering items before projection.

ProjectionDefinition

Required

Defines how the query generates output items. See Generating query results.

Identifier scope in stream queries

Consider the following code fragment:

integer a;
stream<float> prices := from a in ticks select a.price;

In this example, the a in the query refers to the current Tick item in the stream and not to the a integer variable. In a stream query, you can use an identifier that you have not previously declared. If there is a variable in a containing scope that has the same name as an identifier in the query, then for expressions in the query the identifier in the query hides the variable in the containing scope.

Following is another example of how scope works with stream queries:

integer a := 42;
from a in ticks select a.price as p {
   print a.toString(); // Prints "42" rather than one of the ticks. }

The previous code fragment illustrates that identifiers in the listener action can have the same names as identifiers in the stream query. While this is not good practice, it is important to recognize that the listener action is not part of the stream query. Consequently, an identifier in a stream query is out-of-scope in the stream query’s listener action.

Stream query processing flow

Each element of the stream query operates on the output of the previous part. To correctly define stream queries, it can be helpful to understand that items flow through the query and the correlator processes the parts of the query in the order shown in the following figure. In the figure, the dashed outlines indicate optional elements.

Illustration of how items flow through the query

As items arrive on the input stream(s) and time elapses, the window definition for each stream identifies which items from that stream the query should be processing at any given moment. This includes partitioning, if it is specified. See Adding window definitions to from and join clauses.

In queries with two input streams, the correlator combines items from the two streams by means of a cross-join operation (a second from clause) or an equi-join operation (a join clause). See Joining two streams.

The where clause, if there is one, filters items. See Filtering items before projection.

The projection definition defines how the query generates output items. This includes the select clause, which has appeared in examples such as Simple example of a stream network. See Generating query results.

Specifying input streams in from clauses

In a stream query, each from clause specifies a stream that the query is operating on. The syntax of the from clause is as follows:

from itemIdentifier in streamExpr [WindowDefinition]

Syntax Element

Description

itemIdentifier

Specify an identifier that you want to use to represent the current item in the stream you are querying. You use this identifier in subsequent clauses in the query. For details about the characters you can specify, see Identifiers. The type of the identifier is the same as the type of the items that are in the stream you are querying.

There is no link between an item identifier in a query and a variable that you might define elsewhere in your code. In other words, it is okay for an in-scope variable to have the same name as an item identifier in a query. Inside the query, the item identifier hides that variable. See the second example below.

streamExpr

Specify an expression that returns a stream type. This is the stream that you want to query.

WindowDefinition

Define which portion of the stream to query. See Adding window definitions to from and join clauses.

Examples

The query below generates a stream of float items. The item identifier is a. The stream variable, ticks, refers to a stream of Tick events. The select clause specifies that each query result item contains only the price value from the Tick event. Details about the select clause are in Generating query results.

stream<float> prices := from a in ticks select a.price;

The all keyword followed by an event template is an expression of type stream referred to as a stream source template. Consequently, you can use this in a from clause. For example, you can modify the previous example to use the stream source template directly within the stream query:

stream<float> prices :=
   from a in all Tick(symbol="APMA") select a.price;
Notes

A stream query is an expression of type stream and so anywhere that you can specify a stream expression you can use a stream query in its place. (There is one exception to this. See Linking stream queries together.) This means you can nest stream queries to create a compound stream query. For example, consider the following non-nested stream queries:

stream<A> sA := all A();

stream<integer> derived :=
   from a in sA retain 2 select mean(a.x);

stream<B> sB :=
   from a in derived within 10.0 select B(stddev(a));

An equivalent way to write this is as follows:

stream<B> sB :=
   from b in
      from a in all A() retain 2 select mean(a.x)
   within 10.0
   select B(stddev(b));

The compiler generates the same stream network in both cases, so the performance is exactly the same. However, nesting stream queries beyond one level can make the compound stream query hard to understand.

To define a query that operates on two streams, specify two consecutive from clauses or specify a from clause followed by a join clause. See Joining two streams.

Adding window definitions to from and join clauses

The items flowing through a stream are ordered. In any given activation, there are zero or more items that are current. By default, the stream query operates on those current items.

Alternatively, a window may be defined. Window definitions specify which items the query should operate on in each activation, based on (but not limited to) the following:

  • The items within a given time period
  • A maximum number of items
  • The content of the items

As the window contents change, the items in the query projection will also change: new items will be inserted and old ones removed. The output from a query is a stream of items.

If the projection is an aggregate projection then the query output is the result of evaluation of the select clause when the window contents change. See Aggregating items in projections.

If the projection is a simple, non-aggregate projection, the default output is the insert stream (or istream for short) of new projected items. Alternatively, if the rstream keyword is specified in the select clause, the output is the remove stream (or rstream) of items that have become obsolete. See also Obtaining the query’s remove stream (rstream).

Window definition syntax

There are a number of different formats and keywords that you can use to define a window on a stream. Following are the alternatives you can choose from. See the subsequent topics for details.

[partition by partitionByExpr[, partitionByExpr]...]

(
within windowDurationExpr[every batchPeriodExpr]
   [retain windowSizeExpr] [with unique keyExpr]

| retain windowSizeExpr [every batchSizeExpr] [with unique keyExpr]
)

| retain all

Every window definition specifies retain, within or both.

Syntax Element

Description

partitionByExpr

Optionally specifies an EPL expression that should involve the input item in some way and that returns a comparable type. A partition by clause effectively creates a separate window for each encountered distinct value of partitionByExpr.

windowDurationExpr

Specifies a float expression that indicates a duration of a number of seconds. The window contains the items received within the last windowDurationExpr seconds. See Defining time-based windows.

batchPeriodExpr

Specifies a float expression that indicates an interval period of a number of seconds. The window updates its contents every batchPeriodExpr seconds. See Defining batched windows.

windowSizeExpr

Specifies an integer expression that indicates the number of items you want to retain in the window. The window contains the most recent windowSizeExpr items. See Defining size-based windows.

keyExpr

Specifies an EPL expression that must contain at least one reference to the input item and must return a comparable type. See Comparable types. If you add a with unique clause, if there is more than one item in the window that has the same value for the key identified by keyExpr, only the most recently received item is considered to be in the window. See Defining content-dependent windows.

batchSizeExpr

Specifies an integer expression that indicates a number of items. The window updates its contents after every batchSizeExpr items that match the query are found. See Defining batched windows.

Omitting the window definition

The window definition is optional in a stream query. If you do not specify any window then, for any given activation of the stream query, the stream query operates on only the items that are current for that activation. Typically, this is a single event. However, if the source for this query is, for example, a stream query with a batched window, then the items in each batch will be processed together as in the following example:

stream<A> sA := from a in all A() retain 4 every 4 select a;
from a in sA select count() as c {... }

The second query receives batches of four A events and will generate a single aggregate value for each batch. For more details see Stream queries that generate lots.

Retaining all items

The simplest window is one that contains all items that have ever been in the stream. The corresponding window definition is retain all. Conceptually, once an item enters a retain all window, it remains in the window indefinitely (or until the stream query is terminated). The following query evaluates the running mean of all items that have ever been in the values stream:

stream <decimal> means := from v in values retain all select mean(v);

The retain all clause specifies an unbounded window. Unbounded windows have restrictions on their use:

  • You cannot have a partitioned or batched unbounded window.
  • You cannot perform a join operation on an unbounded window.
  • You cannot specify an unbounded window when you use rstream in the select clause of a query.

When you use a custom (user-defined) aggregate function in a query that contains an unbounded window, you cannot also use a bounded aggregate function. You should also be aware that, if you use a badly implemented custom aggregate function in a query that contains an unbounded window, then this can result in uncontrolled memory usage. See Defining custom aggregate functions.

Defining time-based windows

In a time-based window, the items are held in the window for a specific duration. The syntax for defining a time-based window is:

within windowDurationExpr

Replace windowDurationExpr with an expression that returns the number of seconds that items should remain in the window as a float value. For example, the following query calculates the sum of all items that arrived in a stream of float values during the last 1.5 seconds:

stream<float> sums := from v in values within 1.5 select sum(v);

The following diagram illustrates how this works in practice.

Illustration of time-based windows

Each column represents a time when the query window contents change whereas each row represents the arrival and lifetime of each event. As an event arrives in the window, it appears in bold purple. At each given time, the current window contents is indicated by the items enclosed by boxes. Bold purple items are new and lighter purple items are old items still in the window. The numbers at the bottom give the contents of the stream of insertions to and removals from the window in the case where each value is being selected independently, or when the aggregate sum of the values in the set of items in the window is being calculated. The query before the diagram corresponds to the aggregate projection line. The queries shown here are:

  • Simple istream projection:

    from v in values within 1.5 select v
    
  • Simple rstream projection:

    from v in values within 1.5 select rstream v
    
  • Aggregate projection:

    from v in values within 1.5 select sum(v)
    

In a simple, non-aggregate projection, when an event arrives in the window, it appears in the istream of the projection. It remains for 1.5 seconds, at which point it appears on the rstream of the projection. The aggregate projection behaves differently. Whenever an item arrives in or is removed from the window, a new sum appears on the istream of the aggregate projection.

Defining size-based windows

As well as time, you can specify windows that contain only a certain number of items. In a size-based window, as each new item arrives, it is added to the window. After the number of items in the window reaches the window size limit specified in the query, the arrival of a new item causes the removal of the oldest item from the window.

The syntax for defining a size-based window is as follows:

retain windowSizeExpr

Replace windowSizeExpr with an expression that returns how many items you want to retain in the window as an integer value. For example, the following query calculates the sum of the last two items in a stream of floats:

stream <float> sums := from v in values retain 2 select sum(v.number);

The following diagram, which uses the same notation as the previous section, illustrates how this works in practice.

Illustration of size-based windows

The query before the diagram corresponds to the aggregate projection. The three queries shown here are:

  • Simple istream projection:

    from v in values retain 2 select v.number
    
  • Simple rstream projection:

    from v in values retain 2 select rstream v.number
    
  • Aggregate projection:

    from v in values retain 2 select sum(v.number)
    

When an event arrives in the window, it appears in the istream of a simple, non-aggregate projection. The first item remains in the window when a second item arrives. When a third item arrives, the first item is no longer in the window and it appears on the rstream of the simple, non-aggregate projection. Likewise, when the fourth item arrives in the window, it appears in the istream and the second item appears on the rstream of the simple projection, and so on. The behavior of the aggregate projection is that whenever an item arrives in or is removed from the window, a new sum appears on the istream of the aggregate projection.

Combining time-based and size-based windows

Sometimes you might want to focus on the last n items received in the last d seconds. To define a window that retains items based on both time and size, use the following format in the from clause:

within windowDurationExpr retain windowSizeExpr

The within keyword and expression must be first and the retain keyword and expression must be second. As with separate size-based and time-based windows, replace windowDurationExpr with an expression that returns a number of seconds, d, as a float value. Replace windowSizeExpr with an expression that indicates how many items you want to retain in the window, n, as an integer value. The window contains the last n items received in the last d seconds. If no items were received in the last d seconds, the window is empty. For example:

from v in values within 2.5 retain 2 select sum(v);

The following diagram, which uses the same notation as the previous section, illustrates how this works in practice.

Illustration of size-based and time-based windows

The query before the diagram corresponds to the aggregate projection. The three queries shown here are:

  • Simple istream projection:

    from v in values within 2.5 retain 2 select v
    
  • Simple rstream projection:

    from v in values within 2.5 retain 2 select rstream v
    
  • Aggregate projection:

    from v in values within 2.5 retain 2 select sum(v);
    

The important point to note in this example is that some items drop out of the window before the 2.5 second period is passed. When e2 arrives, e0 and e1 are already in the window. Even though e0 has been there for only 2 seconds, it is removed because e1 and e2 are now the two most recent items received in the last 2.5 seconds.

Defining batched windows

The default behavior is that the contents of a window change upon the arrival of each item. The every keyword can be used to control when the contents of the window change: it causes the items to be added to the window in batches. Time-based windows can be controlled to update only every p seconds, and size-based windows can be controlled to update only after every m events.

The syntax for a batched window is one of the following:

within windowDurationExpr every batchPeriodExpr
| retain windowSizeExpr every batchSizeExpr
| within windowDurationExpr every batchPeriodExpr retain windowSizeExpr

Here, windowDurationExpr and windowSizeExpr retain their meaning from the previous sections. The batchPeriodExpr is an expression that returns the time, p, between updates as a float value. The batchSizeExpr is an expression that returns the number of events between updates, m, as an integer value.

When you specify within followed by every followed by retain, the every keyword always indicates a number of seconds. That is, the window updates its content every p seconds.

If no items have arrived or expired since the previous window update, the window content is unchanged and consequently the query does not execute. The correlator executes the query only when the window content changes.

Here is an example of a stream query that defines a batched, time-based window. The correlator creates the query at t=0.0.

from v in values within 1.5 every 1.0 select sum(v)

The following diagram illustrates how this works in practice.

Illustration of batched windows

The query before the diagram corresponds to the aggregate projection. The three queries shown here are:

  • Simple istream projection:

    from v in values within 1.5 every 1.0 select v
    
  • Simple rstream projection:

    from v in values within 1.5 every 1.0 select rstream v
    
  • Aggregate projection:

    from v in values within 1.5 every 1.0 select sum(v)
    

The important thing to note about the behavior of these queries is that the window content changes only every second. Nothing appears on any insert or remove stream between those points. This means that the items 10.0, 20.0 and 40.0 are not in the window at the moment they arrive, but are kept until the next multiple of 1.0 second. Item lifetimes are calculated from the item arrival time, not the point at which the batching allows the item into the window. Consequently, the lifetime of the items in the window is also affected by the batching. In these examples, you can see that the items that were delayed entering the window are only in the window for one second because they were already 0.5 seconds old at the point they entered the window. For contrast, the item with the value 30.0 remains in the window for 2.0 seconds because after 1.5 seconds the batching has not occurred, and so the window cannot change until the next multiple of 1.0 second.

In the examples given here, the batch period is smaller than the duration of the window. If the batch period is larger than the duration of the window, then some items can never enter the window, if they would have already expired by the time the next batch arrives in the window.

Batched size-based windows behave similarly to batched time-based windows, except that the batch criteria is waiting for a number of items to arrive. In that case, items always arrive in the window as a multiple of the batch size.

Batched windows produce multiple items at one time. A single group of items flowing between queries together is called a lot. A lot can contain one item or several items. A batched window is one way of producing a lot that contains several items.

Partitioning streams

The partition by clause splits a stream into partitions, based on one or more key values. The subsequent window operators are applied to the partitioned stream; the behavior is as if the window operators had been applied separately to each partition. The result of using partition by followed by a window operator is referred to as a partitioned window. You use a query with a partitioned window to retain particular items for each partition specified by the partition by clause.

Partitioning is introduced with the following syntax:

partition by partitionByExpr[, partitionByExpr]...

The partition by clause precedes other window operators, so a complete query would be:

from a in sA partition by a.x retain 2 select sum(a.y);

Each partitionByExpr is an expression that should contain at least one reference to the input item and must return a comparable type. See Comparable types. Some examples are in the following table. Assume that each partition by clause in the table starts with the following:

from a in all A()...

Definition

Description

partition by a.x

Partition on a single primitive type field of the input event. This is likely to be the most common case.

partition by a

Partition on an event’s field values. The events that have identical values for all fields are in the same partition. For example:

from a in all A()
   partition by a retain 2 select a;

Given the following input events:

A(1,1)
A(1,2)
A(1,1)

The first and third events are in the same partition, the second is not. In this case, the event type A must itself be a comparable type.

partition by 1

This is a valid partition expression, but it is not recommended. A partition expression should reference the input item in some way.

partition by f(a)

This is a valid partition expression if f() is a function that returns an appropriate type.

partition by a.x*globaldict[a.y]

Another valid partition expression.

Example:
from t in all Tick()
   partition by t.symbol retain 1
   select rstream t;

This query creates a separate partition for each new stock symbol it finds. Each partition contains the most recent Tick event for that symbol. The query output, for each encountered symbol, is the previous Tick event for that symbol. Note that it is possible for this query to consume a large quantity of memory.

Partitions and aggregate functions

The partition by clause creates several partitions within the window. However, a stream query has other parts in addition to the window. The other parts include the projection and optional join or where elements. These other parts of the query operate on a single window that contains all items from all partitions.

Likewise, when you partition a stream any specified aggregate functions aggregate over all partitions. If you want to generate separate aggregate values for different groups of events then you must specify a group by clause. See Grouping output items. A common use case is to specify matching partition by and group by clauses.

Consider the following stream query:

from a in all A() partition by a.x retain 2 select sum(a.y);

The window definition is retain 2, and this is partitioned by a.x, where x is the first field in A. There is one retain 2 partition for each value of x. Suppose this stream query receives the following input events:

A(1,1)
A(1,2)
A(2,1)
A(2,2)
A(1,3)
A(2,3)

After these events have all arrived, one partition contains A(1,2) and A(1,3) while a second partition contains A(2,2) and A(2,3). However, the parts of the query following the window definition operate on the collection of all items in all partitions. In this example, the sum() aggregate function generates 10. It does not generate a lot that contains two values of 5. Now consider the following query:

from t in all Tick()
   partition by t.symbol retain 10
   group by t.symbol
   select mean(t.price)

This query returns one mean value per symbol, which is the mean of the last 10 ticks for that symbol. If you do not want all means for all symbols in one lot, you might prefer to spawn monitors so that you have an instance of the following query for each symbol:

from t in all Tick(symbol=X)
   retain 10
   select mean(t.price)

If you do want the averages for all the symbols in the same stream, then you can specify the group key in the select clause in order to later differentiate between the output events, as in the following example:

from t in all Tick()
   partition by t.symbol retain 10
   group by t.symbol
   select Output(t.symbol, mean(t.price))

As you can see, the partition by clause is often used in conjunction with the group by clause.

Info

Tip:

In EPL, it is common to use spawn in a monitor to create separate monitor instances. For example, each monitor instance might process a separate stock symbol. Spawning separate monitor instances might be preferable to using a single monitor instance that specifies partition by in a stream query so that it, for example, processes all stock symbols. Spawning separate monitor instances can be more efficient because your application processes only the subset of symbols that are of interest. Also, the subset of symbols of interest can change through the day. Appropriate monitor instances and queries can be created as required.

See also IEEE special values in stream query expressions.

Using multiple partition by expressions

To partition a window according to multiple criteria, you can insert multiple, comma-separated expressions. For example, you can refine a previous query to produce values for different volume bands as follows:

from t in all Tick()
   partition by t.symbol, t.volume.floor()/100 retain 1
   select rstream t;

In this example, the correlator applies retain 1 to each set of ticks that share both the same symbol and the same volume (to within 100). As a result, an item is output only when a replacement tick arrives for an existing symbol in an existing volume band.

Partitioning time-based windows

If a window is purely time-based, then there is no benefit to partitioning the window. For example, consider the following two queries:

from t in all Tick() within 1.0...
from t in all Tick() partition by t.symbol within 1.0...

The first query outputs every Tick received in the last second. The second query organizes the stream of Tick events by their symbols, then gives you each one that arrived in the last second. This is still every Tick received in the last second. The correlator ignores a partition by statement if it is used only with a within window.

If your window includes a retain clause as well as a within clause then it can be helpful to use partition by, likewise if there is a with clause. See Defining content-dependent windows. For example:

from t in all Tick() partition by t.symbol within 10.0 retain 5...

This window will contain at most 5 Tick events for each different symbol received within the last 10 seconds.

Defining content-dependent windows

The contents of the window can also depend on the content of individual items in the stream. Currently, the only content-dependent window operator is the with unique clause, which limits the window to containing only the most recent item for each key value. The with unique clause can be added to a within or a retain window by following it with:

with unique keyExpr

The keyExpr follows the same rules as a partition key expression. That is, it is an expression that should contain at least one reference to the input item and must return a comparable type. See Comparable types.

If you add a with unique clause, if there is more than one item in the window that has the same value for the key identified by keyExpr, only the most recently received item is considered to be in the window. It is important to note that the with unique clause processing happens after the rest of the window processing. Consider the following query:

from p in pairs retain 3 with unique p.letter select sum(p.number)

If the most recent two events have the same letter, there will be only two events over which the sum is calculated. This is illustrated in the following diagram:

Illustration of content-dependent windows

The query before the diagram corresponds to the aggregate projection. The three queries shown here are:

  • Simple istream projection:

    from p in pairs retain 3 with unique p.letter select p
    
  • Simple rstream projection:

    from p in pairs retain 3 with unique p.letter select rstream p
    
  • Aggregate projection:

    from p in pairs retain 3 with unique p.letter select sum(p.number)
    

As you can see, when the last three items received all have a unique letter, the query behaves like a retain 3 window. When the last three items received do not all have a unique letter, the duplicate that arrived first is removed from the window. In this example, the arrival of c,5 causes the removal of c,3 even though it was one of the last three items received. In other words, the with unique clause can cause an item to be removed from the window and the sum earlier than it would otherwise be removed.

The difference between a partitioned window and a window that is using a with unique clause can be described as:

  • using partition by gives you the last three values for each key, and
  • using with unique gives you one value of each key, from the last three.

You can combine both partition by and with unique if you are using different key expressions in each clause.

Note that you cannot specify within followed by retain followed by with unique.

See also IEEE special values in stream query expressions.

Joining two streams

When a stream query operates over two input streams, it is referred to as a join operation. There are two forms of join operation available in EPL:

  • A cross-join joins every event from one stream’s window with every event in the other stream’s window.
  • An equi-join joins events only when they have matching keys.

Each form takes two input streams and produces a single output stream of combined items.

Join operations, particularly cross-joins, can create many more output events than input events, not just the same or fewer.

Defining cross-joins with two from clauses

A cross-join is defined with two from clauses, one for each stream, optionally including window definitions. A simple example of this is:

from p1 in leftPairs retain 2
   from p2 in rightPairs retain 2
   select sum(p1.num * p2.num);

This is illustrated in the following diagram, whose notation differs from the previous diagrams. Here, for each time point there are two columns, one for each side of the join. The first column, with purple events, represents the items from the first from clause and the second column, with cyan events represents the items from the second from clause. Events in bold arrived during this activation of the stream query and the boxes enclose the windows for each side. As in the previous diagrams, the output is given for each of the three kinds of projections.

Illustration of cross-joins

The query before the diagram corresponds to the aggregate projection. The three queries shown here are:

  • Simple istream projection:

    from p1 in leftPairs retain 2
       from p2 in rightPairs retain 2
       select p1.num * p2.num
    
  • Simple rstream projection:

    from p1 in leftPairs retain 2
       from p2 in rightPairs retain 2
       select rstream p1.num * p2.num
    
  • Aggregate projection:

    from p1 in leftPairs retain 2
       from p2 in rightPairs retain 2
       select sum(p1.num * p2.num);
    

As shown in the diagram, in a cross-join whenever an item arrives in a window, it is joined to every item in the other window to produce a separate output item for each combination.

Because the number of output items is the product of the size of the two windows, cross-joins are normally used for joins between at least one of:

  • A window of size 1.
  • A stream where you have omitted the window definition.

If both sides of the join omit the window definition, then for output to occur an item must arrive on each stream during the same activation of the query.

A more concrete example can be seen here:

spreads :=
      from a in all com.apama.demo.marketdata.Depth(symbol=order.Instrument_1)
        retain 1
      from b in all com.apama.demo.marketdata.Depth(symbol=order.Instrument_2)
        retain 1
      select (a.midPrices[0] - b.midPrices[0]);

This query generates the spread between the latest prices for the two identified stocks. In each from clause, the window contains one item. Whenever a new item arrives in one window, the query executes the calculation defined in the select clause and outputs the result.

To generate a running mean and a standard deviation for this spread value, you can define the following query:

stream<MeanSD> averages := from s in spreads within 20.0
   select MeansSD(mean(s),stddev(s));

Then, to obtain all three current values for the spread, the mean and the standard deviation, you can perform a join between the spreads stream and the averages stream:

stream<SpreadMeanSD> all := from s in spreads
   from a in averages
   select SpreadMeansSD(s, a.mean, a.stddev);

This query outputs a result only when there is an item currently in both spreads and averages.

In a cross-join, you cannot specify more than two from clauses.

Info

CAUTION:

Be aware that cross-joins have the potential to generate a great quantity of output. It is preferable to use cross-joins only where the window size/duration of any window involved in the cross-join is small. For example, putting 8000 events through a 100x100 cross-join produces 1.6 million output events. You cannot specify a cross-join in a query that contains an unbounded window.

Defining equi-joins with the join clause

An equi-join has a key expression for each of the two streams that are being joined. Two items are joined into an output item only if the values of their key expressions are equal. The full syntax for an equi-join, consisting of a from clause followed by a join clause, is:

from itemIdentifier1 in streamExpr1 [windowDefinition1]
   join itemIdentifier2 in streamExpr2 [windowDefinition2]
   on joinKeyExpr1 equals joinKeyExpr2

As with the partition and unique key expressions, each join key expression must return a comparable type (see Comparable types. Also, joinKeyExpr1 must include a reference to itemIdentifier1 and joinKeyExpr2 must include a reference to itemIdentifier2. Each join key may not refer to the item from the other stream. An example of an equi-join is:

from p1 in leftPairs retain 2
   join p2 in rightPairs retain 2
   on p1.letter equals p2.letter
   select sum(p1.num * p2.num);

This is illustrated in the following diagram:

Illustration of equi-joins

The query before the diagram corresponds to the aggregate projection. The three queries shown here are:

  • Simple istream projection:

    from p1 in leftPairs retain 2
       join p2 in rightPairs retain 2
       on p1.letter equals p2.letter
       select p1.num * p2.num
    
  • Simple rstream projection:

    from p1 in leftPairs retain 2
       join p2 in rightPairs retain 2
       on p1.letter equals p2.letter
       select rstream p1.num * p2.num
    
  • Aggregate projection:

    from p1 in leftPairs retain 2
       join p2 in rightPairs retain 2
       on p1.letter equals p2.letter
       select sum(p1.num * p2.num);
    

This diagram shows the input that was used in the cross-join example, but with the join changed to be an equi-join. As you can see, only the items with matching letters appear in the output. The first event on the right side of the join has the same letter as the event on the left, so an output is produced as before. When the second event arrives on the left, however, no output is produced, because the letter does not match the other side. When a b event arrives on the right side of the join, that is joined with the b event on the left.

Finally, at the end of the table you can see that the join is empty because none of the events on the left match any of the events on the right.

Here is a more concrete example of an equi-join:

from r in priceRequest
   join p in prices partition by p.symbol retain 1
   on r.symbol equals p.symbol
   select p.price

For each new stock price request, this query generates the latest price for that stock/symbol. In an equi-join, whenever an item enters a window on one side, the correlator evaluates the join condition to determine if the item matches any of the items in the window on the other side. The correlator joins and outputs each matching pair when it finds one.

Typically, you want to create a derived event that is a function of the events on both sides of the join operation. Here is another example:

from latest in latestSensorReadings
   join average in averageSensorReadings
   on latest.sensorId equals average.sensorId
   select SensorAlert(latest.sensorId, latest.value, average.mean) as alert{
      send alert to "output";
}

This query joins a stream of the most recent readings from all the sensors with a stream of averages of the same readings over some period. When a new reading appears it causes an event on the stream of averages at the same time. This causes them to be joined to create an alert that contains both the latest value and the latest average, which is then sent.

See also IEEE special values in stream query expressions.

Filtering items before projection

In a stream query, after the window definition and any join clause, you can optionally specify a where clause to filter the items produced by the window or join. The where clause specifies an arbitrary EPL expression and can filter items based on any criteria available to EPL. The syntax of the where clause is as follows:

where booleanExpr

Replace booleanExpr with a Boolean expression. This expression is referred to as the where predicate. Only those items for which the where predicate evaluates to true are passed by the filter. For example:

from t in ticks retain 100
   where t.price*t.volume>threshold
   select mean(t.price)

To calculate the mean price, this query operates on only the items whose value (t.price * t.volume) is greater than the specified threshold.

Performance

The filtering performed by the where clause happens after any window, with or join operations. In some cases, it is possible to rephrase the query to improve operational efficiency. For example:

from t in ticks within 60.0
   where t.price*t.volume>threshold
   select mean(t.price)

This query maintains a window of Tick items. Now consider this revision:

from p in
   (from t in ticks where t.price*t.volume>threshold select t.price)
   within 60.0
   select mean(p)

In the first example, the within window contains all Tick events received in the last minute. In the second example, the where clause is before the window definition so the filtering happens before items enter the window. Consequently, the window contains only float items for which the where predicate is true. These types of optimization are of particular benefit in queries that include both a where clause and a join operation (equi-join or cross-join). However, care must be taken when refactoring queries, particularly when size-based windows are involved. For example, consider the two queries below:

from t in ticks retain 100 where t.price*t.volume>threshold
   select mean(t.price)

from p in
   (from t in ticks where t.price*t.volume>threshold select t.price)
   retain 100 select mean(p)

These queries are not equivalent. The first query generates the mean of a subset of the last 100 items. The where predicate evaluated to true for only the items in the subset. The second query generates the mean of the last 100 items for which the where predicate evaluated to true.

Generating query results

The last component of a stream is the required projection definition, which specifies how to generate items for the query’s output stream. A projection definition has the following syntax:

[group by groupByExpr[, groupByExpr]...] [having havingExpr]
  select [rstream] selectExpr
Syntax Element Description
groupByExpr Each groupByExpr is an expression that returns a value of a comparable type. These expressions form the group key, which determines which group each output item is a part of. Any aggregate functions in the having or select expression operate over each group separately. See Grouping output items.
havingExpr The havingExpr expression filters output items. See Filtering items in projections.
selectExpr The value you specify for selectExpr defines the items that are the result of the query. The correlator evaluates selectExpr to generate each item that appears in the query’s output stream. The type of selectExpr identifies the type of the query’s output stream.

A projection can be one of the following kinds:

  • A simple projection does not specify any aggregate functions, nor does it specify a group by or having clause. A simple projection can be a simple istream projection or a simple rstream projection.

  • An aggregate projection specifies at least one aggregate function across the having and select expressions.

    You can specify a group by clause as part of an aggregate projection. If there is a group by clause, the group key must be one or more expressions that take the input event and return a value of a comparable type.

    You cannot specify rstream in an aggregate projection.

The following table describes the kinds of expressions that can appear in the select expression for each type of projection. In more complex expressions, the rules apply similarly to each sub-expression within that expression.

Kind of Expression

Valid in Projections

Description

Example

Non-item expression

Simple and aggregate

An external variable, constant, or method call. It does not refer to any of the input items.

select currentTime;

Item expression

Simple

A reference to the input item or a non-aggregate expression that contains at least one reference to the input item.

select a.i; select sqrt(a.x)*5.0/a.y

Group key expression

Aggregate

An expression that returns one of the group keys can also occur in the projection.

group by a.i/10 select (a.i/10)*mean(a.x);

Aggregate function expression

Aggregate

An expression that contains at least one aggregate function. Arguments to the aggregate function can include item expressions.

select mean(a.i);

Info
An expression might not be syntactically equivalent to a group by expression even though it might appear to be equivalent. For example, if the group by expression is a.i*10, you cannot specify 10*a.i as an equivalent expression. An equivalent group by expression must contain the exact sub-expression specified in the group by clause.

Aggregating items in projections

An aggregate function calculates a single value over a window. If a select expression contains any aggregate functions, then references to the input item can appear only in the arguments to those aggregate functions. Any EPL expression can appear in the arguments to the function, but other aggregate functions may not. EPL provides several built-in aggregate functions and you can define additional ones. See Defining custom aggregate functions and Built-in aggregate functions.

Grouping output items

In a select clause, when you do not specify a group by clause any aggregate functions in the projection operate on all values in the window. This is true even if you partitioned the window. To group the items in the window into one or more separate groups and to calculate an aggregate value for each group of items, use the group by clause. The syntax of the group by clause is as follows:

group by groupByExpr[, groupByExpr]...

Each groupByExpr is an expression that returns a value of a comparable type. See Comparable types.

These expressions form the group key, which determines which group each output item is a part of. Any aggregate functions in the select expression operate over each group separately.

In an aggregate projection, you can refer to any group key expressions anywhere in the select expression. However, you can refer to a query input item only in an aggregate function argument. For example:

from t in all Tick() within 30.0
   group by t.symbol select TickAverage(t.symbol, mean(t.price));

Whenever a lot arrives, this query updates one or more groups. Every group that is updated outputs a TickAverage event, and all TickAverage events are in the same lot. Each TickAverage event contains the symbol and the average price for that symbol over the last thirty seconds. If a group is not updated, it does not output a TickAverage event.

You typically use a group by clause in a stream query in conjunction with a partition by clause. In the following example, the window contains up to 10 events for each stock symbol. The aggregate projection calculates the average price separately for each symbol and each average is based on up to 10 events:

from t in ticks partition by t.symbol retain 10
   group by t.symbol select mean(t.price);
Obtaining the query’s remove stream (rstream)

For each query, there are items that have been added to the window in a given query activation and items that have been removed (they were previously in the window, but are no longer in the window). By default, a simple, non-aggregate projection returns the items that have been added to the window. This is the insert stream (istream). To obtain the items that have been removed from the window, add the rstream keyword to the select clause.

For aggregate projections, obtaining the remove stream is not meaningful, and therefore the rstream keyword is not allowed in aggregate projections.

For examples of specifying rstream, see Defining time-based windows, Defining size-based windows, Defining cross-joins with two from clauses and Defining equi-joins with the join clause.

When you specify retain all, you cannot specify rstream.

Filtering items in projections

In a stream query, as part of an aggregate projection definition, you can optionally specify a having clause to filter the items produced by the projection. The having clause specifies an arbitrary EPL expression and can filter items based on any criteria available to EPL. The syntax of the having clause is as follows:

having booleanExpr

Replace booleanExpr with a Boolean expression. This expression is referred to as the having predicate. The having predicate is evaluated for each lot that arrives. When the having predicate evaluates to false, the projection does not generate output.

Unlike the where clause, the having clause

  • Is part of the projection
  • Filters the output of the projection rather than what comes into the projection
  • Cannot refer to individual items
  • Can refer only to the group key or aggregates

A having clause can only be in an aggregate projection; it cannot be in a simple projection. Each aggregate projection must contain at least one aggregate in a having clause or in the select clause. Values for aggregates, whether in having expressions or select expressions, are always calculated over the same window(s). See Grouping output items.

For example:

from t in all Temperature() within 60.0
   having count() > 10
   select mean(t.value)

This query calculates a rolling average of temperatures over the last minute. In this stream query, the having clause permits the average to be output only when it is a reliable measure. The count() aggregate function ensures that there are sufficient measurements (at least 10) in the previous 60 seconds to compensate for any noise or one-off errors in the readings.

Because the filtering occurs after the select expression has been processed, the average is still being calculated invisibly in the background, and can be output the very moment the measurement passes the reliability criterion. In the previous example, this means that after ten items have arrived, the average of all values in the last minute is output.

Filtering grouped aggregate projections

If you specify the group by clause, the having clause operates separately on each group, just as the select clause operates separately on each group. For example, the following code changes the previous code so that it outputs a reliable rolling average for each zone:

from t in all Temperature() within 60.0
   group by t.zone
   having count() > 10
   select ZoneAverage(t.zone, mean(t.value))

Just as a distinct mean is output for each group (each zone), the criterion for the having expression are applied separately to each group. A rolling average for a zone is output only when count() > 10 is true for that zone.

Performance

It is possible for the stream network to avoid some calculations in a select clause when the having clause evaluates to false. Since maintaining aggregates can be expensive, this can be a useful optimization. When you know that a having clause can often evaluate to false, you can obtain better performance by specifying a having clause in the stream query as opposed to specifying a query like this:

from t in all Ticks(symbol="APMA") within 60.0 * 10.0
    select MeanStddev(mean(t.value), stddev(t.value)) as avg_sd {
        if(shouldOutput()) {
            send avg_sd to "output";
       }
    }

This query computes a rolling average and standard deviation over the last ten minutes of a stock, and sends them to a dashboard or similar. Optionally, the output feed that sends out the rolling average and standard deviation can be turned off, and this is indicated by the return value of the shouldOutput() action. However, even when the output is turned off, Tick events still come in and the stream network still calculates the rolling average and standard deviation.

You can rewrite the code such that turning off the output terminates the query and turning on the output restarts the query. This option loses the state of the window and introduces a 10-minute lag before accurate output is available. A better option is to add a having clause so that turning off the output removes the performance penalty without losing state. For example:

from t in all Ticks(symbols="APMA") within 60.0 * 10.0
    having shouldOutput()
    select AvgStddev(mean(t.value), stddev(t.value)) as avg_sd {
        send avg_sd to "output";
    }

The mean() and stddev() aggregates continue to accumulate state when shouldOutput() returns false, but they do not fully calculate the rolling average and standard deviation for each incoming item.

IEEE special values in stream query expressions

The following information about IEEE special values applies to the following expressions:

  • The key expression in a with unique clause
  • A partition by expression
  • The expressions that define the conditions in a join clause
  • A group by expression

If one of these expressions is a decimal or float value, or a container that involves a decimal or float value, and the decimal or float value is an IEEE special value, then the following applies:

  • NaN — This value is illegal as all or part of an expression and terminates the monitor instance.
  • Positive/negative infinity — These values are legal and all positive infinities are treated as equal as are all negative infinities.

Defining custom aggregate functions

EPL provides a number of commonly used aggregate functions that you can specify in the select clause of a query. See Aggregating items in projections. If none of these functions perform the operation you need, you can define a custom aggregate function. The format for defining a custom aggregate function is as follows:

aggregate [bounded|unbounded] aggregateName ([arglist])
   returns retType { aggregateBody }

Element

Description

bounded | unbounded

Specify bounded when you are defining a custom aggregate function that will work with only a bounded window. That is, the query cannot specify retain all. Specify unbounded when you are defining a custom aggregate function that will work with only an unbounded window. That is, the query must specify retain all.

Do not specify either bounded or unbounded when you are defining a custom aggregate function that will work with either a bounded or an unbounded window.

If you do not specify bounded, you must define the custom aggregate function so that it can handle a window that never removes items. The function should not consume memory per item in the window.

aggregateName

Specify a name for your aggregate function. This is the name you will specify when you call the function in a select clause. For details about the characters you can specify, see Identifiers.

arglist

Optionally, specify one or more comma-separated type/name pairs. Each pair indicates the type and the name of an argument that you are passing to the function. For example, (float price, integer quantity).

retType

Specify any EPL type. This is the type of the value that your function returns.

aggregateBody

The body of a custom aggregate function is similar to an event body. It can contain fields that are specific to one instance of the custom aggregate function and actions to operate on the state. The init(), add(), remove() and value() actions are special. They define how stream queries interact with custom aggregate functions.

You define custom aggregate functions outside of an event or a monitor, and the function's scope is the package in which you declare it. To use custom aggregate functions in other packages, specify the function's fully-qualified name, for example:
from a in all A() select com.myCorporation.custom.myCustomAggregate(a)

Alternatively, you can specify a using statement. For example, suppose you define the myCustomAggregate() function in the com.myCorporation.custom package. To use that function inside another package, insert a statement such as the following in the file that contains the monitor in which you want to use the function:

using com.myCorporation.custom.myCustomAggregate;

Insert the using statement after the optional package declaration, but before any other declarations. You can then simply specify the function name. For example:

from a in all A() select myCustomAggregate(a)

Be sure to inject the file that contains the function definition before you inject the files that contain monitors that use the function.

See also Names.

Example of defining a custom aggregate function

The following example shows the definition of a custom aggregate function that returns the weighted standard deviation of the input values.

aggregate bounded wstddev( float x, float w ) returns float {
   // 1st argument is the value, 2nd is the weight.
   float s0;
   float s1;
   float s2;
   action add( float x, float w ) {
      if (w != 0.0) {
         s0 := s0 + w;
         s1 := s1 + w*x;
         s2 := s2 + w*x*x;
      }
   }
   action remove( float x, float w ) {
      if (w != 0.0) {
         s0 := s0 - w;
         s1 := s1 - w*x;
         s2 := s2 - w*x*x;
      }
   }
   action value() returns float {
      if (s0 != 0.0) { return ((s2 - s1*s1/s0)/s0).sqrt(); }
      else { return float.NAN; }
   }
}

Defining actions in custom aggregate functions

Certain actions in a custom aggregate function have special meanings, and you must define them as follows:

  • init() — The init() action is optional. If a custom aggregate function defines an init() action, it must take no arguments and must not return a value. The correlator executes the init() action once for each new aggregate function instance it creates in a stream query.
  • add() — A custom aggregate function must define an add() action. The add() action must take the same ordered set of arguments that are specified in the custom aggregate function signature. That is, the names, types, and order of the arguments must all be the same. The correlator executes the add() action once for each item added to the set of items that the aggregate function is operating on.
  • remove() — A bounded aggregate function must define a remove() action. An unbounded aggregate function must not define a remove() action. If you do not specify either bounded or unbounded, the remove() action is optional. The remove() action must take the same ordered set of arguments as the add() action and must not return a value. The correlator executes the remove() action once for each item that leaves the set of items that the aggregate function is operating on. The value that remove() is called with is the same value that add() was called with.
  • value() — All custom aggregate functions must define a value() action. The value() action must take no arguments and its return type must match the return type in the aggregate function signature. The correlator executes the value() action once per lot per aggregate function instance and returns the current aggregate value to the query.

Custom aggregate functions can declare other actions, including actions that are executed by the above named actions. A custom aggregate function cannot contain a field whose name is onBeginRecovery, onConcludeRecovery, init, add, value, or remove, even if, for example, the custom aggregate function does not define a remove() action.

Overloading in custom aggregate functions

As with event types, the names of custom aggregate functions must be unique. Unlike the built-in aggregate functions, there is no overloading, so it is not possible to declare two aggregate functions with the same name and different parameters or two aggregate functions with different bounded and unbounded specifiers and the same name. For example:

aggregate unbounded max( float value) returns float {...}
aggregate bounded max( float value) returns float {...}
   // Error! You cannot use the same function name.

aggregate unbounded maxu( float value) returns float {...}
aggregate bounded maxb( float value) returns float {...}
   // Both of these queries are correct. They have different names.

In contrast, the built-in bounded and unbounded aggregate functions are overloaded.

Distinguishing duplicate values in custom aggregate functions

Each item in a stream is considered to be unique. However, when duplicate values appear in the set of items that a custom aggregate function operates on, it is not possible for the function to identify the particular instance of the value. If your implementation requires being able to distinguish between instances of duplicate values, you can accomplish this by extending the signatures of the function’s add() and remove() actions.

For example, you might see the following set of float values in a stream:

1.0   2.0   3.0   4.0   3.0   2.0   1.0

Each occurrence of a particular value in the stream represents an individual value, separate from any other occurrences of that value. But when a query presents these values to a custom aggregate function (by means of the add() and remove() actions), the value alone is not enough to identify the particular occurrence that this value represents.

To distinguish one occurrence from another, extend the action signatures as follows:

  • The add() action can return a value, which can be of any type.
  • If the add() action does return a value, then the remove() action must accept, as its last argument in addition to its standard arguments, an argument of the same type as that returned by the add() action.

When an item is added to the aggregate, the value returned by the add() action is stored with the item. When that item is removed from the aggregate, the same value will be passed to the remove() action. Thus, it is possible to distinguish between items with duplicate values by comparing the additional data that is passed to the remove() action.

The following example shows an aggregate function that returns the entire window contents, in order, as a sequence:

aggregate windowOf(float f) returns sequence<float> {
    dictionary<integer,float> d;
    integer i;

    action init() { d.clear(); i := 0; }
    action add(float f) returns integer {
        i := i+1;
        d[i] := f;
        return i;
    }
    action remove(float f, integer k) { d.remove(k); }
    action value() returns sequence<float> { return d.values(); }
}

Working with lots that contain multiple items

Each time a stream query or stream listener is activated, it might be processing more than one item at a time. Each simultaneously processed group of items is referred to as a lot. Like an auction lot, a lot can contain just one item or it can contain a number of items. Stream listeners can be activated once per item or once per lot. Stream queries try to process each item in a lot as if it arrived separately. See Behavior of stream queries with lots for a discussion of cases where this is not possible.

When a lot contains multiple items, all items in the lot appear in the output stream at the same time. However, the correlator preserves the order in which the stream query generated the items in the lot. When that output stream is the input stream for another stream query, the subsequent query uses the preserved order, if necessary, to determine how to process the items.

Stream queries that generate lots

To generate a lot that contains multiple items, a stream query must specify a simple projection or an aggregate projection that contains a group by clause. The stream query must also either receive lots that contain multiple items or must contain one of the following:

  • A batched window.
  • A timed window with the rstream keyword (this must be a simple projection, and not an aggregate projection).
  • A join of either type.

A query with a non-grouped aggregate projection never generates multiple items. It generates a single item or nothing.

A timed window with the rstream keyword can generate lots because multiple items can have the same timestamp. In a timed window, when items with the same timestamp expire, they all leave the window at the same time. However, the correlator still maintains the order in which the items were generated or received.

Behavior of stream queries with lots

This topic provides advanced information about how queries process lots that they receive on their input streams. The information here requires a thorough understanding of streams, queries, and the information about lots presented so far.

To understand how stream queries behave when receiving lots that contain more than one item, consider the window content of the query before the lot is input and the window content of the query after the lot is input. The difference between these two states determines the output of the query. For example, consider the following queries:

// event A { float x; }
stream<A>     sA := from a in all A() retain 3 every 3 select a;
stream<float> sB := from a in sA select a.x;
stream<float> sC := from a in sA select sum(a.x);

The following diagram shows the lot output by each stream on each activation of the query.

Illustration of the lot output

As can be seen, in the queries that contain aggregate functions, the aggregate expressions (and projections) are evaluated, at most, once per query activation. All queries, with the exception of those containing a group by clause, behave in this way.

Size-based windows and lots

When a size-based window is processing a lot that contains more than one item, all of the items are processed in the window before any of the rest of the stream query is processed. None of the intermediate states are visible to the query. This means that in the following query:

from a in sA retain 3 select sum(a.i);

if the window contains the events A(1), A(2) and A(3) and a lot containing both A(4) and A(5) arrives, those will displace A(1) and A(2) immediately. The state of the window A(2), A(3), A(4) will never have existed. This is more relevant when the lot contains more items than will fit in the window. In this case, if five more events arrived in a single lot, the three events will fall out of the window, the last three events will go into the window and the two interim events will disappear – never having been in the window at any point.

This behavior means that care must be taken with fixed-size windows when events might be processed in lots.

Join operations and lots

The principle of updating the state of a query in a single operation without the intermediate state being visible is most relevant for join operations. The two diagrams that follow illustrate how a cross-join behaves when several events arrive in a single lot.

In the diagrams, the items on the left side of the join are represented by the numbered items that come in from the left side, and the items on the right side of the join are represented by the lettered items that come in from the top. Each square in the grid can be a joined event. In both diagrams, the results of the join before the lot arrives are mostly highlighted in blue. The items joined after the lot arrives are mostly highlighted in teal. The relevant stream query in both examples is:

from a in sA retain 3
   from b in sB retain 3
   select C(a, b);

The complete set of values in the table represents all of the combinations of items from sA and items from sB that could possibly be generated by the join when considering alternative ways of ordering the sA and sB items arriving in the lot. In general, there is no particular ordering of the sA and sB items that is superior (more meaningful) than all other orderings. Thus, when considering the transitions, there is no preferred path from the initial window content to the final window content. Hence, it is considered that the correct output for the join is achieved by taking the difference between the initial window content and the final window content, ignoring any intermediate states.

First diagram showing joined events

In the first diagram, there are nine joined events before the lot arrives. These are represented by the seven blue squares and the two orange squares. Two items, 4 and 5, arrive on sA and displace items 1 and 2. Also, one item, d, arrives on sB. and displaces item a. The result is nine joined events after the lot arrives, of which two were there before (represented by the two orange squares, and seven are new, represented by the teal squares. A non-aggregating query that outputs the istream (as given above) would return the seven new items (shown in teal). If, instead, the query was selecting the rstream, then it would return the seven items that are no longer a result of the join (shown in blue).

Second diagram showing joined events

In the second example, there are again nine joined events before the lot arrives. These are represented by the nine blue squares. Four items, 4, 5, 6, and 7 arrive on sA and displace items 1, 2, and 3. Because this is a retain 3 window, item 4, as the oldest item in the lot, never makes it into the window. Also, items d, e, f, and g arrive on sB, which displaces items a. b, and c, and again, because it is a retain 3 window, item d never appears in the window. After the lot arrives, the result is nine new joined events, which are represented by the teal squares.

Since there are no joined events that are present both before the lot arrives and after the lot arrives, all nine events that were previously the result of the join would be returned by a query selecting the remove stream of this join. The nine new events are output by the query that selects the input stream. No events containing either 4 or d are ever visible as a result of the query, even though both values were present on one of the inputs.

Grouped projections and lots

Suppose that a query that contains a group by clause processes a lot that contains several items. The query generates new projected items for the groups where the state of the group after the lot is input differs from the state of the group before the lot is input.

Stream network lifetime

After you create a stream or stream listener, it exists until one of the following happens:

  • You explicitly terminate it.
  • The monitor that contains the stream or stream listener terminates.
  • You terminate another stream or stream listener in the same stream network and that causes the stream or stream listener to terminate.

A stream or stream listener is explicitly terminated by calling the quit() method on a variable that refers to it. Hence, to explicitly terminate a stream or stream listener, you must retain a reference it. You can also terminate a stream or stream listener by terminating a related stream or stream listener in the same stream network (as detailed below).

You can create a stream or stream listener that is not referenced by any variable and cannot be terminated by quitting any other streams or stream listeners in the stream network. If this is unintentional, then we refer to it as a stream or stream listener leak. This situation is similar to an event listener leak (see Avoiding listeners and monitor instances that never terminate). Here is an example:

action createStreamListener() returns listener {
   stream <A> sA := all A();
   listener l := from a in all A() select a.x as x { print x.toString(); }
   return l;
   // error: meant to use sA in the query above
}

Although executing the code returns a listener variable that refers to the created stream listener, it inadvertently creates an unreferenced stream (the local variable sA did refer to this stream but is no longer in scope).

Calling quit() on a stream or stream listener in a stream network typically has side effects. A side effect can be one of the following:

  • Termination of additional streams, stream queries, stream listeners, or stream event expressions.
  • Disconnection between the terminated element and another element.

When determining which queries to terminate, the correlator uses the following rule: when, due to another stream or query terminating, a query can no longer generate any output, it is also terminated. For example, the following diagram shows a stream network with two stream source templates generating input events for five queries, eventually connected to two stream listeners. There are four stream variables pointing to the streams in the network.

Diagram showing a stream network

Suppose you call quit() on either r6 or r7 (the stream variables on the right). The correlator terminates the whole of the branch from Query D down. This is because, whichever stream you quit, nothing can be generated by anything connected to those streams. stream 4, however, is also feeding Query C, which can still generate output. Therefore, the rest of the network, including Query B and both stream event expressions, remains active.

If you subsequently call quit() on r5, this will terminate the stream listener and Query C, which will then terminate stream 3 and stream 4, since they are not connected to any other queries, and also stream 1, stream 2 and both stream source templates.

The stream variables after their streams are terminated will be dummy references. Subsequent attempts to create a query using those streams are ignored (the result is an inert stream).

Disconnection vs termination

In the example above, quitting r6 disconnects Query D from stream 4. Because stream 4 has other stream queries using it, this disconnection does not terminate stream 4 immediately. Streams terminate when all the queries using them have disconnected.

If you were instead to call quit() on r4, this would terminate everything on the right side of the diagram, no matter how many queries are using stream 4. However, the stream would just be disconnected from Query C. Whether this terminates Query C depends on the state of the join in Query C. If it is joining a size-based window from stream 4, the items in the window would remain to be joined against new items in stream 3. If it was a time-based window, then Query C would remain until everything in the window had been discarded. At that point, since nothing can ever be added to that side of a join, Query C terminates, causing the rest of the network to also be terminated.

Rules for termination of stream networks

The complete set of rules for when a part of a stream network is terminated are:

  • Stream listeners:
    • quit() is called on a listener variable pointing at that stream listener.
    • The stream the listener is connected to is terminated.
  • Streams:
    • quit() is called on a stream variable pointing at that stream.
    • The stream query generating the stream is terminated.
    • All the stream queries using the stream are terminated.
  • Stream queries:
    • The stream the query generates is terminated.

    • All of the streams the query uses are terminated and either the query does not define a window or it defines a within or within...every window and there are no live items in the window.

      A live item is an item whose expiration (the item falls out of the window) can cause query output. For example, if the only items in a timed window fail to satisfy a where clause in the window definition, then those items cannot change query output when they expire.

      If none of the items in the window are live, the query terminates when all items have fallen out of the window. However, the query might terminate earlier if the correlator can determine that none of the items are live and that all streams that the query uses have terminated. Regardless of when such a query quits, there are no observable effects except in two situations:

      • The query is the only thing keeping the monitor active. That is, when the query terminates, then the monitor’s ondie() action is called.
      • Calculation of the size of the window has one or more side effects.
  • Stream source templates:
    • The stream the stream source template generates is terminated.

Using dynamic expressions in stream queries

The expressions in stream queries can contain variables and action calls from EPL. Unlike parameters to event templates, the correlator evaluates these expressions each time the query is used and not just when it is created. This allows the behavior of the query to be altered during program execution.

Behavior of static and dynamic expressions in stream queries

A static expression is an expression that refers to only static elements. Static elements are:

  • Constants (defined with the constant keyword)

  • Literal values, for example:

    from a in all A() within 20.0 select sum(a.i);
    
  • Primitive types that are local variables, for example:

    integer width := 10;
    from a in all A() retain width select sum(a.i);
    

The correlator can fully evaluate static expressions when it creates the stream query.

A dynamic expression is an expression that refers to one or more dynamic elements. In a query, the value of a dynamic expression can change throughout the lifetime of that query. Consequently, the correlator must re-evaluate each dynamic expression at appropriate points in the execution of the query.

Dynamic elements are:

  • Any reference type
  • Any monitor global variable
  • Where the stream query is created by an action on an event, the members of that event
  • Any action, method or plug-in call

The correlator fully evaluates an event template in a stream source template when the correlator creates the query. For example, consider the following two queries:

from a in all A(id=currentMatch) select a;
from a in all A() where id = currentMatch select a;

During execution, if currentMatch is a global variable, a change to the value of currentMatch affects the behavior of the second query, but it does not affect the behavior of the first query.

When to avoid dynamic expressions in stream queries

Where possible, use static expressions in preference to dynamic expressions. This allows the compiler to optimize the query to improve performance. For example, consider the following query:

stream<float> vwaps := from t in all ticks
   within vwapPeriod
   select wavg(t.price,t.volume);

When vwapPeriod is a monitor global variable whose value does not change, then it is preferable to copy the value to a local variable first. For example:

float period := vwapPeriod;
stream<float> vwaps := from t in all ticks
   within period
   select wavg(t.price,t.volume);

Similarly, if it is known that a given action call always returns the same value, then it is preferable to copy the result to a local variable and use this in place of the action call. For example:

float period := getVwapPeriod(symbol);
stream<float> vwaps := from t in all ticks
   within period
   select wavg(t.price,t.volume);

Ordering and side effects in stream queries

To determine when it is safe to use dynamic expressions in stream queries, it is important to understand that:

  • In a query, the order in which the correlator executes the action calls is not defined. Although the order is not defined, the correlator always executes the action calls in the same order for a particular Apama release.
  • When processing each item passed to the query, if an action call with a given set of arguments appears multiple times within a stream query, then the number of times the correlator executes the action is not specified. It might be equal to or less than the number of times that the action call appears within the query. However, this number is always the same for a particular release.
  • In a stream network, the order in which the correlator executes the queries is not defined except for when the output of a query forms the input to a second query. In this case, the correlator always executes the first query before the second. Again, in a particular release, the execution order is always the same.

Because of these points, it is best to avoid actions with side effects in expressions executed in stream queries. Such actions can make a program more difficult to understand and debug. Instead, execute any such actions in stream listeners.

A method or expression that produces a value has a side effect if it modifies something or interacts with something outside the program. This includes, but is not limited to:

  • Modifying a global variable
  • Changing the value of an argument
  • Calling plug-in methods
  • Routing, enqueuing, emitting or sending an event
  • Calling another action that has side effects
  • Setting up event listeners or new streams

Understanding when the correlator evaluates particular expressions

All expressions in a stream query can contain dynamic elements. To understand the behavior of a query that specifies dynamic elements, it is necessary to know under what circumstances the correlator re-evaluates an expression and uses the result in the query.

Using dynamic expressions in windows

A window definition can contain some or all of the following:

  • A partition key expression
  • The window duration, size or both duration and size
  • An every batch period or size
  • The key for a with unique clause

The following table shows when the correlator evaluates each of these:

Window Definition

Description

retain n

The correlator evaluates n every time an item arrives on the stream. The correlator uses the new value of n to calculate what should be in the window.

retain n every m

The correlator stores incoming items until the current value of m is satisfied. When m is satisfied, the correlator evaluates both n and m. The correlator uses the new value of n to calculate what should be in the window, including the stored items. Because m is evaluated only after it has been satisfied, meeting that condition is always based on the old value of m.

within d

The correlator evaluates d every time an item arrives on the stream and every time an item is due to be removed from the window. The correlator uses the new value of d to calculate what should be in the window.

within d every p

The correlator stores incoming items until p seconds have elapsed. When p seconds have elapsed, the correlator evaluates p and d only if there are any items in the window or stored. The correlator uses the new value of d to calculate what should be in the window, including stored events. The correlator uses the new value of p to determine the next time the window can change. If there are no items in the window or waiting to enter the window then, for efficiency, the correlator does not evaluate p. When the correlator evaluates p, it is always based on the old value of p.

…retain n

If a within or within every window definition also specifies retain, the correlator evaluates n whenever the window content can change. The correlator uses the new value of n to calculate what should be in the window. If the window definition specifies every, the window content can change only when p is satisfied.

Otherwise, the window content can change when an item arrives on the stream and when an item is due to be removed from the window.

partition by k1[, k2]…

If the window definition specifies a timed every p clause, the correlator evaluates each partition expression when p seconds have elapsed. Otherwise, the correlator evaluates each key expression when an item arrives on the stream. The correlator uses the new value of each key expression to calculate what should be in each partition.

with unique w

The correlator evaluates w once for each item whenever that item is about to enter the window. If there is an every clause, an item can enter the window only when m or p is satisfied. Otherwise, an item can enter the window when it arrives on the stream.

Using dynamic expressions in equi-joins

The format of a query that contains an equi-join is as follows:

from x in s1 join y in s2 on j1 equals j2...

Suppose that j1 and j2 are dynamic expressions that return the left and right join keys for each input item. The correlator evaluates these expressions once for each input item when it enters the window. This is regardless of how many items are joined from the other side.

Using dynamic expressions in where predicates

The correlator evaluates the predicate in a where clause once for each item. This happens as soon as a join operation produces an item, or if there is no join operation, as soon as an item enters a window.

Using dynamic expressions in projections

In a simple projection, the correlator evaluates the select expression once for each item. The correlator evaluates the select expression as soon as a join operation produces an item, or if there is no join operation, as soon as an item enters a window.

In a simple projection, regardless of whether the select clause specifies the rstream keyword, the correlator evaluates expressions in the projection when the items would be present on the insert stream and the results are stored until needed for the remove stream.

In an aggregate projection, the correlator evaluates expressions in the projection when the items would be present on the insert stream.

If an aggregate projection contains a group by clause the correlator evaluates the group key once for each item. This happens as soon as a join operation produces an item, or if there is no join operation, as soon as an item enters a window.

The correlator evaluates aggregate and grouped expressions in two stages. The correlator evaluates arguments to aggregate functions once for each item as soon as it is produced by a join or if there is no join, as soon as it arrives in the window. The correlator evaluates the rest of the aggregate expression once for each lot.

Examples of using dynamic expressions in stream queries

Following are some examples of using dynamic elements in stream queries. These examples are simplified, for brevity.

Example of altering query window size or period

The following code fragment shows part of a monitor that accepts requests from external entities to monitor/generate the volume-weighted average price (VWAP) for a given symbol. After you create a monitor like this, an external entity can, at any time, change the parameters that control the period over which the monitor calculates the VWAP and/or the output frequency of the VWAP events.

monitor VwapMonitor {
   VwapRequestParams params;
   action onload() {
      on all VwapRequest() as v spawn monitorVwap(v);
         // Simplified. Assumes no duplicate requests.
   }
   action monitorVwap(VwapRequest v) {
      params := v.params;
      from t in all Ticks(symbol=v.symbol)
         within params.duration
         every params.period
         select Vwap(t.symbol,wavg(t.price,t.volume)) as vwap {
            route vwap;
         }
      on all VwapRequestUpdate(symbol=v.symbol) as u {
         params := u.params;
      }
   }
}

When accumulating the raw tick data to generate the VWAP price, no prescience is involved. There is no anticipation that the window size is to be increased. Changing the within duration to a larger value causes the window duration to increase but does not recover historic events. Hence the effective sample duration over which the monitor calculates the VWAP will, over time (as new tick items arrive), extend from the smaller setting to the larger setting. When switching from a larger within duration to a smaller one, the change takes effect immediately. The correlator discards the items that are no longer in the within duration.

Example of altering a threshold

The following code fragment shows part of a monitor that accepts requests from external entities to monitor the value of the trades for a given symbol. After you create a monitor like this, an external entity can, at any time, change the thresholds at which the monitor recognizes the trade as a high value trade.

monitor CountHighValueTicks {
   float threshold;
   action onload() {
      on all CountHighValueTicksRequest() as r {spawn
         monitorHighValueTicks (r);
      }
   // Simplified. Assumes no duplicate requests.
   }
   action monitorHighValueTicks(CountHighValueTicksRequest r) {
      threshold := r.threshold;
      stream<Tick> filtered := from t in all Tick(symbol=r.symbol)
                               where t.price*t.volume > threshold
                               select t;
      from t in filtered within 60.0 every 60.0 select count() as c {
         print "Count of high value trades in previous minute: " +
            c.toString();
      }
      on all CountHighValueTicksRequestUpdate(symbol=r.symbol) as u {
         threshold := u.threshold; }
   }
}

This example uses two queries. The first query filters out any ticks with values below the threshold. The second query accumulates the high-value ticks received in the last minute and outputs the count of high-value ticks in that period. This could have been written as a single query with the filtering performed after the window operation. For example:

from t in all Ticks(symbol=v.symbol) within 60.0 every 60.0
   where t.price*t.volume > threshold select count();

However this query’s window contains all of the low value ticks received in the last 60 seconds, as well as the high value ticks. This is not an optimal use of memory resources. Hence the two query approach is preferred.

Alternatively, you can specify an embedded query to amalgamate the two queries into a single statement:

from t in
   (from t2 in ticks where t2.price*t2.volume > threshold select t2 )
   within 60.0 every 60.0
   select count() as c {... }

The parentheses around the embedded query are optional.

Example of looking up values in a dictionary

The following statement shows a query that calculates the current value of a basket of stocks based on the most recent prices for those stocks. When using dictionaries in this way, be careful to ensure that all values used as keys are in the dictionary. A missing key value causes a runtime error and the correlator terminates the monitor instance. In the example, it is assumed that the prices stream was filtered to contain prices for only the stocks in the basket.

stream<Tick> basketPrices :=
   from p in prices
   partition by p.symbol
   retain 1
   select sum( p.price * basketVolume[t.symbol] );

Example of actions and methods in dynamic expressions

Actions and methods can be considered to be dynamic elements. There are various reasons why you might want to use actions and methods in queries:

  • If you are using a particular common complex expression in several places in queries within a monitor, it might be preferable to implement this as an action.

  • If you are using a method that is implemented in a plug-in.

  • To add protection to expressions that, if unprotected, might cause runtime errors. For example:

    stream<Tick> basketPrices :=
       from p in prices
       partition by p.symbol
       retain 1
       select sum( p.price * getBasketVolume(t.symbol) );
       ...
    action getBasketVolume( string symbol) returns float {
       if ( basketVolume.hasKey(t.symbol) ) {
          return basketVolume[t.symbol];
       } else {
          return 0.0;
       }
    }
    

Troubleshooting and stream query coding guidelines

This section provides high-level guidelines for writing stream query applications that implement best practices.

Prefer on statements to from statements

Do not use streams unnecessarily. If an event expression in an on statement meets your needs, use it. Take advantage of mixing code elements for listeners and event expressions, stream processing, and responsive program actions, all in the same monitor.

Know when to spawn and when to partition

As a rule, you should listen for only those events or streams that you are interested in now. Apama applications typically define monitors that spawn to handle a new situation, for example, to automatically manage the trading of a new large order. Each monitor instance is usually interested in only one particular substream of a larger stream, for example, Tick events for a particular stock rather than all Tick events.

Consequently, the common pattern is to create a new monitor instance and for that instance to set up stream queries that process the events of interest, for example, to calculate the average price. This is more efficient than defining a monitor that processes all events (for example, all Tick events for all stocks), generates added-value items and then forwards these items to client monitors. However, there are situations when the latter approach is required. You should decide which solution approach is best in which circumstances.

Filter early to minimize resource usage

To minimize processing and memory overhead, it is preferable to filter streams as early as possible in the processing chain or network. Filtering early can reduce the number of items processed or retained in memory and can also reduce the size of the items held. If possible, filter items right at the beginning of the query chain, that is, in the event template.

For example, it is preferable to rewrite this query:

from l in all LargeEvent()
   within largeWindowPeriod
   where l.key = key
   select mean(l.value);

If the key is static, rewrite it this way:

from l in all LargeEvent(key=key)
   within largeWindowPeriod
   select mean(l.value);

If the key is dynamic, rewrite it this way:

from v in
   from l in all LargeEvent()
      where l.key = key select l.value
   within largeWindowPeriod select mean(v);

In the static case, the correlator filters the large event before the event gets to the window. In the dynamic case, the embedded query filters the event before the event gets to the window in the enclosing query. Because the select statement specifies only l.value, the correlator discards the rest of the event. There is no need to bring the whole event into the window.

Avoid duplication of stream source template expressions

When you are maintaining code, you might add a stream query whose streamExpr is an event template that is already used in a query elsewhere in the same monitor. However, duplicated stream source template expressions do not always produce the behavior you want. Consider the following two code fragments:

stream<float> means := from t in all Temperature()
   within 10.0
   select mean(t.temperature);
from t in all Temperature()
   from m in means select t-m as d {
      print "Difference from mean is " + d.toString();
   }

The first fragment behaves differently than this fragment:

stream<float> temperatures := all Temperature();
stream<float> means := from t in temperatures
   within 10.0
   select mean(t.temperature);
from t in temperatures
   from m in means
   select t-m as d {
      print "Difference from mean is " + d.toString();
   }

Of the two code fragments above, the second one has the desired behavior. The first example creates two event listeners, one for each all Temperature() clause. Each listener matches each incoming Temperature event, but the listeners trigger independently, one after the other. This means that there is no time when the second query has an item in each of its source streams. Consequently, the cross-join never produces any output.

In the second example, there is a single Temperature event listener that places matching events in the temperatures stream. The temperatures stream is the source stream for two queries. Now both source streams of the last query contain items at the same time and the query generates output.

Avoid using large windows where possible

In Apama, all data being processed is held in memory, including data within stream windows. If you specify query windows that contain a large number of items or hold items for a long period of time, the memory that the application uses necessarily increases.

A memory requirement that is more than the memory available to the application causes paging to occur, which can decrease application throughput. Where possible, consider reducing the size of any stream query windows by doing one or more of the following:

  • Filter items to reduce the number or size of the items in the window.
  • Use a complex event expression to achieve the same result.
  • Use retain all instead of specifying a within clause. See In some cases prefer retain all to a timed window for details.

In some cases prefer retain all to a timed window

When you specify retain all in a stream query, the correlator does not retain the items indefinitely. The correlator processes each new item when it arrives (for example, it might execute an aggregate function) and then discards it. Consequently, queries that specify retain all use less memory than queries that define time-based or size-based windows.

A situation that typically tempts you to define a time-based window is when you want to calculate some aggregate values for a session. For example, a session could be from the start of a day to the end of a day, or an incoming event could initiate a session that requires aggregated values such as placing an order in an automated trading system.

After the session begins, interest in the aggregated values usually continues until the session ends, for example, at the end or day or when the full volume of the placed order has been traded. In situations such as these, use a retain all window instead of a within session window.

Prefer equi-joins to cross-joins

In a query using an equi-join, the items from the two input sets are joined based on equality of key values. The identification of matching items is very efficient.

Cross-joins have no expressions, so it is more efficient to calculate them than equi-joins. However, cross-joins are less preferable to equi-joins if they produce unwanted items that must subsequently be filtered out.

Be aware that time-based windows can empty

Consider the query below:

from s in Shipment(destination="SPQ")
   within 604800.0
   select sum(s.qty)/count()

After creation of the query, suppose that several shipments are sent in the first week and no shipments are sent in the second week. The value of the count() aggregate function drops to zero, which results in an attempt to divide by zero. This terminates the monitor instance.

Be aware that fixed-size windows can overflow

Consider the following example:

stream<temperature> batchedTemperatures :=
   from t in all Temperature(sensorId="S001")
   within 60.0 every 60.0 select t;
from t in batchedTemperatures
   retain 5
   select count() as c { print c.toString(); }

During execution of the first query, suppose that more than 5 matching events are found within one minute. The query outputs all of the matching events as a single lot. A lot that contains more than 5 items overflows the retain window in the second query. All but the most recent five items are lost. Calculations operate on only the most recent 5 items.

Note that you are unlikely to need the query combination shown in the code example above.

Beware of accidental stream leaks

Just as it is possible to leak event listeners, it is also possible to leak streams. Suppose that you create a stream but you do not specify the stream as input to any query. This stream still remains in existence, keeps a monitor instance alive, and consumes resources so it is considered to be a stream leak. A stream leak causes memory to be used and not freed. It can also cause unnecessary computation to occur.

A stream leak can happen if you create a stream that you want to use later on in your code. To be able to use this stream, you must assign it to a stream variable that is in scope in the location where you want to use the stream. If the stream variable goes out of scope or you assign another stream to that variable, the original stream still exists within the monitor instance’s internal stream network, but it is no longer accessible. For example:

  • The stream variable that references the stream goes out of scope:

    action streamLeakExample1(string s) {
       stream<float> prices :=
          from t in  all Tick(symbol=s) select t.price;
          ... // If the elided code does not use the stream
    }         // a leak occurs when the prices variable goes out of scope.
    
  • You overwrite the stream variable that refers to an unused stream:

    action streamLeakExample2(pattern<string> symbols) {
       string s;
       stream<float> prices;
       for s in symbols {
          prices := from t in all Tick(symbol=s)  select t.price;
          ... // If the  elided code does not use the prices stream
              // a leak occurs when you overwrite prices.
       }
    }
    

Any code that creates a stream leak is erroneous. Code that repeatedly creates unused, inaccessible streams quickly uses up machine resources. To avoid leaking streams:

  • Avoid creating streams you do not intend to use immediately.
  • Quit a stream before the variable referring to it goes out of scope.