Using functional operations in EPL

The Functional EPL Library is available as a project bundle which you can add with either the apama_project tool or using Apama Plugin for Eclipse. This provides the following types which you can use in your EPL files with using statements:

  • using com.apama.functional.Fn;
  • using com.apama.functional.Functional;

You can find the API documentation for all of these types in the API reference for EPL (ApamaDoc).

Functional operators

This library provides a selection of functional operations (modeled similar to Python’s functools or itertools libraries) such as filter, map and reduce. These operate on EPL container types (sequence and dictionary) and on generators provided by this library (see also Generators). To help using these functional operators, there are also several functor actions and predicates provided within the library. A functor action can be passed to actions such as map and reduce to provide a transformation to the underlying data. It takes an argument and returns a modified version of it. A predicate is an action which takes an argument and returns a boolean to say whether a particular fact is true about that argument. They can be passed to actions such as filter to test all elements in a container.

There are two APIs for accessing the functional operators. Firstly, all of the operators are provided as static functions on the com.apama.functional.Fn type. Each of these functions takes a container (sequence, dictionary or generator) as its first argument and returns a new container with the new contents as the result, in each case using an any as the type. For example, to filter a sequence of numbers for just even numbers:

sequence<integer> evens := <sequence<integer>> Fn.filter(numbers, Fn.even);

This example also shows the use of one of the functor actions, also provided on the Fn event. You can use an action or action variable with the signature action<integer> returns boolean. Some of these are provided within the library, but you can also write your own. You can combine several of these operations into a pipeline:

integer evenSum := <integer> Fn.reduce(Fn.filter(numbers, Fn.even), Fn.sum);

This returns the sum of all even numbers within the numbers container. The reduce function takes an additional first argument of the current value of the accumulator and returns the new value of the accumulator. So in this case, the signature would be action<integer, integer> returns integer.

If you are operating on a dictionary instead of a sequence, then you can use functions with one of two signature types:

  • action<VALUETYPE> returns RETURNTYPE signatures are invoked with each value in turn (ignoring the keys).
  • action<KEYTYPE, VALUETYPE> returns RETURNTYPE signatures are passed the key and the value in turn.

The second API is using the com.apama.functional.Functional type. This wraps your container and then provides the functional operators as instance methods, each one returning a new Functional object. At the end of the chain, you can either use an operator which directly returns a value like reduce, or you can call get to return the underlying result object. For example:

sequence<integer> evens := <sequence<integer>> Functional(numbers).filter(Fn.even).get();
integer evenSum := <integer> Functional(numbers).filter(Fn.even).reduce(Fn.sum);

Functional wraps all of the operators provided as static functions on Fn. As you can see, you still use Fn to access the predicates and functor actions to use with the operators.

The following table lists the operators provided on Fn and Functional.

Operator

Description

Arguments

Returns

filter

Filters the container to only have the elements where the provided predicate is true.

Sequence, dictionary or generator.action<TYPE> returns boolean

or

action<KEY, VALUE> returns boolean

Fn: A new container of the same type as the input container.Functional: A new Functional object.

map

Applies a functor action that maps the current members of a sequence, values in a dictionary or output of a generator to new values. For dictionaries, the values are mapped and the keys are unchanged. This operator can also be used with functors that perform an operation (such as updating a data structure or field) but do not return anything, in which case map() returns an empty any instead of a sequence. However this is not possible (or useful) for generators, only sequences and dictionaries.

Sequence, dictionary or generator.action<TYPE> returns NEWTYPE

or

action<KEY, VALUE> returns NEWTYPE

Fn: A new container of the same sort as the input container, but the type returned from the functor action.Functional: A new Functional object.

mapKeys

Applies a functor action that maps the current keys of a dictionary to new keys. The values are unchanged. This operator can also be used with functors that perform an operation but do not return anything, in which case mapKeys() returns an empty any instead of a dictionary.

Dictionary.action<KEY> returns NEWKEY

or

action<KEY, VALUE> returns NEWKEY

Fn: A dictionary with the key type returned from the functor action.Functional: A new Functional object.

reduce

Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning the final result. The first call is passed a default-initialized RESULT type.

Sequence or dictionary.action<RESULT, TYPE> returns RESULT

or

action<RESULT, KEY, VALUE> returns RESULT

The result of calling the functor action across all values.

reduceFrom

Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning the final result. The first call is passed the initial value.

Sequence or dictionary. Initial value for the reduction.

action<RESULT, TYPE> returns RESULT

or

action<RESULT, KEY, VALUE> returns RESULT

The result of calling the functor action across all values.

accumulate

Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning each result in turn. The first call is passed a default-initialized RESULT type.

Sequence, dictionary or generator.action<RESULT, TYPE> returns RESULT

or

action<RESULT, KEY, VALUE> returns RESULT

Fn: A generator which iterates over the results.Functional: A new Functional object.

accumulateFrom

Repeatedly calls a functor action on each value, using the output of the functor action to update an accumulator passed to the next call and returning each result in turn. The first call is passed a default-initialized RESULT type. The first call is passed the initial value.

Sequence, dictionary or generator.Initial value for the accumulation.

action<RESULT, TYPE> returns RESULT

or

action<RESULT, KEY, VALUE> returns RESULT

Fn: A generator which iterates over the result.Functional: A new Functional object.

slice

Selects a subset of a sequence or generator. Immediately consumes enough of the generator to create a concrete sequence.

Sequence or generator.Start offset (0+).

End offset (0+, or negative to count back from the end).

Fn: A sequence containing the selected elements. Functional: A new Functional object.

sliceFrom

Selects a subset from the given offset to the end of a sequence or generator. Immediately consumes enough of the generator to create a concrete sequence.

Sequence.Start offset (0+).

Fn: A sequence containing the selected elements. Functional: A new Functional object.

sliceTo

Selects a subset from the start, ending at the given offset of a sequence or generator. Immediately consumes enough of the generator to create a concrete sequence.

Sequence or generator.End offset (0+, or negative to count back from the end).

Fn: A sequence containing the selected elements. Functional: A new Functional object.

stride

Selects every nth item.

Sequence.Start offset (0+).

Fn: A sequence containing the selected elements. Functional: A new Functional object.

consume

Steps a generator the given number of times, discarding the results.

Generator. The number of times to step it.

The generator stepped n times.

quantify

Runs a predicate on each item in the container and counts how many times it returns true.

Sequence or dictionary.action<TYPE> returns boolean

The number of items in the sequence or dictionary for which the predicate returns true.

anyTrue

True if a container of booleans contains at least one True.False for the empty container.

Container of booleans.

Boolean.

allTrue

True if a container of booleans contains no False.True for the empty container.

Container of booleans.

Boolean.

Info

Important:

If you get an “Unable to find type” exception when using this library, you may need to declare a dummy variable of the container type listed in the error in order for the correlator to be able to instantiate the type, for example: any dummy := new sequence<integer>;.

The operators all return an any, which contains an appropriate container type. The item type (for example, the type within a sequence<TYPE>) is inferred from the return type of the functor action being used where possible. For example, if you call Fn.map with a functor action that returns integer, you get back a sequence<integer>. If you call a functor action which returns any (like Fn.mul), then you get back a sequence<any>, even if all the values in that sequence have the same type. If you want to extract this result, you must cast to the correct type. Passing a sequence<any> to another functional operator does work without any further changes. However, if you need to have a sequence of the actual type, you must copy them from the sequence<any>. You can do this by using toType: Fn.map([<any> 123, 456], Fn.toType("integer")) // returns [123, 456].

Fn also provides some predicates to use with filter:

Predicate

Description

isNot

Inverts another predicate. For example: Fn.filter(numbers, Fn.isNot(Fn.even))

isTrue

True if a boolean is true.

even

True if an integer is even.

odd

True if an integer is odd.

negative

True if an integer, float or decimal is less than 0.

positive

True if an integer, float or decimal is greater than 0. To include 0, use Fn.isNot(Fn.negative).

whole

True if a float or decimal does not have a fractional part. Always true for integers.

gt

True if the value is above a given threshold. Must be used with Fn.partial to provide the threshold. For greater-than-or-equal, use Fn.isNot(Fn.lt).

lt

True if the value is below a given threshold.Must be used with Fn.partial to provide the threshold. For less-than-or-equal, use isNot(Fn.gt).

fieldEqual

True if a field in the object has the given value.Must be used with Fn.partial to provide the field name and value.

isEqual

True if the two values are equal.Must be used with Fn.partial to provide the value to compare against.

isEmpty

True if the any is empty, or contains a listener, string, optional, sequence or dictionary that is empty.

`Fn` also provides some functor actions to use with `map`, `reduce` and `accumulate`:
functor action Description Operand type
toType Converts items in a container to the specified type using a cast or toString conversion. any
increment Increments to the next integer. integer
sum Add up all the values. integer, float or decimal
mean Returns the mean of the values. integer, float or decimal
mul Calculates the product of the values. integer, float or decimal
concat Concatenates all strings. string
callAction Calls an instance (non-static) action on an object (such as an event or built-in type) that is supplied later, for example, by a call to map(). any
getEntry Returns a named entry such as an event field or dictionary value from an object that is supplied later, for example, by a call to map(). any
setEntry Sets the named field value on an object that is supplied later, for example, by a call to map(). any
setFields Helper that initializes one or more of the fields in an event, either by field name or by position. There is also a helper action called sendToChannel for sending an event initialized using this action to a channel and returning it ready for subsequent use setting up listeners. any
quit Quits the given listener. listener

Functional listeners

Fn provides some actions which interact with events and listeners. These allow you to use a functional style of code to also listen for events.

Example of how you might use these where you might have wanted to write:

on all Event(f="val1") as e or all Event(f="val2") as e or... { eventArrived(e); }

However, you have a variable number of possible values, and they are not in a contiguous range. With Fn, you can write:

sequence<listeners> ls := Fn.listenForAnyOf(["val1", "val2"], "com.apamax.samples.Event", "f", {}, eventArrived);
on Stop() {
       any _ := Fn.map(ls, Fn.partial(callAction, "quit", []));
}

Another common pattern is having an asynchronous process with a completed event. You have a similar issue listening to a variable number of processes. With the functional API, you can now write this:

on Completed(id=1) and Completed(id=2) and... and not wait(TIMEOUTSECS) { onCompleted(); }
on wait(TIMEOUTSECS) and not (on Completed(id=1) and Completed(id=2) and... ) { onTimeout(); }

as:

Functional(sequenceIDs).waitForAllCompleted("Completed", "id", onCompleted).onTimeout(TIMEOUTSECS, onTimeout);

Lastly, we want to receive all events up until a termination condition and then process them as a collection. Rather than accumulating them all in a container manually with multiple listeners like this:

sequence<ValueEventName> vals := new sequence<ValueEventName>;
on all ValueEventName(fields=valueEventFields) as v
and not EndEventName(fields=endEventFields) or wait(timeout) { vals.append(v); }
on EndEventName(fields=endEventFields) and not wait(timeout) { onComplete(vals); }
on wait(timeout) and not EndEventName(fields=endEventFields) { onTimeout(vals); }

You can write this instead:

Functional.getAllEvents("ValueEventName", {...}, "EndEventName", {...}, onComplete).onTimeout(TIMEOUTSECS, onTimeout);

The following table lists the event and listener actions on Fn.

Action

Description

Arguments

Returns

waitForAllCompleted

Listens for events with a field matching every value from a functional sequence, and calls a completion action when all have arrived (similar to an EPL and event expression).

Sequence of values. Event type name.

Field name.

Timeout.

action<> on success.

sequence<listener>

getAllEvents

Listens for events matching the specified fields, and then calls an action with the received events once a terminating event arrives.

Event type name.Dictionary of event fields.

Event type name.

Dictionary of event fields.

action<sequence<EventType>> on success.

sequence<listener>

listenForAnyOf

Listens for events with a field matching any value from a given sequence, and calls an action when each one arrives (similar to an EPL or event expression).

Sequence of values. Event type name.

Field name.

Additional fields.

action<Eventtype>

sequence<listener>

onTimeout

Waits for a given timeout and then if any of the specified listeners are still valid, quits them and calls an action.

sequence<listener>``action<> on timeout.

sequence<listener>

Generators

The Functional EPL Library provides a concept of generators. Generators are objects which lazily calculate an infinite list, returning one value at a time. For example, the Fn.count generator counts upwards yielding the numbers 1, 2, 3, and so on forever.

The generators returned by the EPL Functional Library are represented in the API by an event called Generator. It is also possible to use your own events as a generator, they just need an action with signature generate() returns TYPE that returns the next value.

The simplest form of a generator is a current value (sometimes known as an accumulator) and a functor action which takes the previous value and calculates the next value. To get the next value, the generate() action is called, which steps the generator and returns the next value. You can use most of the functional operators described in Functional operators with the output of a generator. This results in a new generator that lazily evaluates the function when each value is requested. To create a generator from an existing action, you can use the generator static function on Fn:

Generator g := Fn.generator(Fn.increment);
print g.generate().toString(); // returns 1
print g.generate().toString(); // returns 2
g := <Generator> Fn.filter(g, Fn.even);
print g.generate().toString(); // returns 4
print g.generate().toString(); // returns 6

There are also several static functions which create predefined generators on Fn. For example:

Generator g := Fn.count(); // increments from 0
Generator g := Fn.repeat("A"); // an infinite series of "A"

Each of these static functions also exists as a static function on Functional which returns a Functional object which can have operators called on them fluently:

integer evenSum := <integer> Functional.count().filter(Fn.even).sliceTo(10).reduce(Fn.sum); // sum of the first 10 even numbers

The following table lists the generator functions on Fn and Functional:

Generator

Arguments

Returns

generator

action<TYPE> returns TYPE

Returns the result of calling the functor action on the previous value, starting from a default-initialized TYPE.

generatorFrom

Initial value.action<TYPE> returns TYPE

Returns the result of calling the functor action on the previous value, starting from the initial value.

count

None.

A sequence of increasing integers, counting upwards from 1.

repeat

A value.

The given value repeated infinitely.

random

An integer, float or decimal.

Random values of the given type, between 0 and the given value.

cycle

A sequence.

Generates each value in the sequence in turn, going back to the first element after completing the sequence.

range

Start integer.End integer (End>start).

Number to skip each time (1+).

Returns a finite sequence, not a generator, of the numbers in the given range.

sequenceOf

A value.The number of times to repeat the value.

Returns a finite sequence, not a generator, containing the given value a given number of times.

Partially bound functions

Partial function binding is a concept that allows you to bind some of the arguments to a function, resulting in a function with fewer (or no) arguments that can be executed later when the remaining arguments are supplied.

Often it is useful to bind all arguments except the final one, which is then supplied by the values from a sequence (or other functional container). For example, the following produces a sequence of ["Hello Bob", "Hello Alice"]:

Fn.map(["Bob", "Alice"], Fn.partial(Fn.concat, "Hello "))

This can be used to specialize a callback action by providing some of the arguments in advance, similar to how capture of locals works in languages with lambdas. Without partial function evaluation, if you want to capture locals in a callback, you would have to write an event type to wrap the local you wanted to capture and provide the function as an action on it:

event Multiplier { integer factor; action func(integer i) returns integer { return factor * i; } }
...
integer factor := 5;
Fn.map(container, Multiplier(factor).func);

As an alternative, Fn provides a function to partially satisfy function arguments and return an object which can be used in the place of a functor action to later evaluate with the full arguments. For example:

action mul(integer factor, integer i) returns integer { return factor * i; }
...
Fn.map(container, Fn.partial(mul, 5));

The partial stores the first argument and then map is called with the second argument, evaluating the function once all arguments are available.

If you want to bind arguments in a different order - that is, you want to provide the last argument in the partial and then execute it with an earlier argument - then you can use placeholder variables provided by the Fn.$(n) method. So for example, given the following action:

static action replaceString(string s, string needle, string replacement) returns string

you can call this with a map operator, using a placeholder to specify that the value from map should be the first argument:

log Fn.map(["Hi Bob", "Hi Alice"],
           Fn.partial(replaceString,
                      [Fn.$(1), "Hi", "Hello"])
          ).valueToString(); // prints ["Hello Bob", "Hello Alice"]

The argument to Fn.$ specifies the nth argument to the resulting partial function. This is indexed starting at 1, so Fn.$(1) refers to the first argument, and so on. Any arguments which are provided to the resulting function which are not referred to by a placeholder are appended to all the specified arguments.

You can also store and directly execute a partially evaluated function with the exec function:

Partial p := Fn.partial(mul, 5);
p.exec(3);

You can also chain partials and stash multiple arguments using a sequence:

Fn.partial(fn, [1,2,3]).partial(4).exec(5); // executes fn(1,2,3,4,5)