Working with DataHub

This section describes how to offload data from the Operational Store of Cumulocity IoT to a data lake using Cumulocity IoT DataHub.

Info: You need configuration or administration permissions to work with offloading pipelines. See section Defining DataHub permissions and roles for details.

Configuring offloading jobs

Cumulocity IoT DataHub provides functionality to configure, manage, and execute offloading pipelines that extract and transform data from the Operational Store of Cumulocity IoT and offload it to a data lake.

Basic functionality

On the Offloading page you do the offloading management and monitoring tasks:

Configuration of offloading tasks

In the action bar you have a search control to search for all offloading configurations whose task name, description, filter predicate, additional columns, or UUID contain the search string. You can use the Active/Inactive filter controls to show/hide corresponding configurations. The action bar comprises also controls for adding a collection for offloading, reloading the list of configurations and their status, and importing/exporting configurations. Below the action bar you will find the current set of configurations.

The following steps describe how to set up and start an offloading pipeline.

Defining an offloading configuration

To define an offloading configuration, click Offload collection to start a wizard which guides you through the main steps:

The wizard prepopulates settings for the different steps to ease the configuration process. You can modify those settings according to your needs.

Select collection

In the dropdown box select one of the Cumulocity IoT base collections, which are:

Info: You can define multiple offloading pipelines for each Cumulocity IoT collection, except for the case of a TrendMiner offloading configuration, which must be singleton. For example, you can filter the alarms collection by different criteria with each one resulting in a separate pipeline.

Later in this section you will find a summary of the default attributes being offloaded per base collection.

Define an offloading task

Click Next to proceed with the next configuration step. Click Cancel to cancel the offloading configuration.

Configure target table

Once you have selected a collection for offloading, you have to specify the target table in the data lake. The Target table name denotes the folder name in the data lake. In this folder the offloaded data is stored. In Dremio a table is created with the same name, pointing to this data lake folder. This table is used when querying the corresponding data lake folder and thus the offloaded data. The target table name must follow these syntax rules:

Each pipeline must create its own so-called target table in the data lake, i.e., you must choose distinct target table names for each offloading configuration.

For alarms, events, inventory collection, you only need to specify the target table name in this step.

For the measurements collection, additional settings are required. The target table layout refers to the way the measurements are stored. Measurements in the base collection may have different types, e.g., the collection may contain temperature, humidity, and pressure measurements. Depending on your layout choice, measurements are stored differently in the target table.

The layout One table for one measurement type (Default) will create a table containing only measurements of one specific type; measurements of other types are not included. When selecting this layout, you have to additionally specify the measurement type to which the offloaded measurements are restricted. DataHub automatically inspects a subset of the data and identifies existing measurement types. In the measurement type dropdown box, these auto-detected types are listed. In case a specific type you are looking for has not been detected, you can manually enter it in this box as well.

The layout All measurement types in one table (TrendMiner) will create a table containing measurements of all types. To distinguish the measurements, the table has a column which lists for each measurement its corresponding type. The specific table schema for this layout is listed later in this section. This layout is only for use cases where you want to offload the data into the data lake, so that TrendMiner can consume the data for its time-series analytics. When this layout is selected, the target table name is set to a fixed, non-editable name, which TrendMiner expects for its data import. To learn more about the interaction between TrendMiner and DataHub, see Integrating DataHub with TrendMiner.

For each base collection, a corresponding pipeline extracts a default set of data fields during offloading. This set defines the default schema of the target table with the columns capturing the data fields. The set is fix for each collection and cannot be modified. Select Show default schema to show the columns of the default schema with their corresponding name and type.

Click Next to proceed with the next configuration step. Click Finish to jump directly to the final step. Both steps will fail if the associated base collection is empty, as it prevents necessary schema investigations. In such a case you have to ensure that the base collection is not empty before you can proceed with the offloading configuration. Click Previous to go one configuration step back. Click Cancel to cancel the offloading configuration.

Set additional result columns

If you have added additional top-level fields while feeding data into Cumulocity IoT and you want to access them in your DataHub queries, then you can include them as additional result columns. You can also use additional result columns to offload data fields in the base collection which are not part of the default schema. Additional result columns can be configured optionally. The TrendMiner case does not support this option.

Auto-detected columns

To ease the configuration process, DataHub auto-detects additional result columns. Using a sample of the base collection, DataHub searches for additional top-level fields and provides them as additional result columns. You can either include such an auto-detected column in your offloading or not. As the auto-detection logic relies on a sample, not all additional top-level fields might be captured. You can manually add a column to include a field you miss.

Structure of additional result columns

Each additional result column, whether it is manually configured or auto-detected, has the following properties:

When entering the configuration step for additional result columns, all columns and their properties are shown in a table, with one additional result column per row. In the top right corner the Hide auto-detected columns checkbox allows you to either show the auto-detected columns or not. On the right side of each additional result column, a collapse button and a context menu is available. With the collapse button you can expand/collapse more details of the column. In the details section you can explore the source definition as well as sample data of the column. In the context menu of an additional result column you find actions for editing, duplicating, or deleting the column. The column name can also be edited inline by clicking into the name field, adapting the name, and clicking once outside the field.

In the top-right corner of the table you find a button for manually adding an additional result column.

If you enter the additional result columns step for a running offloading pipeline, i.e., the pipeline is scheduled, you cannot modify the columns.

Overview of additional result columns

Add an additional result column

When adding an additional result column, a modal dialog for defining the column opens. You must define a unique column name. Then the source definition needs to be specified.

First step is to define a field from the base collection in the source definition. Then you can optionally apply SQL functions to adapt the data of this field to your needs, e.g., by trimming whitespace or rounding decimal values. The source definition editor supports you in this process with content completion and syntax highlighting.

If you want to derive additional result columns from nested content, you can specify the nested fields using the prefix “src.” and the path to the nested field. For example, if you have a top-level field “someField” with a nested field “someSubField”, add “src.someField.someSubField” as additional result column. In the same way you can access nested arrays. If you have a top-level field “someField” with a nested array field “someArraySubField”, add “src.someField.someArraySubField[0]” as additional result column to access the first array entry.

Add additional result column

Click Apply to add the column, which will be selected for offloading by default. If the source definition is invalid, e.g. when accessing an unknown column, you get an error message like Column “UnknownColumn” not found in any table. You have to fix the source definition before you can proceed. Click Cancel to cancel the configuration of the additional result column.

Edit an additional result column

In the context menu of an additional result column, select Edit to open the dialog for editing the column name and the source definition. Click Apply to update the column with the new settings. The new column name must be unique and the source definition must be valid in order to proceed. Click Cancel to quit editing the column.

Note that for auto-detected columns the source definition cannot be modified. If you want to modify the source definition, you have to duplicate the auto-detected column.

Duplicate an additional result column

In the context menu of an additional result column, select Duplicate to open the dialog for duplicating the column. The source definition of the duplicate column is the same as of the original column and can be adapted to your needs. The column name uses the original column name plus a counter as suffix to make the name unique. You can adapt the name to your needs, provided the name is unique.

Click Apply to complete and Cancel to quit duplicating the column.

Delete an additional result column

In the context menu of an additional result column, select Delete to open the dialog for deleting the column. Click Confirm to proceed or Cancel to cancel the deletion.

Auto-detected columns cannot be deleted.

When deleting an additional result column, the data will no longer be included in the next offloading run. Data which has already been offloaded to the data lake is not affected by the deletion of the column.

Migration of additional result columns

DataHub versions prior to version 10.10 offer a single text field for defining a comma-separated list of additional result columns. Offloading configurations defined with such an old version internally rely on a different format for managing additional result columns. DataHub version 10.10 and above includes an auto-migration procedure in its version upgrade process to automatically migrate an old configuration to the new additional result columns format. In rare cases this auto-migration might fail, e.g., when the SQL expression is invalid. Such a configuration can still be scheduled, but its settings cannot be modified.

To migrate to the new format manually, proceed as follows:

  1. In the context menu of the column click Show and navigate through the configuration.
  2. Copy the task name, the additional result columns definition, and the target table name to a text editor.
  3. Create a new configuration with an arbitrary target table name and an arbitrary task name.
  4. Navigate to the additional result columns step. Rebuild the additional result columns by manually adding the same columns as given in the old definition. For example, the expression “‘Hello’ AS Col1, ‘World’ AS Col2” results in two columns, one with name “Col1” and source definition “‘Hello’” and one with name “Col2” and source definition “‘World’”. In case columns in the old definition were not named, Dremio has automatically assigned a column name like “EXPR$1”. Use the preview of the old configuration to get the corresponding column names and use them when defining the new additional result columns. Complete the configuration.
  5. Delete the old configuration.
  6. Edit the new configuration and set the task name and the target table name of the old configuration.
  7. When activating the new configuration, you are prompted for either flushing or appending to the existing data. Use the latter option to base the new configuration on the data the old configuration has offloaded so far.

Click Next to proceed with the next configuration step. Click Previous to go one configuration step back. Click Cancel to cancel the offloading configuration.

Set filter predicate

Optionally you can define an additional filter predicate. Per default, all entries in the base collection are offloaded to the data lake; you can use the predicate to filter out entries you do not want to persist in the data lake. For example, you can filter out invalid values or outliers. In the Additional filter predicate field, you can specify such a filter in SQL syntax. For example, for the alarms collection the filter might be “status=‘ACTIVE’ AND severity=‘WARNING’” to only persist active alarms with severity warning. The filter predicate functionality supports complex SQL statements, i.e., a combination of AND/OR, clauses like “IN(…)” / “NOT IN(…)”, and functions, e.g. “REGEXP_LIKE(text, ‘MyText\S+')”.

In the filter predicate you can query all standard attributes of the base collection as well as the custom fields. The additional result columns defined in the previous configuration step cannot be accessed by their name in the filter predicate. You have to use the source definition as defined in the corresponding column instead.

Info: For querying the attribute “id”, you have to use “_id”. For querying the time attributes, see also Working with DataHub > DataHub best practices for example snippets for widely-used temporal filter predicates.

When defining an additional filter predicate, you can click Validate to validate your predicate. If the validation fails, you will get an error description. For example, if you want to apply the trim function to a numeric value “TRIM(numeric_value)”, you get an error message that the trim function cannot be applied in that case. You should fix these errors as otherwise the offloading execution will fail. If the underlying collection is empty and no schema information is available, the validation step cannot be executed.

Click Next to proceed with the next configuration step. Click Previous to go one configuration step back. Click Cancel to cancel the offloading configuration.

Configure task

The task configuration step includes the offloading task name and the description. The Offloading task name is an identifier for the offloading pipeline. It has to have at minimum one non-whitespace character. Even though the task name does not have to be unique, it is advisable to use a unique name.

In the Description field, you can add a description for this offloading pipeline. The description is optional, but we recommend you to use it, as it provides additional information about the pipeline and its purpose.

Click Next to proceed with the next configuration step. Click Previous to go one configuration step back. Click Cancel to cancel the offloading configuration.

Finish configuration

The final step provides a summary of your settings as well as a result preview. The summary includes the settings from the previous steps as well as the internal UUID of this configuration. The UUID is generated by the system and cannot be modified. With the UUID you can distinguish configurations having the same task name, e.g., when browsing the audit log or the offloading status. In the summary, you also get the schedule with which the offloading pipeline will be executed once it is started, e.g., “every hour at minute 6”. The schedule cannot be modified. With the Inactive/Active toggle at the end of the summary you choose whether the periodic offloading execution should be activated upon save or not.

In the offloading preview you can inspect what the actual data that will be offloaded looks like. For this purpose, the offloading is executed, returning a sample of the resulting data. The header row of the sample data incorporates the column name as well as the column type. Use Hide time columns to either show the default columns with a temporal notion or not. Note that no data is persisted to the data lake when running the preview.

Finally, click Save to save the offloading pipeline. Otherwise click Cancel to cancel the offloading configuration. You can also navigate back to adapt previous settings, using the Previous buttons.

Validate an offloading configuration

Overview of offloading pipelines

In the main panel of the Offloading page, you will find all pipelines as well as their current status.

Each pipeline is shown as a card. A card has controls for managing the offloading process. Besides the description you will find the schedule with which this pipeline will be executed once it is started. In addition to that you will find for active pipelines the time of their last execution and the planned next execution. When expanding Additional information, the additional columns, the filter predicate, and the UUID of the configuration are shown.

Scheduling an offloading job

Once you have defined an offloading configuration and saved it, you can start the offloading pipeline.

Starting periodic offloading

Click the Active/Inactive toggle in an offloading card to start the periodic execution of the offloading pipeline, if it was not already activated when configuring the pipeline. The scheduler component of DataHub will then periodically trigger the pipeline.

The initial offload denotes the first execution of an offloading pipeline. While subsequent executions only offload data increments, the initial offload moves all collection data from the Operational Store of Cumulocity IoT to the data lake. Thus, the initial offload may need to deal with vast amounts of data. For this reason, the initial offload does not process one big data set, but instead partitions the data into batches and processes the batches. If the initial offload fails, e.g. due to a data lake outage, the next offload checks which batches were already completed and continues with those not yet completed.

If the same pipeline has already been started and stopped in the past, a new start of the pipeline opens a dialog asking you whether you want to flush the existing data or append the data to the existing data. The latter option offloads only data that has been added after the last execution. The first option flushes the data lake. Then the next execution will offload the complete collection. This option should be used with caution.

Before restarting the periodic offloading, you may have changed the result schema by adding or removing columns (via adding or removing additional result columns). When you restart the pipeline, existing data in the data lake is not modified, but the new data being offloaded incorporates the new schema. When querying such a data set comprising different schemas, the system computes a “merged” schema and (virtually) fills it up with null values where fields have not yet been provided. This usually works without problems if additional attributes are included or removed from the offloading configuration. However, schema merging might fail or lead to unexpected behavior in certain cases. One example is if you change data types, e.g., if the old configuration contained “myCustomField1” as a string and you change it to a number via “CAST(myCustomField1 AS Integer) AS myCustomField1”. Therefore you should take care that the data you offload is consistent.

A previous offloading pipeline may have already written into the same target table, i.e., the data is stored in the same folder on the data lake. In this case, when starting the new offloading pipeline, you are asked whether you want to flush the existing data or append the data to the existing data. You should only append the data if old and new data share the same schema. Otherwise, you might end up with a table consisting of disparate data, which hinders meaningful analysis. If the new data differs from the old data, you should use a new target table. Alternatively, you can flush the existing table if its old contents are not needed anymore. Again, you should be careful when flushing a table as the data most likely cannot be recovered.

Scheduling settings

The scheduler is configured to run the offloading pipeline once an hour. The precise minute of the hour at which the offloading starts is shown in the pipeline configuration. This minute is assigned by the system to balance the load on the Operational Store of Cumulocity IoT, i.e., to avoid that all offloading jobs from different tenants run at the same time. The schedule settings cannot be modified.

Stopping periodic offloading

Use the Active/Inactive toggle in an offloading card to stop the periodic offloading. Then the scheduler stops scheduling new jobs; active jobs will complete.

Viewing the execution status

At the bottom of each pipeline card, the execution status is shown:

Managing an offloading pipeline

In the context menu of each offloading pipeline, you will find actions for managing and monitoring the pipeline.

Context menu of an offloading configuration
Editing/showing an offloading pipeline

Click Edit to edit the current settings. Only inactive pipelines can be edited. Note that you cannot change the Cumulocity IoT base collection selected for this pipeline. For the measurements collection, the target table layout cannot be changed as well. Also note that changes to additional filter predicates, and additional result columns are not applied to already exported data, i.e., a change to the offloading pipeline only affects data to be exported in the future.

For active pipelines, click Show to browse through the configuration. You cannot edit the settings.

Copying an offloading pipeline

Click Copy to copy the current configuration. The new configuration is an identical copy of the selected configuration except for the task name and the target table, both of which will have a unique suffix appended. You can change the settings according to your needs.

A TrendMiner offloading configuration cannot be copied, as only one TrendMiner configuration is allowed.

Deleting an offloading pipeline

Click Delete to delete a configuration. Only inactive pipelines can be deleted. Data in the data lake which has already been exported by this offloading pipeline is not deleted. To delete the actual data in your data lake, you have to use the tooling offered by the data lake provider, e.g. AWS S3 Console or Azure Storage Explorer.

Triggering a manual offloading job

If the periodic offloading is enabled, you can also manually trigger an offloading job between two scheduled executions. For example, you might not want to wait for the next scheduled execution to offload recent data into the data lake. Click Offload now to trigger a manual offloading. As with periodic offloading, a manual offloading execution processes only incremental data that has been added since the last offloading execution (independent of whether this last execution was triggered manually or by the scheduler).

However, we recommend you to rely on the periodic offloading instead of triggering it manually.

Monitoring an offloading pipeline

Click Show offloading history to examine the execution history of a pipeline. See section Monitoring offloading jobs for details.

Importing/exporting offloading configurations

The import/export functionality allows you to copy offloading configurations from one DataHub instance to another. Import/export solely includes the configuration of a pipeline; it includes neither the runtime status of a pipeline nor already exported data.

Export of offloading configurations

The action bar provides an Export button, which exports all offloading configurations. The button is disabled if no offloading configurations are defined. If you click Export, all offloading configurations are exported into a file. The file is located in the local download folder used by your browser.

Warning: You must not modify the contents of the export file as this might corrupt the import step.

Import of offloading configurations

The action bar provides an Import button, which imports offloading configurations from a file with previously exported configurations.

Click Import to open the import dialog. Either drop the file in the import canvas or click into the canvas to browse your file system to select the import file. Once the file is selected, a table with all configurations in the file is shown. For each entry, the table lists the task name, the description, and the internal UUID of the original configuration. The IMPORT checkbox defines whether the configuration is imported or not. Duplicate entries cannot be imported and therefore the checkbox is not shown for such an entry. An entry to import is a duplicate if an already existing configuration has the same target table name or the same internal UUID.

To import the selected configurations, click Import. Click Cancel to cancel the import process.

To change the import file, click the delete icon next to the file name and select a new file to import the configurations from.

Offloading the base collections

The following tables summarize the resulting schemas for each of the Cumulocity IoT standard collections. These schemas additionally include virtual columns “dir0”, …, “dir3”, which are used for internal purposes. The columns are generated during the extraction process, but neither do they have corresponding data in the Operational Store of Cumulocity IoT, nor are they persisted in the data lake. You must not use “dir0”, …, “dir3” as additional columns or you must rename them accordingly in your offloading configuration.

Offloading the alarms collection

The alarm collection keeps track of alarms which have been raised. During offloading, the data of the alarm collection is flattened, with the resulting schema being defined as follows:

Column name Column type
id VARCHAR
count INTEGER
creationTime TIMESTAMP
creationTimeOffset INTEGER
creationTimeWithOffset TIMESTAMP
time TIMESTAMP
timeOffset INTEGER
timeWithOffset TIMESTAMP
lastUpdated TIMESTAMP
lastUpdatedOffset INTEGER
lastUpdatedWithOffset TIMESTAMP
year VARCHAR
month VARCHAR
day VARCHAR
severity VARCHAR
history OTHER
source VARCHAR
status VARCHAR
text VARCHAR

Info: The column firstOccurrenceTime is not included in the default schema. If you want to include it in the offloading, it must be added manually.

The alarms collection keeps track of alarms. An alarm may change its status over time. The alarms collection also supports updates to incorporate these changes. Therefore an offloading pipeline for the alarms collection encompasses additional steps:

  1. Offload those entries of the alarms collection that were added or updated since the last offload. They are offloaded with the above mentioned standard schema into the target table of the data lake.
  2. Two views over the target table are defined in the tenant’s space in Dremio. Their names are defined as target table name plus “_all” and “_latest” respectively. The following examples use “alarms” as target table name:
    • alarms_all: A view with the updates between two offloading executions, not including the intermediate updates. For example, after the first offloading execution, the status of an alarm is ACTIVE. Then it changes its status from ACTIVE to INACTIVE and afterwards back to ACTIVE. When the next offloading is executed, it will persist the latest status ACTIVE, but not the intermediate status INACTIVE (because it happened between two offloading runs and thus is not seen by DataHub).
    • alarms_latest: A view with the latest status of all alarms, with all previous transitions being discarded.

Both views are provided in your Dremio space. For details on views and spaces in Dremio see section Refining Offloaded Cumulocity IoT Data.

Offloading the events collection

The events collection manages the events. During offloading, the data of the events collection is flattened, with the resulting schema being defined as follows:

Column name Column type
id VARCHAR
creationTime TIMESTAMP
creationTimeOffset INTEGER
creationTimeWithOffset TIMESTAMP
time TIMESTAMP
timeOffset INTEGER
timeWithOffset TIMESTAMP
lastUpdated TIMESTAMP
lastUpdatedOffset INTEGER
lastUpdatedWithOffset TIMESTAMP
year VARCHAR
month VARCHAR
day VARCHAR
source VARCHAR
text VARCHAR
type VARCHAR

Events, just like alarms, are mutable, i.e., they can be changed after their creation. Thus, the same logic as for alarms applies.

Two views over the target table are defined in the tenant’s space in Dremio. Their names are defined as target table name plus _all and _latest respectively. The following examples use events as target table name:

Offloading the inventory collection

The inventory collection keeps track of managed objects. During offloading, the data of the inventory collection is flattened, with the resulting schema being defined as follows:

Column name Column type
id VARCHAR
creationTime TIMESTAMP
creationTimeOffset INTEGER
creationTimeWithOffset TIMESTAMP
lastUpdated TIMESTAMP
lastUpdatedOffset INTEGER
lastUpdatedWithOffset TIMESTAMP
year VARCHAR
month VARCHAR
day VARCHAR
name VARCHAR
owner VARCHAR
type VARCHAR
c8y_IsDeviceGroup BOOLEAN
c8y_IsDevice BOOLEAN
childAssets OTHER
childDevices OTHER

The inventory collection keeps track of managed objects. Note that DataHub automatically filters out internal objects of the Cumulocity IoT platform. These internal objects are also not returned when using the Cumulocity IoT REST API. A managed object may change its state over time. The inventory collection also supports updates to incorporate these changes. Therefore an offloading pipeline for the inventory encompasses additional steps:

  1. Offload the entries of the inventory collection that were added or updated since the last offload. They are offloaded with the above mentioned standard schema into the target table of the data lake.
  2. Two views over the target table are defined in the tenant’s space in Dremio. Their names are defined as target table name plus _all and _latest respectively. The following examples use inventory as target table name:
    • inventory_all: a view with the updates between two offloading executions, not including the intermediate updates. For example, after the first offloading execution, the status of a device is ACTIVE. Then it changes its state from ACTIVE to INACTIVE and afterwards to ERROR. When the next offloading is executed, it will persist the status ERROR, but not the intermediate status INACTIVE (because it happened between two offloading runs and thus is not seen by DataHub).
    • inventory_latest: a view with the latest status of all managed objects, with all previous transitions being discarded.

Both views are provided in your Dremio space. For details on views and spaces in Dremio see section Refining Offloaded Cumulocity IoT Data.

Info: The fields childDevices and childAssets are not part of the default offloading columns. They were included in previous versions, but lead to problems for a high number of list items in those fields. In such a case, the columns were no more readable by Dremio. If they need to be included in the offloaded data, they can be defined as additional result columns. However, you have to ensure that the number of list items in those fields does not exceed the Dremio limit configured in your environment.

Offloading the measurements collection

The measurements collection stores device measurements. Offloading the measurements collection differs from the other collections as you have to explicitly select a target table layout, which is either having one table for one type or for the TrendMiner case one table with measurements of all types.

Offloading measurements with the default target table layout

When using the default layout, you have to select a measurement type, so that all offloaded data is of the same type. During offloading, the data of the measurements collection is flattened, with the resulting schema being defined as follows:

Column name Column type
id VARCHAR
creationTime TIMESTAMP
creationTimeOffset INTEGER
creationTimeWithOffset TIMESTAMP
time TIMESTAMP
timeOffset INTEGER
timeWithOffset TIMESTAMP
year VARCHAR
month VARCHAR
day VARCHAR
source VARCHAR
type VARCHAR
fragment.attribute1.name.value Depends on data type, often FLOAT
fragment.attribute1.name.unit String
fragment.attributeN.name.value Depends on data type, often FLOAT
fragment.attributeN.name.unit String
myCustomAttribute1 Depends on data type
myCustomAttributeN Depends on data type

The entries in the measurements collection can have a different structure, depending on the types of data the corresponding device emits. While one sensor might emit temperature and humidity values, another sensor might emit pressure values. The flattened structure of these attributes is defined as fragment_ followed by attribute name and associated type being defined as in the measurements collection. The concrete number of attributes depends on the measurement type, illustrated in the above table with fragment_attribute1_name_value to fragment_attributeN_name_value.

Example mapping

The following excerpt of a measurement document in the base collection

{
    ...
     "c8y_Temperature": {
         "T": {
             "unit": "C",
             "value": 2.0791169082
         }

     }
}

is represented in the target table in the data lake as

c8y_Temperature.T.unit c8y_Temperature.T.value
C 2.0791169082
Offloading measurements with the TrendMiner target table layout

When using the TrendMiner layout, all measurements are offloaded into one table. Their corresponding type is stored in column type. The column unit defines the unit, while the column value defines the value of the measurement. The column tagname is used by TrendMiner to search for specific series. It is composed of the source, the fragment, and the series as stored in the measurements collection.

The resulting schema is defined as follows:

Column name Column type
id VARCHAR
creationTime TIMESTAMP
creationTimeOffset INTEGER
creationTimeWithOffset TIMESTAMP
time TIMESTAMP
timeOffset INTEGER
timeWithOffset TIMESTAMP
source VARCHAR
type VARCHAR
tagname VARCHAR
value VARCHAR
unit VARCHAR

Example mapping

The following excerpt of a measurement document in the base collection

{
    ...
    "source": "857",
    "type": "Temperature",
    ...
     "c8y_Temperature": {
         "T": {
             "unit": "C",
             "value": 2.0791169082
         }
     }
}
...
{
    ...
    "source": "311",
    "type": "Pressure",
    ...
     "c8y_Pressure": {
         "P": {
             "unit": "kPa",
             "value": 98.0665
         }
     }
}

is represented in the target table in the data lake as

type tagname unit value
Temperature 857.c8y_TemperatureMeasurement.T C 2.0791169082
Pressure 311.c8y_PressureMeasurement.P kPa 98.0665

For more details on the TrendMiner/DataHub interaction see also Integrating DataHub with TrendMiner.

Monitoring offloading jobs

Once you have configured and started your offloading pipelines, they regularly offload data to the data lake. DataHub provides insights into the execution status of the different pipelines so that you can investigate whether everything is running as expected.

You can either examine the corresponding latest execution for all configured pipelines or examine the execution history for a specific pipeline.

Info: You need administration or management permissions to monitor the offloading jobs. See section Defining DataHub permissions and roles for details.

Status of all offloading jobs

In the navigator, select Status and then Offloading to get an overview of the latest status of all pipelines. The list shows the corresponding last execution for all pipelines. Each execution consists of the following details:

Component Description
Status icon The status of the execution, which is either running, successful, or error
Execution mode icon The type of execution, which is either scheduled (calendar icon) or manual (spot icon)
Job name The name of the pipeline job
Execution time The point in time the execution was started
Runtime (s) The runtime of the execution in seconds
Next execution time The point in time for which the next execution is scheduled; for a manual execution it is empty
# Records The number of records which have been offloaded during this execution, indicated as number in a blue circle

Click Reload to refresh the status being shown.

You can filter the entries by their task name or their status by using the filter controls in the action bar.

History per offloading job

If you want to examine the execution history for a particular job, select Offloading in the navigation bar and select the offloading job you are interested in.

Click Show offloading history in the context menu of the offloading card to show the history of offloading executions.

The list shows the execution history, with each execution consisting of the following details:

Component Description
Status icon The status of the execution, which is either running, successful, or error
Execution mode icon The type of execution, which is either scheduled (calendar icon) or manual (spot icon)
Job name The name of the pipeline
Execution time The point in time the execution was started
Runtime (s) The runtime of the execution in seconds
Next execution time The point in time for which the next execution is scheduled, provided offloading is activated; for a manual execution it is empty
# Records The number of records which have been offloaded during this execution, indicated as number in a blue circle

The system is configured to keep a limited history of the last job executions.

Click Reload to refresh the list.

You can filter the entries by their status by using the filter control at the top.

Details of offloading job

For a given offloading job, you can examine additional details of its execution.

Info: You need administration permissions to access the job details.

Select a job overview in the history per offloading job or in the status of all offloading jobs. In the corresponding list of jobs click on the job you are specifically interested in. A details view encompasses the following information:

Execution schedule

Component Description
Runtime (s) The runtime of the execution in seconds
Execution mode The mode of the execution, which is either manual or scheduled
Execution time The point in time the execution was started
Scheduled execution time The point in time for which the execution was scheduled
Previous execution time The point in time the previous execution was started; for a manual execution it is empty
Next execution time The point in time for which the next execution is scheduled, provided offloading is activated; for a manual execution it is empty

Results

Component Description
Records The number of records which have been offloaded during this execution

Job details

Component Description
Job name The name of the pipeline
Job id The internal ID of the job
Job execution id The Dremio ID of this execution
Source collection The name of the Cumulocity IoT base collection
Target table The folder name in the data lake
Target folder The path to the target table in the data lake
Last watermark The last watermark which indicates the data in the Cumulocity IoT collection that has already been processed

Offloading results

During offloading Dremio organizes the data in newly created files within the data lake, following a temporal folder hierarchy. For each of those files the following information is provided:

Component Description
File size The size of the file
Fragment The hierarchical ID of the fragment
Partition The partition with which the file is associated
Path The path to the file in the data lake
Records The number of records stored in the file

Monitoring compaction jobs

During offloading, data from the Operational Store of Cumulocity IoT is written into files in the data lake. In order to ensure a compact physical layout of those files, DataHub automatically runs periodic compaction jobs in the background. For each offloading pipeline, a corresponding compaction job is set up and scheduled.

You can examine the latest compaction job for all offloading pipelines or examine the compaction job history for a specific pipeline.

Info: You need administration permissions to access the compaction job histories. See section Defining DataHub permissions and roles for details.

Status of all compaction jobs

In the navigator, select Compaction under Status to get an overview of the latest status of the compaction jobs for each pipeline. The list shows the corresponding last compaction job for all pipelines. Each compaction consists of the following details:

Component Description
Status icon The status of the execution, which is either running, successful, or failed
Job name The name of the pipeline the compaction job is associated with
Execution time The point in time the execution was started
Runtime (s) The runtime of the execution in seconds
Next execution time The point in time for which the next execution is scheduled

Click Reload to refresh the status being shown.

You can filter the entries by their task name or their status by using the filter controls in the action bar.

History of compactions per offloading pipeline

If you want to examine the compaction history for a particular offloading pipeline, select Offloading in the navigation bar and select the offloading job you are interested in.

Click Show compaction history in the context menu of the offloading card to show the compaction history.

The list shows the execution history with each execution consisting of the following details:

Component Description
Status icon The status of the execution, which is either running, successful, or failed
Job name The name of the pipeline the compaction job is associated with
Execution time The point in time the execution was started
Runtime (s) The runtime of the execution in seconds
Next execution time The point in time for which the next execution is scheduled

The system is configured to keep a limited history of the last compaction jobs.

Click Reload to refresh the list.

You can filter the entries by their status by using the filter control at the top.

Details of compaction job

For a given compaction job, you can examine additional details of its execution.

Select a compaction job overview in the compaction history per offloading job or in the status of all compaction jobs. In the corresponding list of jobs click on the job you are specifically interested in. A details view encompasses the following information:

Execution schedule

Component Description
Runtime (s) The runtime of the execution in seconds
Execution time The point in time the execution was started
Scheduled execution time The point in time for which the execution was scheduled
Next execution time The point in time for which the next execution is scheduled

Job details

Component Description
Job name The name of the pipeline
Job ID The internal ID of the job
Job execution ID The Dremio ID of this execution
Target table name The folder name in the data lake
Target folder The path to the target table in the data lake
Daily run Indicates whether the job is a daily execution job
Monthly run Indicates whether the job is is a monthly execution job

Daily compaction results

During daily compaction the files are merged, following a temporal hierarchy. As a result, a folder for each day of the month is built with one or more file(s) combining all values measured for this day. For each of those files the following information is provided:

Component Description
File size The size of the file
Fragment The hierarchical ID of the fragment
Partition The partition with which the file is associated
Path The path to the file in the data lake
Records The number of records stored in the file

Monthly compaction results

During monthly compaction the files are merged, following a temporal hierarchy. As a result, a folder for each month is built with one or more file(s) combining all values measured for this month. For each of those files the following information is provided:

Component Description
File size The size of the file
Fragment The hierarchical ID of the fragment
Partition The partition with which the file is associated
Path The path to the file in the data lake
Records The number of records stored in the file

Querying offloaded Cumulocity IoT data

Cumulocity IoT DataHub offers an SQL interface so that you can efficiently query offloaded (device) data and leverage the results in your own applications. A prerequisite for running SQL queries over device data is that you have configured and executed offloading pipelines that replicate and transform data from the Operational Store of Cumulocity IoT to the data lake.

Overview

As described in section DataHub at a glance, Cumulocity IoT DataHub manages offloading pipelines which periodically extract data from the Operational Store of Cumulocity IoT, transform the data into a relational format, and finally store it in a data lake. Instead of querying the Operational Store, you run your queries against the data lake. The distributed SQL engine Dremio provides the query interfaces to access the data lake.

Different standard interfaces exist for that purpose, namely JDBC, ODBC, and REST. In order to work with one of those interfaces, select Home in the navigation bar. Under Quick links you will find starting points for the different interfaces.

Access to data lake contents

You need a separate Dremio account to run SQL queries. The Dremio account is required to authenticate your requests when running queries against the data lake using Dremio. Contact the administrator for the Dremio account settings.

When you have established a connection to Dremio, you can run SQL queries against your tables in the data lake (to which new data is appended whenever the offloading pipeline has successfully run). The source you refer to in the query is defined by your account name and the target table you have specified in the offloading configuration. The identifier to be used as the source in a SQL query is defined as follows for the different data lake providers:

For example, if your tenantId is t47110815 and you have defined an offloading configuration to write the alarms collection to the target table JohnsAlarms in an Azure Storage account containing a file system named Dremio, then an example query would be:

SELECT * FROM t47110815DataLake.Dremio.t47110815.JohnsAlarms;

You can easily look up the paths to the tables in Dremio’s UI. Click on your data lake under “Sources” at the left, then navigate to the table in the right canvas. When you hover over the table name, a small “copy” icon with the tool tip “Copy Path” will appear right of the table name. Clicking on it will copy the table name into your clipboard.

Info: The offloading pipeline has to be executed at least once with corresponding data being offloaded before you can run a query.

Connecting via JDBC

If you have a Java client, you can use JDBC to run SQL queries against the data lake. You have to download the Dremio JDBC driver. You can obtain the JDBC connection string and the required driver version from DataHub by clicking the JDBC icon in the Quick links section of the Home page. When setting up your JDBC client use as username and password the credentials from your Dremio account.

Connecting via ODBC

If you want to use an ODBC client to run SQL queries against the data lake, you must configure the platform-specific driver, following the associated Dremio installation instructions. To obtain the ODBC connection string, click the ODBC icon in the Quick links section of the Home page. When setting up your ODBC client use as username and password the credentials from your Dremio account.

Connecting via Dremio REST API

Dremio offers a SQL REST API which you can use to run SQL queries against tables in the data lake. You need to authenticate with your Dremio account against Dremio in order to use the API.

Note that the API might change any time and Software AG does not provide any guarantees. Dremio does not send any CORS headers, so direct access from a browser-based application is not possible. It is highly recommended to use DataHub’s REST API, see below.

Connecting via DataHub REST API

The DataHub server also can handle REST requests for Dremio query processing, serving as a proxy to Dremio. DataHub offers two REST APIs for running queries against Dremio. The standard REST API for small to moderate query result sizes and a high-performance REST API for large query result sizes. See the DataHub REST API documentation on the Cumulocity IoT OpenAPI documentation page for details on the endpoints. When using this API, you authenticate with your Cumulocity IoT account, not with your Dremio account.

Connecting other clients

Dremio offers support for connecting a variety of clients, including reporting tools like PowerBI and common analytics languages like Python. The Dremio documentation discusses how to connect these clients to Dremio and leverage its query capabilities.

See also Integrating DataHub with TrendMiner to learn how other Software AG products can connect to DataHub and leverage its query capabilities.

Refining offloaded Cumulocity IoT data

In addition to SQL querying using standard interfaces, you can utilize Dremio functionality to further refine and curate your (device) data.

For a detailed description of all functionalities Dremio provides you can consult the Dremio documentation.

Accessing and logging into Dremio

You access Dremio via a web browser. It has been tested with the following web browsers:

Info: Support for mobile devices like smartphones or tablets has not been tested.

To access Dremio, navigate to the home page. Under Quick links click on the Dremio icon. This will direct you to the Login screen of Dremio.

Info: Your Dremio user does not have administration rights in Dremio.

How to log into Dremio

On the Login screen, enter your Dremio account credentials. Click Login to enter Dremio.

When you log in successfully, you will be taken to the home page of Dremio.

When you want to log out, click on your username at the right of the top bar and then select Log out.

Sources and spaces

On the home page of Dremio you will find at the left under Datasets two panels called Spaces and Sources.

Sources

In the Sources panel there is the data source YourTenantIdDataLake. This source has been auto-configured for you and points to your data lake.

Info: Terminology-wise, Cumulocity IoT DataHub replicates data from the Operational Store of Cumulocity IoT into the data lake. For Dremio the data lake and its target tables is a data source as it allows reading data from it.

When you click on your data source it will be shown in the main panel. Clicking on the source in the main panel navigates into the data source. Here, you see a list of all target tables of your offloading pipelines. Clicking one of these target tables opens an SQL editor which allows you to run queries against that target table.

Info: You might also see a folder named c8y_cdh_temp. The folder is used for DataHub-internal purposes and must not be deleted or altered.

Spaces

A space in Dremio helps in organizing your data sets. Cumulocity IoT DataHub auto-configures a space which is named YourTenantIdSpace, e.g. t47110815Space. A dataset in the space is referred to in queries as YourTenantIdSpace.YourDataset. As described in section Configuring offloading jobs, for the inventory, events, and alarms collections there is a pair of preconfigured views providing either all or latest data.

Job history

The Job History tab at the top of the screen displays jobs/queries you have executed. It allows you to view details of a job and offers filter capabilities (time range, job status, query type, and queue). The Profile view inside the job detail view is very useful to investigate optimization potentials in your queries.

Info: The job history only contains queries that you actively run; the jobs related to the data extraction are hidden.

Creating views

With Cumulocity IoT DataHub, you can replicate data from a Cumulocity IoT collection to a data lake using a default transformation of the data. As requirements for subsequent data analysis of the offloaded device data may vary over time, you should configure your offloading pipeline so that all potentially relevant data is included.

Depending on your use cases, you will often find the need to provide a view on the data, which limits, filters, or transforms the data, e.g. converting Celsius to Fahrenheit or extracting data from JSON fragments.

In Dremio, you can create such a view by defining a corresponding query and saving it as a new dataset. When saving that new dataset, you must select your space as the location and can freely choose a name for the view. Once that is done, you can work with the new dataset as with any other source and run queries against it. This includes in particular querying this view from other clients as described in section Querying offloaded Cumulocity IoT Data.

Info: Such a view is per default not materialized, i.e., it is not stored persistently. Each time you query the view, the underlying query defining the view is run against the source data.

Example

Consider the case that you want to visualize data in a reporting tool. The raw data volume is too high, so you want to instead show the hourly average of the column myValue. You can easily do that by creating a view with the following SQL statement and saving it as a view/virtual data set:

SELECT DATE_TRUNC('HOUR', "time") as "time", avg(myValue) as hourlyAvg
FROM myTable
GROUP BY DATE_TRUNC('HOUR', "time")

The creation (and update) of views can be done via the Dremio SQL API, too. This is especially useful to automate tasks. The above example can be created or updated as follows.

CREATE OR REPLACE VDS YourTenantIdSpace.YourDesiredViewName AS
  SELECT DATE_TRUNC('HOUR', "time") as "time", avg(myValue) as hourlyAvg
  FROM myTable
  GROUP BY DATE_TRUNC('HOUR', "time")

Joining tables/views

Views you have defined and target tables from your data lake can be joined as well. In Dremio you can either define joins using the SQL editor or a graphical interface.

A general use case for joining is to enrich your alarms, events, or measurement values with metadata from the inventory collection, e.g.

SELECT *
FROM t47110815DataLake.Dremio.t47110815.alarms
JOIN t47110815DataLake.Dremio.t47110815.inventory
USING(id)

DataHub best practices

Learn from well-established usage patterns in order to ensure a robust and scalable processing of your SQL queries.

Naming policies

When defining an offloading configuration, you have to specify the task name, the target table, and a description. You should ensure that you provide reasonable names for each of these settings so that afterwards you can easily find the offloading pipelines you are interested in. A reasonable naming scheme also facilitates writing queries.

Also when defining an offloading configuration, you must always define a target table that is unique among the currently saved configurations. You should not re-use a target table from an old offloading configuration which was deleted in the meantime. Otherwise, you might run into the problem that your target table consists of data from multiple configurations with potentially different schemas.

Careful definition of additional columns and filter predicate

An offloading configuration allows you to specify additional columns to be offloaded as well as filter predicates for filtering the data. For both settings, you should carefully think about which data you actually need for your processing. Data being filtered out cannot be retrieved any more. Even if you adapt the filter predicate afterwards, the data which would have qualified in previous offloading executions will not be offloaded. You can, however, stop an offloading, change the configuration to include additional fields, etc., and then restart it. When it is restarted, DataHub will ask you whether you want to flush existing data or append. Flushing will delete all data in the data lake so that with the next offloading execution the complete collection will be offloaded again. Note that DataHub can only import data that is still present in the Operational Store of Cumulocity IoT, i.e., be careful with this option and keep in mind that data retention policies in Cumulocity IoT might have deleted data.

On the other side, data which will definitely be irrelevant for further analysis should not be included in the offloading process.

Decomposition of complex additional columns

You may have stored complex data using JSON fragments as an additional column in a Cumulocity IoT collection. As described in section Configuring offloading jobs, you can add additional columns so that they are included in the offloading process. Within these additional columns, you can apply functions to decompose the fragments into flat data.

Writing these functions is often an iterative process that requires multiple adaptations of the underlying logic. Leverage the Dremio SQL editor and define a dummy offloading configuration which moves a small portion of the data into the data lake for testing purposes. You can use the filter predicate to retrieve such a portion of the data; see below for time filter examples. Then you can open the table created by the offloading configuration with Dremio; using Dremio’s SQL editor, you can develop the extraction logic. Once your decomposition logic for your additional columns is complete, you can copy the column transformations and use them to define a corresponding offloading configuration in DataHub. Once that is done, the dummy offloading pipeline can be deleted.

Examples for time filters

Depending on the collection, you can use different time filters. All collections support creationTime which represents the timestamp when the entity was persisted by the platform (UTC timezone). Mutable entities (alarms, events, inventory) also support lastUpdated which is the timestamp when the entity was last changed (UTC timezone). time is the application timestamp written by the client; it is supported in alarms, events, and measurements.

The below time filters are examples only; you can use much more complex or simpler combinations with a mixture of AND/OR-connected conditions.

Alarms/events

To offload all alarms or events which have the application time set to between 2020-02-08 14:00:00.000 and 2020-02-08 15:00:00.000, use:

src."time"."date" >= {ts '2020-02-08 14:00:00.000'} AND
src."time"."date" <= {ts '2020-02-08 15:00:00.000'}

To offload all alarms or events which were persisted after 2020-02-08 15:00:00.000, use:

src."creationTime"."date" > {ts '2020-02-08 15:00:00.000'}

Restricting the offloading to alarms and events last modified before 2020-02-08 14:00:00.000, use:

src."lastUpdated"."date" < {ts '2020-02-08 14:00:00.000'}
Inventory

To offload all data which was persisted between 2020-02-08 14:00:00.000 and 2020-02-08 15:00:00.000, use:

src."creationTime"."date" >= {ts '2020-02-08 14:00:00.000'} AND
src."creationTime"."date" <= {ts '2020-02-08 15:00:00.000'}

or, to offload all data that was last updated after 2020-02-08 14:00:00.000, use:

src."lastUpdated"."date" > {ts '2020-02-08 14:00:00.000'}
Measurements

To offload all data with application time between 2020-02-08 14:00:00.000 and 2020-02-08 15:00:00.000, use:

src."time"."date" >= {ts '2020-02-08 14:00:00.000'} AND
src."time"."date" <= {ts '2020-02-08 15:00:00.000'}

or, to offload all data received by the platform after 2020-02-08 14:00:00.000, use:

src."creationTime"."date" > {ts '2020-02-08 14:00:00.000'}

Querying additional data with DataHub

Main use-case of DataHub is to offload data from the internal Cumulocity IoT database to a data lake and query the data lake contents afterwards. In some use cases DataHub is required to query additional data which is not kept in the Cumulocity IoT database. For a cloud environment, the additional data must be provided as Parquet files and must be located in the specific bucket of the data lake as configured in the offloading settings of DataHub. The Parquet files must not be in folders that are used as target tables for offloadings as this would corrupt query processing of DataHub. The Parquet files themselves must be compliant with the [https://docs.dremio.com/data-formats/parquet-files/](Dremio limitations for Parquet files).

For a dedicated environment, the additional data can be located somewhere else, provided it can be accessed via Dremio, e.g. in a relational database. For performance and cost reasons, however, data and processing should always be colocated.