Using EPL plug-ins

In EPL programs (monitors), you can use standard EPL plug-ins provided with Apama and you can also use EPL plug-ins that you define yourself. An EPL plug-in consists of an appropriately formatted library of C++ functions, which can be called from within EPL code. In the case of a plug-in written in Java, the Java classes that are called from an application’s EPL code are contained in a jar file. The correlator does not need to be modified to enable or to integrate with a plug-in, as the plug-in loading process is transparent and occurs dynamically when required.

To write custom EPL plug-ins, see Developing EPL plug-ins.

When using a plug-in, you can call the functions it contains directly from EPL, passing EPL variables and literals as parameters, and getting return values that can be manipulated.

Overhead of using plug-ins

The overhead when using EPL plug-ins is very small.

However, you do need to ensure that you do not block the correlator for a long period of time. For example, you do not want to use a plug-in for doing extensive, synchronous, time-consuming calculations.

If you need to perform a time-consuming operation, use asynchronous processing in the plug-in to perform the calculation on a background thread, and then deliver the result via an event to the monitor instance which requested it. For example, your plug-in method might return an ID for the monitor to listen to an event with that ID. When the calculation is complete, the plug-in sends an event with that ID to the context from which it was originally called:

integer id := plugin.compute(inputs);
on ComputeResult(id=id) as cr {
     // process cr.result
}

When to use plug-ins

Custom plug-ins can be written in C++, Java or Python. A custom plug-in is a suitable solution in the following situations:

  • You have an in-house or third-party library of (possibly complex) functions or classes that you want to re-use.
  • The operations you need to perform are more easily/efficiently performed in another language than using EPL. For example, you need to use data structures that are not easily represented in EPL.
Info
If your concern is purely performance, then the compiled EPL runtime (available on Linux, see also the --runtime option in Starting the correlator) may be sufficient and in some cases can produce results better than other languages, including C++ and Java.

When not to use plug-ins

In general, when you can efficiently write the desired operation in EPL, an all-EPL solution is preferable to one that involves custom-developed plug-ins. Apama customers who experience problems with correlator stability when using custom-developed plug-ins will be asked by Cumulocity product support to remove the plug-in and reproduce the problem prior to being offered further technical help. Cumulocity product support lifts this restriction only if the plug-ins have certification from Apama product management.

Using the TimeFormat event library

The TimeFormat event library uses the Time Format plug-in.

The TimeFormat event provides routines to obtain the current time and convert to or from string representations of time.

Internally, the correlator expresses time as seconds since the Unix epoch (1 Jan 1970, midnight UTC). UTC has no daylight savings applied and each day consists of 86,400 seconds. Leap seconds are effectively treated as double-length seconds, so that the number of seconds in the day is constant. This is the form of currentTime and is convenient for performing arithmetic, such as differences between times. For more information on this variable, see currentTime.

To convert from string form to float form, use a parseTime method. To convert from float form to string form, use a format method. Both take a format String, which is a string which describes the string form of the time. For more information, see Format specification for the TimeFormat functions.

The parseTime method is available on the TimeFormat event directly. Or you can pre-compile a pattern and then perform parsing using the compiled pattern. A CompiledPattern object is obtained from the TimeFormat event using one of the compilePattern methods (depending on which time zone the pattern should use by default). The CompiledPattern object can be stored in a monitor variable, as an instance of an event or in a local variable and used by listeners. Re-using a CompiledPattern is more efficient than calling one of the TimeFormat.parseTime methods as the format String only needs to be read and compiled once. Calling parse on the TimeFormat event is equivalent to passing the same format String to generate a CompiledPattern and calling parse on that event. It is also possible to create multiple CompiledPattern events if your application needs to use several different formats for time.

For example, the following will behave the same:

TimeFormat.parseTime(pattern, time);
TimeFormat.compilePattern(pattern).parseTime(time);

There are also functions to obtain the current system time. It depends on whether your purpose is to measure the time intervals between two events on the same machine (getMicroTime()), or to identify when an event occurred in human terms or across machines (getSystemTime() or currentTime).

  • getSystemTime() provides an absolute time. Its purpose is to give an indication of the wall clock time rather than measuring time intervals. You would see a jump in the values if someone changed the time (for example, manually in the operating system or to synchronize with an NTP server). These jumps would be annoying if you were trying to measure latency of some operation. Whereas, if you use getMicroTime(), you would not see any effect from such a change to the operating system time.
  • getMicroTime() provides a high precision time, which is suitable for high precision relative times. The absolute value of getMicroTime() depends on the host operating system. Its main purpose is for measuring small durations of time. Unless you need such high precision, it is probably better to use currentTime or getSystemTime() instead.

The following table gives a brief overview of the different time functions:

getMicroTime() getSystemTime() currentTime
Epoch Undefined / determined by the operating system Unix epoch (midnight 1 January 1970 UTC) Unix epoch (midnight 1 January 1970 UTC)
Affected by the host operating system’s automatic adjustment for daylight savings No No No
Affected by a change to the host operating system’s time zone No No No
Affected by a manual change to the host operating system’s time No Yes Yes
Affected by the host operating system’s automatic time synchronization No Yes Yes
Represents a unique point in time No Yes, except for leap seconds Yes, except for leap seconds
Precision Microseconds Milliseconds Dependent on the correlator clock rate *
Remains constant while processing a single event No No Yes
Use for date/time formatting No Yes Yes
Info
All of the above are floating point values representing seconds since their epoch.

* See the documentation for the --frequency correlator command-line option (in Starting the correlator) and the timerFrequency YAML configuration option (in YAML configuration file for the correlator).

Patterns with textual elements operate by default in English, but will instead both produce output and expect input in another language if that has been set in the environment. For example, under Linux, if the correlator is running with the LC_ALL environment variable set to "fr_FR", the format "EEEE dd MMMM yyyy G" produces and expects "jeudi 01 janvier 1970 ap. J.-C." for time 0.0.

When you use the TimeFormat event library you can use the TZ environment variable to select a particular locale to be used by the event library. Specify the value in either of the following formats:

Continent/City
Ocean/Archipelago

For example: TZ=Europe/London. The alternative shortened format will not work correctly. For example, TZ=GB will not be recognized. If you specify something like this, Coordinated Universal Time (UTC) is used instead.

Info
For a list of time zones, see Supported time zones. When passing the time zone to one of the TimeFormat functions, you can use the constants in the TimeZone namespace in the API reference for EPL (ApamaDoc).

TimeFormat format functions

The format functions convert the time parameter to the local time and return that time in the format you specify.

For usage information, see the API reference for EPL (ApamaDoc).

TimeFormat parse functions

The parse functions parse the value contained by the timeDate parameter according to the format passed in the format parameter or wrapped by the CompiledPattern.

All functions return the result as a float of seconds since the epoch.

For usage information, see the API reference for EPL (ApamaDoc).

Notes

For all parse functions:

  • If the timeDate parameter specifies only a time, the date is assumed to be 1 January 1970 in the appropriate time zone. If the timeDate parameter specifies only a date, the time is assumed to be the midnight that starts that day in the appropriate time zone. Adding them together as seconds gives the right result.
  • If the timeDate string specifies a time zone, and there is a matching z, Z, v, or V in the format string, the time zone specified in the timeDate string takes precedence over any other ways of specifying the time zone. For example, when you call the parseUTC() or parseTimeWithTimeZone() function, and you specify a time zone or offset in the timeDate string, the time zone or offset specification in the timeDate string overrides the time zone you specify as a parameter to the parseTimeWithTimeZone() function and the normal interpretation of times and dates as UTC by the parseUTC() function.
  • Parsing behavior is undefined if the format string includes duplicate elements such as "MM yyyy MMMM", has missing elements such as "MM", or it includes potentially contradictory elements and is given contradictory input, for example, "Tuesday 3 January 1970" (it was actually a Saturday).
  • Dates before 1970 are represented by negative numbers.

Example

The following example returns 837007736:

TimeFormat.parseTime("yyyy.MM.dd G 'at' HH:mm:ss", "1996.07.10 AD at 15:08:56")

See also Midnight and noon.

The following examples both parse the timeDate string as having a time zone of UTC+0900:

TimeFormat.parseTimeWithTimeZone("DD.MM.YY Z", "01.01.70 +0900", "UTC");
TimeFormat.parseUTC("DD.MM.YY Z", "01.01.70 +0900");

In the first example, the +0900 specification in the timeDate string overrides the UTC specification for the time zone name parameter. In the second example, the +0900 specification in the timeDate string overrides the UTC specified by calling the parseUTC() function.

TimeFormat utility functions

The utility functions provide methods for manipulating datetimes. These include, for a given datetime, finding the time of the previous midnight, how many seconds have passed since that midnight, the number of days that have passed since the beginning of the epoch, and the offset in seconds for time zones from UTC.

For usage information, see the API reference for EPL (ApamaDoc).

Format specification for the TimeFormat functions

The format and parse functions make use of the SimpleDateFormat class provided in the International Components for Unicode libraries. SimpleDateFormat is a class for formatting and parsing dates in a language-independent manner.

Pattern letters in format strings

The TimeFormat functions use the SimpleDateFormat class to transform between a string that contains a time and/or date and a normalized representation of that time and/or date. In this case, the normalized representation is the number of seconds since the epoch.

For the operation to succeed, it is important to define the format string so that it exactly represents the format of the time and/or date you provide as a string in the timeDate parameter to a parse function, or expect to be returned from a format function. You specify the format as a time pattern. In this pattern, all ASCII letters are reserved as pattern letters.

The number of pattern letters determines the format as follows:

  • For pattern letters that represent text
    • If you specify four or more letters, the SimpleDateFormat class transforms the full form. For example, EEEE formats/parses Monday.
    • If you specify fewer than four letters, the SimpleDateFormat class transforms the short or abbreviated form if it exists. For example, E, EE, and EEE each formats/parses Mon.
  • For pattern letters that represent numbers
    • Specify the minimum number of digits.
    • If necessary, SimpleDateFormat prepends zeros to shorter numbers to equal the number of digits you specify. For example, m formats/parses 6, mm formats/parses 06.
    • Year is handled specially. If the count of y is 2, the year is truncated to 2 digits. For example, yyyy formats/parses 1997, while yy formats/parses 97.
    • Unlike other fields, fractional seconds are padded on the right with zeros.
  • For pattern letters that can represent text or numbers
    • If you specify three or more letters, the SimpleDateFormat class transforms text. For example, MMM formats/parses Jan, while MMMM formats/parses January.
    • If you specify one or two letters, the SimpleDateFormat class transforms a number. For example, M formats/parses 1, and MM formats/parses for 01.

The following table provides the meaning of each letter you can specify in a pattern. After the table, there are a number of combined examples.

Descriptions of pattern letters in format strings:

Symbol

Meaning

Presentation

Example

Sample Result

G

Era designator

Text

G G

AD BC

y (lowercase)

Year

Number

yy yyyy

96 1996

Y (uppercase)

Year for indicating which week of the year. Use with the w symbol. See “Week in year” later in this table.

Number

See example for “Week in year”.

u

Extended year

Number

uuuu

5769

M

Month in year

Text or Number

M MM

MMM

MMMM

9 09

Sep

September

d

Day in month

Number

d dd

dd

7 07

25

h

Hour in AM or PM (1-12)

Number

hh

05

H

Hour in day (0-23) See also Midnight and noon.

Number

H HH

HH

0 05

14

m

Minute in hour See also Midnight and noon.

Number

m mm

mm

3 03

55

s

Second in minute

Number

s ss

ss

5 05

59

S

Fractional second

Number

S SS

SSS

2 20

200

E

Day of week

Text

E EE

EEE

EEEE

Fri Fri

Fri

Friday

e

Day of week (1-7) This is locale dependent. Typically, Monday is 1.

Number

e

4

D

Day in year

Number

D DD

DDD

DDD

7 07

007

123

F

Day of particular week in month (1-7). Use with W (uppercase) for week in month. See “Week in month” later in this table.

Number

See example for “Week in month”.

w (lowercase)

Week in year. Use with uppercase Y. The first week of a month/year is defined as the earliest seven day period beginning on the specified “first day of the week” and containing at least the specified “minimal number of days in the first week” of that month/year.

Note that the first day of the week and the minimum days in the first week (1 to 7) are dependent upon the calendar in use (which depends upon the locale resource data).

For example, January 1, 1998 was a Thursday.

If the first day of the week is a Monday and the minimum days in a week is 4 (these are the values reflecting ISO 8601 and many national standards), then week 1 of 1998 starts on December 29, 1997, and ends on January 4, 1998. However, if the first day of the week is a Sunday and the minimum number of days in a week is 4, then week 1 of 1998 starts on January 4, 1998, and ends on January 10, 1998. The first three days of 1998 are then part of week 53 of 1997.

Number

The first example below uses uppercase Y. The second example shows the difference when you use lowercase y. "'Week' w YYYY"

"'Week' w yyyy"

Suppose you are transforming December 31st, 2008, which is a Wednesday (results for the US locale). "Week 1 2009"

"Week 1 2008"

W (uppercase)

Week in month. The values are calculated similar to “Week in year” (w).

Weeks numbered with a minus sign (such as -2 or -1) and 0 precede the first week. Weeks numbered 2, 3 and so on follow the first week.

Number

"'Day' F 'of Week' W"

"Day 2 of Week 3"

a

AM/PM marker

Text

a a

AM PM

k

Hour in day (1-24)

Number

k kk

kk

1 01

24

K

Hour in AM/PM (0-11)

Number

K KK

KK

0 07

11

z (lowercase)

Time zone

Text

z, zz, zzz zzzz

GMT+05:30

Pacific Standard Time

Z (uppercase)

Time zone (RFC 822)

Number

Z

-0800

v (lowercase)

Generic time zone

Text

v vvvv

PT (assuming US locale)

Pacific Time

V (uppercase)

Short time zone ID

Text

V

gblon

VV

Long time zone ID

Text

VV

Europe/London

VVV

Time zone exemplar city

Text

VVV

London

VVVV

Time zone location text

Text

VVVV

United Kingdom Time

O (uppercase)

Localized GMT time zone

Text

O OOOO

GMT-8 GMT-8:00

X (uppercase)

ISO8601 time zone with Z

Text

X XX

XXX

XXXX

XXXXX

-08, +0530, Z -0800, Z

-08:00, Z

-0800, -075258, Z

-08:00, -07:52:58, Z

x (lowercase)

ISO8601 time zone without Z

Text

x xx

xxx

xxxx

xxxxx

-08, +0530 -0800

-08:00

-0800, -075258

-08:00, -07:52:58

g

Julian day

Number

g

2451334

A

Milliseconds in day

Number

A

69540000

'

Escape for text

Delimiter

"'Week' w YYYY"

"Week 1 2009"

''

Single quote

Literal

"KK 'o''clock'"

"11 o'clock"

Any character in the format pattern that is not in the range of [’a’..’z’] or [’A’..’Z’] is treated as quoted text. For example, the following characters can be in a timeDate string without being enclosed in quotation marks:

  • :
  • .
  • ,
  • #
  • @

A pattern that contains an invalid pattern letter results in a -1 return value.

The following table gives examples that assume the US locale:

Format pattern Suitable timeDate string
yyyy.MM.dd G 'at' HH:mm:ss z 1996.07.10 AD at 15:08:56 GMT+08:30
EEE, MMM d, ''yy Wed, July 10, '96
h:mm a 12:08 PM
hh 'o''clock' a, zzzz 12 o'clock PM, Pacific Daylight Time
K:mm a, z 0:00 PM, GMT+07:30
yyyyy.MMMMM.dd GGG hh:mm aaa 1996.July.10 AD 12:08 PM

When parsing a date string using the abbreviated year pattern (y or yy), SimpleDateFormat (and hence all parse functions) must interpret the abbreviated year relative to some century. It does this by adjusting dates to be within 79 years before and 19 years after the time the SimpleDateFormat instance is created. For example, using a pattern of MM/dd/yy and a SimpleDateFormat instance created on Jan 1, 1997, the string 01/11/12 would be interpreted as Jan 11, 2012 while the string 05/04/64 would be interpreted as May 4, 1964. During parsing, only strings consisting of exactly two digits, as defined by Unicode::isDigit(), will be parsed into the default century. Any other numeric string, such as a one digit string, a three or more digit string, or a two digit string that is not all digits (for example, -1), is interpreted literally. So 01/02/3 or 01/02/003 are parsed, using the same pattern, as Jan 2, 3 A.D. Likewise, 01/02/-3 is parsed as Jan 2, 4 B.C. Behavior is undefined if you specify a two-digit date that might be either twenty years in the future or eighty years in the past.

If the year pattern has more than two y characters, the year is interpreted literally, regardless of the number of digits. So using the pattern MM/dd/yyyy, 01/11/12 parses to Jan 11, 12 A.D.

When numeric fields abut one another directly, with no intervening delimiter characters, they constitute a run of abutting numeric fields. Such runs are parsed specially. For example, the format HHmmss parses the input text 123456 to 12:34:56, parses the input text 12345 to 1:23:45, and fails to parse 1234. In other words, the leftmost field of the run is flexible, while the others keep a fixed width. If the parse fails anywhere in the run, then the leftmost field is shortened by one character, and the entire run is parsed again. This is repeated until either the parse succeeds or the leftmost field is one character in length. If the parse still fails at that point, the parse of the run fails.

For time zones that have no names, SimpleDateFormat uses strings GMT+hours:minutes or GMT-hours:minutes.

The calendar defines what is the first day of the week, the first week of the year, whether hours are zero based or not (0 vs. 12 or 24), and the time zone. There is one common number format to handle all the numbers; the digit count is handled programmatically according to the pattern.

Midnight and noon

The format "HH:mm" parses "24:00" as midnight that ends the day. Given the formal "hh:mm a", both "00:00 am" and "12:00 am" parse as the midnight that begins the day. Note that "00:00 pm" and "12:00 pm" are both midday.

Supported time zones

The TimeFormat event library supports the following time zones. We recommend use of the “Area/City” time zones. Constants to provide ease of access can be found in the TimeZone namespace in the API reference for EPL (ApamaDoc).

  • ACT
  • AET
  • AGT
  • ART
  • AST
  • Africa/Abidjan
  • Africa/Accra
  • Africa/Addis_Ababa
  • Africa/Algiers
  • Africa/Asmara
  • Africa/Asmera
  • Africa/Bamako
  • Africa/Bangui
  • Africa/Banjul
  • Africa/Bissau
  • Africa/Blantyre
  • Africa/Brazzaville
  • Africa/Bujumbura
  • Africa/Cairo
  • Africa/Casablanca
  • Africa/Ceuta
  • Africa/Conakry
  • Africa/Dakar
  • Africa/Dar_es_Salaam
  • Africa/Djibouti
  • Africa/Douala
  • Africa/El_Aaiun
  • Africa/Freetown
  • Africa/Gaborone
  • Africa/Harare
  • Africa/Johannesburg
  • Africa/Juba
  • Africa/Kampala
  • Africa/Khartoum
  • Africa/Kigali
  • Africa/Kinshasa
  • Africa/Lagos
  • Africa/Libreville
  • Africa/Lome
  • Africa/Luanda
  • Africa/Lubumbashi
  • Africa/Lusaka
  • Africa/Malabo
  • Africa/Maputo
  • Africa/Maseru
  • Africa/Mbabane
  • Africa/Mogadishu
  • Africa/Monrovia
  • Africa/Nairobi
  • Africa/Ndjamena
  • Africa/Niamey
  • Africa/Nouakchott
  • Africa/Ouagadougou
  • Africa/Porto-Novo
  • Africa/Sao_Tome
  • Africa/Timbuktu
  • Africa/Tripoli
  • Africa/Tunis
  • Africa/Windhoek
  • America/Adak
  • America/Anchorage
  • America/Anguilla
  • America/Antigua
  • America/Araguaina
  • America/Argentina/Buenos_Aires
  • America/Argentina/Catamarca
  • America/Argentina/ComodRivadavia
  • America/Argentina/Cordoba
  • America/Argentina/Jujuy
  • America/Argentina/La_Rioja
  • America/Argentina/Mendoza
  • America/Argentina/Rio_Gallegos
  • America/Argentina/Salta
  • America/Argentina/San_Juan
  • America/Argentina/San_Luis
  • America/Argentina/Tucuman
  • America/Argentina/Ushuaia
  • America/Aruba
  • America/Asuncion
  • America/Atikokan
  • America/Atka
  • America/Bahia
  • America/Bahia_Banderas
  • America/Barbados
  • America/Belem
  • America/Belize
  • America/Blanc-Sablon
  • America/Boa_Vista
  • America/Bogota
  • America/Boise
  • America/Buenos_Aires
  • America/Cambridge_Bay
  • America/Campo_Grande
  • America/Cancun
  • America/Caracas
  • America/Catamarca
  • America/Cayenne
  • America/Cayman
  • America/Chicago
  • America/Chihuahua
  • America/Ciudad_Juarez
  • America/Coral_Harbour
  • America/Cordoba
  • America/Costa_Rica
  • America/Creston
  • America/Cuiaba
  • America/Curacao
  • America/Danmarkshavn
  • America/Dawson
  • America/Dawson_Creek
  • America/Denver
  • America/Detroit
  • America/Dominica
  • America/Edmonton
  • America/Eirunepe
  • America/El_Salvador
  • America/Ensenada
  • America/Fort_Nelson
  • America/Fort_Wayne
  • America/Fortaleza
  • America/Glace_Bay
  • America/Godthab
  • America/Goose_Bay
  • America/Grand_Turk
  • America/Grenada
  • America/Guadeloupe
  • America/Guatemala
  • America/Guayaquil
  • America/Guyana
  • America/Halifax
  • America/Havana
  • America/Hermosillo
  • America/Indiana/Indianapolis
  • America/Indiana/Knox
  • America/Indiana/Marengo
  • America/Indiana/Petersburg
  • America/Indiana/Tell_City
  • America/Indiana/Vevay
  • America/Indiana/Vincennes
  • America/Indiana/Winamac
  • America/Indianapolis
  • America/Inuvik
  • America/Iqaluit
  • America/Jamaica
  • America/Jujuy
  • America/Juneau
  • America/Kentucky/Louisville
  • America/Kentucky/Monticello
  • America/Knox_IN
  • America/Kralendijk
  • America/La_Paz
  • America/Lima
  • America/Los_Angeles
  • America/Louisville
  • America/Lower_Princes
  • America/Maceio
  • America/Managua
  • America/Manaus
  • America/Marigot
  • America/Martinique
  • America/Matamoros
  • America/Mazatlan
  • America/Mendoza
  • America/Menominee
  • America/Merida
  • America/Metlakatla
  • America/Mexico_City
  • America/Miquelon
  • America/Moncton
  • America/Monterrey
  • America/Montevideo
  • America/Montreal
  • America/Montserrat
  • America/Nassau
  • America/New_York
  • America/Nipigon
  • America/Nome
  • America/Noronha
  • America/North_Dakota/Beulah
  • America/North_Dakota/Center
  • America/North_Dakota/New_Salem
  • America/Nuuk
  • America/Ojinaga
  • America/Panama
  • America/Pangnirtung
  • America/Paramaribo
  • America/Phoenix
  • America/Port-au-Prince
  • America/Port_of_Spain
  • America/Porto_Acre
  • America/Porto_Velho
  • America/Puerto_Rico
  • America/Punta_Arenas
  • America/Rainy_River
  • America/Rankin_Inlet
  • America/Recife
  • America/Regina
  • America/Resolute
  • America/Rio_Branco
  • America/Rosario
  • America/Santa_Isabel
  • America/Santarem
  • America/Santiago
  • America/Santo_Domingo
  • America/Sao_Paulo
  • America/Scoresbysund
  • America/Shiprock
  • America/Sitka
  • America/St_Barthelemy
  • America/St_Johns
  • America/St_Kitts
  • America/St_Lucia
  • America/St_Thomas
  • America/St_Vincent
  • America/Swift_Current
  • America/Tegucigalpa
  • America/Thule
  • America/Thunder_Bay
  • America/Tijuana
  • America/Toronto
  • America/Tortola
  • America/Vancouver
  • America/Virgin
  • America/Whitehorse
  • America/Winnipeg
  • America/Yakutat
  • America/Yellowknife
  • Antarctica/Casey
  • Antarctica/Davis
  • Antarctica/DumontDUrville
  • Antarctica/Macquarie
  • Antarctica/Mawson
  • Antarctica/McMurdo
  • Antarctica/Palmer
  • Antarctica/Rothera
  • Antarctica/South_Pole
  • Antarctica/Syowa
  • Antarctica/Troll
  • Antarctica/Vostok
  • Arctic/Longyearbyen
  • Asia/Aden
  • Asia/Almaty
  • Asia/Amman
  • Asia/Anadyr
  • Asia/Aqtau
  • Asia/Aqtobe
  • Asia/Ashgabat
  • Asia/Ashkhabad
  • Asia/Atyrau
  • Asia/Baghdad
  • Asia/Bahrain
  • Asia/Baku
  • Asia/Bangkok
  • Asia/Barnaul
  • Asia/Beirut
  • Asia/Bishkek
  • Asia/Brunei
  • Asia/Calcutta
  • Asia/Chita
  • Asia/Choibalsan
  • Asia/Chongqing
  • Asia/Chungking
  • Asia/Colombo
  • Asia/Dacca
  • Asia/Damascus
  • Asia/Dhaka
  • Asia/Dili
  • Asia/Dubai
  • Asia/Dushanbe
  • Asia/Famagusta
  • Asia/Gaza
  • Asia/Harbin
  • Asia/Hebron
  • Asia/Ho_Chi_Minh
  • Asia/Hong_Kong
  • Asia/Hovd
  • Asia/Irkutsk
  • Asia/Istanbul
  • Asia/Jakarta
  • Asia/Jayapura
  • Asia/Jerusalem
  • Asia/Kabul
  • Asia/Kamchatka
  • Asia/Karachi
  • Asia/Kashgar
  • Asia/Kathmandu
  • Asia/Katmandu
  • Asia/Khandyga
  • Asia/Kolkata
  • Asia/Krasnoyarsk
  • Asia/Kuala_Lumpur
  • Asia/Kuching
  • Asia/Kuwait
  • Asia/Macao
  • Asia/Macau
  • Asia/Magadan
  • Asia/Makassar
  • Asia/Manila
  • Asia/Muscat
  • Asia/Nicosia
  • Asia/Novokuznetsk
  • Asia/Novosibirsk
  • Asia/Omsk
  • Asia/Oral
  • Asia/Phnom_Penh
  • Asia/Pontianak
  • Asia/Pyongyang
  • Asia/Qatar
  • Asia/Qostanay
  • Asia/Qyzylorda
  • Asia/Rangoon
  • Asia/Riyadh
  • Asia/Saigon
  • Asia/Sakhalin
  • Asia/Samarkand
  • Asia/Seoul
  • Asia/Shanghai
  • Asia/Singapore
  • Asia/Srednekolymsk
  • Asia/Taipei
  • Asia/Tashkent
  • Asia/Tbilisi
  • Asia/Tehran
  • Asia/Tel_Aviv
  • Asia/Thimbu
  • Asia/Thimphu
  • Asia/Tokyo
  • Asia/Tomsk
  • Asia/Ujung_Pandang
  • Asia/Ulaanbaatar
  • Asia/Ulan_Bator
  • Asia/Urumqi
  • Asia/Ust-Nera
  • Asia/Vientiane
  • Asia/Vladivostok
  • Asia/Yakutsk
  • Asia/Yangon
  • Asia/Yekaterinburg
  • Asia/Yerevan
  • Atlantic/Azores
  • Atlantic/Bermuda
  • Atlantic/Canary
  • Atlantic/Cape_Verde
  • Atlantic/Faeroe
  • Atlantic/Faroe
  • Atlantic/Jan_Mayen
  • Atlantic/Madeira
  • Atlantic/Reykjavik
  • Atlantic/South_Georgia
  • Atlantic/St_Helena
  • Atlantic/Stanley
  • Australia/ACT
  • Australia/Adelaide
  • Australia/Brisbane
  • Australia/Broken_Hill
  • Australia/Canberra
  • Australia/Currie
  • Australia/Darwin
  • Australia/Eucla
  • Australia/Hobart
  • Australia/LHI
  • Australia/Lindeman
  • Australia/Lord_Howe
  • Australia/Melbourne
  • Australia/NSW
  • Australia/North
  • Australia/Perth
  • Australia/Queensland
  • Australia/South
  • Australia/Sydney
  • Australia/Tasmania
  • Australia/Victoria
  • Australia/West
  • Australia/Yancowinna
  • BET
  • BST
  • Brazil/Acre
  • Brazil/DeNoronha
  • Brazil/East
  • Brazil/West
  • CAT
  • CET
  • CNT
  • CST
  • CST6CDT
  • CTT
  • Canada/Atlantic
  • Canada/Central
  • Canada/East-Saskatchewan
  • Canada/Eastern
  • Canada/Mountain
  • Canada/Newfoundland
  • Canada/Pacific
  • Canada/Saskatchewan
  • Canada/Yukon
  • Chile/Continental
  • Chile/EasterIsland
  • Cuba
  • EAT
  • ECT
  • EET
  • EST
  • EST5EDT
  • Egypt
  • Eire
  • Etc/GMT
  • Etc/GMT+0
  • Etc/GMT+1
  • Etc/GMT+10
  • Etc/GMT+11
  • Etc/GMT+12
  • Etc/GMT+2
  • Etc/GMT+3
  • Etc/GMT+4
  • Etc/GMT+5
  • Etc/GMT+6
  • Etc/GMT+7
  • Etc/GMT+8
  • Etc/GMT+9
  • Etc/GMT-0
  • Etc/GMT-1
  • Etc/GMT-10
  • Etc/GMT-11
  • Etc/GMT-12
  • Etc/GMT-13
  • Etc/GMT-14
  • Etc/GMT-2
  • Etc/GMT-3
  • Etc/GMT-4
  • Etc/GMT-5
  • Etc/GMT-6
  • Etc/GMT-7
  • Etc/GMT-8
  • Etc/GMT-9
  • Etc/GMT0
  • Etc/Greenwich
  • Etc/UCT
  • Etc/UTC
  • Etc/Universal
  • Etc/Zulu
  • Europe/Amsterdam
  • Europe/Andorra
  • Europe/Astrakhan
  • Europe/Athens
  • Europe/Belfast
  • Europe/Belgrade
  • Europe/Berlin
  • Europe/Bratislava
  • Europe/Brussels
  • Europe/Bucharest
  • Europe/Budapest
  • Europe/Busingen
  • Europe/Chisinau
  • Europe/Copenhagen
  • Europe/Dublin
  • Europe/Gibraltar
  • Europe/Guernsey
  • Europe/Helsinki
  • Europe/Isle_of_Man
  • Europe/Istanbul
  • Europe/Jersey
  • Europe/Kaliningrad
  • Europe/Kiev
  • Europe/Kirov
  • Europe/Kyiv
  • Europe/Lisbon
  • Europe/Ljubljana
  • Europe/London
  • Europe/Luxembourg
  • Europe/Madrid
  • Europe/Malta
  • Europe/Mariehamn
  • Europe/Minsk
  • Europe/Monaco
  • Europe/Moscow
  • Europe/Nicosia
  • Europe/Oslo
  • Europe/Paris
  • Europe/Podgorica
  • Europe/Prague
  • Europe/Riga
  • Europe/Rome
  • Europe/Samara
  • Europe/San_Marino
  • Europe/Sarajevo
  • Europe/Saratov
  • Europe/Simferopol
  • Europe/Skopje
  • Europe/Sofia
  • Europe/Stockholm
  • Europe/Tallinn
  • Europe/Tirane
  • Europe/Tiraspol
  • Europe/Ulyanovsk
  • Europe/Uzhgorod
  • Europe/Vaduz
  • Europe/Vatican
  • Europe/Vienna
  • Europe/Vilnius
  • Europe/Volgograd
  • Europe/Warsaw
  • Europe/Zagreb
  • Europe/Zaporozhye
  • Europe/Zurich
  • Factory
  • GB
  • GB-Eire
  • GMT
  • GMT+0
  • GMT-0
  • GMT0
  • Greenwich
  • HST
  • Hongkong
  • IET
  • IST
  • Iceland
  • Indian/Antananarivo
  • Indian/Chagos
  • Indian/Christmas
  • Indian/Cocos
  • Indian/Comoro
  • Indian/Kerguelen
  • Indian/Mahe
  • Indian/Maldives
  • Indian/Mauritius
  • Indian/Mayotte
  • Indian/Reunion
  • Iran
  • Israel
  • JST
  • Jamaica
  • Japan
  • Kwajalein
  • Libya
  • MET
  • MIT
  • MST
  • MST7MDT
  • Mexico/BajaNorte
  • Mexico/BajaSur
  • Mexico/General
  • NET
  • NST
  • NZ
  • NZ-CHAT
  • Navajo
  • PLT
  • PNT
  • PRC
  • PRT
  • PST
  • PST8PDT
  • Pacific/Apia
  • Pacific/Auckland
  • Pacific/Bougainville
  • Pacific/Chatham
  • Pacific/Chuuk
  • Pacific/Easter
  • Pacific/Efate
  • Pacific/Enderbury
  • Pacific/Fakaofo
  • Pacific/Fiji
  • Pacific/Funafuti
  • Pacific/Galapagos
  • Pacific/Gambier
  • Pacific/Guadalcanal
  • Pacific/Guam
  • Pacific/Honolulu
  • Pacific/Johnston
  • Pacific/Kanton
  • Pacific/Kiritimati
  • Pacific/Kosrae
  • Pacific/Kwajalein
  • Pacific/Majuro
  • Pacific/Marquesas
  • Pacific/Midway
  • Pacific/Nauru
  • Pacific/Niue
  • Pacific/Norfolk
  • Pacific/Noumea
  • Pacific/Pago_Pago
  • Pacific/Palau
  • Pacific/Pitcairn
  • Pacific/Pohnpei
  • Pacific/Ponape
  • Pacific/Port_Moresby
  • Pacific/Rarotonga
  • Pacific/Saipan
  • Pacific/Samoa
  • Pacific/Tahiti
  • Pacific/Tarawa
  • Pacific/Tongatapu
  • Pacific/Truk
  • Pacific/Wake
  • Pacific/Wallis
  • Pacific/Yap
  • Poland
  • Portugal
  • ROC
  • ROK
  • SST
  • Singapore
  • SystemV/AST4
  • SystemV/AST4ADT
  • SystemV/CST6
  • SystemV/CST6CDT
  • SystemV/EST5
  • SystemV/EST5EDT
  • SystemV/HST10
  • SystemV/MST7
  • SystemV/MST7MDT
  • SystemV/PST8
  • SystemV/PST8PDT
  • SystemV/YST9
  • SystemV/YST9YDT
  • Turkey
  • UCT
  • US/Alaska
  • US/Aleutian
  • US/Arizona
  • US/Central
  • US/East-Indiana
  • US/Eastern
  • US/Hawaii
  • US/Indiana-Starke
  • US/Michigan
  • US/Mountain
  • US/Pacific
  • US/Pacific-New
  • US/Samoa
  • UTC
  • Universal
  • VST
  • W-SU
  • WET
  • Zulu

Using the MemoryStore

The MemoryStore provides an in-memory, table-based, data storage abstraction within the correlator. All EPL code running in the correlator in any context can access the data stored by the MemoryStore. In other words, all EPL monitors running in the correlator have access to the same data.

The Apama MemoryStore can also be used in a distributed fashion to provide access to data stored in a MemoryStore to applications running in a cluster of multiple correlators. For more information on the distributed MemoryStore, see Using the distributed MemoryStore.

Info
The distributed MemoryStore is deprecated and will be removed in a future release

The MemoryStore can also store data on disk to make it persistent, and copy persistent data back into memory. However, the MemoryStore is primarily intended to provide all monitors in the correlator with in-memory access to the same data.

Use the MemoryStore to share data among monitors in the correlator or to persist data on disk. If the situations listed below apply to you, the standard Apama ADBC (Apama Database Connector) adapter is likely to be a better option for you than the MemoryStore.

  • You want to interoperate directly with data users other than Apama.
  • You need access to more data than can fit in memory.
  • You need to key on more than one field.
  • You want to join tables.

See also Using the MemoryStore when persistence is enabled.

See also The Database Connector IAF adapter (ADBC).

For details about the event types that provide the MemoryStore interface, see the API reference for EPL (ApamaDoc).

Introduction to using the MemoryStore

Data that the MemoryStore stores must be one of the following types: boolean, decimal, float, integer or string.

To use the MemoryStore, you need to add the MemoryStore bundle to your Apama project (see Adding the MemoryStore bundle to your project). This lets you create instances of MemoryStore events and then call actions on those events. Available actions include the following:

  • Creating stores that contain tables
  • Defining the schema for the rows in a table
  • Creating tables and associating a schema with each table
  • Storing, retrieving, updating, and committing rows of data
  • Copying tables to disk to make the data persistent
  • Making stored data available in DataViews for use by dashboards

You can use the MemoryStore in parallel applications. You can use the MemoryStore in a persistent monitor in a persistence-enabled correlator. See Using the MemoryStore when persistence is enabled.

For information on using the MemoryStore in a distributed fashion, see Using the distributed MemoryStore.

Overview of MemoryStore events

The MemoryStore defines the following events in the com.apama.memorystore package. Most of these events contain action fields that serve as the MemoryStore interface.

  • Storage — The event type that provides the interface for creating stores.
  • Store — A Store event represents a container for a uniquely named collection of tables.
  • Table — A Table event represents a table in a store. A table is a collection of rows. Each table has a unique name within the store. A table resides in memory and you can store it on disk if you want to.
  • Schema — A Schema event specifies a set of fields and the type of each field. Each Schema event represents the schema for one or more tables. Each table is associated with one schema. All rows in that table match the table’s schema.
  • Row — A Row event represents a row in a table. A row is an ordered and typed set of named fields that match the schema associated with the table that the row belongs to. Each row is associated with a string that acts as its key within the table. You can change the values of the fields in a row.
  • Iterator — Provides the ability to manipulate each row of a table in turn.
  • Finished — The MemoryStore enqueues a Finished event when processing of an asynchronous action is complete.
  • RowChanged — The RowChanged event is used only in a distributed store. In a distributed store, the RowChanged event is sent to all applications that have subscribed to a specific table whenever changes to data in a row in that table have been successfully committed. This behavior is optional and is supported by some, but not all, third-party distributed cache providers.

For details about these events, see the information for MemoryStore in the API reference for EPL (ApamaDoc).

Adding the MemoryStore bundle to your project

To use the MemoryStore, you need only add the MemoryStore bundle to your project. The description below explains how to add the bundle using Apama Plugin for Eclipse, but you can also add it using the apama_project command-line tool as described in Creating and managing an Apama project from the command line.

Info
To use the distributed MemoryStore, you add the Distributed MemoryStore adapter instead. The procedure for this is different and is described in Adding distributed MemoryStore support to a project.

Adding the MemoryStore bundle to your project makes the MemoryStore.mon file available to the monitors in your project. When you run your project, Apama Plugin for Eclipse automatically injects MemoryStore.mon. If you want to examine this file, it is in the monitors/data_storage directory of your Apama installation directory. MemoryStore.mon is the interface between the monitors in your application and the MemoryStore plug-in. Your application creates events of the types defined in that file and calls actions on those events to use the MemoryStore’s facilities. There is never any need to import or call the plug-in directly.

Info
If you use the engine_inject tool to manually inject your EPL, instead of using Apama Plugin for Eclipse, and you want to expose MemoryStore tables to dashboards, you need to inject the files MemoryStoreScenarioImpl.mon which is in the monitors/data_storage directory and MangagementImpl.mon and Management.mon which are in the monitors directory.

To add the MemoryStore bundle

  1. In Apama Plugin for Eclipse, open the project in the Apama Developer perspective.

  2. In the Project Explorer, right-click the EPL Bundles node and select Add Bundle.

  3. In the Add Bundle dialog, select the The MemoryStore bundle and click OK.

Steps for using the MemoryStore

To use the MemoryStore, you must first add the The MemoryStore bundle to your project, unless you are using the distributed MemoryStore. (If you are using the distributed MemoryStore, instead of adding the The MemoryStore bundle, you need to add the Distributed MemoryStore adapter. For more information on this, see Adding distributed MemoryStore support to a project.)

After you have added the MemoryStore bundle, you write EPL that does the following:

  1. Prepare and then open a store that will contain one or more tables.
  2. Define the data schema for the rows that will belong to the table.
  3. Prepare and then open a table in a store.
  4. For applications that will access data in a distributed store, if the underlying third-party distributed cache provider supports notifications, optionally subscribe to the table in order to receive notifications when data has changed. For further information, see Notifications.
  5. Get a new or existing row from the table.
  6. Modify the row.
  7. Commit the modified row to the table.
  8. Repeat the three previous steps as often as needed.
  9. Optionally, use an iterator to step through all rows in the table.
  10. Optionally, store the in-memory table on disk.

Preparing and opening stores

The Storage event is used to prepare and open a store to which you can add tables. Storage events define actions that do the following:

  • Request preparation of a store.
  • Open a store that has been prepared.

Storage events contain no data. All Storage events are alike and exist only to provide the interface for preparing and opening stores. All actions on the Storage event are static; there is no need to create an instance of a Storage event.

If you do not require on-disk persistence, you can prepare a store in memory. If you do require on-disk persistence, you can specify the file that contains (or that you want to contain) the store. Depending on the action you call to open the store, the MemoryStore does one of the following:

  • Opens the store for read-write access.
  • Opens the store for read-only access.
  • Opens the store for read-write access. Create the store if it does not already exist.

Preparation of stores is asynchronous. Actions that prepare stores return an ID immediately. When the MemoryStore completes preparation it enqueues a Finished event that contains this ID. You should define an event listener for this Finished event. The Finished event indicates whether or not preparation was successful.

You can open a store only after receiving a Finished event that indicates successful preparation.

For example, the following code fragment declares a Storage type variable and a Store type variable. It then calls the prepareOrCreate() action on the Storage type variable and saves the returned ID in the Store type variable. The name of the new store is storename and the store will be made persistent by saving it in the example.dat file. Finally, this code fragment declares a Finished event variable and an event listener for a Finished event whose ID matches the ID returned by the preparation request.

using com.apama.memorystore.Storage;
using com.apama.memorystore.Store;
using com.apama.memorystore.Finished;

monitor Test {
   Store store;

   action onload() {
      integer id := Storage.prepareOrCreate("storename", "/tmp/example.dat");
      on Finished(id,*,*)as f
      onStorePrepared(f);
      ...
  }
}

After a store has been successfully prepared, you can open it:

action onStorePrepared(Finished f) {
   if not f.success { log "Whoops"; die; }
   store := Storage.open("storename");

All subsequent examples assume that the appropriate using statements have been added.

Any monitor instance can open a store after that store has been successfully prepared. However, monitor A has no information about whether or not monitor B has prepared a particular store.

Therefore, each monitor should prepare any store it needs, and then prepare any tables it needs within that store. There is no way to pass Store or Table events from one monitor to another. Multiple monitors can prepare and open the same store or table at the same time.

There are several different actions available for preparing a store:

  • Storage.prepareInMemory(string name) returns integer prepares an in-memory store with the name you specify. All tables are empty when prepared for the first time. Persistence requests are ignored and immediately return a successful Finished event.
  • Storage.prepare(string name, string filename) returns integer does the same thing as Storage.prepareInMemory and it also associates that store with the database file you specify. If there is data in the database file the MemoryStore loads the store with the data from the file when you prepare a table. Persistence requests write changes back to the file. The specified file must exist.
  • Storage.prepareOrCreate(string name, string filename) returns integer does the same thing as Storage.prepare() except that it creates the file if it does not already exist.
  • Storage.prepareReadOnly(string name, string filename) returns integer does the same thing as Storage.prepare and it also opens for read-only access the database file you specify. The MemoryStore will load the store with data from the file when you prepare the table. Persistence requests are refused and return a failure Finished event
  • Storage.prepareCorrelatorPersistent(string name) returns integer prepares a store that the correlator automatically persists. Each time the correlator takes a snapshot, the snapshot includes any correlator-persistent stores along with the contents of those stores.
  • Storage.prepareDistributed(string name) returns integer prepares a distributed store which will be available to applications running in a cluster of correlators. The name argument is a unique identifier that specifies the name of a configured distributed store. For information on adding a distributed store to a project, see Adding a distributed store to a project.

Suppose a monitor instance calls one of the Storage.prepare() actions and the action is successful. Now suppose another monitor instance calls the same Storage.prepare() variant with the same table name and, if applicable, the same filename, as the previously successful call. The second call does nothing and indicates success immediately. However, if a monitor instance makes a Storage.prepare() call and specifies the same table name as was specified in a previously successful prepare() call, that call fails immediately if at least one of the following is different from the successful call:

  • The variant of the prepare() action called
  • The specified file name or store name (if applicable)

For example, suppose a monitor made the following successful call:

Storage.prepare("foo", "/tmp/foo.dat")

After this call, the only prepare call that can successfully prepare the same table is

Storage.prepare("foo", "/tmp/foo.dat")

The following calls would all fail:

Storage.prepareInMemory("foo")
Storage.prepareOrCreate("foo", "/tmp/foo.dat")
Storage.prepareReadOnly("foo", "/tmp/foo.dat")
Storage.prepare("foo", "/tmp/bar.dat")

If a monitor makes a call to prepare() that matches a prepare action that is in progress, the result is the same as the result of the prepare that is in progress.

Description of row structures

Schemas

A schema consists of an ordered list of the names and types of fields that define the structure of a row. For example, the following schema consists of one field whose name is times_run and whose type is integer:

Schema schema := new Schema;
schema.fields := ["times_run"];
schema.types := ["integer"];

A valid schema can be created from an event type using schemaFromAny(event). Types that are not supported in the event are converted to string types.

The Schema event has additional members that indicate how to publish the table. See Exposing in-memory or persistent data as DataViews.

The schema does not include the row’s key. The key is always a string and it does not have a name. Each row in a table is associated with a key that is unique within the table. The key provides a handle for obtaining a particular row. The row does not contain the key.

Two schemas match when they list the same set of field names and types in the same order and choose the same options for exposing DataViews.

Some distributed MemoryStore drivers (such as TCStore) support getting and setting extra fields that are present in only some individual rows, and are not named in the schema. For stores supporting this feature, it is even possible to specify an empty list of schema fields and access all fields as extra fields if desired. When getting extra fields, it is important to be aware that getting a field that does not exist will result in an exception, so it is usually necessary to add exception handling code, or to check which keys are present in the row (using Row.getKeys()) before attempting to access them. It is also possible to use the Row.getAll or Row.toDictionary actions to get all fields including those named in the schema and any extra fields that are present. Note that RowChanged notifications are not supported for extra fields.

Tables

Table events define actions that do the following:

  • Retrieve a row by key. The returned object is a Row event.
  • Remove a row by key
  • Remove all rows
  • Obtain a sequence of keys for all rows in the table
  • Obtain an iterator to iterate over the rows in the table
  • Determine if any row in the table has a particular key
  • Store on disk the changes to the in-memory table
  • Subscribe (and unsubscribe) to a table to be notified when a row has changed. (Note, this is only supported for tables in a distributed store, and only if the underlying provider supports this feature.)
  • Modify a row by key
  • Modify all rows
  • Obtain the position in a schema of a specified field.
  • Obtain the name of the table
  • Obtain the name of the store that contains the table

For details about these Table event actions, see the information for MemoryStore in the API reference for EPL (ApamaDoc).

Retrieval of a row from a table by key always succeeds (although retrieving a row from a table in a distributed store can throw an exception). If the row already exists, the MemoryStore returns a Row event that provides a local copy of the row. The content of this Row event does not change if another user modifies the in-memory version of the row in the table. If the row does not already exist, the MemoryStore populates a Row event with default values and returns that with field values as follows:

  • boolean types are false
  • decimal types are 0.0d
  • float types are 0.0
  • integer types are 0
  • string types are empty ("")
Rows

Row events define actions that do the following:

  • Getters and setters. These actions modify only the local copy (your Row event) and not the in-memory version of the row. The in-memory version of the row is available to all monitors. If another user of the table retrieves the same row, that user receives a Row event that contains a copy of the in-memory version of the row; that user does not receive a copy of your modified, local version of the row:
    • Get and set boolean, decimal, float, integer, and string fields by name.
    • Generic get and set field by name actions which use the any type. These throw an exception if the underlying types do not match the expected field type.
    • Get and set all fields. These expect a prototype event whose fields and types match that of the table schema. An exception is thrown if the schemas do not match.
  • Commit a modified Row event. That is, you modify your local Row event, and commit the changes, which updates the shared row in the table. This makes the update available to all monitors.
  • Get the value of a row’s key.
  • Determine whether a row was present in the table when the local copy was provided.
  • Obtain the name of the table the row is in.
  • Obtain the name of the store the row’s table is in.

The Row.commit() action modifies only the in-memory copy of the row so it is a synchronous and non-blocking operation. Note, in a distributed store, Row.commit() writes the value to the distributed store, which may be a fast, local operation or it may involve writing data to one or more remote nodes. If any other user of the table modifies the in-memory row between the time you obtain a Row event that represents that row and the time you try to commit your changes to your Row event, the Row.commit() action fails and the monitor instance that called Row.commit() dies. Therefore, if you are sharing the table with other users or using a distributed store, you should call Row.tryCommit() instead of Row.commit(). If it fails you must retry the commit operation by retrieving the row again (that is, obtaining a new Row event that contains the latest content of the in-memory row), reapplying the changes, and then calling the Row.tryCommit() action. This ensures that you always make changes that are consistent and atomic within the shared version of the row.

However, it is not possible to make atomicity guarantees across rows or tables.

Preparing and opening tables

After you have an open store, you can add one or more tables to that store. You call actions on Store events to create tables. Store events define actions that do the following:

  • Prepare a table. You specify a table name and a schema or supply an event or type name to use as the name and schema. This call is asynchronous. The MemoryStore sends a Finished event that indicates success or failure. If the table does not exist, the MemoryStore creates an empty table.
  • Open a table that has been prepared
  • Store on disk the in-memory changes to tables.

If the store that contains the table is persistent and the table exists on disk then the on-disk schema must match the schema that you specify when you call the action to prepare the table. The schemas must also match if the table is a distributed table that already exists in a distributed store. If the schemas do not match, the Finished event that the MemoryStore sends includes an error message.

Info
A persistent table can be an on-disk table or a table in a correlator-persistent store.

If a monitor instance calls Store.prepare() with the same table name and schema as those of a previously successful Store.prepare() call, the call does nothing and indicates success immediately. If a monitor instance calls Store.prepare() and specifies the same table name but the schema does not exactly match, that call fails immediately. If a monitor makes a call to Store.prepare() that matches a preparation that is in progress, the result is the same as the result of the preparation that is in progress.

If the table you want to prepare is persistent and it has not yet been loaded into memory then the MemoryStore loads the table’s on-disk data into memory in its entirety. The MemoryStore sends the Finished event when loading the table is complete.

To use a table that is in memory, you must retrieve a handle to it from the store that contains it. Obtaining a handle to a prepared (loaded) table is a synchronous action that completes immediately and does not block. The calling monitor instance dies if you try to obtain a handle to a table that is not prepared or that is in the process of being prepared.

For example:

integer id := store.prepare("tablename", schema);
on Finished(id,*,*) as f onTablePrepared(f);

action onTablePrepared(Finished f) {
   if not f.success { log "Whoops"; die; }
   Table tbl := store.open("tablename");
Info
The term “table” is a reserved keyword. Consequently, you should not use “table” as a variable name.

Preparation of a table can fail for a number of reasons including, but not limited to, the following:

  • You call prepare() on an existing table and the schema of that table and the schema specified in the prepare() call do not match.
  • You call prepare() on an existing in-memory table and the exposePersistentView setting is true for the schema you specify in the prepare() call.
  • You call prepare() on a table that does not exist and the store has been opened read-only.
  • You call prepare() on a table that does not exist in a persistent store and the attempt to create a new table in the persistent store fails, perhaps because the disk is full.
  • The on-disk version of the table is corrupt in some way.
  • You set exposePersistentView on a table in a correlator-persistent store.
  • You set exposeMemoryView or exposePersistentView to true for a distributed store.
  • The third-party distributed store implementation throws an exception for some reason such as unrecoverable network failure.

Using transactions to manipulate rows

In a monitor, any changes you make to Row events are local until you commit those changes. In other words, any changes you make actually modify the Row events that represent the actual rows in the table. After you commit the changes you have made to your Row events, the updated rows are available to all monitors in the correlator and to all other members of the distributed cluster if you are using a distributed store.

Info
When you modify a Row event and you want to update the actual row with your changes, you must commit your changes. It does not matter whether or not the table is in a correlator-persistent store.

The Row event defines the following actions for committing changes:

  • Row.commit() — Tries to commit changes to Row events to the table. If nothing else modified the row in the table since you obtained the Row event that represents that row, the MemoryStore commits the changes and returns. The update is available to all monitors. If the row in the table has been modified, an exception is thrown, leaving the table unchanged.
  • Row.tryCommit() — Behaves like commit() except that it does not throw an exception upon failure. If the row in the table has been modified, this action returns false and leaves the table unchanged. If this action is successful, it returns true.
  • Row.tryCommitOrUpdate() — Behaves like tryCommit() except that when it returns false, it also updates your local Row event to reflect the current state of the actual row in the table. In other words, if the row in the table has been modified, this action does the following:
    • Leaves the actual row in the table unchanged.
    • Updates the local Row event that represents this row to reflect the current state of the table. Any local, uncommitted modifications are lost.
    • Returns false.
  • Row.forceCommit() — Commits the local Row event to the table even if the row in the table has been modified after you obtained the Row event.

Determining which commit action to call

If you are certain that you are the only user of a table and if it is okay for your monitor instance to be killed if you are wrong, you can use commit().

If you want to use a simple loop like the one below, or if you intend to give up if your attempt to commit fails, then use tryCommit().

boolean done := false;
while not done {
   Row row := tbl.get("foo");
   row.setInteger("a",123);
   done := row.tryCommit();
}

However, the loop above calls tbl.get() every time around. If you think there might be a high collision rate, it is worth optimizing to the following, more efficient design:

Row row := tbl.get("foo");
boolean done := false;
while not done {
   row.setInteger("a",123);
   done := row.tryCommit();
   if not done { row.update(); }
}

The tryCommitOrUpdate() action makes the example above a little simpler and considerably more efficient:

Row row := tbl.get("foo");
boolean done := false;
while not done {
   row.setInteger("a",123);
   done := row.tryCommitOrUpdate();
}

Alternatively, there is a packaged form of that loop that you might find more convenient:

action doSomeStuff(Row row) {
   row.setInteger("a",123);
}
tbl.mutate("foo", doSomeStuff);

The above example is equivalent to the previous one, both in behavior and performance. Which to use is a matter of context, style and personal preference.

If you want to simply overwrite the whole Row rather than updating the row based on the current value, then using forceCommit() would be more appropriate. It commits the local Row content to the table even if it was modified after you obtained the Row event:

Row row := tbl.get("foo");
row.setInteger("a", 123);
row.forceCommit()

Creating and removing rows

To create a row in a table, call the get() or add() action on the table to which you want to add the row. The action declaration for the get() action is as follows:

action get(string key) returns Row

The Table.get() action returns a Row event that represents the row in the table that has the specified key. If there is no row with the specified key, this action returns a Row event that represents a row that contains default values. A call to the Row.inTable() action returns false. For example:

boolean done := false;
integer n := -1;
while not done {
   Row row := tbl.get("example-row");
   n := row.getInteger("times_run");
   row.setInteger("times_run", n+1);
   done := row.tryCommit();
}
send Result(
   "This example has been run " +n.toString() +" time(s) before")
   to "output";

The add() action does the same as the get() action, except that it does not check if the row that is to be added already exists in the table until commit() is called and it therefore never throws an exception. If you are sure that the row does not yet exist, you can use add() as this is faster than get().

To remove a row from a table, call the Table.remove() action on the table that contains the row. The action declaration is as follows:

action remove(string key)

The Table.remove() action removes the row with the specified key from the table. If the row does not exist, this action does nothing.

It is also possible to remove a row transactionally, by calling Table.get() and then Row.remove() and Row.commit(). This strategy lets you check the row’s state before removal. The Row.commit() action fails if the shared, in-memory row has been updated since the Table.get() action.

In some circumstances, using Row.remove() is essential to guarantee correctness. For example, when decrementing a usage counter in the row and removing the row when the count reaches zero. Otherwise, another correlator context might re-increment the count between it reaching zero and the row being removed.

Iterating over the rows in a table

Iterators have operations to step through the table and determine when the end has been reached. Provided an iterator is not at the table’s end, the key it is at can be obtained.

Iterator events define actions that do the following:

  • Step through the rows in a table.
  • Determine when the last row has been reached.
  • Obtain the key of the row that the iterator is at. The iterator must not be at the end of the table for this action to be successful.
  • Obtain a Row event to represent the row that the iterator is at.

The following sample code reads table content:

Iterator i := tbl.begin();
while not i.done() {
   Row row := i.getRow();
   if row.inTable() {
      // Put code here to read the row in the way you want.
   }
   i.step();
}

The following sample code modifies table content:

Iterator i := tbl.begin();
while not i.done() {
   Row row := i.getRow();
   boolean done := false;
   while row.inTable() and not done {
      // Put code here to modify the row in the way you want.
      done := row.tryCommitOrUpdate();
   }
   i.step();
}

Iterating through a table is always safe, regardless of what other threads are doing. However, if another context adds or removes a row while you are iterating in your context, it is undefined whether your iterator will see that row.

Furthermore, it is possible for another context to remove a row while your iterator is pointing at it. If this happens, a subsequent Iterator.getRow() returns a Row event that represents a row for which Row.inTable() is false.

If an EPL action loops, the correlator cannot perform garbage collection within that loop. (See Optimizing EPL programs.) Performing intricate manipulations on many rows of a large table could therefore create so many transitory objects that the correlator runs out of memory. If this becomes a problem, you can divide very large tasks into smaller pieces, each of which is performed in response to a routed event. This gives the correlator an opportunity to collect garbage between delivering successive events.

Requesting persistence

After changing a MemoryStore table, you can call the Table.persist() action to store the changes on disk. Note that you can call persist() only on tables in an on-disk store; you cannot call persist() on tables in correlator-persistent, in-memory, or distributed stores. The correlator automatically persists correlator-persistent stores and their contents at the same time as the rest of the correlator runtime state. Updating a table on disk is an asynchronous action. The MemoryStore sends a Finished event to indicate success or failure of this action. The persistent form of the database that contains the tables is transactional. Consequently, if there is a hardware failure either all of the grouped changes are made or none of them are made.

Following is an example of storing a table on disk:

integer id := tbl.persist();
on Finished(id,*,*) as f onPersisted(f);

action onPersisted(Finished f) {
   if not f.success { log "Whoops"; die; }
   log "All OK" at INFO;

When you update a table, the MemoryStore copies only the changes to the on-disk table.

To improve performance, the MemoryStore might group persistence requests from multiple users of a particular store. This means that calling persist() many times in rapid succession is efficient, but this does not affect correctness. If the MemoryStore indicates success, you can be certain that the state at the time of the persist() call (or at the time of some later persist() call) is on disk.

You can call the Store.backup() action to backup the on-disk form of a store while it is open for use by the correlator. This is an asynchronous action that immediately returns an ID. The MemoryStore sends a Finished event that contains this ID to indicate success or failure of this action. Be sure to define an event listener for this event.

Exposing in-memory or persistent data as DataViews

You can expose committed in-memory data or committed persistent data as DataViews for use by Scenario Service clients such as dashboards (see Scenario Service API). Note, however that is not supported for distributed stores. The Schema event defines the following fields for this purpose:

  • exposeMemoryView — When this field is true, the MemoryStore makes the rows in the in-memory table associated with this schema available to Apama’s Scenario Service. That is, the MemoryStore creates DataViews that contain this data.
  • exposePersistentView — When this field is true, the MemoryStore makes the rows in the on-disk table associated with this schema available to Apama’s Scenario Service. That is, the MemoryStore creates DataViews that contain this data. You cannot expose a persistent view of a table in a correlator-persistent store.
  • memoryViewDisplayName — Specifies the display name for the exposed DataView created from the in-memory table.
  • memoryViewDescription — Specifies the description for the exposed DataView created from the in-memory table.
  • persistentViewDisplayName — Specifies the display name for the exposed DataView created from the on-disk table.
  • persistentViewDescription — Specifies the description for the exposed DataView created from the on-disk table.

The MemoryStore exposes in-memory changes after successfully committing them to the table. The MemoryStore exposes on-disk changes after the transaction that contains the changes is committed.

The exposeMemoryView and exposePersistentView fields have an impact on the time it takes to prepare a table for the first time. When a table is prepared the rows that are loaded from disk need to be reflected to the Scenario Service.

If you prepare the same table multiple times the display names and descriptions must match or the MemoryStore rejects the contradicting request.

When a display name or description field is blank (an empty string), the MemoryStore chooses the display name or the description for the exposed DataView. You can specify a non-empty string for one or more fields to override the default. Leave the display name and description fields blank when you are not exposing the corresponding DataView.

The fields of the exposed views are the same as those of the table, in the same order as they are defined in the table schema. The key is not part of the exposed views. Each row in the table forms a single exposed view.

See Making application data available to clients. See also Building and Using Apama Dashboards.

Monitoring status for the MemoryStore

The MemoryStore provides status values via the user status mechanism. It provides the following metrics:

  • memStoreCommitTimeEwmaLongMillis — A longer-term exponentially-weighted moving average (EWMA) of the time in milliseconds taken to commit to the MemoryStore (in-memory store and on-disk persistence).
  • memStoreCommitTimeEwmaShortMillis — A quickly-evolving exponentially-weighted moving average (EWMA) of the time in milliseconds taken to commit to the MemoryStore (in-memory store and on-disk persistence).
  • memStoreCommitTimeMaxInLastHour — The maximum commit latency in milliseconds since the start of the last 1 hour measurement period for the MemoryStore (in-memory store and on-disk persistence).
  • distMemStoreCommitTimeEwmaLongMillis — A longer-term exponentially-weighted moving average (EWMA) of the time in milliseconds taken to commit to the distributed MemoryStore.
  • distMemStoreCommitTimeEwmaShortMillis — A quickly-evolving exponentially-weighted moving average (EWMA) of the time in milliseconds taken to commit to the distributed MemoryStore.
  • distMemStoreCommitTimeMaxInLastHour — The maximum commit latency in milliseconds since the start of the last 1 hour measurement period for the distributed MemoryStore.

The above MaxInLastHour values (for both the MemoryStore and the distributed MemoryStore) are updated if either of the following conditions is true:

  • The latency of current commit transaction is greater than the existing maximum.
  • The existing maximum value was set more than 1 hour ago.

For more information about monitor status information published by the correlator, see Managing and monitoring over REST and Watching correlator runtime status.

Restrictions affecting MemoryStore disk files

At any one time, only one correlator should be accessing a particular MemoryStore disk file.

To minimize the risk of data corruption in the event of a system failure, keep MemoryStore files on your local disk and not on a remote file server.

Do not create hard or symbolic links to MemoryStore files. Linking to the directory that contains a MemoryStore file is not a problem.

Using the distributed MemoryStore

Info
The distributed MemoryStore, TCStore and BigMemory features are deprecated and will be removed in a future release.

With a distributed MemoryStore, you can access data shared among Apama applications running in separate correlators. Distributed stores make use of distributed caching software from a variety of third-party vendors.

The topics below describe typical use cases for the distributed MemoryStore, how to add and configure distributed stores, and how to write drivers for integrating with third-party caching software.

Overview of the distributed MemoryStore

The MemoryStore supports several types of stores as described in Using the MemoryStore. In addition to those stores that are local to a single Apama process, Apama also supports a distributed store in which data can be accessed by applications running in multiple correlators. You prepare a distributed store with a prepareDistributed call on the Storage interface. When this sends a Finished event with success set to true, the Store can be opened, and Table objects created.

A distributed store makes use of Terracotta’s TCStore or BigMemory Max, or a third-party distributed cache or datagrid technology that stores the data (table contents) in memory across a number of processes (nodes), typically across a number of machines. The collection of nodes is termed a cluster.

Advantages

Arranging a number of nodes into a cluster provides the following advantages:

  • It is possible to store more data than would fit on one node.
  • As the data is in memory, a distributed store is typically faster than persisting the store contents to disk.
  • Every piece of data is typically stored on more than one node, so the failure of any one node should not cause the loss of any committed data.
  • If a node fails, other nodes can access any of the data without waiting to “recover” or reload the entire datastore. Note, however, that it may take time to detect that the failed node is down.
  • The number of correlators can be changed at runtime, allowing the processing capacity of the system to be increased.
  • Different providers can be used, allowing a single Apama application to integrate with different distributed caches. However, each provider must have a driver. Apama provides a Service Programming Interface (SPI) with which you can write a custom driver (see Creating a distributed MemoryStore driver for more information).
  • Data is accessible to multiple correlators. If they distribute workload appropriately, more processing capacity can use the same shared store of data. A distributed store is a building block for such a system, not a complete solution in itself.
  • Applications can be notified of changes to data in the store (see Notifications for more information).

Disadvantages

A distributed store has the following disadvantages compared with the other types of store:

  • A network request may be required to get or commit any Row. This is slower than the in-process local-memory get and commit requests made against local stores.
  • The network request may fail because either more than one node has failed or there is a network failure such that the correlator cannot contact other nodes in the cluster.
  • Multiple access to a single row will cause contention and will not scale (and will be slower than an in-memory store).
  • It is not permitted to expose DataViews with a distributed store. A distributed store may contain a very large number of entries, which would not be practical to expose as DataViews (as it requires storing a copy of the entire table in the dashboards/Scenario Service client).

Use cases

Based on the advantages and disadvantages of distributed stores, the typical use cases for using them are:

  • More data needs to be stored than will fit on any single node.
  • Elastic (changing) processing capacity is required.
  • A highly available system needs continuous access to data, even if some nodes fail, and with minimal recovery time.
  • High throughput is required across a large number of different rows, with only a small amount of contention for a single row.

The typical use cases where a distributed store is not suitable are:

  • Very low latency (sub-millisecond) access to data.
  • Very high throughput (more than 10,000 requests per second) to a single row. The distributed store only scales out well if different rows are being accessed.

Supported providers

Apama includes drivers for connecting to Terracotta’s TCStore and BigMemory Max, which provide unlimited in-memory data management across distributed servers. See TCStore (Terracotta) driver details and BigMemory Max driver details for more information.

Apama also provides an interface to integrate with third-party distributed caching software that provides compare-and-swap operations for adding, updating, and removing data (for example, software that provides methods similar to the putIfAbsent, replace, and remove operations on java.util.concurrent.ConcurrentMap).

For other distributed cache providers, you need to write a driver using the Apama Service Provider Interface (SPI) to serve as a bridge between the MemoryStore and the caching software. For information on creating a driver, see Creating a distributed MemoryStore driver.

Configuration

In order to use a distributed MemoryStore, a set of configuration files must be created in your project and provided to the correlator. These configuration files typically come in pairs: a storeName-spring.xml file and a storeName-spring.properties file. Multiple pairs of files can be created and can make use of more than one distributed cache provider. See Configuring a distributed store.

Distributed store transactional and data safety guarantees

The commit() action on a Row object from a distributed store by default behaves similarly to an in-memory store’s Row object, in that the commit succeeds only if there have been no commits to the Row object since the most recent get() or update() of the Row object.

However, providers can be configured differently. For example, if using TCStore or BigMemory Max, and the .properties specifies useCompareAndSwap as false, then the commit will always succeed, even if another monitor committed a different value for that entry.

Unlike in-memory stores, for Row objects from a distributed store, a Table.get() or Row.update() may return an older value, that is, a previously committed value, even if a more recent commit has completed. This is because a distributed store may perform caching of data. After some undefined time, the get() should be eventually consistent; a later get() or update() of the Row object should retrieve the latest value. Typically, a commit of a Row object where the get() has not retrieved the latest value will flush any local cache of the value, thus the first commit will fail, but a subsequent update and commit will succeed.

Again, providers can be configured differently. For the BigMemory Max driver, setting the terracottaConfiguration.consistency property to STRONG will ensure that after a commit(), a get() on any node will retrieve the latest version. This STRONG consistency mode is more expensive than EVENTUAL consistency.

An example: Monitor1 gets and modifies a row and sends an EPL event to Monitor2 which in response to the event gets and updates the row. In the table below, the event has “overtaken” the change to the row; the effects of changing the row and sending the event are observed in the reverse order (the event is seen before the change to the row).

Time Monitor1 (on node 1) Monitor2 (on node 2)
1 Table.get("row1") = "abc"
1.2 Change row to be "abcdef" Table.get("row1") = "abc" (cached locally)
1.3 Row.commit("row1" as "abcdef") succeeds
1.301 Send event to node 2
1.302 Receive event from node 1
1.303 Table.get("row1") = "abc" from local cache)
1.4 Update row to be "abcghi"
1.5 Row.commit("row1 as "abcghi") fails (not last value)
1.6 Row.update() = "abcdef"
1.7 Update row to be "abcdefghi"
1.8 Row.commit("row1" as "abcdefghi") succeeds

At 1.303, an in-memory cache (when two contexts are communicating in the same process) would be guaranteed to retrieve the latest value, "abcdef", but a distributed store may cache values locally. The commit is guaranteed to fail when a stale value is read, as it does not rely on cached values for checking whether the row is up to date or not.

Different providers may have other differences in behavior. In particular, they may differ in whether or not they use referential checking, that is, if one client reads a row, and meanwhile the row is modified but then modified back to the first value, the client with the old Row object may or may not succeed in performing a commit(). Some providers require the row to not have changed since the row was read (even if then changed back to the value at the point it was read), while others will only compare the contents of the row.

Also avoid relying on comparisons based on the float values NaN and -0.0. Some providers may treat NaN as not equal to any value, including a NaN, which will result in the commit() method never being able to complete. Providers may differ in whether 0.0 and -0.0 are treated as the same value or not. Consider this to be undefined behavior.

Configuring a distributed store

Adding distributed MemoryStore support to a project

If you want to configure a distributed store, you first have to add Apama’s Distributed MemoryStore adapter bundle to an Apama project.

The description below explains how to add the bundle using Apama Plugin for Eclipse, but you can also add it using the apama_project command-line tool as described in Creating and managing an Apama project from the command line.

To add the Distributed MemoryStore adapter bundle to a project

  1. In the Project Explorer, right-click the Connectivity and Adapters node and select Add Connectivity and Adapters.

  2. Select Distributed MemoryStore (Supports using a distributed cache from MemoryStore) from the list of available adapters.

  3. Click OK.

    The adapter bundle is added to the project’s Connectivity and Adapters node and the adapter instance is opened in the Distributed MemoryStore editor. The editor is initially blank and the Distributed Stores panel contains no distributed stores.

Adding a distributed store to a project

After the Distributed MemoryStore adapter bundle has been added, you can add a distributed store to the project.

To add a distributed store to a project

  1. In the Distributed MemoryStore editor’s Distributed Stores panel, click the Plus icon (Add Store) button. The Distributed MemoryStore Configuration Wizard appears.

  2. In the Distributed MemoryStore Configuration Wizard, specify the following:

    1. From the Store provider drop-down list, select the third-party cache provider.

      If you are using a driver supplied by Apama, select TCStore (Terracotta) or BigMemory Max from the drop-down list. Otherwise select Others from the drop-down list.

      For queries, select Apama Queries (using TCStore) from the drop-down list.

      Info
      Apama queries are deprecated and will be removed in a future release.
    2. In the Store name field, specify the name of the store as it will be known in the configuration files and EPL code. The name must be unique and cannot contain spaces.

      When you have selected Apama Queries (using TCStore), the store name ApamaQueriesStore is automatically provided; this name cannot be changed in the wizard.

    Info
    Support for using BigMemory Max for queries is deprecated and will be removed in a future release. It is recommended that you now use Terracotta’s TCStore for queries by selecting Apama Queries (using TCStore) as the store provider. However, if you still want to use BigMemory Max for queries, select BigMemory Max as the store provider and specify “ApamaQueriesStore” as the store name.
  3. Click Finish.

    The name of the store is added to the Distributed Stores panel in the editor, and the resources for the store are added to the project. The default configuration settings for the store are displayed in the editor.

Configuring a distributed store

After a distributed store has been added to the project, you can configure it.

You can configure frequently-used settings for a distributed store in the Distributed MemoryStore editor. These settings are those in the .properties file. For other settings, you need to edit the .xml file directly.

To configure a distributed store
  1. Specify any provider-specific configuration options in either the Standard properties section of the editor or, for TCStore, in the TCStore sections of the editor.

  2. In the Classpath section, specify the names of the required provider-specific .jar files. This is only required if Other was used as the store type (that is, if you are not using TCStore or BigMemory Max).

    1. Click the Plus (Add Classpath) button. A new line is added to the location list.

    2. In the new line, specify the name of the .jar file. When you specify the path to a .jar file, you should use substitution values rather than a full path name (for example, use ${installDir.mystore}/lib/my.jar).

  3. In the Custom property substitution variables section, specify the name and values of additional substitution variables (${varname}), if any, used by the distributed cache. The .properties file contains substitution variables that are used by the .xml configuration file.

    You can define your own property substitution variables here, which will be written to the .properties file when you save. You can then edit the .xml file (see below) to use your own property substitution variables wherever you wish.

    1. Click the Plus (Add Variable) button. A new line is added to the list of substitution variables.

    2. In the new line, specify the name and value of the substitution variable you want to add.

  4. (If needed.) In the Configuration files section, you can access the Spring .xml and .properties files. Click on the file name links to open them in the appropriate editor.

    For more information on specifying property values, see Configuration files for distributed stores.

Launching a project that uses a distributed store

When you add the Distributed MemoryStore adapter bundle to an Apama project, the launch configuration is automatically updated to set the --distMemStoreConfig startup option.

In Apama Plugin for Eclipse, the maximum Java heap size and off-heap storage can be set in the Correlator Configuration dialog of the Run Configurations dialog. See Adding a correlator for more information on that dialog.

Interacting with a distributed store

Once prepared, a distributed store behaves much like other MemoryStore Store objects as described in Using the MemoryStore. However, be aware of the following differences:

  • The schema for tables in a distributed store is not allowed to expose DataViews.
  • A distributed store (as opposed to other, non-distributed stores) supports notifications. For more information, see Notifications.
  • Exceptions. In an in-memory store, only the Row.commit() action can throw exceptions. However, in a distributed store, most actions can throw exceptions. An exceptions represents runtime error that can be caught with a try... catch statement. This allows developers to choose what corrective action to take (such as logging, sending alerts, taking corrective steps, retrying later, or other actions). If no try... catch block is used with these actions and an exception is thrown, the monitor instance will be terminated, the ondie() action will be called if one exists, and the monitor instance will lose all state and listeners. Exceptions can be thrown because of errors raised by third-party distributed cache providers. To discover what errors could be thrown because of third-party integration, you should refer to the documentation for the third-party provider in use. For more information on exceptions, see Exception handling. The following are some of the actions that can throw exceptions:
    • Table.get()
    • Table.begin()
    • Iterator.step()
    • Row.commit()
    • Row.update()
  • Performance differences. See Overview of the distributed MemoryStore for the advantages and disadvantages of using a distributed store as compared to an in-memory store.

Notifications

Distributed store Table objects may support the subscribeRowChanged() and unsubscribe() actions. If subscribed to a table, RowChanged events will be sent to that context. Subscriptions are reference counted per context, so multiple subscriptions to the same table in the same context will only result in one RowChanged event being sent for every change. Monitors should unsubscribe when they terminate (for example, in the ondie() action) to avoid leaking subscriptions.

The store factory bean property rowChangedOldValueRequired (see also Standard configuration properties for distributed stores) indicates whether subscribers receive previous values in RowChanged notification events for updated rows. When this property is set to true and the RowChanged.changeType field is set to UPDATE the RowChanged.oldFieldValues field is populated.

Notifications can impact performance, so are not recommended for tables in which a large number of changes are occurring. While TCStore and BigMemory Max support notifications, they do not support population of the old value in RowChanged.changeType = UPDATE events.

Within a cluster of correlators, if a table has subscriptions to RowChanged notifications, then all correlators must subscribe RowChanged notifications for that table, even if some correlators do not consume the events. This ensures all nodes receive all events correctly.

Support for notifications is optional, but if the driver does not support notifications, calls to Table.subscribeRowChanged() and Table.unsubscribe() will throw OperationNotSupportedException errors.

There is no support for RowChanged notifications for any extra fields (if supported). Only fields named in the schema are included in a RowChanged notification.

Some drivers do not provide RowChanged notifications for the rows removed by a Table.clear() operation. This behavior is driver-specific, therefore consult the driver documentation for more details.

Some drivers, such as TCStore, support sending a MissedRowChanges notification event in situations where an unknown number of RowChanged events may have been dropped, for example, due to a failure or high load.

Neither the TCStore driver nor the BigMemory Max driver set the old values with a RowChanged event. The old values sequence will always be empty for both of these drivers.

The BigMemory Max driver sets new values in the RowChanged event, but the TCStore driver does not. Therefore, if you need the new values in TCStore, you need to explicitly get them (using Table.get or similar).

See also the description of com.apama.memorystore.RowChanged in the API reference for EPL (ApamaDoc).

Configuration files for distributed stores

The configuration for a distributed store consists of a set of .xml and .properties files. Each distributed store in a project has the following files:

  • storeName-spring.xml
  • storeName-spring.properties

A distributed store is configured using a bean element in the Spring XML configuration file. The bean element has the following attributes:

  • id – The unique name for this distributed store, which must match the name used in calls to Storage.prepareDistributed() and Storage.open() in EPL.
  • class – The name of the StoreFactory implementation used by this distributed store.

When the correlator is started with the –distMemStoreConfig dir option (see also Starting the correlator), it loads all XML files matching *-spring.xml in the specified configuration directory, and also all *-spring.properties files in the same directory. (Note, the correlator does not start unless the specified directory contains at least one configuration file.)

Info
When the correlator is started, any properties that are specified with the –config file or -Dkey=value option take precedence and override the properties defined in a storeName-spring.properties file. An INFO message is then logged for all Spring properties that are being ignored.

When using Apama Plugin for Eclipse, these files are generated automatically. New storeName-spring.xml and storeName-spring.properties files are created when a store is added to a project. The most commonly used settings can be changed at any time using the Distributed MemoryStore editor (which rewrites the .properties file whenever the configuration is changed). In addition, the storeName-spring.xml files can be edited manually in Apama Plugin for Eclipse to customize more advanced configuration aspects. To edit the XML file, open the Distributed MemoryStore editor and in the Configuration files section, click the name of the file to open it in the appropriate editor. Once the editor for an XML file has been opened, you can switch between different views using the Design and Source tabs at the bottom of the editor window.

Some property values usually need to be changed when a development and testing configuration is deployed to a different environment such as one for production use. For more information on modifying property values when moving from a test environment to a production environment, see Changing bean property values when deploying projects.

Making use of substitution variables is the best way to maintain different bean property values in different environments, as you can use the same XML file, with a different .properties file for each environment. For more details on using substitution variables to specify configuration properties, see Substitution variables.

XML configuration file format

The configuration files for a distributed store use the Spring XML file format, which provides an open-source framework for flexibly wiring together the different parts of an application, each of which is represented by a bean. Each bean is configured with an associated set of properties, and has a unique identifier which can be specified using the id attribute.

It is not necessary to have a detailed knowledge of Spring to configure a distributed store, but you may wish to explore the Spring 3.0.5 documentation to obtain a deeper understanding of what is going on and to leverage some of the more advanced functionality that Spring provides.

The Apama distributed MemoryStore configuration loads any bean that extends the Apama AbstractStoreFactory class.

Setting bean property values

Most bean properties have primitive values (such as string, number, boolean) which are set like this:

<property name="propName" value="my value"/>

However, it is also possible to have properties that reference other beans, such as a configuration bean defined by the third-party distributed cache provider. These property values can be set by specifying the id of a top-level bean as in the following example (where it is assumed that myConfig is the id of a bean defined somewhere in the file):

<property name="someConfigProperty" ref="myConfig"/>

Any top-level bean may be referenced in this way, that is, any bean that is a child of the <beans> element and not nested inside another bean. Referencing a bean that is defined in a different configuration file is supported.

Instead of referencing a shared bean, it is also possible to configure a bean property by creating an inner configuration bean nested inside the property value like this:

<property name="terracottaConfiguration">
  <bean class="net.sf.ehcache.config.TerracottaConfiguration">
    <property name="consistency" value="STRONG"/>
  </bean>
</property>

Note, advanced users may want to exploit Spring’s property inheritance by using the parent attribute on an inner bean to inherit most properties from a standard top-level bean while overriding some specific subset of properties or by type-based auto-wiring.

You can use the Spring syntax for compound property names to set the value of a property held by another property. For example, to set a property stringProp on a bean held by the property beanProp, use the following:

<property name="beanProp.stringProp" value="myValue"/>

Or, to set the value of the key myKey in a property that holds a Map called mapProp, use the following:

<property name="mapProp[myKey]" value="myValue"/>

Substitution variables

Substitution variables in the form ${varname} can be used to specify bean property values. Instead of specifying bean property values directly in an XML configuration file, you use ${varname} substitution variables in the XML file and specify the values of those variables in a .properties file inside the configuration directory. This makes it possible to edit the variable values in Apama Plugin for Eclipse and to use different values during deployment to a production environment using the Apama Ant macros.

Although .properties and storeName-spring.xml files often have similar names, there is no explicit link between them, so any properties file can define properties for use by any storeName-spring.xml file. Although in some cases it may be useful to share a single substitution variable across multiple XML files, this is not normally the desired behavior, and therefore the recommendation is that all properties follow the naming convention ${varname.storeName}.

In addition to the standard substitution variables shared by most drivers, you can add your own substitution variables for important or frequently changed properties specific to the driver specific to the cache integrated with your application. This is especially important when changing from a development environment to a production environment.

It is also possible to provide property values at runtime as Java system properties, such as specifying -J-Dvarname=value on the correlator command line.

The special variables ${APAMA_HOME} and ${APAMA_WORK} are always available.

Substitution variables are evaluated recursively. So a substitution variable can refer to another substitution variable, for example, classpath=${installDir}/foo.jar.

Standard configuration properties for distributed stores

The following standard properties are supported by Apama distributed cache drivers. These properties should be supported by customer-developed implementations as well.

Property Name

Description

clusterName

This is a required property. It is a provider-specific string that is used to group together distributed store nodes that communicate with each other and share data. Store objects with the same clusterName value should operate as a single cluster, sharing data between them, whereas stores with different clusterName values should operate independently if possible. For providers such as Terracotta/TCStore that use other configuration properties to indicate which components to connect to, the clusterName is just a display name used in log messages with no impact on behavior.

For BigMemory Max, the clusterName is a comma-separated list of host:port pairs that identify the servers in the Terracotta Server Array.

Care should be taken to ensure that different clusters, and thus clusterName values, are used for development/testing and production environments, as serious errors would be introduced if the production and testing nodes were able to communicate with each other. For the Terracotta/TCStore driver, this is not a concern as it is only a display name. The BigMemory Max driver makes it easy to avoid this pitfall since it requires a list of host:port. However, if you are using another driver, then for this reason, as well as whatever firewalls may exist between development/testing and production, the recommendation is to explicitly add a suffix such as _testing or _production to the clusterName to indicate clearly to which environment it belongs.

logLevel

This is an optional property. The default is provider-specific, but is typically the same as the correlator log level. The logLevel property is an Apama log level string (compatible with com.apama.util.Logger) such as ERROR, WARN, INFO, DEBUG which is used to set the log level for the provider if possible (some providers write to the main correlator log file, through log4j or the Apama Logger, but others may write to a separate file).

If not specified, the default log level is determined by the author of the driver, based on the criteria of avoiding the correlator log or stdout being filled with third-party distributed store messages while logging a small number of the most important messages.

backupCopies

This is an optional property. The default is 1. The backupCopies property specifies the number of additional redundant nodes that should hold a backup copy of each key/value data element. The minimum value for this property is 0 (indicating no redundancy, that is, all data is held by a single node).

Note that some providers may allow customizing the backup count on a per-table basis, in which case this property specifies an overridable default value for tables that do not specify it explicitly.

For BigMemory Max, this setting has no effect. The number of backup copies is determined by the Terracotta Server Array configuration, which is separate from the Apama configuration.

This property is not used by the TCStore driver.

initialMinClusterSize

This is an optional property. The initialMinClusterSize property specifies the minimum number of nodes a cluster must have before the Finished event is sent in response to a call to prepareDistributed. This provides a way to make sure that a cluster is fully ready for correlator nodes to request and process data.

The default is 1, which specifies that a Finished event is sent without waiting for additional nodes when preparing the distributed store.

This property is not used by the TCStore driver.

rowChangedOldValueRequired

This property indicates whether the old value is required when there is a notification that a row has changed. The default is true. If set to false, the value of oldFieldValues is empty for RowChanged.changeType.UPDATE events.

If set to true, the previous value is available. This cannot be set to true for TCStore or BigMemory Max.

enforceApamaSchema

This is an optional property. The enforceApamaSchema property sets whether Apama creates a special table to record the schema of each table and use it to prevent opening a table with a different Apama schema to the previously used schema.

The default for most drivers (except TCStore) is to enable this checking to provide early warning of unexpected version mismatch issues. If a mismatch is detected, you should manually delete all tables used by Apama from the distributed store, typically using the delete command from the file system or tools provided by the distributed store. Note that this check provides no protection against non-Apama clients adding data that does not match the expected schema.

If the standard properties were set, the bean configuration would look like:
<bean id="MyStore" class="com.foobar.MyStoreFactory">
  <property name="clusterName" value="host1:port1, host2:port2"/>
  <property name="logLevel" value="WARN"/>
  <property name="backupCopies" value="1"/>
  <property name="initialMinClusterSize" value="2"/>
  <property name="rowChangedOldValueRequired" value="true"/>
  <property name="enforceApamaSchema" value="true"/>
</bean>

TCStore (Terracotta) driver details

Info
The TCStore drive is deprecated and will be removed in a future release.

Apama provides a MemoryStore driver for Terracotta. This driver uses the TCStore API to allow Apama to read and write records in TCStore datasets, which may also be read and written by other Apama correlators and non-Apama components such as custom clients written against the TCStore API.

The TCStore client libraries are installed automatically with the Apama server. So there is no need to explicitly select them at installation time, or to specify their location in the driver configuration.

Info

Apama’s TCStore driver only supports clustered mode. Currently, there is no support for embedded TCStore, for performing searches from Apama, or for the Ehcache API.

BigMemory Max is the recommended driver for caching use cases (see also BigMemory Max driver details). For applications that do not involve caching (such as store and “system of record” use cases), TCStore is the recommended MemoryStore driver.

Using TCStore from the Apama MemoryStore

The following table maps Apama MemoryStore terminology to TCStore classes; this may be useful when referring to the Terracotta documentation (see https://docs.webmethods.io/on-premises/terracotta):

Apama MemoryStore Event API Name TCStore Class
Store DatasetManager
Table Dataset
Row Record
Field value of a Row Cell

Apama schemas and TCStore

Apama applications using the MemoryStore always specify a schema indicating the names and types of the fields (cell definitions) when preparing a table for use.

TCStore does not have a concept of schemas, as records in a given table are not required to have the same cell definitions. Row fields (cells) named in the schema during table preparation can always be accessed from Apama. It is not necessary for every field named in the schema to exist in a given row (record). Apama simply returns a default value (for example, an empty value, 0 or false) when reading any field listed in the schema that is not present in a given row. It is also possible to access extra fields that are present in an individual row (record) but not listed in the schema, using any of the getXXX actions or toDictionary(). If a getXXX action is called with a key that is not present in the schema and is also not present as an extra field on the row, it will throw an exception. You can call getKeys() to see a list of all the fields (schema and non-schema) accessible.

Apama can therefore make use of non-homogenous data written to TCStore by other clients. If there is no strictly defined schema for a dataset, it is possible to not specify any fields in the schema, subject to the above restrictions. It is also possible for different Apama applications or different versions of the same application to interact with a single TCStore table using different schemas. When developing Apama applications, you should remember that all rows in a table do not necessarily contain the same fields, and ensure that all clients are using the same types for each field.

Apama can also dynamically add and remove additional non-schema fields. Any set method on a Row for a new field will create a new field on the row. Non-schema fields can be removed either with the removeNonSchema method or by setting the field to an empty any object.

Info
All clients using a Dataset should agree on using the same types for a given cell name. The behavior when multiple clients read and write a row field (cell) with the same field name but different types is undefined, and may change in future releases of Apama.

Apama can only access cells of the Java types Long (maps to integer in EPL), Double (maps to float in EPL), String (for string or decimal), and Boolean. The decimal type is not supported by TCStore, so the driver converts to or from a string automatically for decimal fields defined in the schema. For non-schema fields, the EPL application needs to convert between string and decimal manually.

Configuring the TCStore driver

You can create configuration files for the TCStore driver using Apama Plugin for Eclipse. The only required property is the list of Terracotta servers to connect to.

The following table lists the properties that can be configured in the storeName-spring.xml file for the TCStore driver. The most commonly used properties also have ${varname.storeName} substitution variables in the storeName-spring.properties file to make it easy to change the values by editing the properties file manually or using the Distributed MemoryStore editor in Apama Plugin for Eclipse.

Property Name

Description

servers

Type: string A comma-separated list of host1:tsa-port1,host2:tsa-port2,... values for each server from one of the stripes in the cluster, that is, all the servers defined in the Terracotta server configuration. Servers from other stripes are found automatically. A terracotta:// URI prefix can optionally be added to the first host:port, but is not required. At least one server must be specified. If high availability is required, multiple servers should be specified. See the Terracotta documentation for detailed information about cluster architecture.

useCompareAndSwap

Type: boolean Specifies whether to provide safe atomic updating of rows by multiple clients. This means that a commit only succeeds if the value last seen by the client before it made the change matches the current value at the time of the commit. The default is true, which is the recommended setting for most use cases. Compare and swap can be disabled if each TCStore client (and thread) is known to have exclusive access to a given row, or if “best effort” rather than fully reliable behavior is required, which is likely to result in improved performance. If compare and swap is disabled, MemoryStore commit() operations do not fail as a result of writes made by other clients.

Info

Important:

For correct behavior of Apama queries, you must leave the useCompareAndSwap property in its default (true) setting. Apama queries are deprecated and will be removed in a future release.

classpath

Type: string Specifies the path of the driver. This property should not be changed from its default value.

connectionTimeoutSecs

Type: integer Specifies the timeout used when establishing the initial connection to the Terracotta server while preparing the store. The default is 0, which uses the standard timeout supplied by TCStore. See the Terracotta documentation on withConnectionTimeout for further details.

logLevel

Type: string Specifies the verbosity of log messages from the Terracotta client. It does not affect the verbosity of the Apama TCStore driver. The default value for this is to remain unset, which uses the same log level for Terracotta as the correlator’s main log level.

clusterName

Type: string Not recommended for use. Specifies a display name that is logged at startup. This setting has no impact on behavior, and does not have to match the name of the connected Terracotta cluster. The default is hardcoded: Terracotta.

enforceApamaSchema

Type: string Not recommended for use. Specifies whether Apama should attempt to detect use of conflicting schemas by multiple Apama clients using the same Terracotta cluster. The default is false, as for typical TCStore use cases this checking is not useful.

securityRootDirectory

Type: string Specifies the path to the security root directory, if using TLS. For more information, see the Terracotta documentation.

The following properties are configured in the same files, but are used only when Apama creates a new dataset \(table\) as a result of trying to open a dataset that does not yet exist. These settings have no effect if the dataset was already created, whether by Apama or another TCStore client.

These settings apply to all datasets created by a given Apama store. If it is necessary to create datasets with different settings on the same Terracotta cluster, you have to configure separate Apama stores for each group of datasets needing the same configuration, or use another tool or client to create the datasets. To change the dataset settings after creation, you have to delete any data directories and restart the cluster. See the Terracotta documentation for more information.

The dataset creation configuration properties are:

Property Name

Description

offheapResourceName

Type: string Specifies the name of a resource defined in the Terracotta server configuration to be used for off-heap storage of the dataset contents. This is a required setting when creating datasets from Apama.

enableDatasetPersistence

Type: boolean Specifies whether dataset contents are persisted on disk. The default is false, indicating that dataset contents should be held only in memory.

persistenceDataRootName

Type: string Specifies the logical name of a data directory defined in the Terracotta server configuration. Note that this is a logical name; it is not a directory path. This property only has an effect if enableDatasetPersistence is true.

The following standard configuration properties are not used by this driver and are either ignored or rejected if set:
  • backupCopies
  • initialMinClusterSize
  • clusterName
  • rowChangedOldValueRequired (cannot be set to true for this driver).
    Info
    TCStore currently sends notifications for rows removed by a Table.clear() operation, but this may change in a future release.
Info
The standard enforceApamaSchema property is set to false by default for TCStore, which is the recommended setting in almost all cases.

BigMemory Max driver details

Info
Support for using BigMemory Max is deprecated and will be removed in a future release. Apama queries are deprecated and will be removed in a future release.

Using BigMemory Max from the Apama MemoryStore

For reference, the following table maps Apama MemoryStore terminology to BigMemory Max classes; this may be useful when referring to the BigMemory Max documentation:

MemoryStore Event Object BigMemory Max Class
Store CacheManager
Table Cache
Row Element

By default, a distributed MemoryStore Store uses the BigMemory Max default cache manager. To specify the use of a different cache manager, specify the name property on the configuration bean. For example:

<property name="configuration.name" value="myCacheManager"/>

In a cluster, if one correlator calls subscribeRowChanged() for a given MemoryStore table, then all correlators in that cluster that modify the entries in that table must also call subscribeRowChanged() on that table even if they do not consume the events.

Iterating over a table may require pulling the entire table into memory. It may fail if the table is being modified.

If accessing a BigMemory Max table from Apama and non-Apama applications, clients will need the correct cache configuration (available from the Terracotta Management Console) and have the appropriate Apama classes available on their classpath (available in the distmemstore and ap-distmemstore-bigmemory.jar files) in order to access the cache.

Configuring the BigMemory Max driver

You can create configuration files for BigMemory Max when using Apama in Apama Plugin for Eclipse. The providerDir property should be set to the location of the BigMemory Max installation whose client libraries will be used, which is typically in the same location as Apama.

The driver for BigMemory Max is configured as follows:

  • You can set BigMemory Max driver properties (described in the table below) in the storeName-spring.xml configuration file. Alternatively, you can specify many of these properties in an ehcache.xml configuration file and then specify the path for that file in the storeName-spring.xml configuration file using the ehcacheConfigFile property. If this is done, many of the properties in the storeName-spring.xml* configuration file will be ignored; the settings derived from the ehcache.xml file will be used instead.
  • Use the storeName-spring.properties file to set configuration properties for the BigMemory Max driver.
  • Using off-heap storage requires setting -XX:MaxDirectMemorySize=. Specify this in the command line for starting the correlator as -J-XX:MaxDirectMemorySize=. The BigMemory Max documentation provides recommendations for specifying the value of this property. When you add a correlator to a correlator launch configuration in Apama Plugin for Eclipse, you can select the Maximum Java off-heap storage in MB option. See Correlator arguments.

When using the BigMemory Max driver, all correlators accessing the same data in a BigMemory Max cluster must have the same configuration.

Driver bean properties with equivalents in ehcache.xml

The following properties can be set either in the driver properties, or using a ehcache.xml configuration file. See the BigMemory Max documentation at https://docs.webmethods.io/on-premises/bigmemory-max for more details. For more information on the Ehcache types mentioned below, see the Ehcache Javadoc and search for the required type such as CacheConfiguration.

Property Name

Description

cacheConfiguration

Type: CacheConfiguration Ehcache CacheConfiguration bean, shared by all caches (Tables). Typically used as a compound bean name, for example, cacheConfiguration.overflowToOffHeap.

cacheConfiguration.eternal

Type: boolean Disables expiration (removing old, unused values) of entries if true. Set to true in the default storeName-spring.xml configuration file.

cacheConfiguration. maxEntriesLocalHeap

Type: int The number of entries for each table.

This is the maxEntriesLocalHeap entry in the .properties file.

cacheConfiguration. overflowToOffHeap

Type: boolean Whether to use off-heap storage. For scenarios where data is fast changing and being written from multiple correlators, the cache may perform better if this is disabled.

This is the cacheConfiguration.overflowToOffHeap entry in the .properties file.

cacheDecoratorFactory

Type: String Name of a class to use as a cacheDecoratorFactory. The named class must be on the classpath and must implement Ehcache’s CacheDecoratorFactory interface.

cacheDecoratorFactoryProperties

Type: Properties Properties to pass to a cacheDecoratorFactory. Allows use of the same class for many caches.

clusterName

Type: String Comma-separated list of host:port identifiers for the servers, or a tc-config.xml file name. Best practice is to list all Terracotta Server Array (TSA) nodes.

configuration

Type: Configuration Ehcache Configuration bean. Typically used as a compound bean name, for example, configuration.monitoring.

configuration.name

Type: String By default, the BigMemory Max default cache manager is used. Use this property to specify the use of a different cache manager.

maxMBLocalOffHeap

Type: long Number of MB of local off-heap data. Total across all tables, per correlator process.

pinning

Type: String Either an attribute value of "inCache" (default) or "localMemory" or a <null/> XML element (that is, <property name="pinning"><null/></property>.) Pinning prevents eviction if the cache size exceeds the configured maximum size. Recommended if the cache is being used as a system of record.

terracottaConfiguration

Type: TerracottaConfiguration Ehcache TerracottaConfiguration bean. Typically used as a compound bean name, for example, terracottaConfiguration.consistency.

terracottaConfiguration. clustered

Type: boolean Whether to use a TSA. Set to true in the default storeName-spring.xml configuration file.

terracottaConfiguration. consistency

Type: String Either 'STRONG' or 'EVENTUAL'. STRONG gives MemoryStore-like guarantees, while EVENTUAL is faster but may have stale values read.

This is the terracottaConfiguration.consistency entry in the .properties file.

terracottaConfiguration. localCacheEnabled

Type: boolean Whether to cache entries in the correlator process. Set to true in the default storeName-spring.xml configuration file.

terracottaConfiguration. synchronousWrites

Type: boolean If true, then data is guaranteed to be out of process by the time a Row.commit() action completes. Disabling this can increase speed.

This is the terracottaConfiguration.synchronousWrites entry in the .properties file.

Driver bean properties with no equivalents in ehcache.xml

You can set the following BigMemory Max driver properties in the storeName-spring.xml configuration file, but not in the ehcache.xml configuration file as they modify how the driver accesses the BigMemory Max cache.

Property Name

Description

backupCopies

Type: int Ignored. Not supported. The number of backups is governed by the TSA topology defined in the BigMemory Max documentation (see https://docs.webmethods.io/on-premises/bigmemory-max) and used to configure the TSA nodes.

converterConfig.default

Type: RowKeyValueConverter bean Specifies the default converter to be used for all tables which do not explicitly specify a converter. A converter is a bean of type RowKeyValueConverter. It is used to convert non-Apama format Ehcache key/value pairs to Apama format key/value pairs. If not specified, then Apama’s default converter is used which assumes that the key/value pairs in Ehcache are in Apama format. For example:

<property name="converterConfig.default" ref="MyDefaultConverter"/>

See the API reference for Java (Javadoc) and the samples in the samples\distmemstore\bigmemory\converters directory of your Apama installation for more information about RowKeyValueConverter which can be found in the com.apama.correlator.memstore package.

converterConfig.byTable

Type: Map(String, RowKeyValueConverter) Per-table converter configuration. If a converter is not specified for a table, the default converter specified by the converterConfig.default property is used. For example:

<property name="converterConfig.byTable">
 <map>
   <entry key="SensorTable">
     <bean class="SensorDataConverter" />
   </entry>
 </map>
</property>

ehcacheConfigFile

Type: String Path to an ehcache.xml configuration file.

Info
If this is specified, any other properties listed in this table will be ignored.

exposeSearchAttributes

Type: boolean Enable exposing search attributes. If true, then the MemoryStore schema columns are exposed as BigMemory Max search attributes and are indexed, so that other clients of BigMemory Max can perform searches on the data set. If exposeSearchAttributesSet is non-empty, then only the named columns are exposed as BigMemory Max search attributes. See notes below about non-Apama applications accessing the data in a BigMemory Max cluster.

exposeSearchAttributesSet

Type: Set(String) Limits the set of columns in each table that should be exposed as search attributes. Entries are in the form tableName.columnName. If empty, all schema columns are exposed as search attributes. There is an incremental cost per column that is exposed, so for performance, only expose the columns which need to be used in searches.

For example, to expose only the Surname and FirstName columns of myTable:

<property name="exposeSearchAttributes"
  value="true"/>
<property name="exposeSearchAttributesSet">
 <set>
   <value>myTable.Surname</value>
   <value>myTable.FirstName</value>
 </set>
</property>

initialMinClusterSize

Type: int The minimum cluster size (number of correlators) that must be connected for prepare to finish.

logLevel

Type: String The log level.

rowChangedOldValueRequired

Type: boolean Whether to expose old values in rowChanged events. Must be set to false.

Info
BigMemory Max currently sends notifications for rows removed by a Table.clear() operation, but this may change in a future release.

useCompareAndSwap

Type: boolean Whether to use compare and swap (CaS) operations or just put/remove. Some versions of BigMemory Max support only CaS in Strong consistency.

Info

Important:

For correct behavior of Apama queries, you must leave the useCompareAndSwap property in its default (true) setting. Apama queries are deprecated and will be removed in a future release.

useCompareAndSwapMap

Type: Map(String, Boolean) Per-table (cache) configuration for whether to use CaS or put/remove.

Migrating from BigMemory Max to TCStore

If you have an existing Apama application that uses BigMemory Max and you wish to move it to use TCStore instead, simply open the Distributed MemoryStore editor in Apama Plugin for Eclipse, remove the BigMemory Max store and add a new TCStore using the same store name. See Adding a distributed store to a project for more details.

Configure the new TCStore instance as described in TCStore (Terracotta) driver details. There is typically no need to migrate any driver configuration settings from BigMemory Max to TCStore, since the options for each driver are quite different, and because TCStore requires a lot less configuration on the client side than BigMemory Max. In most cases, the default TCStore driver settings should be left unchanged. If you had configured search attributes or were using custom converters in BigMemory Max, there is no need to do anything similar when moving to TCStore, as row values in TCStore are stored in a standard format and are automatically searchable. The initialMinClusterSize and useCompareAndSwapMap (per-table) settings are supported by BigMemory Max but cannot be set for TCStore. The TCStore driver does not support client-side caching, so there is no need to migrate any cache-related settings from BigMemory Max, or to pass an off-heap storage option (-XX:MaxDirectMemorySize) to the correlator.

In most cases, there will be no need to make changes to your EPL application. If you wish, you can make use of features available in the TCStore driver but not BigMemory Max, such as accessing row fields that are not in the schema. If using “row changed” notifications, the TCStore driver will sometimes send a MissedRowChanges event if it has detected that some of the RowChanged events may have been dropped; BigMemory Max does not support sending MissedRowChanges notifications.

The BigMemory Max driver sets new values in the RowChanged event, but the TCStore driver does not. Therefore, if you are migrating an application from BigMemory Max and you need the current or new values, you need to explicitly get them (using Table.get or similar).

See also the description of com.apama.memorystore.RowChanged in the API reference for EPL (ApamaDoc).

Info
There is no way to automatically transfer data from BigMemory Max to a Terracotta server for use by TCStore.

Changing bean property values when deploying projects

Some bean property values will usually need to be changed when a development/testing configuration is deployed to a different environment such as production, which is typically achieved by ensuring that all such bean property values are specified using ${varname} substitution variables specified in .properties files for test versus production environments. For example, for some distributed memory stores the clusterName should be changed so that the nodes cannot talk to each other (although Apama also recommends production nodes to be located on a different network to reduce the chance of accidental errors). For more details on using substitution variables to specify configuration properties, see Substitution variables.

Info

Tip:

Due to the flexibility and simplicity of .properties files, there are many ways this requirement can be addressed. For customers using Apama’s Ant macros for deployment, one option is to maintain a separate set of .properties files for each environment, and customize your project’s Ant script to copy the correct version of the files into the directory defined by the --distMemStoreConfig option just before starting the correlator. Another option is to use Ant’s <propertyfile> task (see the Apache Ant documentation for more information on how to do this) to modify the .properties files in-place, overriding or adding to existing property values as required for the new deployment.

Creating a distributed MemoryStore driver

The Apama installation includes a driver for integrating the distributed MemoryStore with TCStore or the BigMemory Max distributed caching software. If you use other third-party distributed caching software, you need to write a driver that provides the bridge between Apama’s MemoryStore and the third-party software in use. Apama provides a Service Provider Interface (SPI) for you to use when writing drivers.

This topic presents an introduction to the SPI and a description of its essential elements. Complete Javadoc information for the SPI is available in the API reference for Java (Javadoc); see the com.apama.correlator.memstore package.

Overview

A driver for a distributed cache needs to extend the following abstract classes:

  • AbstractStoreFactory
  • AbstractStore
  • AbstractTable

Implementation details:

  • AbstractStoreFactory – This is the abstract class that holds the configuration used to instantiate a distributed Store. The starting point for creating an Apama distributed cache driver is to create a concrete subclass of AbstractStoreFactory. The subclass should have the following:

    • A public no-args constructor.
    • JavaBeans-style setter and getter methods for all provider-specific configuration properties.
    • An implementation of createStore() that makes use of these product-specific properties, in addition to the generic properties defined on this factory, which are getClusterName(), getLogLevel(), and getBackupCopies().
    • afterPropertiesSet() (optional, but useful). Implementers are encouraged to do as much validation as possible of the configuration in the afterPropertiesSet() method. This method is called by Spring during correlator startup, after setters have been invoked for all properties in the configuration file. The createStore() action will never be called before this has happened.

    The StoreFactory class that is implemented must then be named in the storeName-spring.xml configuration file for the distributed store.

  • AbstractStore – This is the abstract class that provides access to tables whose data is held in a distributed store. Implementers should create a subclass of AbstractStore.

    A driver’s implementation of the AbstractStore needs to implement or override the following methods:

    • createTable()
    • init()
    • close()
    • getTotalClusterMembers()
  • AbstractTable – This is the abstract class that holds Row objects whose data is held in a distributed store.

    If the distributed store provides a java.util.concurrent.ConcurrentMap, Apama recommends that implementers of Apama distributed stores create a subclass of the ConcurrentMapAdapter abstract class for ease of development and maintenance. If the distributed store does not provide a ConcurrentMap, implementers should create a subclass of Apama’s AbstractTable class.

    If you are implementing from AbstractTable, you need to implement or override the following methods:

    • get()
    • clear()
    • remove()
    • replace()
    • putIfAbsent()
    • containsKey()
    • size() Drivers may also optionally provide support for EPL subscribing to “row changed” data notifications. To allow EPL application to subscribe to these notifications, subclasses of AbstractTable (or ConcurrentMapAdapter) must provide an instance of RowChangedSubscriptionManager that provides implementations of addRowChangedListener and removeRowChangedListener, and calls fireRowChanged when changes are detected. Also, if a subclass implements notifications, it should override the getRowChangedSubscriptionManager method and have it return the instance of RowChangedSubscriptionManager for this table. Calls to subscribeRowChanged and unsubscribe are passed to this instance. The default implementation of getRowChangedSubscriptionManager returns null, indicating that “row changed” notifications are not supported; in this case, calls to subscribeRowChanged and unsubscribe will throw OperationNotSupportedException.

    Drivers may also optionally provide support for accessing extra fields that are present in an individual row but not in the table’s schema. To do this, the table should implement the marker interface TableSupportsExtraFields and use the RowValue.getExtraFields() method to get and set a row’s extra fields when converting between RowValue objects and the data type used by the distributed store to represent row values.

  • RowValue – The RowValue class is not inherited from or implemented, but a driver must be able to store and retrieve objects of the Apama RowValue class. Typically, a cache can store any suitable Java class, but some mapping may be required as well. For more information about this class, see the API reference for Java (Javadoc) for com.apama.correlator.memstore.RowValue.

Sample driver

To help get started writing a driver, the BigMemory Max driver is provided in source form as a sample. It implements the SPI described above and invokes the Ehcache API in order to use BigMemory Max. The sample is provided in the samples/distmemstore_driver/bigmemory directory of the Apama installation. To avoid confusion with the pre-compiled driver supplied in the product, the sample BigMemory Max driver uses the package name com.apamax.memstore.provider.bigmemory. A README.txt file describes how to build the sample.

Using the Management interface

The Management interface defines actions that let you do the following:

  • Obtain information about the correlator
  • Connect the correlator to another component
  • Control logging
  • Request a persistence snapshot
  • Manage user-defined status values

Actions in the Management interface are defined on several event types, which are documented in the API reference for EPL (ApamaDoc).

To use the Management interface, add the Correlator Management EPL bundle to your Apama project (see Adding EPL bundles to projects or Creating and managing an Apama project from the command line). Alternatively, you can directly use the EPL interfaces provided in APAMA_HOME\monitors\Management.mon.

Obtaining information about the correlator

The Management interface provides actions for obtaining information about the correlator that the Management interface is being used in. These actions are defined in the com.apama.correlator.Component event.

There are also options of the engine_management tool that you can specify (see also Shutting down and managing components):

  • to retrieve the same information from outside the correlator,
  • or to retrieve the same information for IAF processes.

The correlator also logs all of these values to its log file at startup.

Connecting the correlator to another component

The com.apama.correlator.Component event provides actions that allows EPL to create connections to another component, in the same way as the engine_connect tool (see Configuring pipelining with engine_connect).

Controlling logging

You can configure logging using the Management interface. The com.apama.correlator.Logging event provides actions such as setApplicationLogFile, setLogFile and setApplicationLogLevel. These actions are the equivalent of using the engine_management options to configure logging (see also Shutting down and managing components).

The rotateLogs() action, which is also defined in the com.apama.correlator.Logging event, is used for closing the log files in use, opening new log files, and then sending messages to the new log files. This action applies to:

  • the main correlator log file,
  • the correlator input log file if you are using one, and
  • any EPL log files you are using.

For details about log file rotation, see Rotating correlator log files and Rotating all correlator log files.

You can write an EPL monitor that triggers log rotation on a schedule. For example, the code below rotates logs every 24 hours at midnight:

using com.apama.correlator.Logging;

monitor Rotator {

   action onload() {
      on all at (0, 0, *, *, *) {
         Logging.rotateLogs();
      }
   }
}

Requesting a snapshot

In a persistence-enabled correlator, you can use the Management interface to request a snapshot to occur as soon as possible, and be notified of when that snapshot has been committed to disk. The Management interface lets persistent and non-persistent monitors create instances of Persistence events and then call the persist() action on those events.

When the correlator processes the persist() call it takes and commits a snapshot and executes the specified callback action at some point after the snapshot is committed. There are no guarantees about the elapsed time between the persist() call, the snapshot and the callback, especially when large amounts of correlator state are changing. Your code resumes executing immediately after the call to the persist() action. See Using correlator persistence.

The Management interface defines the Persistence event:

package com.apama.correlator;
event Persistence {
   static action persist(action<> callback) {
...
   }
}

Consider the following sample code:

using com.apama.correlator.Persistence;
event Number {
   integer i;
}

persistent monitor MyApplication {
   integer counter := 0;
   sequence<integer>  myNumbers;

   action onload() {
      on all Number(*) as n {
         myNumbers.append(n.i);
         counter := counter + 1;
         if(counter % 10 = 0) {
            doCommit();
         }
      }
   }

   action doCommit() {
      Persistence.persist(logCommit);
   }

   action logCommit() {
      log "Commit succeeded";
   }
}

Because MyApplication is a persistent monitor the correlator copies its state to disk as that state changes. This monitor listens for Number events and stores their content in the myNumbers sequence. After every tenth Number event, the code executes the doCommit() action, which uses the Persistence event in the Management interface to request that the correlator commits persistent state to disk. When that commit has succeeded, the Management interface calls the action variable that was passed to the persist() action. This action writes "Commit succeeded" to the correlator log.

The Management interface guarantees that at the moment the callback action (logCommit() in this example) is executed, the state of all persistent monitors at a particular point in time will have been committed. The particular point in time is guaranteed only to be between the point at which persist() was called and the point at which the callback action was executed. For example, suppose the following event stream is being sent into the correlator:

Number(1)
Number(2)
Number(3)
...
Number(10)
Number(11)
Number(12)

At the point that Number(10) is received, the myNumbers sequence contains the ten items 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 and so the application initiates a snapshot commit. Suppose that the correlator suddenly terminates after notification of success appears in the log. When the correlator recovers, MyApplication has a myNumbers sequence that contains at least ten items. However, the sequence might contain 11 or even 12 items, if more Number events were received after the commit was requested but before the snapshot was actually taken. The correlator also persists state periodically, or as directed by other monitors that call the Management interface, so the sequence can be persisted at other points as well.

Managing user-defined status values

The Management interface provides actions for managing (set and return) the user-defined status values. These actions are defined in the com.apama.correlator.Component event and in the com.apama.correlator.EngineStatus event.

Status keys will have leading and trailing whitespace stripped. Keys may not be empty.

A user-defined status will only be changed if the new value differs from the current value when set using setUserStatus.

Note that the correlator status statements that appear in the log files will not have the user-defined status values, and will remain unaffected.

You can monitor the status of each component of your application using Prometheus or REST. For more information, see Monitoring with Prometheus and Managing and monitoring over REST.

Using the JSON plug-in

You can use the JSON plug-in to convert EPL objects to JSON strings, and vice versa. When converting from JSON strings, JSON objects are converted to dictionary<any, any> and JSON lists are converted to sequence<any>.

To use the JSON plug-in, add the JSON Support EPL bundle to your Apama project. For details, see Adding EPL bundles to projects or Creating and managing an Apama project from the command line.

The JSON plug-in is provided as a JSONPlugin event in the com.apama.json package. The JSONPlugin event provides the following actions:

  • To convert a JSON string to an EPL object:

    fromJSON(string) returns any

  • To convert an EPL object (including events, dictionaries and sequences) to a JSON string:

    toJSON(any) returns string

For detailed information, see the API reference for EPL (ApamaDoc).

The following is a simple example:

using com.apama.json.JSONPlugin;

event Address {
            integer houseNumber;
            sequence<string> address;
            optional<string> postcode;
}

monitor PrintJSON {
            action onload() {
                        Address a1 := new Address;
                        Address a2 := new Address;
                        a1.houseNumber := 107;
                        a1.address.append("The Rounds");
                        a1.address.append("Milton");
                        a1.postcode := "CB1 2AB";
                        a2.houseNumber := 196;
                        a2.address.append("Exeter Road");
                        a2.address.append("Newmarket");
                        print JSONPlugin.toJSON(a1);
                        print JSONPlugin.toJSON(a2);

            }
}

The above example prints the following:

{"postcode":"CB1 2AB","address":["The Rounds","Milton"],"houseNumber":107}
{"postcode":null,"address":["Exeter Road","Newmarket"],"houseNumber":196}

Equivalent functionality is available with the JSON codec connectivity plug-in. Use the JSON codec if the JSON is coming in as an entire message of a connectivity chain. Using a codec helps to separate the application logic from the connection over which the data is being sent, and allows the use of mapping codecs if needed. See The JSON codec connectivity plug-in for detailed information.

Use the JSON EPL plug-in if only one field of an event is JSON, or if the events are going to or are coming from a connection that is not using a connectivity chain.

Using the Base64 plug-in

You can use the Base64 plug-in to encode EPL strings as Base64 or to decode Base64 strings to EPL. When decoding Base64, the encoded data must be in UTF-8 character encoding.

To use the Base64 plug-in, add the Base64 Support EPL bundle to your Apama project. For details, see Adding EPL bundles to projects or Creating and managing an Apama project from the command line.

The Base64 plug-in is provided as a Base64 event in the com.apama.util package. The Base64 event provides the following static actions:

  • To convert an EPL string to Base64:

    stringToBase64(string) returns string

  • To convert a Base64 string to an EPL string:

    base64ToString(string) returns string

Info
The Base64 plug-in does not allow you to handle arbitrary binary data encoded in Base64. Only valid UTF-8 strings can be decoded.

For detailed information, see the API reference for EPL (ApamaDoc).

Similar functionality is available with the Base64 codec connectivity plug-in. That allows arbitrary binary data from a transport to be stored in EPL in Base64 format. See The Base64 codec connectivity plug-in for detailed information.

Using MATLAB® products in an application

Info
Support for MATLAB is deprecated and will be removed in a future release.

To use MATLAB analysis and modeling capability in an Apama application or in an application built using Apama Capital Markets Foundation, you need to add the MATLAB EPL bundle to your project (see Adding EPL bundles to projects or Creating and managing an Apama project from the command line) and ensure that the MATLAB executables and libraries are available to the correlator. The MATLAB bundle provides access to the MATLAB analysis and modeling toolkit from Apama EPL code and includes an EPL plug-in.

For information about the supported MATLAB versions, see the Supported Platforms document for the current Apama version. This can be found at Supported platforms.

This MATLAB plug-in lets you connect to and use the MATLAB engine. However, there are some functions/toolkits for which MATLAB does not support integration with C or Fortran on some operating platforms. Check the MATLAB documentation before using the MATLAB EPL plug-in.

The recommended way to use the MATLAB plug-in is to use the MatlabManager event, and call the relevant action and supply a callback. The call goes directly to the MATLAB plug-in, so you do not need to route a request event. *Response events are routed from the MATLAB plug-in to the calling context. Each request action automatically sets up a listener for the *Response event that will call the supplied callback. You can supply the relevant doesNothing*Callback() action from the MatlabManager event if you are not interested in the results of the callback. If you use the MatlabManager actions, you do not need to call the #initialize() action.

The legacy way to use the MATLAB plug-in is to route *Request events and set up listeners for the *Response events. If you are using the MATLAB plug-in in only the main context, injecting MatlabService.mon sets up all required listeners for the *Request events that call into the MATLAB plug-in. To use the MATLAB plug-in from another context, instantiate a MatlabManager variable, spawn to the other context, and call #initialize() on the variable. This sets up the required listeners in the current (non-main) context, and the *Response events are routed to this context.

Info

The MATLAB plug-in is asynchronous (except for the OpenSession requests), so the processing of the input queue, or calling the request actions, does not block.

Due to a shutdown timing issue in MATLAB, if you repeatedly open and close MATLAB sessions, you may see the error Error message - A primary message table for module 77. This is due to a race with shutting down the MATLAB COM server. The workaround for this is to have an unused idle MATLAB instance running that will keep the COM server running.

The MATLAB plug-in is multi-context aware. The *Response events are routed to the calling context.

To include MATLAB capabilities in your application

  1. Ensure that the directory containing the MATLAB plug-in library is included in the library search path: %APAMA_HOME%\bin should be in the PATH on Windows platforms. Or for deployment on Linux operating systems, $APAMA_HOME/lib should be in the LD_LIBRARY_PATH.

  2. Import the MATLAB plug-in in the application’s EPL code.

  3. Set the appropriate values for your PATH environment variable:

    • 64-bit Windows: Add MATLAB_HOME/bin and MATLAB_HOME/bin/win64 to %PATH%.
    • 64-bit Linux: Add MATLAB_HOME/bin to $PATH. Also, add MATLAB_HOME/sys/os/glnxa64 and MATLAB_HOME/bin/glnxa64 to the end of $LD_LIBRARY_PATH (after the Apama library).
Info

On Linux, as per the MATLAB documentation, /bin/csh is required to be installed for the MATLAB engine to be started.

On Linux, if you have problems starting Apama and MATLAB, especially with library incompatibilities such as with libssl, libcryto or libz, ensure the LD_LIBRARY_PATH is ordered to load the newest version of the library first (which may be in the Apama or MATLAB installation, or the version in the operating system). It may help to try running a standalone MATLAB from an Apama Command Prompt, or to use the ldd utility to diagnose.

MatlabManager actions

The MatlabManager event provides the following actions. For complete reference information, see the API reference for EPL (ApamaDoc).

Action

Description

openSession(
   string sessionID,
   string messageID,
   boolean singleUse,
   integer precision,
   action<string, string,
      boolean, string>
   callback)

Starts a MATLAB process for the purpose of using MATLAB as a computational engine. Uses the MATLAB API function engOpen() if singleUse is false and engOpenSingleUse() if singleUse is true. Single use is unavailable on Linux. The response to this action call is an OpenSessionResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

closeSession(
   string sessionID,
   string messageID,
   action<string, string,
      boolean, string>
   callback)

Closes a MATLAB session. Uses the MATLAB API function engClose(). The response to this action call is a CloseSessionResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

initialize()

You must call this action when you are using MATLAB by means of routed events in a context other than the main context. Spawn to another context, set up the relevant listeners in the new context, and then call initialize(). You do not need to call initialize() when you are calling the MatlabManager actions.

putFloat(
   string sessionID,
   striing messageID,
   string name,
   float value,
   action <string, string,
      boolean, string>
   callback)

Puts a float variable into a MATLAB engine workspace. Uses the MATLAB API function engPutVariable(). The response to this action call is a PutFloatResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

Info
By default, this event creates a local variable in the MATLAB session. If you need the variable to have a global scope, call evaluate() before you call the putFloat() action. In the evaluate() call, declare the variable as being global (for example, “global x”).
getFloat(
   string sessionID,
   string messageID,
   string name,
   action<string, string,
      float, boolean, string>
   callback)

Gets a float variable from the MATLAB engine workspace. Uses the MATLAB API function engGetVariable(). The response to this action call is a GetFloatResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

putFloatSequence(
   string sessionID,
   string messageID,
   string name,
   sequence<float> values,
   action<string, string,
      boolean, string>
   callback)

Puts a float sequence variable in a MATLAB engine workspace. Uses the MATLAB API function engPutVariable(). The response to this action call is a PutFloatSequenceResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

getFloatSequence(
   string sessionID,
   string messageID,
   string name,
   action<string, string,
      sequence<float>, boolean,
      string>
   callback)

Gets a float sequence variable from the MATLAB engine workspace. Uses the MATLAB API function engGetVariable(). The response to this action call is a GetFloatSequenceResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

putFloatMatrix(
 string sessionID,
 string messageID,
 string name,
 sequence<sequence<float>> values,
 action<string, string,
    boolean, string>
 callback)

Puts a two-dimensional matrix variable into a MATLAB engine workspace. Uses the MATLAB API function engEval(). The response to this action call is a PutFloatMatrixResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

getFloatMatrix(
   string sessionID,
   string messageID,
   string name,
   action<string, string,
      sequence<sequence<float>>,
      boolean, string>
   callback)

Gets a two-dimensional matrix variable from the MATLAB engine workspace. Uses the MATLAB API function engGetVariable(). The response to this action call is a GetFloatMatrixResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

evaluate (
   string sessionID,
   string messageID,
   string expression,
   integer outputSize,
   action<string, string,
      string, sequence<string>
      boolean, string>
   callback)

Evaluates an expression for the MATLAB engine session and returns textual output from evaluating the expression, including possible error messages. Uses the MATLAB API function engEvalString(). The response to this action call is an EvaluateResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

setVisible(
   string sessionID,
   string messageID,
   boolean value,
   action<string, string,
      boolean, string>
   callback)

Makes the window for the MATLAB engine session either visible or invisible on the Windows desktop. Uses the MATLAB API function engSetVisible(). The response to this action call is a SetVisibleResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

getVisible(
   string sessionID,
   string messageId,
   action<string, boolean,
      boolean, string>
   callback)

Returns the current visibility setting for the MATLAB engine session. Uses the MATLAB API function engGetVisible(). The response to this action call is a GetVisibleResponse event routed from the plug-in to the calling context and the supplied callback is invoked.

MATLAB examples

To use MATLAB features in your Apama or Apama Capital Markets Foundation application, you must create a MATLAB session. The following examples show how to create a MATLAB session and how to use it to set or get floating point scalar values, arrays or matrices. Each get or set request has an associated response that indicates whether the request successfully completed.

Creating a MATLAB session

The following example creates a MATLAB session. A boolean value indicates whether MATLAB should open a new session or re-use an existing session.

monitor MatlabExample2
{
   // ***** Creating a MATLAB session:
   com.apamax.matlab.MatlabManager matlabManager;

   action onload() {
      // Spawn to a new context:
      spawn run() to context("New Context");
   }

   action run() {
      // Running in a context other than main, open a MATLAB session:
      matlabManager.openSession(
         "Session1", "openSessionRequest", false, 6, sessionOpened);
   }

   action sessionOpened(
      string sessionID, string messageID, boolean success, string error) {
         if (success) {
            log "Session Opened";
         } else {
            log "Session failed to open - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
         }
   }

Working with scalar values

The following example shows how to set a scalar value:

   action putFloatExample() {
      matlabManager.putFloat(
         "Session1", "putFloatRequest", "x", 10.0, putFloatCallback);
   }

   action putFloatCallback(
      string sessionID, string messageID, boolean success, string error) {
         if (success) {
            log "Put Float Succeeded";
         } else {
            log "Put Float Failed - " + sessionID + ", " + messageID + ", "
                + success.toString() + ", " + error;
         }
   }

The following example shows how to get a scalar value:

   action getFloatExample() {
      matlabManager.getFloat(
         "Session1", "getFloatRequest", "x", getFloatCallback);
   }

   action getFloatCallback(string sessionID, string messageID, float value,
      boolean success, string error) {
         if (success) {
            log "Get Float Succeeded - value = " + value.toString();
         } else {
            log "Get Float Failed - " + sessionID + ", " + messageID + ", "
               + success.toString() + ", " + error;
      }
   }

Working with arrays

To set an array:

   action putFloatSequenceExample() {
      sequence<float> y := [0.0, 1.0, 2.71828, 3.14159];
      matlabManager.putFloatSequence("Session1", "putFloatSequenceRequest",
         "y", y, putFloatSequenceCallback);
   }

   action putFloatSequenceCallback(
      string sessionID, string messageID, boolean success, string error) {
         if (success) {
            log "Put Float Sequence Succeeded";
         } else {
            log "Put Float Sequence Failed - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
         }
   }

To get an array:

   action getFloatSequenceExample() {
      matlabManager.getFloatSequence(
         "Session1", "getFloatSequenceRequest", "y", getFloatSequenceCallback);
   }

   action getFloatSequenceCallback(string sessionID, string messageID,
      sequence<float> value, boolean success, string error) {
         if (success) {
            log "Get Float Sequence Succeeded - value = " + value.toString();
         } else {
            log "Get Float Sequence Failed - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
         }
   }

Working with matrices

To set a matrix:

   action putFloatMatrixExample() {
      sequence<sequence<float>> matrix := [];
      sequence<float> row1 := [-2.1, 3.5];
      sequence<float> row2 := [5.0, 1.0, 7.9, 17.0];
      sequence<float> row3 := [-20.0, -90.0, 25.0];

      matrix.append(row1);
      matrix.append(row2);
      matrix.append(row3);

      matlabManager.putFloatMatrix("Session1", "putFloatMatrixRequest",
         "m", matrix, putFloatMatrixCallback);
      }

   action putFloatMatrixCallback(
      string sessionID, string messageID, boolean success, string error) {
         if (success) {
            log "Put Float Matrix Succeeded";
         } else {
            log "Put Float Matrix Failed - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
         }
   }

To get a matrix:

   action getFloatMatrixExample() {
      matlabManager.getFloatMatrix(
         "Session1", "getFloatMatrixRequest", "m", getFloatMatrixCallback);
   }

   action getFloatMatrixCallback(string sessionID, string messageID,
      sequence<sequence<float>> value, boolean success, string error) {
         if (success) {
            log "Get Float Matrix Succeeded - value = " + value.toString();
         } else {
            log "Get Float Matrix Failed - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
      }
   }

As well as setting MATLAB variables, applications may also send requests to the MATLAB plug-in to evaluate any appropriate MATLAB expressions using the evaluate() action.

The following example shows how to use the MATLAB plug-in to add two matrices and get the result:

   action evaluateRequestExample() {
      // First matrix:
      sequence<sequence<float>> matrix1 := [];
      sequence<float> m1row1 := [1.0,2.0,3.0];
      sequence<float> m1row2 := [4.0,5.0,6.0];
      sequence<float> m1row3 := [7.0,8.0,9.0];
      matrix1.append(m1row1);
      matrix1.append(m1row2);
      matrix1.append(m1row3);

      // The MATLAB manager also provides 'doesNothing*' callbacks that can
      // process the returns silently if the response is not needed.
      matlabManager.putFloatMatrix("Session1", "putFloatMatrixRequest1",
         "matrix1", matrix1, matlabManager.doesNothingCallback);

      // Second matrix:
      sequence<sequence<float>> matrix2 := [];
      sequence<float> m2row2 := [2.0,5.0,8.0];
      sequence<float> m2row3 := [3.0,6.0,9.0];
      matrix2.append(m2row1);
      matrix2.append(m2row2);
      matrix2.append(m2row3);
      matlabManager.putFloatMatrix("Session1", "putFloatMatrixRequest1",
         "matrix2", matrix2, matlabManager.doesNothingCallback);

      // Use MATLAB to add the two matrices.
      // The expected size of the string to be returned:
      integer STANDARD_OUTPUT_SIZE := 256;

      // Although use of the MATLAB plug-in is asynchronous, requests are
      // queued. This guarantees that the two putFloatMatrix() actions
      // have already been processed.
      matlabManager.evaluate("Session1", "evaluateRequest",
         "matrix3 = matrix1 + matrix2", STANDARD_OUTPUT_SIZE, evaluateCallback);
   }

   action evaluateCallback(
      string sessionID, string messageID, string output,
      sequence<string> outputLines, boolean success, string error) {
         if (success) {
            matlabManager.getFloatMatrix(
               "Session1", "getMatrixRequest", "matrix3", getMatrix3Callback);
         } else {
            log "Evaluate Failed - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
         }
   }

   action getMatrix3Callback(string sessionID, string messageID,
      sequence<sequence<float>> value, boolean success, string error) {
         if (success) {
            log "Get Float Matrix Succeeded - value = " + value.toString();
         } else {
            log "Get Float Matrix Failed - " + sessionID + ", "
               + messageID + ", " + success.toString() + ", " + error;
         }
   }
}

Using the R plug-in

Info
The R plug-in is deprecated and will be removed in a future release.

Introduction to using the R plug-in

R is a free software environment for statistical computing. See https://www.r-project.org/ for more information on R.

The Apama R plug-in connects to R using the Rserve package. See https://rforge.net/Rserve/index.html for more information on Rserve.

Prerequisites

You must install R, and you must install the Rserve package into R. After that, you can run Rserve using an appropriate configuration.

The Apama R plug-in uses a TCP connection to communicate with R. A host and port are supplied when opening the connection. The R server process can run locally on the same machine as Apama, or on a remote server. Rserve uses 6311 as the default port. You can change this default in the Rserve configuration and when creating the connection within Apama. See Steps for using the R plug-in for further information.

Info
Rserve has different session functionality, depending on whether the server is running on Windows or Linux. On Windows, new sessions are not created, if you re-connect to Rserve; previously set variables will still be valid.

To use the R plug-in, you need to add the R Support bundle to your Apama project (see also Adding the R Support bundle to your project). This lets you interact with a local or remote R running Rserve, setting and getting variables and evaluating R script from within EPL.

Adding the R Support bundle to your project

To use the R plug-in, you need only add the R Support EPL bundle to your project. For details, see Adding EPL bundles to projects or Creating and managing an Apama project from the command line.

Adding the R Support bundle to your project makes the RPlugin.mon file available to the monitors in your project. When you run your project, Apama Plugin for Eclipse automatically injects the RPlugin.mon file. If you want to examine this file, you can find it in the monitors/R directory of your Apama installation.

The file RPlugin.mon is the interface between the monitors in your application and the R plug-in. Your application creates an RConnection event using the static open() action in the RFactory event, and it then calls set() or evaluate() actions on the RConnection event to communicate with R. There is never any need to import or call the plug-in directly.

Steps for using the R plug-in

The events within the R Support bundle are in the com.apama.r package. It is assumed that R has been installed with Rserve and that it is running on a known host and port (see Introduction to using the R plug-in for more information).

After you have added the R Support bundle, you write EPL that does the following:

  1. Create an RConnection event using the static open() or openDefault() action in the RFactory event with the known host and port. On Linux, the connection can be made using local sockets if the port is set to -1 and if the host is set to the socket filename.

  2. Use the set() action in RConnection to set named variables in R. The set() action uses the any type, which means that all types can be passed into it.

    Supported types are integer, float, string, boolean and sequence, including nested sequences or sequences of different types using any.

    decimal and dictionary types are not supported. All unsupported types will cause an exception to be thrown.

  3. Use the evaluate() action to evaluate an R expression and return the result. This action returns an any type and the user must process the result to the correct type.

    Or use the evaluateAstype() action to evaluate an R expression, casting the result to the requested type. If the type is not correct, an exception will be thrown.

  4. Close the connection using close().

See the API reference for EPL (ApamaDoc) for more information on these actions.

open() can be called from different contexts. However, a matching host/port to an open connection will share the same connection.

Within R, everything is an array and R does not differentiate between a single value or an array which holds a single value. Because of this, the Apama R plug-in converts an array of a single value into a single value of the correct type. Therefore, if you set a sequence<integer> variable to [123], it will just be an integer of 123 when you retrieve this value.

Calls to the R plug-in are synchronous and block execution in the current context. You can create an asynchronous service in a dedicated context; a sample is provided to demonstrate this. See also R plug-in samples.

R plug-in samples

Sample code for using the R plug-in is provided in the samples/epl/RPlugin directory of your Apama installation. See the README.txt file in that directory for more information.

  • The file RPluginSample.mon demonstrates how to set and get variables of different types from R, how to create a matrix using an R script and get the dimensions, and how to load an R script from an external file.
  • The file AsyncR.mon demonstrates an asynchronous R service running in a dedicated context where requests and responses are sent via events.

Interfacing with user-defined EPL plug-ins

Although EPL is very powerful and enables complex applications, it is foreseeable that some applications might require additional specialized operations. For example, an application might need to carry out advanced arithmetic operations that are not provided in EPL. You can address this situation by writing custom EPL plug-ins using Apama’s C++, Java, or Python plug-in development kits. For detailed information on developing your custom EPL plug-in, see Developing EPL plug-ins.

Info
The correlator’s plug-in interface is versioned. If you upgrade the major or minor version of Apama, you may need to recompile your plug-ins against the new libraries to be compatible with the newer version of the correlator.

In order to access a function implemented in an EPL plug-in, you must import the plug-in. If the plug-in is written in Java, you must first inject the jar file containing the plug-in. For any plug-in, an EPL monitor or event must use the import statement to load it:

import "apama_math" as math;

For a Java plug-in, this will load the plug-in from the injected jar file. For a C++ plug-in, this will look for the Apama plug-in file libapama_math.so (on Linux) or for apama_math.dll (on Windows). These must be located on the standard library path (by default, in $APAMA_WORK/lib). It will then map it to the internal alias math.

Info
Insert the import statement in the monitor that uses the plug-in functions.

If the apama_math plug-in defines a method called cos that takes a single floating point value as an argument and returns a float value, this would be called from EPL as follows:

float a, b;
//... some other EPL
a := math.cos(b);

Standard float, integer and boolean types are passed by-value to external functions while string and sequence types (which map to native arrays in the plug-in) are passed by-reference. In addition, the chunk type can be used to “pass-through” data returned from one function call to another plug-in function, as shown in About the chunk type.

About the chunk type

The chunk type allows data to be referenced from EPL that has no equivalent EPL type. It is not possible to perform operations on data of type chunk from EPL directly; the chunk type exists purely to allow data output by one external library function to pass through to another function. Apama does not modify the internal structure of chunk values in any way. As long as a receiving function expects the same type as that output by the original function, any complex data structure can be passed around using this mechanism.

To use chunks with plug-ins, you must first declare a variable of type chunk. You can then assign the chunk to the return value of an external function or use the chunk as the value of the out parameter in the function call.

The following example illustrates this. The functions setRealVal and setImagVal set internal values of the chunk, while the functions getRealVal and getImagVal retrieve values from the chunk. The function addComplexNumber adds the second chunk to the first chunk. The source code for complex_plugin is in the samples\correlator_plugin\cpp directory of your Apama installation directory.

Monitor ComplexPlugin {
   import "complex_plugin" as plugin;

   action onload {
      // Creates a first complex number chunk
      chunk complexNumberFull := plugin.makeComplexNumberFull(4.0, -1.4);
      printComplexNumber(complexNumberFull);

      // Creates a second complex number chunk
      chunk complexNumberEmpty := plugin.makeComplexNumberEmpty();

      // Get the real and imaginary values of the number
      plugin.setRealVal(complexNumberEmpty, 2.0);
      plugin.setImagVal(complexNumberEmpty, 3.0);
      printComplexNumber(complexNumberEmpty);

      // Add the second complex number to the first complex number
      plugin.addComplexNumber(complexNumberFull, complexNumberEmpty);
      printComplexNumber(complexNumberFull);
   }

   action printComplexNumber(chunk complexNumber)
   {
      float real := plugin.getRealVal(complexNumber);
      float imag := plugin.getImagVal(complexNumber);
      string sign := "";
      if(imag >= 0.0) {
             sign := "+";
      }
      log real.toString() + sign + imag.toString() + "i";
   }
}

Although the chunk type was designed to support unknown data types, it is also a useful mechanism to improve performance. Where data returned by external plug-in functions does not need to be accessed from EPL, using a chunk can cut down on unnecessary type conversion. For example, suppose the output of a localtime() method is a 9-element array of type float. While you could declare this output to be of type sequence<float>, there is no need to do so because the EPL never accesses the value. Consequently, you can declare the output to be of type chunk and avoid an unnecessary conversion from native array to EPL sequence and back again.