using com.apama.cumulocity.Measurement; using com.apama.cumulocity.MeasurementValue; using com.apama.cumulocity.Alarm; using com.apama.cumulocity.FindAlarm; using com.apama.cumulocity.FindAlarmResponse; using com.apama.cumulocity.FindAlarmResponseAck; using com.apama.aggregates.avg; using com.apama.aggregates.last; /** * Raise/Clear Alarm(s) on crossover of long-term & short-term moving averages. * * This application raises an alarm when the short-term moving average * exceeds the long-term moving average and clears an existing * alarm if it goes the other way. */ monitor CrossoverAlarm { /** * This is a container for storing both long-term and short-term moving * averages. */ event Crossover { string source; float shortTerm; float longTerm; } /** * This container is used to store the moving average. */ event MovingAverage { string source; float timeWindow; float average; } // Long-term moving average window period in secs // for 24 hrs(1*24*60*60) float LONG_TERM_TIME_WINDOW := 1.0 * 24.0 * 60.0 * 60.0; // Short-term moving average window period in secs // for 1 hrs(1*60*60) float SHORT_TERM_TIME_WINDOW := 1.0 * 60.0 * 60.0; // The measurement type to be used while creating new // measurement for moving averages. constant string MEASUREMENT_STATISTICS := "movingAverages"; // Measurement type constant string MEASUREMENT_TYPE := "c8y_Temperature"; // Measurement Fragment & Series constant string FRAGMENT := "c8y_Temperature"; constant string SERIES := "T"; // Alarm type to be created if shorter moving average // crosses above the longer moving average constant string ALARM_TYPE := "CrossoverAlarm"; // Cache last published time dictionary lastPublishTime; // Local cache to store the previous moving average dictionary cache; action onload() { /** * Calculate the moving average for the given short and long duration time * windows and group them based on the source. */ calculateMovingAverage(SHORT_TERM_TIME_WINDOW); calculateMovingAverage(LONG_TERM_TIME_WINDOW); on all MovingAverage(timeWindow = SHORT_TERM_TIME_WINDOW) as shortTerm { // May be the window has elapsed if shortTerm.average.isNaN() { return; } on MovingAverage(timeWindow = LONG_TERM_TIME_WINDOW, source = shortTerm.source ) as longTerm { // May be the window has elapsed if longTerm.average.isNaN() { return; } Crossover crossover := Crossover(shortTerm.source, shortTerm.average, longTerm.average); // Publish periodically to Cumulocity IoT if currentTime - lastPublishTime.getOrDefault(crossover.source) > 5.0 { sendMovingAverage(crossover); lastPublishTime.add(crossover.source, currentTime); } if not cache.hasKey(crossover.source) { cache.add(crossover.source, crossover); return; } Crossover prev := cache[crossover.source]; // Check if short-term moving average exceeded the long-term moving // average. If so, raise an alarm if prev.shortTerm <= prev.longTerm and crossover.shortTerm > crossover.longTerm { // Store the current short-term and long-term moving averages sendMovingAverage(crossover); // Raise an alarm sendAlarm("", crossover.source, "ACTIVE", "MINOR", "CrossoverAlarm created"); } // Check if short-term moving average went below the long-term // moving average. If so, clear any existing alarms else if prev.shortTerm >= prev.longTerm and crossover.shortTerm < crossover.longTerm { monitor.subscribe(FindAlarmResponse.CHANNEL); // Check for any alarms that exist for the current source // with the given type and are still "ACTIVE" integer reqId := integer.getUnique(); on FindAlarmResponse(reqId=reqId) as resp and not FindAlarmResponseAck(reqId=reqId) { sendAlarm(resp.id, crossover.source, "CLEARED", "MINOR", "CrossoverAlarm cleared"); } on FindAlarmResponseAck(reqId = reqId) { monitor.unsubscribe(FindAlarmResponse.CHANNEL); } FindAlarm findAlarm := new FindAlarm; findAlarm.reqId := reqId; findAlarm.params.add("source", crossover.source); findAlarm.params.add("status", "ACTIVE"); findAlarm.params.add("type", ALARM_TYPE); send findAlarm to FindAlarm.CHANNEL; } cache.add(crossover.source, crossover); } } } // Create moving average stream based on time window action calculateMovingAverage(float timeWindow) { // Subscribe to Measurement.CHANNEL // to receive all available measurements monitor.subscribe(Measurement.CHANNEL); from m in all Measurement(type=MEASUREMENT_TYPE) partition by m.source within timeWindow where m.measurements.hasKey(FRAGMENT) and m.measurements[FRAGMENT].hasKey(SERIES) group by m.source select MovingAverage(m.source,timeWindow, avg(m.measurements[FRAGMENT][SERIES].value)) as movingAverage { route movingAverage; } } action sendAlarm(string id, string source, string status, string severity, string msg) { Alarm alarm := new Alarm; alarm.id := id; alarm.status := status; alarm.severity := severity; alarm.source := source; alarm.text := msg; alarm.type := ALARM_TYPE; alarm.time := currentTime; send alarm to Alarm.CHANNEL; } action sendMovingAverage(Crossover crossover) { Measurement m := new Measurement; m.source := crossover.source; m.time := currentTime; m.type := MEASUREMENT_STATISTICS; dictionary fragment := m.measurements.getOrAddDefault(MEASUREMENT_STATISTICS); fragment.getOrAddDefault("shortTermAverage").value := crossover.shortTerm; fragment.getOrAddDefault("longTermAverage").value := crossover.longTerm; // Send new measurement to Cumulocity IoT send m to Measurement.CREATE_CHANNEL; } }