using com.apama.cumulocity.Operation; using com.apama.cumulocity.FindOperation; using com.apama.cumulocity.FindOperationResponse; using com.apama.cumulocity.FindOperationResponseAck; /** An object that holds query parameters. */ event OperationFilter { /** Param to indicate how many entries of the collection * are to be retrieved from Cumulocity IoT. */ constant string PAGE_SIZE := "pageSize"; /** Param to search for operations based on the fragmentType. */ constant string FRAGMENT_TYPE := "fragmentType"; /** Param to search for operations based on the type. */ constant string TYPE := "type"; /** Param to fetch operations based on the device identifier. */ constant string SOURCE := "source"; /** Param to fetch operations based on the agent identifier. */ constant string AGENT := "agent"; /** Param to fetch operations based on the status.*/ constant string STATUS := "status"; /** Search for operations from a specific date. */ constant string FROM_DATE := "fromDate"; /** Search for operations up to a specific date .*/ constant string TO_DATE := "toDate"; /** Create and return a new OperationFilter object. */ static action init() returns OperationFilter { return new OperationFilter; } /** Request with pageSize. */ action withPageSize(integer pageSize) returns OperationFilter { return addParam(PAGE_SIZE, pageSize.toString()); } /** Filter based on FragmentType. */ action byFragmentType(string fragmentType) returns OperationFilter { return addParam(FRAGMENT_TYPE, fragmentType); } /** Filter based on Type. */ action byType(string type) returns OperationFilter { return addParam(TYPE, type); } /** Filter based on Source. */ action bySource(string source) returns OperationFilter { return addParam(SOURCE, source); } /** Filter based on Agent. */ action byAgent(string agent) returns OperationFilter { return addParam(AGENT, agent); } /** Filter based on Status. */ action byStatus(string status) returns OperationFilter { return addParam(STATUS, status); } /** Search for operations from a specific date. */ action fromDate(float fromDate) returns OperationFilter { return addParam(FROM_DATE, fromDate.toString()); } /** Search for operations up to a specific date. */ action toDate(float toDate) returns OperationFilter { return addParam(TO_DATE, toDate.toString()); } /** Add a query parameter. */ action addParam(string paramName, string paramValue) returns OperationFilter { params.add(paramName, paramValue); return self; } /** Return query parameters. */ action getParameters() returns dictionary { return params; } /** Query parameters. */ dictionary params; } /** Helper to query for operations. */ event OperationLookupHelper { /** Callback for successful responses. */ action responseCallback; /** Callback for completion acknowledgement. */ action responseCompletedCallback; /** Create and return a new OperationLookupHelper object. */ static action init() returns OperationLookupHelper { return new OperationLookupHelper; } /** Register query response callback. */ action withResponseCallback(action responseCb) returns OperationLookupHelper { responseCallback := responseCb; return self; } /** Register query completed callback. */ action withResponseCompletedCallback(action responseCompletedCb) returns OperationLookupHelper { responseCompletedCallback := responseCompletedCb; return self; } /** Simple request without any params to fetch available operations. */ action get() returns integer { return getWithFilter(OperationFilter.init()); } /** Query for operations based on a filter. */ action getWithFilter(OperationFilter filter) returns integer { /** Subscribe to FindOperationResponse.CHANNEL to listen for responses. */ monitor.subscribe(FindOperationResponse.CHANNEL); /** Unique request identifier. */ integer reqId := integer.getUnique(); /** Listen for matching responses. */ on all FindOperationResponse(reqId=reqId) as resp and not FindOperationResponseAck(reqId=reqId) { ifpresent responseCallback { responseCallback(reqId, resp.operation); } } /** Listen for request completed acknowledgement. */ on FindOperationResponseAck(reqId=reqId) { monitor.unsubscribe(FindOperationResponse.CHANNEL); ifpresent responseCompletedCallback { responseCompletedCallback(reqId); } } /** Send request to find available operations. */ send FindOperation(reqId, filter.getParameters()) to FindOperation.CHANNEL; return reqId; } } /** * Query for Operation(s) based on status. * * This application sends a request to query for Operations with status as * "PENDING" and creates listeners to listen for 0 or more * FindOperationResponse events followed by the FindOperationResponseAck * completion acknowledgement event. */ monitor FindOperationSample { constant string STATUS := "PENDING"; action onload() { /* Create a filter to lookup operations based on status. */ OperationFilter filter := OperationFilter.init().byStatus(STATUS); /** Query for operations. */ integer reqId := OperationLookupHelper.init(). withResponseCallback(queryResponse). withResponseCompletedCallback(queryCompleted). getWithFilter(filter); } /** Receive each individual response. */ action queryResponse(integer reqId, Operation operation) { log "Find Operation Response : " + operation.toString() at DEBUG; } /** Request completed acknowledgement. */ action queryCompleted(integer reqId) { log "Find Operation Request completed " + reqId.toString() at DEBUG; } }