using com.apama.cumulocity.Event; using com.apama.cumulocity.FindEvent; using com.apama.cumulocity.FindEventResponse; using com.apama.cumulocity.FindEventResponseAck; /** An object that holds query parameters. */ event EventFilter { /** Param to indicate how many entries of the collection * are to be retrieved from Cumulocity IoT. */ constant string PAGE_SIZE := "pageSize"; /** Param to search for events based on the type. */ constant string TYPE := "type"; /** Param to fetch events based on the device identifier. */ constant string SOURCE := "source"; /** Search for events from a specific CreationDate. */ constant string FROM_CREATION_DATE := "fromCreationDate"; /** Search for events up to a specific CreationDate. */ constant string TO_CREATION_DATE := "toCreationDate"; /** Search for events from a specific date. */ constant string FROM_DATE := "fromDate"; /** Search for events up to a specific date. */ constant string TO_DATE := "toDate"; /** Creates and returns a new EventFilter object. */ static action init() returns EventFilter { return new EventFilter; } /** Request with pageSize. */ action withPageSize(integer pageSize) returns EventFilter { return addParam(PAGE_SIZE, pageSize.toString()); } /** Filter based on Type. */ action byType(string type) returns EventFilter { return addParam(TYPE, type); } /** Filter based on Source. */ action bySource(string source) returns EventFilter { return addParam(SOURCE, source); } /** Search for events from a specific CreationDate. */ action fromCreationDate(float fromCreationDate) returns EventFilter { return addParam(FROM_CREATION_DATE, fromCreationDate.toString()); } /** Search for events up to a specific CreationDate. */ action toCreationDate(float toCreationDate) returns EventFilter { return addParam(TO_CREATION_DATE, toCreationDate.toString()); } /** Search for events from a specific date. */ action fromDate(float fromDate) returns EventFilter { return addParam(FROM_DATE, fromDate.toString()); } /** Search for events up to a specific date. */ action toDate(float toDate) returns EventFilter { return addParam(TO_DATE, toDate.toString()); } /** Add a query parameter. */ action addParam(string paramName, string paramValue) returns EventFilter { params.add(paramName, paramValue); return self; } /** Return query parameters. */ action getParameters() returns dictionary { return params; } /** Query parameters. */ dictionary params; } /** Helper to query for events. */ event EventLookupHelper { /** Callback for successful responses. */ action responseCallback; /** Callback for completion acknowledgement. */ action responseCompletedCallback; /** Create and return a new EventLookupHelper object. */ static action init() returns EventLookupHelper { return new EventLookupHelper; } /** Register query response callback. */ action withResponseCallback(action responseCb) returns EventLookupHelper { responseCallback := responseCb; return self; } /** Register query completed callback. */ action withResponseCompletedCallback(action responseCompletedCb) returns EventLookupHelper { responseCompletedCallback := responseCompletedCb; return self; } /** Simple request without any params to fetch available events. */ action get() returns integer { return getWithFilter(EventFilter.init()); } /** Query for events based on a filter. */ action getWithFilter(EventFilter filter) returns integer { /** Subscribe to FindEventResponse.CHANNEL to listen for responses. */ monitor.subscribe(FindEventResponse.CHANNEL); /** Unique request identifier. */ integer reqId := integer.getUnique(); /** Listen for matching responses. */ on all FindEventResponse(reqId=reqId) as resp and not FindEventResponseAck(reqId=reqId) { ifpresent responseCallback { responseCallback(reqId, resp.evt); } } /** Listen for request completed acknowledgement. */ on FindEventResponseAck(reqId=reqId) { monitor.unsubscribe(FindEventResponse.CHANNEL); ifpresent responseCompletedCallback { responseCompletedCallback(reqId); } } /** Send request to find available events. */ send FindEvent(reqId , filter.getParameters()) to FindEvent.CHANNEL; return reqId; } } /** * Query for Event(s) based on type. * * This application sends a request to query for Events of type * "DoorOpenedEvent" and creates listeners to listen for 0 or more * FindEventResponse events followed by the FindEventResponseAck completion * acknowledgement event. */ monitor FindEventSample { constant string EVENT_TYPE := "DoorOpenedEvent"; action onload() { /* Create a filter to lookup events based on type. */ EventFilter filter := EventFilter.init().byType(EVENT_TYPE); /** Query for events. */ integer reqId := EventLookupHelper.init() .withResponseCallback(queryResponse) .withResponseCompletedCallback(queryCompleted) .getWithFilter(filter); } /** Receive each individual response. */ action queryResponse(integer reqId, Event evt) { log "Find Event Response : " + evt.toString() at DEBUG; } /** Request completed acknowledgement. */ action queryCompleted(integer reqId) { log "Find Event Request completed " + reqId.toString() at DEBUG; } }