In stream queries, you can specify aggregate functions in the select clause. An aggregate function calculates a single value across all items currently in the window. EPL provides a number of commonly used aggregate functions. If a supplied aggregate function does not meet your needs, you can define a custom aggregate function.
EPL provides built-in aggregate functions in the com.apama.aggregates package. All of these functions are available for either bounded or unbounded use. See the com.apama.aggregates package in the API reference for EPL (ApamaDoc) for detailed information on each built-in aggregate function.
To use a built-in aggregate function in a stream query, you must do one of the following:
Specify the full name of the aggregate function. For example:
select com.apama.aggregates.sum(x)
For each aggregate function you want to use in your code, add a using statement. This lets you specify aggregate function names without specifying the package name. For example:
using com.apama.aggregates.mean;
using com.apama.aggregates.stddev;
...
...select MeanSD( mean(s), stddev(s) );
Insert the using statement after the optional package declaration and before any other declarations in the .mon file.
In a stream query, you can specify an aggregate function in the select clause. If one of the supplied aggregate functions does not meet your needs, you can define a custom aggregate function for use in a select clause.
You define custom aggregate functions in a .mon file and outside of an event or a monitor. The aggregate function’s scope is the package in which you declare it. To use custom aggregate functions in monitors in other packages, specify the aggregate function’s fully-qualified name, for example:
from a in all A() select com.myCorporation.custom.myCustomAggregate(a)
Specify bounded when you are defining a custom aggregate function that will work with only a bounded window. That is, a stream query cannot specify retain all. Specify unbounded when you are defining a custom aggregate function that will work with only an unbounded window. That is, a stream query must specify retain all. Do not specify either bounded or unbounded when you are defining a custom aggregate function that will work with either a bounded or an unbounded window.
The name of a custom aggregate function must be unique within a package; you cannot overload it or define an event or monitor with the same name as an aggregate function.
The list of formal parameters consists of zero or more comma-separated type/name pairs. Each pair indicates the type and the name of an argument that you are passing to the aggregate function. For example, (float price, integer quantity).
The data type name must be an EPL type. This is the type of the value that your aggregate function returns.
The body of a custom aggregate function can contain fields that are specific to one instance of the custom aggregate function and actions to operate on the state.
Actions
In a custom aggregate function, the init(), add(), remove() and value() actions are special. They define how stream queries interact with custom aggregate functions.
init() — If a custom aggregate function defines an init() action, it must take no arguments and must not return a value. The correlator executes the init() action once for each new aggregate function instance it creates in a stream query.
add() — A custom aggregate function must define an add() action. The add() action must take the same ordered set of arguments that are specified in the custom aggregate function signature. That is, the names, types, and order of the arguments must all be the same. The correlator executes the add() action once for each item added to the set of items that the aggregate function is operating on.
remove() — A bounded aggregate function must define a remove() action. An unbounded aggregate function must not define a remove() action. If you do not specify either bounded or unbounded, the remove() action is optional. The remove() action must take the same ordered set of arguments as the add() action, followed by an argument of the type returned by add(), if any, and must not return a value. The correlator executes the remove() action once for each item that leaves the set of items that the aggregate function is operating on. The value that remove() is called with is the same value that add() was called with.
value() — All custom aggregate functions must define a value() action. The value() action must take no arguments and its return type must match the return type in the aggregate function signature. In a stream query, the correlator executes the value() action once per batch per group, and returns the current aggregate value to the query.
Custom aggregate functions can declare other actions, including actions that are executed by the above named actions. A custom aggregate function cannot contain a field whose name is onBeginRecovery, onConcludeRecovery, init, add, value, or remove, even if, for example, the custom aggregate function does not define a remove() action.
Fields
In the body of a custom aggregate function, you can define fields that are specific to the custom aggregate instance they are in.