The Functional EPL Library is available as a project bundle which you can add with either the apama_project tool or using Apama Plugin for Eclipse. This provides the following types which you can use in your EPL files with using statements:
This library provides a selection of functional operations (modeled similar to Python’s functools or itertools libraries) such as filter, map and reduce. These operate on EPL container types (sequence and dictionary) and on generators provided by this library (see also Generators). To help using these functional operators, there are also several functor actions and predicates provided within the library. A functor action can be passed to actions such as map and reduce to provide a transformation to the underlying data. It takes an argument and returns a modified version of it. A predicate is an action which takes an argument and returns a boolean to say whether a particular fact is true about that argument. They can be passed to actions such as filter to test all elements in a container.
There are two APIs for accessing the functional operators. Firstly, all of the operators are provided as static functions on the com.apama.functional.Fn type. Each of these functions takes a container (sequence, dictionary or generator) as its first argument and returns a new container with the new contents as the result, in each case using an any as the type. For example, to filter a sequence of numbers for just even numbers:
This example also shows the use of one of the functor actions, also provided on the Fn event. You can use an action or action variable with the signature action<integer> returns boolean. Some of these are provided within the library, but you can also write your own. You can combine several of these operations into a pipeline:
This returns the sum of all even numbers within the numbers container. The reduce function takes an additional first argument of the current value of the accumulator and returns the new value of the accumulator. So in this case, the signature would be action<integer, integer> returns integer.
If you are operating on a dictionary instead of a sequence, then you can use functions with one of two signature types:
action<VALUETYPE> returns RETURNTYPE signatures are invoked with each value in turn (ignoring the keys).
action<KEYTYPE, VALUETYPE> returns RETURNTYPE signatures are passed the key and the value in turn.
The second API is using the com.apama.functional.Functional type. This wraps your container and then provides the functional operators as instance methods, each one returning a new Functional object. At the end of the chain, you can either use an operator which directly returns a value like reduce, or you can call get to return the underlying result object. For example:
Functional wraps all of the operators provided as static functions on Fn. As you can see, you still use Fn to access the predicates and functor actions to use with the operators.
The following table lists the operators provided on Fn and Functional.
Operator
Description
Arguments
Returns
filter
Filters the container to only have the elements where the provided predicate is true.
Sequence, dictionary or generator.action<TYPE> returns boolean
or
action<KEY, VALUE> returns boolean
Fn: A new container of the same type as the input container.Functional: A new Functional object.
map
Applies a functor action that maps the current members of a sequence, values in a dictionary or output of a generator to new values. For dictionaries, the values are mapped and the keys are unchanged. This operator can also be used with functors that perform an operation (such as updating a data structure or field) but do not return anything, in which case map() returns an empty any instead of a sequence. However this is not possible (or useful) for generators, only sequences and dictionaries.
Sequence, dictionary or generator.action<TYPE> returns NEWTYPE
or
action<KEY, VALUE> returns NEWTYPE
Fn: A new container of the same sort as the input container, but the type returned from the functor action.Functional: A new Functional object.
mapKeys
Applies a functor action that maps the current keys of a dictionary to new keys. The values are unchanged. This operator can also be used with functors that perform an operation but do not return anything, in which case mapKeys() returns an empty any instead of a dictionary.
Dictionary.action<KEY> returns NEWKEY
or
action<KEY, VALUE> returns NEWKEY
Fn: A dictionary with the key type returned from the functor action.Functional: A new Functional object.
reduce
Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning the final result. The first call is passed a default-initialized RESULT type.
Sequence or dictionary.action<RESULT, TYPE> returns RESULT
or
action<RESULT, KEY, VALUE> returns RESULT
The result of calling the functor action across all values.
reduceFrom
Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning the final result. The first call is passed the initial value.
Sequence or dictionary. Initial value for the reduction.
action<RESULT, TYPE> returns RESULT
or
action<RESULT, KEY, VALUE> returns RESULT
The result of calling the functor action across all values.
accumulate
Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning each result in turn. The first call is passed a default-initialized RESULT type.
Sequence, dictionary or generator.action<RESULT, TYPE> returns RESULT
or
action<RESULT, KEY, VALUE> returns RESULT
Fn: A generator which iterates over the results.Functional: A new Functional object.
accumulateFrom
Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning each result in turn. The first call is passed a default-initialized RESULT type. The first call is passed the initial value.
Sequence, dictionary or generator.Initial value for the accumulation.
action<RESULT, TYPE> returns RESULT
or
action<RESULT, KEY, VALUE> returns RESULT
Fn: A generator which iterates over the result.Functional: A new Functional object.
slice
Selects a subset of a sequence or generator. Immediately consumes enough of the generator to create a concrete sequence.
Sequence or generator.Start offset (0+).
End offset (0+, or negative to count back from the end).
Fn: A sequence containing the selected elements. Functional: A new Functional object.
sliceFrom
Selects a subset from the given offset to the end of a sequence or generator. Immediately consumes enough of the generator to create a concrete sequence.
Sequence.Start offset (0+).
Fn: A sequence containing the selected elements. Functional: A new Functional object.
sliceTo
Selects a subset from the start, ending at the given offset of a sequence or generator. Immediately consumes enough of the generator to create a concrete sequence.
Sequence or generator.End offset (0+, or negative to count back from the end).
Fn: A sequence containing the selected elements. Functional: A new Functional object.
stride
Selects every nth item.
Sequence.Start offset (0+).
Fn: A sequence containing the selected elements. Functional: A new Functional object.
consume
Steps a generator the given number of times, discarding the results.
Generator. The number of times to step it.
The generator stepped n times.
quantify
Runs a predicate on each item in the container and counts how many times it returns true.
Sequence or dictionary.action<TYPE> returns boolean
The number of items in the sequence or dictionary for which the predicate returns true.
anyTrue
True if a container of booleans contains at least one True.False for the empty container.
Container of booleans.
Boolean.
allTrue
True if a container of booleans contains no False.True for the empty container.
Container of booleans.
Boolean.
Info
Important:
If you get an “Unable to find type” exception when using this library, you may need to declare a dummy variable of the container type listed in the error in order for the correlator to be able to instantiate the type, for example: any dummy := new sequence<integer>;.
The operators all return an any, which contains an appropriate container type. The item type (for example, the type within a sequence<TYPE>) is inferred from the return type of the functor action being used where possible. For example, if you call Fn.map with a functor action that returns integer, you get back a sequence<integer>. If you call a functor action which returns any (like Fn.mul), then you get back a sequence<any>, even if all the values in that sequence have the same type. If you want to extract this result, you must cast to the correct type. Passing a sequence<any> to another functional operator does work without any further changes. However, if you need to have a sequence of the actual type, you must copy them from the sequence<any>. You can do this by using toType: Fn.map([<any> 123, 456], Fn.toType("integer")) // returns [123, 456].
Fn also provides some predicates to use with filter:
Predicate
Description
isNot
Inverts another predicate. For example: Fn.filter(numbers, Fn.isNot(Fn.even))
isTrue
True if a boolean is true.
even
True if an integer is even.
odd
True if an integer is odd.
negative
True if an integer, float or decimal is less than 0.
positive
True if an integer, float or decimal is greater than 0. To include 0, use Fn.isNot(Fn.negative).
whole
True if a float or decimal does not have a fractional part. Always true for integers.
gt
True if the value is above a given threshold. Must be used with Fn.partial to provide the threshold. For greater-than-or-equal, use Fn.isNot(Fn.lt).
lt
True if the value is below a given threshold.Must be used with Fn.partial to provide the threshold. For less-than-or-equal, use isNot(Fn.gt).
fieldEqual
True if a field in the object has the given value.Must be used with Fn.partial to provide the field name and value.
isEqual
True if the two values are equal.Must be used with Fn.partial to provide the value to compare against.
isEmpty
True if the any is empty, or contains a listener, string, optional, sequence or dictionary that is empty.
`Fn` also provides some functor actions to use with `map`, `reduce` and `accumulate`:
functor action
Description
Operand type
toType
Converts items in a container to the specified type using a cast or toString conversion.
any
increment
Increments to the next integer.
integer
sum
Add up all the values.
integer, float or decimal
mean
Returns the mean of the values.
integer, float or decimal
mul
Calculates the product of the values.
integer, float or decimal
concat
Concatenates all strings.
string
callAction
Calls an instance (non-static) action on an object (such as an event or built-in type) that is supplied later, for example, by a call to map().
any
getEntry
Returns a named entry such as an event field or dictionary value from an object that is supplied later, for example, by a call to map().
any
setEntry
Sets the named field value on an object that is supplied later, for example, by a call to map().
any
setFields
Helper that initializes one or more of the fields in an event, either by field name or by position. There is also a helper action called sendToChannel for sending an event initialized using this action to a channel and returning it ready for subsequent use setting up listeners.
any
quit
Quits the given listener.
listener
Functional listeners
Fn provides some actions which interact with events and listeners. These allow you to use a functional style of code to also listen for events.
Example of how you might use these where you might have wanted to write:
on all Event(f="val1") as e or all Event(f="val2") as e or... { eventArrived(e); }
However, you have a variable number of possible values, and they are not in a contiguous range. With Fn, you can write:
sequence<listeners> ls := Fn.listenForAnyOf(["val1", "val2"], "com.apamax.samples.Event", "f", {}, eventArrived);
on Stop() {
any _ := Fn.map(ls, Fn.partial(callAction, "quit", []));
}
Another common pattern is having an asynchronous process with a completed event. You have a similar issue listening to a variable number of processes. With the functional API, you can now write this:
on Completed(id=1) and Completed(id=2) and... and not wait(TIMEOUTSECS) { onCompleted(); }
on wait(TIMEOUTSECS) and not (on Completed(id=1) and Completed(id=2) and... ) { onTimeout(); }
Lastly, we want to receive all events up until a termination condition and then process them as a collection. Rather than accumulating them all in a container manually with multiple listeners like this:
sequence<ValueEventName> vals := new sequence<ValueEventName>;
on all ValueEventName(fields=valueEventFields) as v
and not EndEventName(fields=endEventFields) or wait(timeout) { vals.append(v); }
on EndEventName(fields=endEventFields) and not wait(timeout) { onComplete(vals); }
on wait(timeout) and not EndEventName(fields=endEventFields) { onTimeout(vals); }
The following table lists the event and listener actions on Fn.
Action
Description
Arguments
Returns
waitForAllCompleted
Listens for events with a field matching every value from a functional sequence, and calls a completion action when all have arrived (similar to an EPL and event expression).
Sequence of values. Event type name.
Field name.
Timeout.
action<> on success.
sequence<listener>
getAllEvents
Listens for events matching the specified fields, and then calls an action with the received events once a terminating event arrives.
Event type name.Dictionary of event fields.
Event type name.
Dictionary of event fields.
action<sequence<EventType>> on success.
sequence<listener>
listenForAnyOf
Listens for events with a field matching any value from a given sequence, and calls an action when each one arrives (similar to an EPL or event expression).
Sequence of values. Event type name.
Field name.
Additional fields.
action<Eventtype>
sequence<listener>
onTimeout
Waits for a given timeout and then if any of the specified listeners are still valid, quits them and calls an action.
sequence<listener>``action<> on timeout.
sequence<listener>
Generators
The Functional EPL Library provides a concept of generators. Generators are objects which lazily calculate an infinite list, returning one value at a time. For example, the Fn.count generator counts upwards yielding the numbers 1, 2, 3, and so on forever.
The generators returned by the EPL Functional Library are represented in the API by an event called Generator. It is also possible to use your own events as a generator, they just need an action with signature generate() returns TYPE that returns the next value.
The simplest form of a generator is a current value (sometimes known as an accumulator) and a functor action which takes the previous value and calculates the next value. To get the next value, the generate() action is called, which steps the generator and returns the next value. You can use most of the functional operators described in Functional operators with the output of a generator. This results in a new generator that lazily evaluates the function when each value is requested. To create a generator from an existing action, you can use the generator static function on Fn:
There are also several static functions which create predefined generators on Fn. For example:
Generator g := Fn.count(); // increments from 0
Generator g := Fn.repeat("A"); // an infinite series of "A"
Each of these static functions also exists as a static function on Functional which returns a Functional object which can have operators called on them fluently:
integer evenSum := <integer> Functional.count().filter(Fn.even).sliceTo(10).reduce(Fn.sum); // sum of the first 10 even numbers
The following table lists the generator functions on Fn and Functional:
Generator
Arguments
Returns
generator
action<TYPE> returns TYPE
Returns the result of calling the functor action on the previous value, starting from a default-initialized TYPE.
generatorFrom
Initial value.action<TYPE> returns TYPE
Returns the result of calling the functor action on the previous value, starting from the initial value.
count
None.
A sequence of increasing integers, counting upwards from 1.
repeat
A value.
The given value repeated infinitely.
random
An integer, float or decimal.
Random values of the given type, between 0 and the given value.
cycle
A sequence.
Generates each value in the sequence in turn, going back to the first element after completing the sequence.
range
Start integer.End integer (End>start).
Number to skip each time (1+).
Returns a finite sequence, not a generator, of the numbers in the given range.
sequenceOf
A value.The number of times to repeat the value.
Returns a finite sequence, not a generator, containing the given value a given number of times.
Partially bound functions
Partial function binding is a concept that allows you to bind some of the arguments to a function, resulting in a function with fewer (or no) arguments that can be executed later when the remaining arguments are supplied.
Often it is useful to bind all arguments except the final one, which is then supplied by the values from a sequence (or other functional container). For example, the following produces a sequence of ["Hello Bob", "Hello Alice"]:
This can be used to specialize a callback action by providing some of the arguments in advance, similar to how capture of locals works in languages with lambdas. Without partial function evaluation, if you want to capture locals in a callback, you would have to write an event type to wrap the local you wanted to capture and provide the function as an action on it:
As an alternative, Fn provides a function to partially satisfy function arguments and return an object which can be used in the place of a functor action to later evaluate with the full arguments. For example:
The partial stores the first argument and then map is called with the second argument, evaluating the function once all arguments are available.
If you want to bind arguments in a different order - that is, you want to provide the last argument in the partial and then execute it with an earlier argument - then you can use placeholder variables provided by the Fn.$(n) method. So for example, given the following action:
The argument to Fn.$ specifies the nth argument to the resulting partial function. This is indexed starting at 1, so Fn.$(1) refers to the first argument, and so on. Any arguments which are provided to the resulting function which are not referred to by a placeholder are appended to all the specified arguments.
You can also store and directly execute a partially evaluated function with the exec function:
Partial p := Fn.partial(mul, 5);
p.exec(3);
You can also chain partials and stash multiple arguments using a sequence: