using com.apama.cumulocity.Measurement; using com.apama.cumulocity.MeasurementValue; using com.apama.cumulocity.Alarm; using com.apama.cumulocity.FindAlarm; using com.apama.cumulocity.FindAlarmResponse; using com.apama.cumulocity.FindAlarmResponseAck; using com.apama.cumulocity.FindMeasurement; using com.apama.cumulocity.FindMeasurementResponse; using com.apama.cumulocity.FindMeasurementResponseAck; using com.apama.aggregates.avg; using com.apama.aggregates.max; using com.apama.aggregates.min; using com.apama.aggregates.stddev; using com.apama.aggregates.last; /** * Calculate statistics, persist Measurement(s) and raise Alarm(s) if required. * * This application calculates aggregates such as average, min, max, standard * deviation for measurements of a specific fragment and series. * Raises an Alarm with MINOR severity if incoming measurement value is greater * than max value or less than min value. This application also stores * statistics in Cumulocity IoT as Measurement(s) and retrieves them on * application restart. */ monitor CalculateMeasurementStatistics { /** * This is a container for storing the measurement statistics. */ event Statistics { string source; // source field copied from the original measurement float average; // calculated statistic - average float stddev; // calculated statistic - standard deviation float min; // calculated statistic - min value float max; // calculated statistic - max value float last; // copy of the most recent measurement fragment series value float time; // time copied from the most recent measurement float lastPublishedTime; // time at which this statistic was last published // to Cumulocity IoT } // Measurement Type constant string MEASUREMENT_TYPE := "c8y_Temperature"; // Measurement Fragment & Series constant string FRAGMENT := "c8y_Temperature"; constant string SERIES := "T"; // The measurement type to be used while creating new measurement for // statistics. constant string MEASUREMENT_STATISTICS := "TemperatureStatistics"; // The window size to use for calculating averages and standard deviation constant float WINDOW_SIZE := 60.0; // Alarm type to be created if measurement value goes above the max value or // goes below the min value constant string ALARM_TYPE := "ThresholdBreachedAlarm"; // Local cache to store the recorded values dictionary cache; // Used while retrieving historical information from Cumulocity IoT // stores the oldest timestamp where the last statistics are stored float lastRecordedTime := float.MAX; float applicationStartTime := currentTime; action onload { // Get historical statistics which are stored as Measurement(s) getHistoricalStatistics(); // Get new measurements listenForMeasurements(); // Periodically publish statistics to Cumulocity IoT publishStatistics(); } /** * Calculate the average, standard deviation, min and max values for the given * window size and group based on the source. */ action listenForMeasurements { // Subscribe to Measurement.CHANNEL to receive all available measurements monitor.subscribe(Measurement.CHANNEL); from m in all Measurement(type=MEASUREMENT_TYPE) partition by m.source within WINDOW_SIZE where m.measurements.hasKey(FRAGMENT) and m.measurements[FRAGMENT].hasKey(SERIES) group by m.source select Statistics(m.source, avg(m.measurements[FRAGMENT][SERIES].value), stddev(m.measurements[FRAGMENT][SERIES].value), min(m.measurements[FRAGMENT][SERIES].value), max(m.measurements[FRAGMENT][SERIES].value), last(m.measurements[FRAGMENT][SERIES].value), last(m.time), currentTime) as stats { if not stats.average.isNaN() and not stats.stddev.isNaN() and not stats.min.isNaN() and not stats.max.isNaN() { route stats; } } } /** * Get past statistics from Cumulocity IoT. We use this to capture the * historical min and max values. */ action getHistoricalStatistics { monitor.subscribe(FindMeasurementResponse.CHANNEL); integer reqId := integer.getUnique(); // Retrieve historical min and max values from Cumulocity IoT on all FindMeasurementResponse(reqId=reqId) as resp and not FindMeasurementResponseAck(reqId=reqId) { if resp.measurement.measurements.hasKey(MEASUREMENT_STATISTICS) and cache.getOrAddDefault(resp.measurement.source).time < resp.measurement.time { Statistics stats := cache[resp.measurement.source]; stats.source := resp.measurement.source; dictionary fragment := resp.measurement.measurements[MEASUREMENT_STATISTICS]; stats.min := fragment.getOrDefault("min").value; stats.max := fragment.getOrDefault("max").value; stats.time := resp.measurement.time; if lastRecordedTime > stats.time { lastRecordedTime := stats.time; } } } on FindMeasurementResponseAck(reqId=reqId) { monitor.unsubscribe(FindMeasurementResponse.CHANNEL); // This means that we have atleast one historical statistic if lastRecordedTime != float.MAX { // Get measurements since last recorded time getMeasurementsSinceLastSnapshot(); } log "Done retrieving past measurement statistics" at INFO; } FindMeasurement fm := FindMeasurement(reqId, new dictionary); // Get all measurements with type = TemperatureStatistics fm.params.add("type", MEASUREMENT_STATISTICS); send fm to FindMeasurement.CHANNEL; } /** * Get measurements received since the time the last statistics snapshot is * stored in Cumulocity IoT. This is to check if any new measurements are * received during the time the application is not active (i.e. may be * restarting from a failure). */ action getMeasurementsSinceLastSnapshot { monitor.subscribe(FindMeasurementResponse.CHANNEL); integer reqId := integer.getUnique(); // Retrieve all measurements since the last time so that min and max // values are up to date on all FindMeasurementResponse(reqId=reqId) as resp and not FindMeasurementResponseAck(reqId=reqId) { if resp.measurement.measurements.hasKey(FRAGMENT) and resp.measurement.measurements[FRAGMENT].hasKey(SERIES) { MeasurementValue mv := resp.measurement.measurements[FRAGMENT][SERIES]; if cache.hasKey(resp.measurement.source) { Statistics stats := cache[resp.measurement.source]; if mv.value < stats.min { stats.min := mv.value; } else if mv.value > stats.max { stats.max := mv.value; } if stats.time < resp.measurement.time { stats.time := resp.measurement.time; } } else { Statistics stats := new Statistics; stats.source := resp.measurement.source; stats.max := mv.value; stats.min := mv.value; stats.time := resp.measurement.time; cache.add(stats.source, stats); } } } on FindMeasurementResponseAck(reqId=reqId) { monitor.unsubscribe(FindMeasurementResponse.CHANNEL); log "Done retrieving missed measurements" at INFO; } FindMeasurement fm := FindMeasurement(reqId, new dictionary); fm.params.add("type", MEASUREMENT_TYPE); fm.params.add("fragmentType", FRAGMENT); fm.params.add("fromDate", lastRecordedTime.formatFixed(3)); fm.params.add("toDate", applicationStartTime.formatFixed(3)); send fm to FindMeasurement.CHANNEL; } action publishStatistics { on all Statistics() as stats { // This is the first entry, just add to cache and return if not cache.hasKey(stats.source) { cache.add(stats.source, stats); return; } Statistics prev := cache[stats.source]; // Check if we need to raise an alarm // Raise a MINOR alarm if new value goes above the max value or below // the min value if stats.last > prev.max { sendAlarm(stats.source, "ACTIVE", "MINOR", ALARM_TYPE, "Measurement value " + stats.last.toString() + " exceeded current max value of " + prev.max.toString()); } else if stats.last < prev.min { sendAlarm(stats.source, "ACTIVE", "MINOR", ALARM_TYPE, "Measurement value " + stats.last.toString() + " is lower than current min value of " + prev.min.toString()); } // Update historical min and max values if stats.min < prev.min { prev.min := stats.min; } if stats.max > prev.max { prev.max := stats.max; } // Publish only once every 5 seconds if prev.lastPublishedTime = 0.0 or prev.lastPublishedTime + 5.0 <= currentTime { // Add the statistics to a new Measurement object Measurement m := new Measurement; m.source := stats.source; m.time := currentTime; m.type := MEASUREMENT_STATISTICS; dictionary fragment := m.measurements.getOrAddDefault(MEASUREMENT_STATISTICS); fragment.getOrAddDefault("average").value := stats.average; fragment.getOrAddDefault("stddev").value := stats.stddev; fragment.getOrAddDefault("min").value := prev.min; fragment.getOrAddDefault("max").value := prev.max; // Send new measurement to Cumulocity IoT send m to Measurement.CREATE_CHANNEL; prev.lastPublishedTime := currentTime; } } } /* Create a new alarm in Cumulocity IoT */ action sendAlarm(string source, string status, string severity, string type, string text) { Alarm alarm := new Alarm; alarm.source := source; alarm.status := status; alarm.severity := severity; alarm.type := type; alarm.text := text; alarm.time := currentTime; send alarm to Alarm.CHANNEL; } }