using com.apama.cumulocity.Measurement; using com.apama.cumulocity.FindMeasurement; using com.apama.cumulocity.FindMeasurementResponse; using com.apama.cumulocity.FindMeasurementResponseAck; /** An object that holds query parameters. */ event MeasurementFilter { /** Param to indicate how many entries of the collection * are to be retrieved from Cumulocity IoT. */ constant string PAGE_SIZE := "pageSize"; /** Param to search for measurements based on the fragmentType.*/ constant string FRAGMENT_TYPE := "fragmentType"; /** Param to search for measurements based on the type. */ constant string TYPE := "type"; /** Param to fetch measurements based on the device identifier. */ constant string SOURCE := "source"; /** Search for measurements from a specific date. */ constant string FROM_DATE := "fromDate"; /** Search for measurements up to a specific date .*/ constant string TO_DATE := "toDate"; /** Create and return a new MeasurementFilter object. */ static action init() returns MeasurementFilter { return new MeasurementFilter; } /** Request with pageSize. */ action withPageSize(integer pageSize) returns MeasurementFilter { return addParam(PAGE_SIZE, pageSize.toString()); } /** Filter based on Type. */ action byType(string type) returns MeasurementFilter { return addParam(TYPE, type); } /** Filter based on FragmentType. */ action byFragmentType(string fragmentType) returns MeasurementFilter { return addParam(FRAGMENT_TYPE, fragmentType); } /** Filter based on Source. */ action bySource(string source) returns MeasurementFilter { return addParam(SOURCE, source); } /** Search for measurements from a specific date. */ action fromDate(float fromDate) returns MeasurementFilter { return addParam(FROM_DATE, fromDate.toString()); } /** Search for measurements up to a specific date. */ action toDate(float toDate) returns MeasurementFilter { return addParam(TO_DATE, toDate.toString()); } /** Add a query parameter. */ action addParam(string paramName, string paramValue) returns MeasurementFilter { params.add(paramName, paramValue); return self; } /** Return query parameters. */ action getParameters() returns dictionary { return params; } /** Query parameters. */ dictionary params; } /** Helper to query for measurements. */ event MeasurementLookupHelper { /** Callback for successful responses. */ action responseCallback; /** Callback for completion acknowledgement. */ action responseCompletedCallback; /** Create and return a new MeasurementLookupHelper object. */ static action init() returns MeasurementLookupHelper { return new MeasurementLookupHelper; } /** Register query response callback. */ action withResponseCallback(action responseCb) returns MeasurementLookupHelper { responseCallback := responseCb; return self; } /** Register query completed callback. */ action withResponseCompletedCallback(action responseCompletedCb) returns MeasurementLookupHelper { responseCompletedCallback := responseCompletedCb; return self; } /** Simple request without any params to fetch available measurements. */ action get() returns integer { return getWithFilter(MeasurementFilter.init()); } /** Query for measurements based on a filter. */ action getWithFilter(MeasurementFilter filter) returns integer { /** Subscribe to FindMeasurementResponse.CHANNEL to listen for responses. */ monitor.subscribe(FindMeasurementResponse.CHANNEL); /** Unique request identifier. */ integer reqId := integer.getUnique(); /** Listen for matching responses. */ on all FindMeasurementResponse(reqId=reqId) as resp and not FindMeasurementResponseAck(reqId=reqId) { ifpresent responseCallback { responseCallback(reqId, resp.measurement); } } /** Listen for request completed acknowledgement. */ on FindMeasurementResponseAck(reqId=reqId) { monitor.unsubscribe(FindMeasurementResponse.CHANNEL); ifpresent responseCompletedCallback { responseCompletedCallback(reqId); } } /** Send request to find available measurements. */ send FindMeasurement(reqId, filter.getParameters()) to FindMeasurement.CHANNEL; return reqId; } } /** * Query for Measurement(s) based on fragmentType. * * This application sends a request to query for Measurements with fragmentType * "SpeedMeasurement" and creates listeners to listen for 0 or more * FindMeasurementResponse events followed by the FindMeasurementResponseAck * completion acknowledgement event. */ monitor FindMeasurementSample { constant string FRAGMENT_TYPE := "SpeedMeasurement"; action onload() { /* Create a filter to lookup measurements based on fragmentType. */ MeasurementFilter filter := MeasurementFilter.init().byFragmentType(FRAGMENT_TYPE); /** Query for measurements. */ integer reqId := MeasurementLookupHelper.init() .withResponseCallback(queryResponse) .withResponseCompletedCallback(queryCompleted) .getWithFilter(filter); } /** Receive each individual response. */ action queryResponse(integer reqId, Measurement measurement) { log "Find Measurement Response : " + measurement.toString() at DEBUG; } /** Request completed acknowledgement. */ action queryCompleted(integer reqId) { log "Find Measurement Request completed " + reqId.toString() at DEBUG; } }