Connecting Apama to Zementis and other microservices

Overview

Streaming analytics applications using Apama can make use of applications running in other microservices. This section will use a Predictive Analytics application built with Zementis, but the steps apply to connecting to any other microservice running inside Cumulocity. This section is going to show you how to create a connection to the Cumulocity platform from within Apama EPL which can be used to invoke other microservices directly. It will then show you how to make a request and decode the result.

We will assume that you are developing an Apama EPL application within the Cumulocity EPL editor and demonstrate talking to a predictive model loaded through the Zementis microservice. The steps in this guide will also work with any other way you could be creating an Apama application and can be used to interact with any microservice.

Creating an EPL application

Click the Apama-epl icon in the application switcher to create a new EPL application using the EPL editor. Select New EPL app to begin. You will now see an EPL editor window in which to create the application which interacts with the Zementis microservice.

Connecting to the Cumulocity platform

To support making these requests we provide a helper event with actions to automatically connect to the Cumulocity platform and then create requests which can be used to call other microservices. You will need to copy this helper event and the associated imports into the top of your EPL file. This helper event provides a static action which will connect to Cumulocity and return an instance of the event. The instance has an action which will create a request to call a specific microservice. This helper event will automatically connect either from within a microservice or the Cumulocity platform itself, or from a remote correlator.

Import this code into your application:

using com.apama.correlator.Component;
using com.softwareag.connectivity.ConnectivityPlugins;
using com.softwareag.connectivity.Chain;
using com.apama.util.AnyExtractor;
using com.softwareag.connectivity.httpclient.Request;
using com.softwareag.connectivity.httpclient.Response;
using com.softwareag.connectivity.httpclient.HttpTransport;
using com.softwareag.connectivity.httpclient.HttpOptions;
 
/** Cumulocity Request Interface.
 *
 * This is for making generic REST requests to other
 * Cumulocity microservices with JSON payloads.
 */
event CumulocityRequestInterface
{
   /** @private */
   HttpTransport transport;
    
   /**
   * Allows configuration of a HTTPTransport with
   * Cumulocity-specific configuration details.
   *
   * @returns The instance of the event that contains a transport
   */
   static action connectToCumulocity() returns CumulocityRequestInterface
   {
      string baseUrl := "";
      string basePath := "";
      string host := "";
      integer port := 0;
      string user := "";
      string password := "";
      boolean https := true;
      string tlsFile := "";
 
      dictionary<string, string> config := {};
      dictionary<string, string> envp := Component.getInfo("envp");
    
       
      if envp.hasKey("C8Y_BASEURL") and envp["C8Y_BASEURL"] != "" { // Running internal
         baseUrl := envp["C8Y_BASEURL"];
          
         user := envp["C8Y_TENANT"] + "/" + envp["C8Y_USER"];
         password :=envp["C8Y_PASSWORD"];
      }
      else { // Get the settings from the config properties when running remotely
         string k;
         dictionary<string, string> props := Component.getConfigProperties();
         for k in props.keys() {
            if (k = "CUMULOCITY_SERVER_URL") {
               baseUrl := props[k];
            }
            else if (k = "CUMULOCITY_USERNAME"){
               user := props[k];
            }
            else if (k = "CUMULOCITY_PASSWORD"){
               password := props[k];
            }
            else if (k = "CUMULOCITY_TLS_CERT_AUTH_FILE"){
               tlsFile := props[k];
            }
         }       
      }
 
      if baseUrl.find("/") < 0 {
         baseUrl := baseUrl + "/";
      }
   
      // Check if the baseUrl starts with either http or https
      if baseUrl.length()>=7 and baseUrl.substring(0,7).toLower() = "http://"{
         https := false;
         baseUrl := baseUrl.substring(7, baseUrl.length());
      }
      else if baseUrl.length()>=8 and baseUrl.substring(0,8).toLower() = "https://"{
         https := true;
         baseUrl := baseUrl.substring(8, baseUrl.length());
      }
      // Otherwise assume HTTPS and that the URL does not have such a prefix as http or https
 
      basePath := baseUrl.replace("[^/]*(/.*)?", "$1");
      host := baseUrl.replace("(?:(.*):|(.*)/|(.*)).*", "$1$2$3");
      port := baseUrl.replace("[^:]*:([0-9]*).*", "$1").toInteger();
      if (port = 0){
         if https = true{
            port := 443;
         }
         else{
            port := 80;
         }
      }
       
      config := {
         HttpTransport.CONFIG_USERNAME:user,
         HttpTransport.CONFIG_PASSWORD:password,
         HttpTransport.CONFIG_AUTH_TYPE:"HTTP_BASIC",
         HttpTransport.CONFIG_BASE_PATH:basePath
      };
       
      if https = true{
         config.add(HttpTransport.CONFIG_TLS,"true");
         config.add(HttpTransport.CONFIG_TLS_CERT_AUTH_FILE,tlsFile);
         config.add(HttpTransport.CONFIG_TLS_ACCEPT_UNRECOGNIZED_CERTS,"true");
      }
       
       
      log config.toString() at DEBUG;
      return CumulocityRequestInterface(HttpTransport.getOrCreateWithConfigurations(host, port, config));
   }
    
   /**
   * Allows creation of a request on a transport that
   * has been configured for a Cumulocity connection.
   *
   * @param method The type of HTTP request, for example "GET".
   * @param path A specific path to be appended to the request.
   * @param payload A dictionary of elements to be included in the request.
   */
   action createRequest(string method, string path, any payload) returns Request
   { 
      return transport.createRequest(method, path, payload, new HttpOptions);
   }
}

Connecting to Cumulocity

To create the connection from your own code, simply call the connectToCumulocity method and store the result:

CumulocityRequestInterface cumulocity := CumulocityRequestInterface.connectToCumulocity();

This will automatically create a connection using the credentials and connection details provided to your microservice, or using the configuration for the Cumulocity IoT transport when connecting from an external Apama instance.

Making microservice requests

The CumulocityRequestInterface instance has an action on it to create a request:

/**
* Allows creation of a request on a transport that
* has been configured for a Cumulocity connection.
*
* @param method The type of HTTP request, for example "GET".
* @param path A specific path to be appended to the request.
* @param payload A dictionary of elements to be included in the request.
*/
action createRequest(string method, string path, any payload) returns Request

This takes the HTTP method to use (usually GET, PUT or POST), a path including the Cumulocity service prefix (typically something like /service/serviceName/path/on/service) and the payload. The payload will be converted to a JSON document before submitting to the microservice. The action returns a Request object which is part of the HTTP Client interface, documentation of which can be found on the Apama community website.

Requests are executed with a call-back action as an argument which will be invoked when the request is completed with the response as an argument. If you need to set any options, query parameters or headers on the request, you can set those on the Request object before calling it. For example:

action responseHandled(Response resp) {
    string objectId := resp.payload.getString("id");
    ...
}
...
Request req := cumulocity.createRequest("GET", "/service/otherService/data", any());
req.setQueryParameter("type", "object");
req.execute(responseHandled);

The response will also be decoded from JSON and the response payload uses the AnyExtractor pattern which you can find linked from the Response event in the HTTP Client transport documentation. The above example will be equivalent to the REST request GET http://cumulocity/service/otherService/data?type=object.

Combining streaming analytics with predictive analytics

Apama can process incoming data and then use a predictive model in the Zementis microservice to make decisions on the processed data. We will assume you have already created a model following the Predictive Analytics guide. Each model has a name and you execute the model with a JSON-encoded data string in the query parameters of a GET request to that name. For example, you might execute a simple model with a request like this:

GET /service/zementis/apply/modelName?record=%7B%22name%22:%22fred%22,%22age%22:37%7D

Special characters like quotes (") and curly braces must be encoded in the request. This will happen automatically when using the setQueryParameter API.

The rest of this guide will assume you have a model with a single parameter which analyzes the RSSI value of WiFi networks

The response will be a JSON document with the results of executing the model.

{
  "model" : "modelName",
  "outputs" : [ {
    "normalizedAnomalyScore" : 0.36550809046915766,
    "decisionFunction" : -0.27619546519420546,
    "rawAnomalyScore" : 5.5437220118668105,
    "outlier" : false
  } ]
}

We will start with EPL which connects to Cumulocity and starts listening for measurements from a specific device.

using com.apama.json.JSONPlugin;
 
monitor LookForWifiAnomalies
{
    CumulocityRequestInterface cumulocity;
    action onload()
    {
        cumulocity := CumulocityRequestInterface.connectToCumulocity();
        listenForSignalStrength("idOfDevice", "nameOfZementisModel");
    }
}

You will need to replace the device identifier and Zementis model name for your installation.

Looking for events

First we need to collect some data from measurements. This will use techniques which were previously introduced in this guide. In this case, we will be looking for measurements which arrive from a particular device, check whether they have a given key and if so query the Zementis microservice to decide how we should respond.

action listenForSignalStrength(string deviceId, string modelName)
{
    monitor.subscribe(Measurement.CHANNEL);
    on all Measurement(source = deviceId) as m {
        if (m.measurements.hasKey("c8y_SignalStrengthWifi")) {
            string record := convertMeasurementToRecord(m);
            Request zementisRequest := cumulocity.createRequest("GET", "/service/zementis/apply/"+modelName, any());
            zementisRequest.setQueryParameter("record", record);
            zementisRequest.execute(ZementisHandler(deviceId).requestHandler);
        }
    }
}

Converting measurements to Zementis records

In order to execute the Zementis model we need to convert the Cumulocity request into a record suitable for passing to the Zementis microservice. This will consist of constructing a dictionary corresponding to a JSON object and then encoding it as a string with the JSON EPL plug-in.

action convertMeasurementToRecord(Measurement m) returns string
{
    dictionary<string, any> json := {};
    json["rssi"] := m.measurements.getOrDefault("c8y_SignalStrengthWifi").getOrDefault("rssi").value;
    json["source"] := m.source;
    json["time"] := m.time;
    return JSONPlugin.toJSON(json);
}

Receiving the response from the Zementis microservice

The response from the Zementis microservice will be passed to the request handler once the model has finished executing. It will contain a payload which has been parsed from JSON and will tell us if this is an outlier. We want to raise alarms in Cumulocity for any outliers, which we will do by sending an Alarm event. We are using an event with an action on it so that we can create a closure around the device identifier.

event ZementisHandler
{
    string deviceId;
    action requestHandler(Response zementisResponse)
    {
        integer statusCode := zementisResponse.statusCode;
        if (statusCode = 200 and <boolean> zementisResponse.payload.getSequence("outputs")[0].getEntry("outlier") = true) {
            send Alarm("", "AnomalyDetectionAlarm", deviceId, currentTime,
                "Anomaly detected", "ACTIVE", "CRITICAL", 1, new dictionary<string, any>) to Alarm.CHANNEL;
        }
    }
}

Other microservices

This section was demonstrating talking to a Zementis microservice to execute a model. However, you can also access any other microservice through Cumulocity as long as it uses standard REST requests with JSON payloads. You simply need to construct the appropriate /service URL using the name of your microservice followed by the path of the request within your microservice.