using com.apama.cumulocity.ManagedObject; using com.apama.cumulocity.Measurement; using com.apama.cumulocity.Alarm; using com.apama.cumulocity.FindManagedObject; using com.apama.cumulocity.FindManagedObjectResponse; using com.apama.cumulocity.FindManagedObjectResponseAck; using com.apama.cumulocity.FindAlarm; using com.apama.cumulocity.FindAlarmResponse; using com.apama.cumulocity.FindAlarmResponseAck; using com.apama.cumulocity.SendEmail; using com.apama.cumulocity.Util; /** * Raise an Alarm if no Measurement(s) are received in a specified period. * * This application raises an alarm if measurements are not being received from * devices. No alarams are raised for device under maintenance. */ monitor UnavailabilityAlarm { event LastSeen { float lastSeenTime; float alarmTime; any object; } dictionary managedObjects; constant float TIME_PERIOD := 30.0; constant string ALARM_TYPE := "UnavailabilityAlarm"; constant string ALARM_SEVERITY :="MINOR"; // Populate this with the list of users email has to be sent to sequence recipients := []; // Populate this with the email identifier to which replies can be sent // Usually this will be something similar to noreply@domain.com string replyTo := ; string alarmMsg := "No measurement(s) seen in the last " + TIME_PERIOD.toString() + " seconds"; // Persist the application start time to quicken the lookup time float applicationLastQueryTime := 0.0; action onload { getAllDevices(); listenForMeasurements(); on all wait(TIME_PERIOD) { sequence alarmsToClear := new sequence; sequence alarmsToRaise := new sequence; sequence sendEmail := new sequence; string source; for source in managedObjects.keys() { LastSeen lastSeen := managedObjects[source]; // Device is currently under maintenance, don't proceed further if Util.inMaintenanceMode(lastSeen.object) { log "Source " + source + " is under maintenance." at DEBUG; // Reset this so that we don't actually escalate it when the // source is active again lastSeen.alarmTime := 0.0; continue; } // We received a measurement in the last polling interval if lastSeen.lastSeenTime + TIME_PERIOD >= currentTime { log "Received measurement(s) in the last " + TIME_PERIOD.toString() + " seconds for " + source + ", clearing existing alarm(s), if present." at DEBUG; lastSeen.alarmTime := 0.0; // We started receiving measurements from the device // Check if any alarms exist and if so clear them alarmsToClear.append(source); continue; } // Check if an already raised alarm has been acknowledged or cleared, // if not send an email to escalate this if lastSeen.alarmTime != 0.0 and lastSeen.alarmTime + TIME_PERIOD >= currentTime { string msg := "No measurement(s) seen in the last " + (2.0*TIME_PERIOD).toString() + " seconds"; log msg + " for " + source + ", will send an email if alarm" + " is not acknowledged or cleared" at DEBUG; // Send email for this source sendEmail.append(source); } // No new measurements are received log alarmMsg + "for " + source + ", raising alarm" at DEBUG; // Delay raising alarms till we clear existing alarms // This can speed up the lookup time later on. alarmsToRaise.append(source); if lastSeen.alarmTime = 0.0 { lastSeen.alarmTime := currentTime; } } handleAlarmsAndEmails(alarmsToClear, sendEmail, alarmsToRaise); } } action handleAlarmsAndEmails(sequence alarmsToClear, sequence emailsToSend, sequence alarmsToRaise) { if alarmsToClear.size() = 0 and emailsToSend.size() = 0 { raiseAlarmForSources(alarmsToRaise); return; } monitor.subscribe(FindAlarmResponse.CHANNEL); integer reqId := integer.getUnique(); on all FindAlarmResponse(reqId=reqId) as resp and not FindAlarmResponseAck(reqId=reqId) { integer idx := alarmsToClear.indexOf(resp.alarm.source); // Alarm still active, we can go ahead and clear it if idx >= 0 { sendAlarm(resp.id, resp.alarm.source, "CLEARED", "Clearing Alarm"); alarmsToClear.remove(idx); } idx := emailsToSend.indexOf(resp.alarm.source); // The alarm is still not acknowledged or cleared, send an email if idx >= 0 { sendMail(resp.alarm.source); emailsToSend.remove(idx); } } on FindAlarmResponseAck(reqId=reqId) { monitor.unsubscribe(FindAlarmResponse.CHANNEL); raiseAlarmForSources(alarmsToRaise); } // Get all active alarms FindAlarm fa := new FindAlarm; fa.reqId := reqId; fa.params.add("type", ALARM_TYPE); fa.params.add("status", "ACTIVE"); if applicationLastQueryTime != 0.0 { fa.params.add("fromDate", applicationLastQueryTime.formatFixed(3)); } applicationLastQueryTime := currentTime; send fa to FindAlarm.CHANNEL; } action raiseAlarmForSources(sequence sources) { string source; for source in sources { sendAlarm("", source, "ACTIVE", alarmMsg); } } action listenForMeasurements { monitor.subscribe(Measurement.CHANNEL); on all Measurement() as m { if managedObjects.hasKey(m.source) { managedObjects[m.source].lastSeenTime := currentTime; } } } action getAllDevices { // Get new devices being created/updated monitor.subscribe(ManagedObject.CHANNEL); on all ManagedObject() as managedObject { if managedObject.params.hasKey("c8y_IsDevice") { LastSeen ls := managedObjects.getOrAddDefault(managedObject.id); ls.object := managedObject; ls.lastSeenTime := currentTime; } } // Get list of existing devices monitor.subscribe(FindManagedObjectResponse.CHANNEL); integer reqId := integer.getUnique(); on all FindManagedObjectResponse(reqId=reqId) as resp and not FindManagedObjectResponseAck(reqId=reqId) { LastSeen ls := managedObjects.getOrAddDefault(resp.id); ls.object := resp.managedObject; ls.lastSeenTime := currentTime; } on FindManagedObjectResponseAck(reqId=reqId) { monitor.unsubscribe(FindManagedObjectResponse.CHANNEL); } FindManagedObject findDevice := new FindManagedObject; findDevice.reqId := reqId; // Get all devices findDevice.params.add("fragmentType", "c8y_IsDevice"); send findDevice to FindManagedObject.CHANNEL; } action sendAlarm(string id, string source, string status, string text) { Alarm alarm := new Alarm; alarm.id := id; alarm.source := source; alarm.status := status; alarm.text := text; alarm.time := currentTime; alarm.type := ALARM_TYPE; alarm.severity := ALARM_SEVERITY; send alarm to Alarm.CHANNEL; } action sendMail(string source) { string sub := "Missing measurements from source " + source; string text := "No new Measurements were received from source " + source + " in more than " + (2.0*TIME_PERIOD).toString() + " seconds"; SendEmail email := new SendEmail; email.subject := sub; email.text := text; email.receiver := recipients; email.replyTo := replyTo; //email.cc := []; //email.bcc := ; send email to SendEmail.CHANNEL; } }