Introduction

About TSorage

TSorage is a software solution developed by the CETIC for collecting, storing, processing and presenting time series at scale. It is intended primarily for use by companies looking for a reliable, fast, and cost-effective technologies for deploying time series based applications.

The emphasis of TSorage is on proposing services for managing time series of any type with millisecond resolution timestamp and as few artificial constraints as possible. Among others, TSorage is designed with the following principles in mind.

Availability and Scalability First. Getting a high end infrastructure is a simple way to ensure the reliability of a service, up to the moment its needs start growing, leading to uncontrolled cost increasing. On the other hand, TSorage relies on commodity hardware to ensure a scalable and resilient service. By natively being a distributed and decentralized solution, its capabilities are extended by simply adding more resources on a TSorage cluster. When deployed in a cross-site fashion, TSorage offers local read and write performances while transparently supporting worldwide replication and synchronization. When a site recovers from a connection failure, it will automatically resynchronize with the other sites of the group.

Limitation of vendor locking. Technology evolves extremely quickly. This is especially true when it comes to the (I)IoT domain, where new ways to manage and exploit sensors emerge every year. In order to mitigate the risk to make technological choices that turn out to be inappropriate in the future, TSorage is made of independent modules based on company baked, open source technologies. With such an approach, it’s easier to update the platform when a new technology supersedes the existing one. All TSorage services are available through a REST API, which offers a standardized way to abstract the underlying technologies. It also fosters the integration with any data source or consumer, making TSorage a platform of choice for your IoT applications.

Flexible type support. Most sensors capture a continuous signal, such as a temperature or a pressure. However, time series also cover much more data types, such as geographic positions, trade transactions, and virtually any repetitive event. TSorage provides a builtin support for the commonly used data types, and is designed to be easily extended in order to support your specific data types. If you can represent your measurements as JSON objects, TSorage can handle them.

Scales easily and predictably. Adding a new data source (such as a sensor) should be as simple and as fast as possible. In the IoT world, a long and painstaking procedure for extending the sensor coverage would stifle any innovation. Simply start feeding TSorage with a new data stream, and administrate it in a second time, either through a dedicated Web application or programmatically. Each value can be submitted with arbitrary properties (they are called tags in the TSorage terminology) that help querying and managing data sources more efficiently. Ultimately, the users no longer refer to a unique source id, but query, compare, and aggregate sources based on their tags.

As you integrate more and more data sources to the solution, your processing needs increase. TSorage relies on an elastic architecture that efficiently exploits your infrastructure resources. Start with a small number of containerized services, and extend them at will simply by running more worker nodes.

Fits in your infrastructure, ready for the Cloud. When TSorage is used for managing sensible data, on premise deployment may be preferred over using a remote hosting solution. For other use cases, a deployment on a public or private Cloud is a better option. In either case, TSorage is provided with deployment and monitoring scripts that reduce the burden of deploying and maintaining the solution.

Principles

The most fundamental principle of TSorage is the concept of metric. A metric is an abstract entity that is attached with chronologically ordered measurements. Each measurement is also called a data point or an observation.

In TSorage, all the data points belonging to a metric represent the same physical, digital or logical phenomenon, and therefore typically have the same data type (although this is not a technical limitation of the platform). The data type of an observation determines how TSorage stores and presents it, as well as the transformations this observation can be subject to.

Tags are properties associated with data points. Their purpose is to help the user understanding the meaning of a particular observation, or querying data points having a particular meaning. Concretely, a tag is an arbitrary textual text (the key), associated with an arbitrary textual value (the value). Unsurprisingly, the set of tags associated with a data point is called its tagset. One distinguishes dynamic tags that are directly attached to a data point, from static tags that are attached to a metric and are automatically inherited to all its data points.

In TSorage, the combination of a metric with a tagset is called a time series. There are two typical ways to use the tagsets. According to the first approach, a metric identifies a sensor (in the broad sens of the term), while tags clarify the status of the sensor (name of the manufacturer, geographical position, etc.) or some of the generated data points (data quality, operational condition, etc.)

With respect to the second approach, the metric refers to a property of interest (CPU usage, for instance), while the tags refer to the item at the origin of the data points (server number 5, for instance).

While mixing the approaches is technically possible, we recommend to choose one of them and stick with it.

We recommend the use of tagsets for enforcing a semantic series policy. Instead of placing information in the metric’s name, using some enterprise specific conventions that are tryingly respected and therefore suffer many exceptions, the metric’s name could be almost meaningless, and the semantic should be specified by using tagsets.

Please note that such a policy does not prevent the user from organizing the metrics in a hierarchic way, since tag keys can be used for defining the hierarchical levels of organization. See Tag Management.

Time series are organized hierarchically, since data points and aggregated values from different time series having the same metric can be combined in order to produce a more generic time series.

For instance, consider we have three time series for the metric my-temperature, having the following tagsets:

  • {"unit": "celcius"}
  • {"unit": "celcius", "quality": "good"}
  • {"unit": "celcius", "quality": "bad"}

When a data query is submitted to the system, asking for values belonging to my-temperature with an empty tagset, these three time series will be extracted and transparently merged in order to present all the relevant values. In other words, both values with a good quality, a bad quality, or no quality specified will be taken into account.

Despite the fact the technologies involved in TSorage are quite efficient and scale pretty well, be aware extracting and merging multiple time series may be resource consuming. Therefore, the number of time series that must be merged in order to satisfy a data query should remain reasonable in order to keep the pressure on the database at an acceptable level.

Message

Exchanging one data point at a time would be highly inefficient, in particular when a request-response protocol is used for communicating. In order to provide better performances, TSorage communications are based on the concept of message. A message is essentially a set of data points that relate to the same time series. In other words, messages are a way to submit multiple data points at once, while only submitting the metric name, the dynamic tagset, and the data type once per message.

Each message must contain to the following elements:

  • metric, the metric id for which new data points are provided.
  • tagset, the dynamic tagset associated with all the data points described in the message.
  • type, the type of all the data points described in the message. While using the same type of all data point relating to a metric is generally considered as a good practice, the type associated with a metric (or a time series) can change from a message to another.
  • values, a list of data points. Each data point is a list of two elements. The first element is a representation of the timestamp associated with the data point, while the second element is the data point value.

From a technical point of view, a message can represented by a JSON object, or a Protobuf message (using the proto3 language). The representation format actually used by the components depends on their configuration.

JSON Format

The JSON representation of a TSorage message must comply with the following JSON schema:

{
  "$id": "be.cetic.tsorage.messageschema.json",
  "type": "object",
  "properties": {
    "metric": {
      "type": "string"
    },
    "tagset": {
      "type": "object",
      "additionalProperties": {
        "type": "string"
      }
    },
    "type": {
      "type": "string"
    },
    "values": {
      "type": "array",
      "items": [
        {
          "type": "array",
          "items": [
            {
              "type": "string",
          "pattern": "^(-?(?:[1-9][0-9]*)?[0-9]{4})-(1[0-2]|0[1-9])-(3[01]|0[1-9]|[12][0-9])T(2[0-3]|[01][0-9]):([0-5][0-9]):([0-5][0-9])(\\.[0-9]+)?(\\.([0-9]){1,3})?$"
            },
            {}
          ]
        }
      ]
    }
  },
  "required": [
    "metric",
    "type",
    "values"
  ]
}

As described in this schema, the tagset attribute must be a dictionnary of strings. Each value (that represents a data point) is an array containing the timestamp and the value of the data point, in that order. The timestamp is represented by a string with the ISO 8601 format.

The value itself may be any valid JSON object. Its actual schema depends on the specified data type. There are several builtin data types proposed by TSorage, and arbitrarily complex extra data types can be added at will.

The snippet below is an example of valid message described using the JSON format.

{
  "metric": "my-temperature-sensor",
  "tagset": {
    "quality": "good",
    "owner": "myself"
  },
  "type": "tdouble",
  "values": [
    [ "2020-01-02T03:04:05.678", 42.1337 ],
    [ "2020-01-02T03:04:06.123", 654.72 ]

  ]
}

Protobuf Format

In order to further improve message exchanges, a binary encoding alternative to JSON is proposed, in the form of a Protocol Buffer (a.k.a Protobuf). Messages encoded that way must comply with the following Protobuf structure definition :

syntax = "proto3";

package  com.google.protobuf.util;
option java_package = "com.google.protobuf.util";

import "google/protobuf/timestamp.proto";

message MessagePB {

    message Value
    {
         .google.protobuf.Timestamp datetime = 1;
         bytes payload = 2;
    }

    string metric = 1;
    map<string,string> tagset = 2;
    string type = 3;
    repeated Value values = 4;
}

The payload is encoded as a byte array, the structure of which depends on the specified data type.

In the general case, the payload (which is internally represented as a JSON value) is converted into its (compact) textual representation. The UTF8 encoding of this text is then used as the payload attribute. The use of UTF8 allows the simple representation of textual contents with non-latin characters. However, this encoding is generally quite inefficient, since each digit of a numeric value is represented by (at least) one byte, the name of each part of the encoded value is encoded as an UTF8 string, and some additional bytes are used for representing the structural organization of the payload.

In order to mitigate this issue, values with the tlong and tdouble data types are directly encoded as 8-byte signed long and double values (respectively), using the big endian convention. In IoT-like contexts, these data types are likely to be the most commonly used, so that the Protobuf encoding turns out to be a highly efficient message representation.

Following the same idea, values with the tbool data type are encoded as a single byte, the value of which being 0 for encoding false values, while any other value can be used for encoding true values. tbinary values are encoded using their natural binary representation.

Security Policies

TSorage components interact together in order to provide various services, and some of them are in direct contact with the external world, which includes the enterprise network and sometimes the whole Internet. Depending on their exposure to unreliable systems and the risk of attacks by malicious actors, several security policies can be implemented.

Anonymous Policy

The anonymous security policy allows component communications without any protection. No authentication is required, and the data flow is transmitted unencrypted. This policy should only be used for testing and debug purposes.

The following parameters must be specified:

  • type must be set to anonymous.

Example of Anonymous policy configuration:

{
    type = "anonymous"
}

Password Policy

This policy will enforces the security by submitting a login and a password during the initialisation of a communication session. Only authorized users are allowed to share data points. Both the data flow and the account parameters are transmitted unencrypted. This policy offers an interesting trade-off between security and the ease of use.

The following parameters must be specified:

  • type must be set to password.
  • login, the user’s login.
  • password, the password associated with the specified login.

Example of Password policy configuration:

{
    type = "password"
    login = "my-login"
    password = "secret-password"
}

SSL Policy

This policy is currently not supported.

Features

Powerful and Extensible Data Type Support

The following data types are currently supported out of the box:
  • tdouble, that represents a signed decimal number.
  • tlong, that represents a signed integer value.
  • tbool, that represents a boolean value.
  • ttext, that represents a textual value.
  • tbinary, that represents a binary blob.
  • pos2d, that represents a geographical coordinate.

Because each project is unique, TSorage can be extended in order to support additional data types.

Tagging

Tags are arbitrary pairs of (key, value) that are used for labelling individual or groups of observations. Systematically labelling improves the semantic knowledge about the stored observations, which facilitates their manual and automatic manipulation. Instead of using a hardly enforceable nomenclature for specifying the meaning of a metric, we recommend to use almost meaningless (from a human being point of view) metric names, and to use tags for specifying its semantics.

Automatic Rollup

Rollups consist in aggregating the data points of a time series corresponding to a specific time period. A rollup facilitates the manipulation of large time series while absolving the user from calculating potentially complex operations on raw data points.

For instance, plotting ten years worth of history, for a time series having one data point per minute, means extracting and displaying more than 5 millions data points, which represents unreasonable processing costs and latencies. A daily rollup would reduce the number of data points to be processed and displayed to approximately 3600, which is a much more sensible number.

In the Processing Layer, incoming data points are processed at real time for systematically calculating and storing aggregated values.

Derivators

The processing layer consumes messages submitted to the raw Kafka topic by the Ingestion layer in order to populate the persistence database with both raw and temporally aggregated values. A temporal aggregation consists in summarizing multiple observations belonging to the same time period into higher-level values (typically a single one), for a given time period. In TSorage, operations for temporal aggregations are named derivators.

Some built-in derivators are installed by default:

  • all data types:
    • count: Counts the number of data points during the period.
    • first: Takes the first data point, in order of time, during the period
    • last: Takes the last data point, in order of time, during the period
  • tlong (integer numbers) and tdouble (real numbers):
    • min: Takes the minimal value observed during the period.
    • max: Takes the maximal value observed during the period.
    • sum: Takes the sum of all values observed during the period.
    • s_sum: Takes the sum of the squared values observed during the period.
  • tbool (boolean values):
    • true_count: Counts the number of boolean values being true.

From these derivators, higher-level properties can be calculated. For instance, the mean value can be calculated as the sum divided by count. The variance of a time series can be calculated from its s_sum.

In the processing layer, business-specific derivators can be specified in order to meet the final user’s needs. For the moment, the only way the specify these derivators is by editing the source code, which may be a bit tedious. Editing derivators from outside the application is a planned feature for an upcoming version of TSorage.

Time Aggregators

A part of the configuration file associated with the processing layer describes the successive time periods that must be considered when performing prepared aggregations. More precisely, a (potentially empty) sequence of time durations (also known as time aggregators) is set in the configuration file, and used by the processing layer every time a data point is added to the system.

For instance, if the sequence [1m,1h] is set in the configuration file, raw data points will be converted by buckets of one minute, then by buckets of one hour.

Currently, the following time aggregators are supported:

  • 1m: one minute
  • 1h: one hour
  • 1d: one day
  • 1mo: one month

Aggregators must be specified by increasing period duration.

Aggregations

The processing layer consumes messages published to the raw topic, and transforms them into raw observations. These observations are stored unaltered in the persistence database. After that, aggregations are performed according to the following simplified process:

  1. The time period, corresponding to the first time aggregator applied to an added observation, is calculated.
  2. All the data points stored in the time series database, that belong to the same time series and have a timestamp belonging to the calculated time period, are retrieved.
  3. The data points are aggregated by applying all the derivators that comply with the data type of the time series.
  4. Aggregated values are stored in the time series database, and are further aggregated by applying derivators with the next time aggregator.
  5. Step (4) is repeated until all time aggregators have been used.

Because only composable derivators are carried out, follow-up aggregations can be calculated based on time periods corresponding to a previous time aggregator, instead of retrieving all the raw data points covering each aggregator. This means less pressure to the persistence database and the processing component.

Arbitrary Recording Frequency

TSorage supports timestamped observations with a millisecond resolution. If multiple observations coming from the same time series are ingested with the same timestamp, only one of them will be persisted.

No recording frequency is prescribed. No periodic ingestion is expected among the observations of a time series, nor between distinct time series.

Unbounded Ingestion Window

A limitation of most time series databases (including industrial solutions for data historisation) is the existence of an ingestion window. Any incoming measurement having a timestamp preceding the window limit will be ignored. One of the main reasons for this limitation is the fact it allows a definitive compression of the cold data points, using a more efficient approach than the one adopted for storing hot data points.

When existing, the size of the ingestion window can generally be tuned in order to dynamics the specificities of the monitored system.

While such an ingestion window can make sense in some cases, its size always represents a tradeoff between various technical constraints that are meaningless from a business or operational point of view. It prevents from data ingestion from remote data sources with only occasional access to the storage solution, and makes the ingestion of historical data sets quite challenging.

By default, TSorage uses an unbounded ingestion window, and accepts arbitrarily late observations. Data files receiving late data points are automatically reorganized in order to always store them efficiently. For archiving or advanced analytics purposes, an even more efficient, offline, representation of the time series can be exported from the time series database.

Archiving

Data Querying

Real Time Monitoring

Data Quality Monitoring

Alerting

Architecture

Overview

The TSorage project is based on a modular architecture, with all modules being designed to be executed in distinct Docker containers. This makes TSorage a portable solution, with simple and standardized deployment steps. It also offers the possibility to place the components on different physical and virtual machines, making it available on a wide range of platforms and services.

Furthermore, (re)sizing a containerized architecture is easier, since a component can be moved to a platform offering more resources. Under certain conditions, containers can be duplicated in order to increase the performances of the underlying modules.

The figure below provides an overview of the TSorage architecture.

Architecture Overview

Architecture Overview

Time Series processing starts with the Gateway layer, that contains adhoc components for listening, collecting or extracting time series values from various data sources.

The Ingestion layer is the entry point for time series values. From this point, the described entities are considered as internal TSorage components, which basically means they are managed by the TSorage cluster. The Ingestion layer is made of different interface modules, each of them providing a specific way for a data source to submit new time series values.

Gateway Layer

The Gateway layer is made of remote components (a.k.a collection agents) that are not necessarily managed by a TSorage cluster. These components can be the result of third-party developments, and can can be deployed close to a data source for extracting data points from sensors, databases, spreadsheets, etc. An agent typically focuses on specific data sources, and is deployed near them for avoiding security or accessibility issues. Besides collecting data points, the purpose of an agent is to submit them to a component of the Ingestion layer.

The gateway consists of three services, namely the Collector, the Buffer, and the Transmitter.

As its name suggests, the Collector collects time series values either by querying the data source, observing its environment, or receiving data points from external data sources. It comes with extensions that support the connection from and to various types of data sources, including relational databases, Modbus networks, OPC servers, etc. Each extension interacts with its environment for actually collecting the data points. Using the Collector therefore consists in appropriately choosing and configuring the extensions that must be run in order to collect new observations. Concretely, the Collector takes place in the form of the Collector Docker image, and the running extensions are chosen by providing a specific configuration file when this image is executed.

The Buffer temporarily stores the data points collected by the Collector. The goal of this operation is twofold. First, it allows a continuous data points collection, even if the Ingestion layer is temporarily not available, because of a network failure for instance. Secondly, it allows the Collector to consider the data points are processed as fast as possible, without the necessity for this module to wait for the collected data points to be acknowledged by the Ingestion. The Buffer is based on RabbitMQ, a popular and battle-tested message broker.

Finally, the Transmitter takes the data points stored in the Buffer and transmits them to the Ingestion layer. Whenever possible, this module uses a chain of acknowledgements in order to be sure a data point has been properly received by the Ingestion layer before removing it from the Buffer. Similarly to the Collector, this module takes place in the form of the Transmitter Docker image, and the extensions that actually perform the transmission are chosen by providing a specific configuration file when this image is executed.

the Collector Docker image: https://hub.docker.com/repository/docker/ceticasbl/tsorage-collector RabbitMQ: https://www.rabbitmq.com/ the Transmitter Docker image: https://hub.docker.com/repository/docker/ceticasbl/tsorage-transmitter

Ingestion Layer

Components of the Ingestion layer are responsible of accepting incoming connections from a Transmitter or an other Collector agents for ingesting data points into TSorage. While basic checks (such as submitter authentication) are performed at this point, Ingestion components are not supposed to perform data processing and try to forward the incoming data points as fast as possible. This ensures a highly effective Ingestion, with important throughput rates. Ingestion components can easily be duplicated in order to face occasional or steady increasing needs for Ingestion capacities.

Ingestion Components expect to receive messages, represented either as a JSON value, or as Protobuf payload.

The following components are parts of the Ingestion layer:

  • A REST API, that provides an HTTP(S) endpoint for submitting new values.
  • A MQTT server, that acts like a broker and ingests incoming messages.

All Ingestion components push the incoming message to a message queuing system for further processing.

Message Queuing

In TSorage, Kafka is used as a message queuing system for various purposes. Kafka organizes published recourds into topics, each of them being a category or feed name to which records are published. In TSorage, the following topics are systematically deployed:

  • raw: for incoming messages.
  • observations: for the data points represented in messages, or data points derivated by the processing layer.
  • aggregations: for the aggregated values derivated from data points by the processing layer.

Kafka is an internal component, and its topics should only be accessed by other TSorage components and advanced users.

The purpose of the raw topic is to persist incoming messages until the Processing layer consumes them. Consequently, the message queuing component should not be considered as a long-term storage solution for time series data points, but rather as a buffer that gives Processor components the time to process them.

Depending on the resources available on the underlying infrastructure, Kafka can store a few days or weeks worth of messages on disk, and will automatically delete the oldest ones when running out of space.

Depending on your deployment preferences, Kafka can be deployed on many nodes, constituting a Kafka cluster. For a production environment, we recommend to deploy at least 3 Kafka nodes, in order to offer resiliency to failure and a better workload distribution. The storage capacity of the Kafka cluster can be extended on demand by adding additional nodes to the cluster.

Processing Layer

The processing layer is instantiated by the processor component. This component is implemented with Akka Stream, and provides a modern, reactive, asynchronous, and real-time flow processing.

Global view of Processor Flow

Global view of the Processor Flow. Each named block corresponds to a Processor graph.

Processor flow -- message legend

Message legend

Tag Block

Process Flow -- Tag Block

Tag block of the Processor component.

The role of the Tag block is to keep dynamic tags indices up to date as new observations are ingested by the system. Other processing blocks send it informations about the tagsets mentioned in the ingested messages or inferred during message processing.

Message Block

Process Flow -- Message Block

Message block of the Processor component.

Messages stored in the raw Kafka topic are consumed by the Message block, which stores the data points they contain.

The dynamic tags mentioned in the messages are submitted to the Tag block for further processing.

After extracted data points are processed, they are sent to the Observation block, which can transform them into other observations and calculate aggregated values.

Finally, the Message block performs a first temporal aggregation on the incoming data points, by applying derivators on the raw observations belonging to the same time periods than these data points, according to the first temporal aggregator.

Observation Block

Process Flow -- Observation Block

Observation block of the Processor component.

The Observation block provides a way to further process observations, either by generating derived observations, or by calculating temporal aggregations.

Example of Observation Derivation

A city acquires GPS tracks of the vehicles travelling on its roads, in order to calculate some traffic statistics for its urban planning service.

On the collected data flow, each vehicle periodically emits its GPS coordinates as well as its current speed and other real time properties. From the urban planning service point of view, this information is not very useful, so observations are derived in order to feed time series that focus on road segments, instead of of particular vehicle activities.

A derivator can be added in the Observation block, that converts any raw data point by calculating the road segment corresponding to the vehicle position, and then generates a new observation mentioning the observed speed at this moment for the considered segment.

Because a first temporal aggregation has already been performed on observations corresponding to raw data points, such an aggregation is not repeated here. Derived observations may benefit from a first aggregation, though. In that cas, this first aggregation is performed in this block.

Aggregation Block

Process Flow -- Aggregation Block

Aggregation block of the Processor component.

The application of temporal derivators generates aggregated values that are collected by the Aggregation block. The following operations are then applied on these values:

  1. The aggregated values are converted into Kafka messages, that are sent to the aggregations topic. It enables other TSorage components and external tools to further process the values.
  2. Dynamic tags associated with the aggregated values are reported to the Tag block for indexing purpose.
  3. Aggregated values are stored in the time series database for later retrieval. An aggregated value can trigger the execution of follow-up, coarser time aggregators. The resulting aggregated values are sent back the entry of the Aggregation block, feeding a loop cycle that ends once all time aggregators have been processed.

Once stored, aggregated values are communicated to the Observation block, that can transform them into additional observations.

Storage

The storage of time series values and other TSorage-related informations is ensured by Cassandra. Cassandra is a fault tolerant, performant, decentralized and scalable NoSQL database.

Cassandra stores raw data points and aggregated values in distinct tables having similar schemas. On these tables, each time series is sharded according to the timestamp, as describe in Choose a Sharding. On a typical installations, that means all the data points that relate to the same metric, the same dynamic tagset, and the same day or month (depending on the sharding preferencies) are stored as a single, time-ordered data block, so that requesting the data points corresponding to a time series for a continuous period of time is blazingly fast.

On typical setups, Cassandra will be deployed as a cluster having multiple nodes for ensuring high availability and failure resiliency. For cross-site deployments, the Cassandra nodes will be geographically spread over distinct sites, so that

  1. Local data producers and consumers can use nearby Cassandra nodes, ensuring fast data access and submission.
  2. Submissions are automatically propagated to the other cluster nodes, so that local submissions can be retrieved from an other site.
  3. The database can survive the loss of an entire site.
  4. If a site is temporarily disconnected from the rest of the cluster (for instance, because of a cutting off Internet access), each site can still operate normally. Cassandra nodes will automatically resynchronize once the connection is restored.
Overview of a cross-site Cassandra cluster

Overview of a cross-site Cassandra Cluster. From M. Ben Brahim.

Hub Services

Authentication

Tag Management

Installation

Installation Schemas

All-In-One

Warning

Because it provides no redundancy, the all-in-one schema offers neither highly available services, nor resiliency to software or hardware failure.

This installation schema should only be used for demonstration or testing purpose.

The all-in-one schema consists in deploying a single instance of each module of interest in a single machine. This is a good option for giving TSorage a try, but is definitively not appropriate for a production environment since the failure of one component will result in the unavailability of the underlying services, and the failure of the machine could cause a loss of the whole system.

For this schema, Minikube is recommended over the standard Kubernetes system for the container orchestration, since it runs a single-node Kubernetes cluster inside a virtual machine and removes the burden of managing a distributed cluster.

Todo

to be continued. Provide an HELM chart here for a default all-in-one deployment?

Standard

Extended Installation

Cross-Site Installation

Todo

Each site has at least one node for collecting and processing time series, plus N nodes dedicated to Cassandra. Each site has a local replication factor of 2, and a remote RF of 1, either towards the “mother site”, or towards one of the other sites.

Technical Requirements

Sizing

Infrastructure

Software Setting

Choose a Sharding

In TSorage, time series are stored using a Cassandra database. TSorage takes advantage of Cassandra’s wide partitions for efficiently store data points. Basically, that means all the data points of a time series will be stored in the same partition (sometimes improperly named a row), in chronological order. Because a Cassandra partition can not grow indefinitely, time series are partitioned into shards, each shard covering a time period of the time series history.

The duration of a shard must be set during the installation, and should never be modified afterwards. All time series have the same shard duration. To ensure good performances when writing and reading time series, the number of data points covered by a shard should be limited to a few millions.

A sharding period of one month is often a good choice, since it leads to shards with approximately 45 000 data points or 2 700 000 data points for typical recording frequencies (one point per minute and per second, respectively). For higher frequencies, a sharding period of one day may be preferred, which leads to 86 400 000 data points per shard for a recording frequency of 1 KHz.

On Premise Deployment

Cloud Deployment

Installation Procedure

Introduction

The best way to install TSorage depends on the chosen installation schema. For testing purposes or simple deployments, various Docker images are provided. For production ready and advanced deployments, an additional configuration and management support is provided through Helm charts.

Installing Kafka

Installing Cassandra

Installing a Collector

The Collector component is intended to be installed on a server or a single computer. Contrary to the other components of the TSorage project, the collector is supposed to be as close as possible to the data sources to collect. Consequently, a typical deployment includes one or many collector components, that are deployed on an enterprise server or a single board computer, and are permanently feeding the Ingestion layer.

Time and Time Zone

The data points retrieved by a collector must be timestamped with the UTC timezone, so the conversion for display to other time zones is easier and possible. For data sources that only expose the current value of the metric of interest, it’s the collector’s responsibility to correctly timestamp any collected data point. In order to do so, this component uses the clock time and time zone specified by the underlying host. These parameters must therefore be set appropriately. When using the provided Docker image, that can be done by mounting the Linux files specifying the appropriate time settings.

Preparing the Collector

  1. Make sure the host clock is correctly set, and it will periodically be synced.
  2. Make sure the host time zone is correctly set.
  3. Create Docker Compose that describes which services must be run. The following snippet shows an example of such a file, designed for Linux based hosts.
version: '3'

services:

  collector:
    image: ceticasbl/tsorage-collector:0.1.1
    expose:
      - "5000"
    volumes:
      - LOCAL_CONFIG:/var/config:ro
      - /etc/timezone:/etc/timezone:ro
      - /etc/localtime:/etc/localtime:ro
    links:
      - buffer
    networks:
      - network

buffer:
  image: 'rabbitmq:3.6-management-alpine'
  environment:
    - RABBITMQ_DEFAULT_USER=collector
    - RABBITMQ_DEFAULT_PASS=collectorpwd
  ports:
    - "5672:5672"
    - "15672:15672"
  labels:
    be.cetic.tsorage.component: "collector"
    be.cetic.tsorage.role: "buffer"
  volumes:
    - LOCAL_RABBITMQ/etc/:/etc/rabbitmq/
    - LOCAL_RABBITMQ/data/:/var/lib/rabbitmq/
    - LOCAL_RABBITMQ/logs/:/var/log/rabbitmq/
  networks:
    - network

networks:
  network: {}

Please note that, depending on the servers that are launched by the collector for receiving incoming data points, more ports should be added to the collector` service. In the same vein, if specific devices are used by some collector modules (for instance, a serial port could be used by a Modbus RTU module), these devices must be mounted in the Docker composition.

The LOCAL_RABBITMQ directory facilitates the configuration of the RabbitMQ broker, the persistence of the bufferized data points, and the analysis of RabbitMQ logs.

  1. The behaviour of the collector is determined by a configuration file that must be passed to the Docker image when it starts. In your local configuration directory (LOCAL_CONFIG in the example above), create a directory named collector. Then, create a configuration file named collector.conf. The Buffer host must correspond to the name of the Buffer service (buffer by default), and the Buffer port must correspond to one of the ports declared in the Buffer service (5672 by default) that declares buffer as the Buffer host.

Because RabbitMQ prevents the default user (guest) from accessing the broker from an external host, and because Docker puts each service into its own host, the compliance with the password policy is mandatory in this configuration. The proposed Docker composition automatically creates a collector user having collectorpwd has its password. The snippet below shows an example of Collector configuration file that matches the proposed Docker composition.

buffer = {
    host = "buffer"
    port = 5672
    queue = "buffer"
    security = {
            type = "password"
            user = "collector"
            password = "collectorpwd"
    }
}

sources = [
    {
    type = "random"
    interval = "1 second"
    metric = [
        "sensor-1", "sensor-2", "sensor-3", "sensor-4", "sensor-5",
        "sensor-6", "sensor-7", "sensor-8", "sensor-9", "sensor-10"
    ]
    tagset = {
        "owner" = "myself",
        "quality" = "good"
    }
}

]

sinks = [
    { type = "stdout/json" }
]
  1. Run the composition:
docker-compose up
  1. Check in the Ingestion layer that data points are actually ingested. This could be done by observing the activity of the Ingestion layer. The internal activity of the collector can be observed by accessing the RabbitMQ administration application that listens on port 15672 by default.

Installing an Ingestion Component

Installing a MQTT Broker

The MQTT module of the Ingestion layer relies on a MQTT broker for receiving incoming messages. That implies such a broker must be deployed on an software environment that can be reached from both the data collector agents and the Ingestion module. We recommand Mosquitto, a popular, open source MQTT broker that comes with prepared Docker images.

Password File Preparation

When a text file is used for listing the users authorized to access a MQTT channel, this file must be prepared in order to store passwords hashed in a format similar to crypt. First, a file containing the list of logins and their corresponding unencrypted password must be set:

steve:password
jim:secret

Then, this file must be processed by mosquitto_passwd in order to hash the passwords:

mosquitto_passwd -U passwordfile

This command changes the password file, replacing the passwords by their hashes:

steve:$6$hQ3Mi32i0K6Zhz2B$y0RZN1+2HcHgDOsmUxqRJQgIDSTdw+/YJukHoSwaUCUWlwKvC6mvOO0n+ojrLlCjOxIe+CymANxKSSvrqr2vvQ==
jim:$6$g+cocox+WgDTwoGI$jMSRggTyse28JBTNHX2IeZJqyJm/KckeRr94jhEh4AyJ2ndfZdUyImmRAMINKSXmhLoqyWu7nr3OBFBPVzoY8Q==
Configuration Settings

Prepare a configuration file for describing how Mosquitto must behave. The snippet below shows a simple configuration file, that configures Mosquitto for accepting unencrypted communications from clients, and for using a password file for the authentication.

port 1883
allow_anonymous false
password_file /path/passwordfile
persistence true
persistence_location /mosquitto/data/
log_dest file /mosquitto/log/mosquitto.log
Running Mosquitto

Finally, you can run Mosquitto and refer to the configuration file:

If you are using Docker, you can mount a local configuration file to /mosquitto/config/mosquitto.conf and use it from the running Docker image:

Installing Hub Services

Authentication Micro-service

Adding Extra Data Types

Install TSorage in a Kubernetes Cluster using Helm

TL;DR;

Before installing the TSorage chart, add Helm repository of CETIC:

helm repo add cetic https://cetic.github.io/helm-charts
helm repo update

Then install the chart with a release name my-release in the tsorage namespace:

helm install my-release cetic/tsorage -n tsorage

Requirements

Kubernetes cluster 1.16+

The purpose of this documentation is not to go back over the methods of deploying a Kubernetes cluster. You will find information on this subject in the official Kubernetes documentation. However, it is important to note that TSorage has been tested and validated for Kubernetes versions 1.16, 1.17 and 1.18.

Helm 3.0.0+

There are several possible directions for installing an application in a Kubernetes cluster (YAML files, Helm, Kustomize…).

As there are a lot of configuration possibilities in TSorage, the Helm solution based on templates was chosen. As the version 2.x.x is deprecated, we will only maintain a Helm chart which will only be tested on versions of Helm from 3.0.0.

Here is a link to the documentation describing the steps for installing Helm on your machine.

The Helm TSorage chart is hosted in the CETIC repository. To add this repository in your Helm chart source you must run the following commands:

helm repo add cetic https://cetic.github.io/helm-charts
helm repo update

Persistent volume provisioner

TSorage is a tool which processes time series data and stores them in a database. In view of this last constraint, it is important that the data is persistent, ie it is not lost if the machine restarts.

The mechanism used by Kubernetes is based on the notion of persistent volume. There are a lot of ways to manage the provision of these volumes. The choice of a tool must above all rest on the architecture and the infrastructure which is considered. To support your choice, Kubernetes has written a very complete documentation about the persistent volume.

Warning

By default, Kafka and Cassandra need persistence.

Install TSorage

As explained earlier, the strategy for deploying TSorage is based on a Helm chart, itself based on the concept of template. To apply modifications to this template, just write a file in YAML format in which are placed the values to be modified in the default template.

Module Configuration

The following paragraphs explain how to configure the TSorage modules via the configuration file in YAML format. The first step is therefore to create this file; the conventions recommend calling this file values.yaml.

You will find the complete list of parameters in the TSorage Helm chart.

Gateway

Here are the default values used for the TSorage Gateway module. This module groups together the Collector and the Transmitter connected to each other by a queue message (RabbitMQ) acting as a data buffer. You can reuse these values ​​as a starting point for your custom configuration. Copy them to your values.yaml file.

gateway:
  enabled: false
  replicaCount: 1
  sources:
    - type: "flow/datadog"
      host: "0.0.0.0"
      port: 8081
  sinks:
    # - type: "stdout/json"

We can see that by default this module is not enabled.

The gateway.sources and gateway.sinks fields are both lists. The values that these elements can take are described in this documentation. However, a small translation task between the JSON format described in the documentation and the expected YAML format must be performed.

For example, Collector sources described as follows:

{
sources = [
    {
      type = "flow/http/json"
      host = "localhost"
      port = 8081
    },
    {
        type = "flow/mqtt/json"
        host = "localhost"
        port = 1883
        max_connections = 50
        buffer_size = 1000
        channel = timeseries
        security = {
            type = "password"
            login = "steve"
            password = "password"
        }
    }
  ]
}

will become:

gateway:
  sources:
    - type: "flow/http/json"
      host: "localhost"
      port: 8081
    - type: "flow/mqtt/json"
      host: "localhost"
      port: 1883
      max_connections: 50
      buffer_size: 1000
      channel: timeseries
      security:
        type: "password"
        login: "steve"
        password: "password"
Ingestion

Here are the default values used for the TSorage Ingestion module. You can reuse these values ​​as a starting point for your custom configuration. Copy them to your values.yaml file and adapt them according to your needs.

ingestion:
  enabled: true
  replicaCount: 1
  sources:
    http:
      enabled: true
      ports:
        - 8081
    mqtt:
      enabled: false
      host: 0.0.0.0
      port: 1883
      security:
        type: anonymous
    random:
      enabled: true
  service:
    type: NodePort

We can see that by default this module is enabled and only 1 instance of it is deployed. We can also observe that by default the Ingestion module expects to receive data from thse two channels:

The HTTP REST API Source.

The ingestion.sources.http.ports values is a list. You can easily add other listening ports.

ingestion:
  sources:
    http:
      ports:
        - 8082
        - 8083
        - 8084
The MQTT Source

The MQTT source is disabled by default. In addition, to keep a certain control over deployments, the following values are not configurable and have been set directly in the template.

max_connections = 50
buffer_size = 1000
channel = "inbox-json"

To see what these values correspond to, refer to documentation on the MQTT Ingestion module.

The Pre-Configured Random Source

For the development phase, a random data source is made available. It sends a random value between 0 and 1 every 5 seconds.

Here is the complete configuration of this source:

{
    type = "random"
    interval = "5 second"
    metric = ["sample-metric"]
    tagset = {"platform" = "kubernetes"}
}
Processor

Here are the default values used for the TSorage Processor module. You can reuse these values ​​as a starting point for your custom configuration. Copy them to your values.yaml file.

processor:
  enabled: true
  replicaCount: 1
  config:
    aggregators:
      - "1m"
      - "1h"
      - "1d"
    parallelism: 8
    sharder: "month"
    grouper:
      size: 1000
      duration: 10
      duration_unit: "minutes"

We can see that by default this module is enabled and only 1 instance of it is deployed.

Hub

Here are the default values used for the TSorage Hub module. You can reuse these values ​​as a starting point for your custom configuration. Copy them to your values.yaml file.

hub:
  enabled: true
  replicaCount: 1
  service:
    type: ClusterIP
    port: 8081

We can see that by default this module is enabled and only 1 instance of it is deployed. This instance is served by a Cluster IP service (only reachable by the resources located inside the cluster) on port 8081.

The processor.config field includes all available configurations for the Processor module in a key-value form. Their description can be found in the Processor configuration section of this documentation.

Kafka and Cassandra

Cassandra and Kafka are used as dependencies of the TSorage Helm chart. Therefore, they come from charts which are maintained by third parties.

To modify their configuration, we invite you to consult directly the documentation of the Cassandra and Kafka Helm charts.

It is important to note that these services will only be deployed if the modules on which they depend are enabled. For example, the Cassandra database will only be deployed if the Processor module is enabled.

Bind the Modifications

Now that the values.yaml file is ready, it is time to deploy TSorage.

The command line tool provided by Helm allows you to pass this file using the -f argument. To deploy the chart with a release name my-release in the tsorage namespace with the values.yaml file run the following command:

helm install my-release cetic/tsorage -n tsorage -f <path_of_the_values_files>/values.yaml

Check Installation

If you have installed the kubectl tools you can verify that your deployment is correctly performed using the command:

kubectl get po,svc -n tsorage

You should get the following result:

Kubernetes result

Operation

User and Access Management

Backup and Recovery

User Guide

Overview and Generalities

Generalities

Duration Literals

A duration literal represents an interval of time, which must be positive. In a configuration file, a duration literal is represented by a string, the format of which is <length><unit>, where whitespace is allowed before, between, and after the parts. length must be either a positive integer, or a positive floatting number. unit must be one of the following:

  • d, day or days
  • h, hour or hours
  • min, minute or minutes
  • s, second or seconds
  • ms, milli, millisecond or milliseconds

Collecting Time Series

Gateway Component

The Gateway component is part of the collection layer. It aims to gather data points coming from various data sources, and to transmit them to the Ingestion layer. Technically, the Gateway is made of three modules, namely the Collector, the Buffer, and the Transmitter, that take place in the form of Docker images. The behaviour of these modules is specified by editing configuration files that are communicated to the images when they are instanciated.

Collector Flow

Example of Gateway flow, involving three data sources and a connector to the HTTP REST service for ingesting time series.

Configuring the Collector Module

The configuration file of a Collector module contains two parts.

  • The buffer part details how collected data points are temporarily stored to the Buffer module before being sent to the Ingestion layer.
  • The sources part covers a list of data sources from which data points must be collected. Each type of data source is declared according to a specific configuration document.

Best Practice

While a single Collector module can handle several data sources, it’s generally a better idea to declare a single data source per Collector instance. It fosters the deployment of containers with a single responsibility, which in turn facilitates the system management and improves its elasticity.

An example of configuration document for the Collector module is listed below:

buffer = {
    host = "localhost"
    port = 5672
    queue = "buffer"
    max_ttr = 15 seconds

    security = {
        type = "password"
        user = "guest"
        password = "guest"
    }
}

sources = [
    { configuration for a RDMS connexion },
    { configuration for a Modbus connexion }
]

The buffer part of this configuration specifies how collected data points must be published to the temporary storage mechanism. Its configuration is identical to that of an AMQP source without any type specified, and contains the following parameters:

  • host [mandatory], the IP address or the domaine name of the AMQP broker used for the buffering.
  • port [mandatoy], the port that the AMQP broker is listening on. Typical values are 5672 for unencrypted communications, while 5671 is the register port when AMQP is used over TLS/SSL (AMQPS).
  • queue [mandatory]` must correspond to the queue on which the publishers publish new data points.
  • max_ttr, the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the Buffer after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • security, the security policy to apply.

The sources part of the documents describes how data points are collected from data sources. The schema of each entry of this list is specific to the considered data source. Each source is identified with the type attribute. Each type has its own configuration schema.

A source is either a poll source or a drain source.

Poll sources periodically act (typically by interacting with its environment or an external source) in order to fetch new data points. These sources are characterized by the time interval separating two consecutive poll rounds.

While the collector tries to respect the specified time interval, the actual inter-round duration between typically varies depending on unpredictable factors. If a round takes more time to complete than the specified time interval, the next round will actually start right after the current one comes to an end.

At any time, at most one round is delayed. If the delay becomes so important that two rounds or more are awaiting to start, the oldest delayed rounds are cancelled.

On the other hand, drain sources are continuously listening for incoming data points pushed by external data sources, and transmit new data points as soon as possible.

Poll Sources

All poll sources have a mandatory interval attribute, which textually represents the expected time interval between two successive poll rounds. The value of this attribute must be a duration literal.

Currently, the following poll sources are supported:

Random Poll Source

This module randomly generates double values, mostly for testing purpose. Each generated message contains a single data point associated with the current timestamp.

It accepts the following parameters:

  • type [mandatory], must be random.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • interval [optional], the time interval between two successive messages. Must be a duration literal. Default value: 1 minute.
  • metric [mandatory], a non-empty list of texts representing the potential metric names to be associated with the message. Each time a message is generated, on of the specified metrics will be randomly picked.
  • tagset [mandatory], a dictionary of (key, value) that will be added as the message tagset. Both keys and values must be textual.

The snippet below shows an example of Random source:

{
    type = "random"
    interval = "1 second"
    metric = [
        "sensor-1", "sensor-2", "sensor-3", "sensor-4", "sensor-5",
        "sensor-6", "sensor-7", "sensor-8", "sensor-9", "sensor-10"
    ]
    tagset = {
        "owner" = "myself",
        "quality" = "good"
    }
}

Test this extension by executing the steps below:

  1. Install Docker on your laptop.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  4. Download
  1. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
up --remove-orphans
  1. Docker compose shows messages from the services involved in the composition. In particular, after a few dozens of seconds, the transmitter service should print JSON objects representing valid messages to its standard output. If you can see these messages, it proofs the proper functioning of the random extension.

For performing steps 1-5, you could run the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/random.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/stdout.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
Modbus RTU Poll Source

The Modbus RTU Poll component frequently polls Modbus slaves over a RS232 network. Only once request is submitted to the network at a time.

The configuration of this source supports the following parameters:

  • type must be poll/modbus/rtu (should not be confused with poll/modbus/rtu-tcp).
  • max_ttr, the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • interval, the desired polling interval.
  • name, a human friendly name of the network.
  • max_response_bytes, the maximum number of bytes a Modbus data response can contain. Requests will be sent in order to keep the responses length under this limit. Because Modbus response frames are limited to 256 bytes, the maximum value for this parameter is 252.
  • port, the serial port on which the requests must be sent. Example: /dev/ttyS0
  • baud_rate [optional], an integer representing the baud rate. Depending on the master slaves, accepted values may include 300, 600, 1200, 2400, 4800, 9600, 19200, 38400, 57600, 115200. Default is 9600.
  • parity [optional], the parity checking mode. Supported values are no, odd, and even. Please note the Modbus specification states the even parity is required. Default is even.
  • output_coils, the list of coils in the discrete output coils (coil numbers between 1 and 9999; Modbus function 1) to be collected.
  • input_contacts, the list of discrete input contacts (coils number between 10001 and 19999; Modbus function 2) to be collected.
  • holding_registers, the list of analog holding registers (registers number between 30001 and 39999; Modbus function 3) to be collected.
  • input_registers, the list of analog output input registers (registers number between 40001 and 49999; Modbus function 4) to be collected.
  • response_timeout [optional], the period of time allowed to a slave for answering a request. Once this deadline has expired, the module no longer expect for an answer and termines the current message session. The timeout must be expressed as a duration literal. The default value is 1 second.

The coil and register tables (output_coils, input_contacts, input_registers, holding_registers) are polled in a cyclic fashion, it that order.

The items in output_coils, input_contacts, holding_registers, and input_registers are called extracts and describe the properties of Modbus registers.

The snippet below shows an example of Modbus RTU source:

{
    type = "poll/modbus/rtu"
    interval = 5 seconds
    name = "my device"
    port = "/dev/cu.usbserial-AQ00CKHN"
    baud_rate = 9600
    parity = no
    input_registers = [
        {
            unit_id = 1
            address = 1
            metric = "cetic.2.dside.temperature"
            tagset = {
                type = "temperature"
                unit = "celcius"
            }
            type = sint16
            rank = -1
        }
    ]
}
Modbus TCP Poll Source

The Modbus TCP Poll component frequently polls a Modbus TCP slave (server) for extracting values from its registers.

The configuration of this source supports the following parameters:

  • type must be poll/modbus/tcp
  • max_ttr, the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • interval, the desired polling interval.
  • name, a human friendly name of the slave.
  • max_response_bytes [optional], the maximum number of bytes a Modbus response can contain. Requests will be sent in order to keep the responses length under this limit. The total response length will be 9 + 2 * registerNumber. The supported response length depends on the slave device.
  • host, the ip address or host name of the slave.
  • port, the port on which the slave is waiting for queries. The standard value is 502.
  • output_coils, the list of coils in the discrete output coils (coil numbers between 1 and 9999; Modbus function 1) to be collected.
  • input_contacts, the list of discrete input contacts (coils number between 10001 and 19999; Modbus function 2) to be collected.
  • holding_registers, the list of analog holding registers (registers number between 30001 and 39999; Modbus function 3) to be collected.
  • input_registers, the list of analog output input registers (registers number between 40001 and 49999; Modbus function 4) to be collected.

The coil and register tables (output_coils, input_contacts, input_registers, holding_registers) are polled in a cyclic fashion, it that order.

The snippet below shows an example of coil description:

output_coils = [
    {
        address = 1234,
        metric = "door-is-closed",
        tagset = {
            asset_type = "door"
        }
    },
    {
        address = 0x04D3,
        metric = "parked-head",
        tagset = {
            asset_type = "drive head",
            computer = "14a284d5-4aa0"
        }
    }
]

The items in output_coils, input_contacts, holding_registers, and input_registers are called extracts and describe the properties of Modbus registers.

Because the Modbus TCP requests contain a transaction Id that is echoed by the responding slave, multiple requests can be submitted simultaneously (ie, without waiting the response to a request before sending the next request). Also, because the target slave is identified by its IP address, simultaneous communications can be established with multiple slaves. The Modbus TCP module adopts such behaviour, by simultaneously sending multiple requests for each of the declared source.

Modbus RTU over TCP Poll Source

Since Modbus RTU over TCP essentially consists in wrapping RTU messages in a TCP frame, this source is a mix between Modbus RTU and Modbus TCP sources.

The configuration of this source supports the following parameters:

  • type must be poll/modbus/rtu-tcp.
  • max_ttr, the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • interval, the desired polling interval.
  • name, a human friendly name of the slave.
  • max_response_bytes [optional], the maximum number of bytes a Modbus response can contain. Requests will be sent in order to keep the responses length under this limit. The total response length will be `9 + 2 * registerNumber. The supported response length depends on the slave device.
  • host, the ip address or host name of the slave.
  • port, the port on which the slave is waiting for queries. The standard value is 502.
  • output_coils, the list of coils in the discrete output coils (coil numbers between 1 and 9999; Modbus function 1) to be collected.
  • input_contacts, the list of discrete input contacts (coils number between 10001 and 19999; Modbus function 2) to be collected.
  • holding_registers, the list of analog holding registers (registers number between 30001 and 39999; Modbus function 3) to be collected.
  • input_registers, the list of analog output input registers (registers number between 40001 and 49999; Modbus function 4) to be collected.
  • response_timeout [optional], the period of time allowed to a slave for answering a request. Once this deadline has expired, the module no longer expect for an answer and termines the current message session. The timeout must be expressed as a duration literal. The default value is 1 second

The coil and register tables (output_coils, input_contacts, input_registers, holding_registers) are polled in a cyclic fashion, it that order.

The items in output_coils, input_contacts, holding_registers, and input_registers are called extracts and describe the properties of Modbus registers.

Modbus Extracts

For the Modbus RTU, Modbus RTU over TCP and Modbus TCP modules, the items in output_coils, input_contacts, holding_registers, and input_registers are called extracts and describe the properties of Modbus registers as follows:

  • unit_id, the unit Id to which the Modbus requests will be addressed in order to perform the extract. For TCP connections, this parameter is generally not used by the slave and is set to 0x00 or 0xFF by the master.
  • address, the register address. Should be a value between 0000 and 0x270E.
  • metric, the id of the metric associated with the coil.
  • tagset, a optional dictionary of the dynamic tagset to be reported with each collected value.
  • type, the type of data stored in the register. It helps the module to properly decode the register, and transmit a message in the right format. Must be one of the following value:
    • bool16, a boolean value stored in a single word. When this type is used, the position attribute must be set with an integer between 0 (lowest bit) and 15 (highest bit), as the reference to the position of the word bit to be considered as a boolean.
    • ubyte, an unsigned byte value. When this type is used, the byte attribute must be set to high or low, as the reference to the position of the byte to be considered.
    • sbyte, an signed byte value. When this type is used, the byte attribute must be set to high or low, as the reference to the position of the byte to be considered.
    • uint16, an unsigned integer value, stored as a single word.
    • uint32, an unsigned integer value, stored as two words. The register that follows the addressed one will be used to read the value.
    • sint16, a signed integer value, stored as a single word.
    • sint32, a signed integer value, stored as two words. The register that follows the addressed one will be used to read the value.
    • sfloat32, a signed floatting number, stored as two words. The register that follows the addressed one will be used to read the value.
    • enum16, a categorial value, stored as a single word. The values property of the register can be used for decoding each possible integer as a string.
    • charN, where N is a positive integer. Represent a character string stored as N words. Each word is decoded as a pair of characters, using the ASCII encoding standard.
  • byte_order, a string representing the way bytes are ordered in the register. May be LOW_BYTE_FIRST or HIGH_BYTE_FIRST. Default is HIGH_BYTE_FIRST.
  • word_order, a string representing the way words are ordered in multi-register values. May be LOW_WORD_FIRST or HIGH_WORD_FIRST. Default is HIGH_WORD_FIRST.

Additionally, some data types accept extra parameters:

  • Numeric types (ie, ubyte, sbyte, uint16, uint32, sint16, sint32, and sfloat32) accept some threshold values for data sanity checking purpose. Please note that, if the min_threshold or the max_threshold parameters are specified, and if the collected value remains behind the thresholds, this value will automatically be associated with the tag quality=good.
    • min_threshold: a numeric value, below which collected values will automatically be associated with the tag quality=bad.
    • max_threshold: a numeric value, above which collected values will automatically be associated with the tag quality=bad.
  • The enumeration type (ie, enum16) can optionally have the following extra parameters:
    • mask, which can be applied for limiting the value extraction to the bits covered by this mask. The big endian binary representation of the specified value is used for converting this value to a mask.
    • values, a mapping between integers (represented as literal integers or hexadecimal strings) and arbitrary strings.
  • Integer types (ie, ubyte, sbyte, uint16, uint32, sint16, and sint32) can optionnally have the rank parameter. This integer specifies the decimal rank of the integer unit. The integer value will be multiplied by 10^rank. For instance, if a register supplies a temperature in tenths of a degree, the rank of the extract should be set to -1 in order to get the messages expressing the temperature in degrees. If the rank is specified (with a non-zero value), the type of the resulting messages will be tdouble instead of tlong.

When the mask, rank, and values attributes are jointly used, they are applied on that order. For instance, the mask is applied on the raw register value, and then the resulting value is substituted by a text according to values.

Address and Mask Literals

Addresses and masks values can be specified as an decimal string (e.g.: 1234, or "1234"), an hexadecimal string (e.g.: 0x04D2, or "0x04D2"), or an octal string (e.g.: 01234, or "01234").

Because the (mandatory) heading zero of the octal representation may be confused with the (pointless) heading zero of the decimal representation, the octal representation should be avoided.

Register Extended Addresses

According to the Modbus specifications, the register addresses must be comprised between 0000 and 270E. Extended addresses, ie addresses from 270F to FFFF, are supported by the Modbus module, but it is up to the user to ensure such addresses are supported by the queried slave nodes.

Byte and Word Ordering

The Modbus specification doesn’t mention how a register value should be decoded. Over time, device constructors proposed contradictory conventions about the bit ordering among a word, as well as the word ordering among records combining multiple words.

Consequently, generic Modbus masters have to deal with any combination of high/low byte first, and high/low word first configurations.

HIGH_BYTE_FIRST ordering is normally used in Modbus messages, and HIGH_WORD_FIRST is more frequently encountered. Using both HIGH_BYTE_FIRST and HIGH_WORD_FIRST corresponds to the big endian representation, while using both LOW_BYTE_FIRST and LOW_WORD_FIRST corresponds to the little endian representation.

The snippet below shows an example of extract description:

holding_registers = [
    {
            address = 0x1f
            metric = "pump-temperature"
            type = sint16
            rank = 2
            mask = 0x0F
            min_threshold = -100
            max_threshold = 100
}]

Flow Sources

Flow sources are time series sources that can continuously retrieve new datapoints. They may consist in servers that permanently consume the content from incoming connections, clients that maintain a connection permanently open for receiving data points from a remote server, clients that subscribe to a publish/subscribe system, etc.

HTTP Source

By using its HTTP module, the component can deploy a HTTP service for collecting JSON formatted messages submitted by any HTTP-compliant client. For that reason, the HTTP module is probably the simplest way for an active data source to communicate data points to TSorage.

One or many JSON objects that comply with the Message schema can be POSTed for submission. Although the request payload may be practically unlimited, a good practice consists in limiting the number of datapoints contained in a single request for smoother trafic flow. In practice, a trade-off between occasional but heavyweight, and frequent but smaller requests, should be found depending on the resources available for both the data sources and the target component.

The configuration file of this source has the following parameters:

  • type [mandatory], must be flow/http/json.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [optional], the IP address or name of the server on which the service is running. Default is localhost.
  • port [optional], the port on which the service is listening for incoming connexion. Default is 8081.

Although a token based security was implemented in preliminary versions of this module, this feature is no longer available. It should be proposed again in a later release.

The snippet below shows an example of extract description:

{
    type = "flow/http/json"
    host = "localhost"
    port = 8081
}

Test this collector module by following the steps below:

  1. Install Docker on your laptop.
  2. Install curl on your laptop. This tool is installed by default on most Linux distributions. Alternatively, you can install and use an other client for transferring data using HTTP.
  3. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  4. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  5. Download
  1. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/http_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
  1. Wait a moment (30 seconds should be enough) to be sure all services finished their initialisation procedures.
  2. Run
curl  -X POST -H "Content-type: application/json" \
-d " {
  \"metric\": \"my-temperature-sensor\",
  \"tagset\": {
    \"quality\": \"good\",
    \"owner\": \"myself\"
  },
  \"type\": \"tdouble\",
  \"values\": [
    [ \"2020-01-02T03:04:05.678\", 42.1337 ],
    [ \"2020-01-02T03:04:06.123\", 654.72 ]
  ]
}" \
"http://localhost:7000/api/v1/series"
  1. Docker compose shows messages from the services involved in the composition. In particular, the transmitter service should print JSON objects representing the messages sent to the HTTP module. If you can see these messages, it proofs the proper functioning of the HTTP module.

For performing steps 1-8, you could type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/http.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/stdout.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/http_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/http_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/http_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/http_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans -d

echo "[Waiting]"
sleep 30

curl  -X POST -H "Content-type: application/json" \
    -d " {
      \"metric\": \"my-temperature-sensor\",
      \"tagset\": {
        \"quality\": \"good\",
        \"owner\": \"myself\"
      },
      \"type\": \"tdouble\",
      \"values\": [
        [ \"2020-01-02T03:04:05.678\", 42.1337 ],
        [ \"2020-01-02T03:04:06.123\", 654.72 ]
      ]
    }" \
    "http://localhost:7000/api/v1/series"

With this approach, you should get a response similar to the following:

{"status":"ok","msg":"Total messages received: 1"}

Additionally, The Docker container for the Transmitter component should show the received message with the following command:

docker logs `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.role=transmitter"`

Finally, shut the composition down after you tested it:

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/http_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down
AMQP Source

This source acts as an AMQP client that establishes a connection to an AMQP broker (such as RabbitMQ) and attempts to consume messages from a specific queue. The use of this source implies the deployment and the setting of an AMQP broker, which are not considered in this section.

Warning

Depending on the server settings, messages that have been sent by a publisher when the collector is not connected to the message queue, may be definitively missed. If this behaviour is not desired, please ensure the publisher submits persistent messages (delivery mode 2), and the queue is durable.

The configuration file of this source accepts the following parameters:

  • type [mandatory], must be flow/amqp/json or flow/amqp/pb, depending on whether the message representation is a json object or aProtobuf message (respectively).
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [mandatory], the IP address or the domaine name of the AMQP broker.
  • port [mandatory], the port that the AMQP broker is listening on. Typical values are 5672 for unencrypted communications, while 5671 is the register port when AMQP is used over TLS/SSL (AMQPS).
  • queue [mandatory], must correspond to the queue on which the publishers publish new data points.
  • security [mandatory], the security policy to apply.

The snippet below shows an example of AMQP source configuration:

{
    type = "flow/amqp/pb"
    host = "localhost"
    port = 5672
    queue = "my-queue"
    security = {
        type = "anonymous"
    }
}

Test the behaviour of this collector module by following the steps below:

  1. Install Docker on your computer.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
  1. Wait a moment (30 seconds should be enough) to be sure all services finished their initialisation procedures.
  2. On your favorite Web browser, open a new tab to http://localhost:15672/. Enter collector as login and collectorpwd as password. You are on the administrative application of RabbitMQ, an AMQP broker.
  3. Go to the Queues tab, and click on my-queue in the table. my-queue is the name of the queue the collector is listening to for incoming messages. (buffer is the queue internally used by the collector for temporarily storing incoming messages. Under normal use, this queue should be empty almost all the time).
  4. Click on the Publish message menu item for revealing the online form for publishing messages.
  5. Leave the Headers and Properties fields empty, and type the following content in the Payload field:
{
  "metric": "my-temperature-sensor",
  "tagset": {
    "quality": "good",
    "owner": "myself"
  },
  "type": "tdouble",
  "values": [
    [ "2020-01-02T03:04:05.678", 42.1337 ],
    [ "2020-01-02T03:04:06.123", 654.72 ]
  ]
}
  1. Click on the Publish message button.
  2. Docker compose shows messages from the services involved in the composition. The last printed line should be a JSON representation of the message you just submitted.

For performing steps 1-5, you could type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/amqp_json.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/stdout.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
MQTT Source

When this source is added to a component, a server is started, that listens on a TCP port for incoming MQTT messages.

The MQTT module consumes messages from a MQTT broker. Consequently, a MQTT broker must be installed before any use of this module. Because a RabbitMQ server must be deployed for internally buffering incoming messages, a typical collector deployment already uses this server as MQTT broker.

When the Collector and the Ingestion components are deployed in the same environment, submitting data points to the MQTT module of the Ingestion component should be privileged.

The MQTT module accepts TSorage messages represented as JSON objects (flow/mqtt/json type) or Protobuf messages (flow/mqtt/pb type).

The configuration of this module accepts the following parameters:

  • type [mandatory]: must be either flow/mqtt/json or flow/mqtt/pb.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [mandatory]: The host on which the MQTT broken is running.
  • port [optional]: The port on which the MQTT broker listen for incoming connections. Default is 1883.
  • max_connections [optional]: The maximum number of simultaneous connections to the MQTT broker.
  • client_id [optional]: The client identifier that is sent to the MQTT broker. By default, a random value is set.
  • buffer_size [optional]: The maximum number of messages that can be buffered before the Ingestion process will back pressure the data flow (which can ultimately result in the forced closing of some incoming connections). Default is 1000.
  • channel [optional]: The name of the channel from which incoming messages will be consumed. Default is timeseries.
  • security [mandatory]: The security policy to implement. Must correspond to the installation choices.

Example of configuration for the MQTT module:

{
    type = "flow/mqtt/pb"
    host = "localhost"
    port = 1883
    max_connections = 50
    buffer_size = 1000
    channel = timeseries

    security = {
        type = "password"
        login = "steve"
        password = "password"
    }
}

Test the behaviour of this collector module by following the steps below:

  1. Install Docker on your computer.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  4. Download
  1. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
  1. Wait a moment (30 seconds should be enough) to be sure all services finished their initialisation procedures.
  2. Run the following command to publish a message to the MQTT broker
docker exec `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.sourcetype=mqtt"` \
  mosquitto_pub -t timeseries-json \
  -m "{ \"metric\": \"my-temperature-sensor\", \"tagset\": { \"quality\": \"good\", \"owner\": \"myself\" }, \"type\": \"tdouble\", \"values\": [ [ \"2020-01-02T03:04:05.678\", 42.1337 ], [ \"2020-01-02T03:04:06.123\", 654.72 ] ] }"

This one-liner will execute a mosquitto_pub command on the Docker container on which Mosquitto is running. mosquitto_pub will publish a message to the MQTT channel being listened by the collector.

  1. Docker compose shows messages from the services involved in the composition. The last printed line should be a JSON representation of the message you just submitted.

For performing steps 1-5, you could type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/mqtt_json_anonymous.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/stdout.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml
wget -O /tmp/tsorage/mqtt_default_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/mqtt_default_service.yml

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans -d

echo "[Waiting]"
sleep 30

docker exec `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.sourcetype=mqtt"` \
  mosquitto_pub -t timeseries-json \
  -m "{ \"metric\": \"my-temperature-sensor\", \"tagset\": { \"quality\": \"good\", \"owner\": \"myself\" }, \"type\": \"tdouble\", \"values\": [ [ \"2020-01-02T03:04:05.678\", 42.1337 ], [ \"2020-01-02T03:04:06.123\", 654.72 ] ] }"

The Docker container for the Transmitter component should show the received message with the following command:

docker logs `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.role=transmitter"`

Finally, shut the composition down after you tested it:

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans
OPC-UA Source

The OPC-UA source extracts item values using OPC Unified Architecture (or OPC-UA), a leading standard for platform independent, industrial communication. With this architecture, a server stores data from various data sources and presents them as properties of variable nodes. This TSorage source exploits the publish-subscribe mechanism provided by OPC-UA for subscribing to a selection of nodes of interest, so that the server will automatically notify it when an update can be taken into account. This drastically reduces bandwith requirements as well as the resource pressure brought to the server, compared to a request-response based approach. This allows both clients and servers to support very large item sets.

The configuration of the OPC-UA source accepts the following properties:

  • type [mandatory], must be flow/opc-ua.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [mandatory], the ip address of domain name of the OCP-UA server to contact for discovering the available resources.
  • port [mandatory], the port that the server is listening on.
  • path [optional], the path on the specified server to request for accessing the available items.
  • subscription_internal [mandatory], the time interval at which the client subscriptions are submitted to the server. The value of this attribute must be a duration literal. The default value is 30 seconds.
  • queue_size [optional], the maximum number of item values that can be bufferized in the internal source queue before triggering its backpressure mechanism. The default value is 500.
  • nodes [mandatory], the list of variable nodes of interest for which a subscription must be submitted to the server.
    • ns [mandatory], the identifier of the namespace the node of interest belongs to. Must be an integer.
    • i, s, g, b [strictly one of them mandatory], the node identifier among the specified namespace. Must be an integer, a character string, a GUID (an implementation dependent 16-byte number used as a unique identifier), or a ByteString (an opaque array of bytes), respectively. ByteString identifiers must be represented as base64 encoded strings.
    • metric [optional], the name of the metric that must be used for the time series associated with this node. If not specified, a textual version of the node identifier will be used as metric name.
    • tagset [optional], the dynamic tagset to associated with any message based on this node. Please note the source will automatically add a tag named opc_status, with a value corresponding to the textual description the update status provided by the server along with the update messages.
    • interval [optional], the sampling time interval the server is suppose to observe. In other words, a sampling interval of 4 seconds means the server is asked to sample the node every four seconds. However, it does not imply the server shall provide an update every four seconds, and the actual update frequency may vary over time, depending on the server workload, the value dynamics, etc. The value of this attribute must be a duration literal. The default value is 1 minute.
  • security [mandatory], the security policy to apply.

The source automatically determines the most appropriate data type for each received update, and accordingly encodes the message payload.

Although there is no technical constraint enforcing it, a good practice is to set a single OPC-UA module per server.

The snippet below shows an example of OPC-UA source configuration:

{
    type = "flow/opc-ua"
    host = "myserver.com"
    port = 48484
    path = ""
    subscription_interval = 30 seconds
    queue_size = 500

    nodes = [
        {
            ns = 1
            s = "Countries.AF.Kabul.FlagInitialize"
            interval = 4 seconds
        },
        { ns = 1, s = "Countries.AF.Kabul.Latitude", metric = "latitude" },
        { ns = 1, s = "Countries.AF.Kabul.Longitude" },
        { ns = 1, s = "Countries.AF.Kabul.Timezone" },
        { ns = 1, s = "Countries.AF.Kabul.Icon" },
        {
            ns = 1,
            tagset = { importance = high }
            s = "Countries.AF.Kabul.Temperature"
            metric = "temperature"
        },
        { ns = 1, s = "Countries.AF.Kabul.ApparentTemperature" },
        { ns = 1, s = "Countries.AF.Kabul.Humidity" },
        { ns = 1, s = "Countries.AF.Kabul.Pressure" },
        { ns = 1, s = "Countries.AF.Kabul.WindSpeed" }
    ]

    security = {
        type = "anonymous"
    }
}

Test the behaviour of this collector module by following the steps below:

  1. Install Docker on your computer.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and as the configuration files.
  3. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/opcua_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
  1. Docker compose shows messages from the services involved in the composition. In particular, after a few dozens of seconds, the transmitter service should print JSON objects representing valid messages to its standard output. If you can see these messages, it proofs the proper functioning of the OPC-UA extension.

For performing steps 1-5, you can type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/opcua_anonymous.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/stdout.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml
wget -O /tmp/tsorage/opcua_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/opcua_service.yml

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/opcua_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/opcua_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans
DataDog Source

DataDog is a solution for monitoring and analysing servers, databases, tools, and services. It provides an agent under Apache 2 license, that can be deployed on servers of interest for collecting metrics.

The TSorage Collector component proposes a module that records metrics collected by the DataDog agent and converts them to TSorage messages.

The configuration of this source supports the following parameters:

  • type [mandatory], must be flow/datadog.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • port [optional], the port on which the service is listening for Datadog message. Default is 8080.
  • host [optional], the host on which the service is listening for Datadog message. Default is localhost.

The snippet below shows an example of DataDog source configuration:

{
    type = "flow/datadog"
    host = "localhost"
    port = 80
}

Test the behaviour of this collector module by following the steps below:

  1. Install Docker on your computer.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/datadog_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \

  up --remove-orphans
  1. Docker compose shows messages from the services involved in the composition. In particular, after a few dozens of seconds, the transmitter service should print JSON objects representing valid messages to its standard output. If you can see these messages, it proofs the proper functioning of the Datadog extension.

For performing steps 1-5, you can type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/datadog.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/stdout.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml
wget -O /tmp/tsorage/datadog_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/datadog_service.yml

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/datadog_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/datadog_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  up --remove-orphans

Sink Configuration

In the Gateway component, sinks are modules that submit extracted data points to the Ingestion layer. Because this component relies on different technologies, the user has to specify how to connect the sink to an operating Ingestion component.

Each sink consumes messages that are temporarily stored in the Buffer, and tries to submit them to a component of the Ingestion layer. The Buffer implements a mechanism that force a consumer to acknowledge of receipt before considering the bufferized message as actually processed. Because there is no warranty this submission will succeed, Gateway sinks acknowledge of receipt for the bufferized messages only after they received an acknowledge from the Ingestion component their are connected with, or after they have other evidences the messages have been properly received. In other words, a copy of bufferized messages remains on the Buffer until at least one sink consider the message as properly submitted and sends acknowledgement for these messages.

When many sinks are declared in the configuration file of the Collector component, each sink has the opportunity to acknowledge for a bufferized message. A single acknowledge is enough for considering a message as properly submitted. That means, if many sinks are declared, some of them could miss the submission of some messages when the associated Ingestion components are not available. This situation is preferred over the suspension of the submission of all the sinks when one of them encounters an issue.

Best Practice

While a single Transmitter module can handle several data sinks, it’s generally a better idea to declare a single data sink per Transmitter instance. It fosters the deployment of containers with a single responsibility, which in turn facilitates the system management and improves its elasticity.

Stdout Sink

This simple sink prints every individual incoming message to the standard output of the running process. A message is printed as a single line JSON document. Printed messages are separated by a new line character.

Please note the Stdout sink always acknowledges for consumed messages.

The following parameters must be specified:

  • type must be set to stdout/json

HTTP Sink

The HTTP Sink can communicate with the REST HTTP Ingestion module of the Ingestion layer. Each message is sent using a HTTP POST query, the payload of which is a JSON representation of the message.

For a REST HTTP Ingestion service, the following parameters are accepted:

  • type [mandatory], must be set to flow/http/json.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the sink after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [mandatory], the IP address of the HTTP service.
  • port [optional], the port number that the HTTP service is listening on. Default value: 8081.
  • prefix [optional], the path for the HTTP service. Default value: api/v1/series
  • buffer_size [optional], the maximum number of datapoints to accumulate in the chunk before submitting it to the HTTP service. Because messages may contain a varying number of datapoints, and because messages are not split or merged before submissions, the actual number of datapoints sent in a HTTP request may be lesser or greather than this threshold. Default value: 1, which means each HTTP request always contains a single message.
  • wait_duration [optional], the maximum duration the sink is waiting for other messages before sending a request. Default value: 10 seconds.
  • max_connections [optional], the maximum number of simultaneous HTTP connections this sink can establish with the Ingestion layer. Devault value: 5
Example

The snippet below shows an example of HTTP sink configuration:

{
    type = "flow/http/json"
    host = "127.0.0.1"
    port = 8080
    prefix= "api/v1/series"
    buffer_size = 100
    wait_duration = 1 minute
}

MQTT Sink

The MQTT Sink can communicate with the MQTT Ingestion module of the Ingestion layer. Messages are sent using a MQTT session. The following parameters are accepted:

  • type [mandatory], must be set to flow/mqtt/pb or flow/mqtt/json, depending on whether the Ingestion module is expecting Protobuf or JSON messages (respectively).
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the sink after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [mandatory], the IP address or the domaine name of the MQTT broker.
  • port [optional], is the port that the MQTT broker is listening on. The default value is 1883 for unencrypted communications, while 8883 is the register port when MQTT is used over SSL.
  • queue [optional], must correspond to the queue used by the MQTT Ingestion module. Default value: ``timeseries`
  • security [mandatory], the security policy to apply. It must be set accordingly to the configuration of the Ingestion module.
Example

The snippet below shows an example of MQTT source configuration:

{
    type = "flow/mqtt/pb"
    host = "localhost"
    port = 1883
    queue = "timeseries"
    security = {
        type = "anonymous"
    }
}
Give it a Try

In order to observe the Transmitter sending collected messages to a MQTT broker, simply follow the steps below:

  1. Install Docker on your laptop.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/collector and /tmp/tsorage/config/transmitter subdirectories.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  -f /tmp/tsorage/mqtt_service.yml \
  up --remove-orphans
  1. Run the following command to subscribe to the MQTT channel on which the Transmitter is publishing messages:
docker exec `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.sinktype=mqtt"` \
  mosquitto_sub -t inbox-json

This one-liner will execute a mosquitto_sub command on the Docker container on which Mosquitto is running. mosquitto_sub will subscribe to the channel on which the collector is publishing messages, and should print JSON formatted messages on the standard output.

For performing steps 1-5, you can type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/collector
mkdir -p /tmp/tsorage/config/transmitter

wget -O /tmp/tsorage/config/collector/collector.conf https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/configs/random.conf
wget -O /tmp/tsorage/config/transmitter/transmitter.conf https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/configs/mqtt_json_anonymous.conf
wget -O /tmp/tsorage/collector_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/collector_service.yml
wget -O /tmp/tsorage/buffer_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/collector/src/main/resources/buffer_service.yml
wget -O /tmp/tsorage/transmitter_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/transmitter_service.yml
wget -O /tmp/tsorage/mqtt_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/transmitter/src/main/resources/mqtt_default_service.yml

docker-compose \
    -f /tmp/tsorage/collector_service.yml \
    -f /tmp/tsorage/buffer_service.yml \
    -f /tmp/tsorage/transmitter_service.yml \
    -f /tmp/tsorage/mqtt_service.yml \
    down --remove-orphans

docker-compose \
  -f /tmp/tsorage/collector_service.yml \
  -f /tmp/tsorage/buffer_service.yml \
  -f /tmp/tsorage/transmitter_service.yml \
  -f /tmp/tsorage/mqtt_service.yml \
  up -d

Finally, shut the composition down after you tested it:

docker-compose \
    -f /tmp/tsorage/collector_service.yml \
    -f /tmp/tsorage/buffer_service.yml \
    -f /tmp/tsorage/transmitter_service.yml \
    -f /tmp/tsorage/mqtt_service.yml \
    down

Ingesting Time Series

The Ingestion component, which is responsible for receiving data points from the outside (typically, from a collector agent), is essentially defined as a list of source modules, that are waiting for new data points, and a list of sink modules, that are responsible for forwarding these data points to the TSorage internal components.

While multiple Ingestion components can be deployed in order to face an increasing data volume, a typical installation initially contains a single Ingestion module with as many source and sink modules as necessary. Source modules can be added to its configuration for supporting different kinds of data ingestion, while a single Kafka sink is used, in most cases.

Source Modules

Random Ingestion Module

This module randomly generates double values, mostly for testing purpose. Each generated message contains a single data point associated with the current timestamp.

It accepts the following parameters:

  • type [mandatory], must be random.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • interval [optional], the time interval between two successive messages. Must be a duration literal. Default value: 1 minute.
  • metric [mandatory], a non-empty list of texts representing the potential metric names to be associated with the message. Each time a message is generated, on of the specified metrics will be randomly picked.
  • tagset [mandatory], a dictionary of (key, value) that will be added as the message tagset. Both keys and values must be textual.
Example

The snippet below shows an example of Random source:

{
    type = "random"
    interval = "1 second"
    metric = [
        "sensor-1", "sensor-2", "sensor-3", "sensor-4", "sensor-5",
        "sensor-6", "sensor-7", "sensor-8", "sensor-9", "sensor-10"
    ]
    tagset = {
        "owner" = "myself",
        "quality" = "good"
    }
}
Give it a Try

Test this collector module by following the steps below:

  1. Install Docker on your laptop.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below as well as the configuration files.
  3. Create the /tmp/tsorage/config/ingestion subdirectory.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/kafka_service.yml \
  up --remove-orphans
  1. Docker compose shows messages from the services involved in the composition. In particular, after a few dozens of seconds, the ingestion service should print JSON objects representing valid messages to its standard output. If you can see these messages, it proofs the proper functioning of the random module.

For performing steps 1-8, you can type the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/ingestion

wget -O /tmp/tsorage/config/ingestion/ingestion.conf https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/configs/random_to_stdout.conf
wget -O /tmp/tsorage/ingestion_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/ingestion_service.yml
wget -O /tmp/tsorage/kafka_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/kafka_service.yml

docker-compose \
    -f /tmp/tsorage/ingestion_service.yml \
    -f /tmp/tsorage/kafka_service.yml \
    down --remove-orphans

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/kafka_service.yml \
  up

HTTP REST Ingestion Module

By using its HTTP module, the component can deploy a HTTP service for collecting JSON formatted messages submitted by any HTTP-compliant client. For that reason, the HTTP module is probably the simplest way for an active data source to communicate data points to TSorage.

One or many JSON objects that comply with the Message schema can be POSTed for submission. Although the request payload may be practically unlimited, a good practice consists in limiting the number of datapoints contained in a single request for smoother trafic flow. In practice, a trade-off between occasional but heavyweight, and frequent but smaller requests, should be found depending on the resources available for both the data sources and the target component.

The configuration file of this source has the following parameters:

  • type [mandatory], must be flow/http/json.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [optional], the IP address or name of the server on which the service is running. Default is localhost.
  • port [optional], the port on which the service is listening for incoming connexion. Default is 8081.

Although a token based security was implemented in preliminary versions of this module, this feature is no longer available. It should be proposed again in a later release.

Example

The snippet below shows an example of extract description:

{
    type = "flow/http/json"
    host = "localhost"
    port = 8081
}
Give it a Try

Test this Ingestion module by following the steps below:

  1. Install Docker on your laptop.
  2. Install curl on your laptop. This tool is installed by default on most Linux distributions. Alternatively, you can install and use an other client for transferring data using HTTP.
  3. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  4. Create the /tmp/tsorage/config/ingestion subdirectory.
  5. Download
  6. Run
docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/http_service.yml \
  up --remove-orphans
  1. Wait a moment (30 seconds should be enough) to be sure all services finished their initialisation procedures.
  2. Run
curl  -X POST -H "Content-type: application/json" \
-d " {
  \"metric\": \"my-temperature-sensor\",
  \"tagset\": {
    \"quality\": \"good\",
    \"owner\": \"myself\"
  },
  \"type\": \"tdouble\",
  \"values\": [
    [ \"2020-01-02T03:04:05.678\", 42.1337 ],
    [ \"2020-01-02T03:04:06.123\", 654.72 ]
  ]
}" \
"http://localhost:8081/api/v1/series"
  1. Docker compose shows messages from the services involved in the composition. In particular, the ingestion service should print JSON objects representing the messages sent to the HTTP module. If you can see these messages, it proofs the proper functioning of the HTTP module.

Steps 1-8 can be performed by typing the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/ingestion

wget -O /tmp/tsorage/config/ingestion/ingestion.conf https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/configs/http_to_stdout.conf
wget -O /tmp/tsorage/ingestion_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/ingestion_service.yml
wget -O /tmp/tsorage/http_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/http_service.yml

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/http_service.yml down --remove-orphans

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/http_service.yml \
  up -d

echo "[Waiting]"
sleep 30
echo "[Submitting a message]"

curl  -X POST -H "Content-type: application/json" \
  -d " {
    \"metric\": \"my-temperature-sensor\",
    \"tagset\": {
      \"quality\": \"good\",
      \"owner\": \"myself\"
    },
    \"type\": \"tdouble\",
    \"values\": [
      [ \"2020-01-02T03:04:05.678\", 42.1337 ],
      [ \"2020-01-02T03:04:06.123\", 654.72 ]
    ]
  }" \
  "http://localhost:8081/api/v1/series"

With this approach, you should get a response similar to the following:

{"status":"ok","msg":"Total messages received: 1"}

Additionally, The Docker container for the Ingestion component should show the received message with the following command:

docker logs `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.role=ingestor"`

Finally stop the Docker composition with

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/http_service.yml \
  down

MQTT Ingestion Module

When this source is added to a component, a server is started, that listens on a TCP port for incoming MQTT messages.

The MQTT module consumes messages from a MQTT broker. Consequently, a MQTT broker must be installed before any use of this module. Because a RabbitMQ server must be deployed for internally buffering incoming messages, a typical collector deployment already uses this server as MQTT broker.

When the Collector and the Ingestion components are deployed in the same environment, submitting data points to the MQTT module of the Ingestion component should be privileged.

The MQTT module accepts TSorage messages represented as JSON objects (flow/mqtt/json type) or Protobuf messages (flow/mqtt/pb type).

The configuration of this module accepts the following parameters:

  • type [mandatory]: must be either flow/mqtt/json or flow/mqtt/pb.
  • max_ttr [optional], the maximum time to retry. Corresponds to the maximum duration the collector is waiting before trying to establish a connexion with the source after an error occurred. Must be expressed as a duration literal. The default value is 15 seconds.
  • host [mandatory]: The host on which the MQTT broken is running.
  • port [optional]: The port on which the MQTT broker listen for incoming connections. Default is 1883.
  • max_connections [optional]: The maximum number of simultaneous connections to the MQTT broker.
  • client_id [optional]: The client identifier that is sent to the MQTT broker. By default, a random value is set.
  • buffer_size [optional]: The maximum number of messages that can be buffered before the Ingestion process will back pressure the data flow (which can ultimately result in the forced closing of some incoming connections). Default is 1000.
  • channel [optional]: The name of the channel from which incoming messages will be consumed. Default is timeseries.
  • security [mandatory]: The security policy to implement. Must correspond to the installation choices.
Example

Example of configuration for the MQTT module:

{
    type = "flow/mqtt/pb"
    host = "localhost"
    port = 1883
    max_connections = 50
    buffer_size = 1000
    channel = timeseries

    security = {
        type = "password"
        login = "steve"
        password = "password"
    }
}
Give it a Try

Test the behaviour of this Ingestion module by following the steps below:

  1. Install Docker on your computer.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/ingestion subdirectory.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  up --remove-orphans
  1. Wait a moment (30 seconds should be enough) to be sure all services finished their initialisation procedures.
  2. Run the following command to publish a message to the MQTT broker
docker exec `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.sourcetype=mqtt"` mosquitto_pub \
  -t inbox-json \
  -m "{ \"metric\": \"my-temperature-sensor\", \"tagset\": { \"quality\": \"good\", \"owner\": \"myself\" }, \"type\": \"tdouble\", \"values\": [ [ \"2020-01-02T03:04:05.678\", 42.1337 ], [ \"2020-01-02T03:04:06.123\", 654.72 ] ] }"

This one-liner will execute a mosquitto_pub command on the Docker container on which Mosquitto is running. mosquitto_pub will publish a message to the MQTT channel being listened by the collector.

  1. Docker compose shows messages from the services involved in the composition. The last printed line should be a JSON representation of the message you just submitted.

Steps 1-7 can be performed by typing the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/ingestion

wget -O /tmp/tsorage/config/ingestion/ingestion.conf https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/configs/mqtt_json_anonymous_to_stdout.conf
wget -O /tmp/tsorage/ingestion_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/ingestion_service.yml
wget -O /tmp/tsorage/mqtt_default_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/mqtt_default_service.yml

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  down --remove-orphans

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  up -d

echo "[Waiting]"
sleep 30
echo "[Submitting a message]"

docker exec `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.sourcetype=mqtt"` mosquitto_pub \
  -t inbox-json \
  -m "{ \"metric\": \"my-temperature-sensor\", \"tagset\": { \"quality\": \"good\", \"owner\": \"myself\" }, \"type\": \"tdouble\", \"values\": [ [ \"2020-01-02T03:04:05.678\", 42.1337 ], [ \"2020-01-02T03:04:06.123\", 654.72 ] ] }"

With this approach, the Docker container for the Ingestion component should show the received message with the following command:

docker logs `docker ps -aq --filter  "status=running" --filter "label=be.cetic.tsorage.role=ingestor"`

Finally stop the Docker composition with

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/mqtt_default_service.yml \
  down

Sink Modules

Stdout Sink

When this sink is mentioned in the configuration of an Ingestion component, a one-line JSON representation of every ingested message is written to the standard output of the underlying system. This module is principally used for testing and debugging purposes, and does not actually forward the incoming messages to internal TSorage components. It should therefore be used with a Kafka sink.

Kafka Sink

All internal TSorage components communicate each others through Kafka topics. In particular, whatever the way data points are ingested by an Ingestion component, they are published in the raw Kafka topic for further processing. Consequently, a typical Ingestion component contains one Kafka sink through which the incoming data flow is actually processed.

The configuration of this sink accept have the following properties:

  • type [mandatory], must be flow/kafka/pb.

  • topic [mandatory], the name of the Kafka topic on which incoming data points will be published. Default is raw.

  • nodes [mandatory], a list of Kafka nodes that can be used for bootstrapping the connection to the Kafka cluster. The configuration of each node must contain the following properties:

    • host, the IP address or the domain name of the name.
    • port, the port on which the node is listening for incoming connections. Default is 9092.
  • security [mandatory], the security policy to apply.

Example
{
    type = "kafka",
    topic = "raw",
    nodes = [
        {
            host = "localhost"
            port = 9092
        }
    ],
    security = {
        type = "anonymous"
    }
}
Give it a Try

Test the behaviour of this Ingestion module by following the steps below:

  1. Install Docker on your computer.
  2. Create the /tmp/tsorage/ directory. If you decide to use an other directory, please adapt both the steps below and the configuration files.
  3. Create the /tmp/tsorage/config/ingestion and /tmp/tsorage/config/kafka subdirectories.
  4. Download
  5. Run
docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/kafka_service.yml \
  up --remove-orphans
  1. Wait a moment (30 seconds should be enough) to be sure all services finished their initialisation procedures.
  2. Run the following command:
docker exec \
`docker ps -aq --filter status=running --filter label=be.cetic.tsorage.sourcetype=kafka` \
/opt/bitnami/kafka/bin/kafka-console-consumer.sh \
--bootstrap-server localhost:9092 \
--topic raw

This one-liner prints messages submitted to the Kafka topic by the Ingestion component.

These step can be performed by typing the following commands:

rm -rf /tmp/tsorage

mkdir -p /tmp/tsorage/config/ingestion
mkdir -p /tmp/tsorage/config/kafka

wget -O /tmp/tsorage/config/ingestion/ingestion.conf https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/configs/random_to_kafka_json_anonymous.conf
wget -O /tmp/tsorage/ingestion_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/ingestion_service.yml
wget -O /tmp/tsorage/kafka_service.yml https://raw.githubusercontent.com/cetic/tsorage/dev/ingestion/src/main/resources/kafka_service.yml

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/kafka_service.yml \
  down

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/kafka_service.yml \
  up -d

echo "[Waiting]"
sleep 30
echo "[Reading Kafka messages]"

docker exec \
    `docker ps -aq --filter status=running --filter label=be.cetic.tsorage.sourcetype=kafka` \
    /opt/bitnami/kafka/bin/kafka-console-consumer.sh \
    --bootstrap-server localhost:9092 \
    --topic raw

Finally stop the Docker composition with

docker-compose \
  -f /tmp/tsorage/ingestion_service.yml \
  -f /tmp/tsorage/kafka_service.yml \
  down

Processing Time Series

The Processor component is responsible of carrying out operations for taking into account incoming data points. More precisely, this component provides the following features:

  • Message processing, which consists in storing data points represented by incoming messages.
  • Tagset processing, which consists in adding the dynamic tagsets to the underlying database, for querying purposes.
  • Datapoint derivation, which consists in transforming datapoints into other datapoints, according to arbitrarily defined derivation functions.
  • Temporal aggregation, which consists in grouping datapoints belonging to the same time series by time periods, and then summarizing these groups by applying arbitrary functions. Builtin functions calculates the number of datapoints in a group, its last value, etc.

Configuration

The behaviour of the Processor component is driven by a configuration file describing how to connect to a Kafka topic for consuming incoming data points, and how to process these data points, and how to store them into a Cassandra cluster.

The component accepts the following configuration parameters:

  • source [mandatory], that describes the Kafka topic containing the data points to be consumed.

    • type [mandatory], must be either flow/kafka/json or flow/kafka/pb.

    • topic [mandatory], the topic from which data points will be consumed.

    • group [optional], the consumer group with which the Processor component will consume incoming data points. This group must be the same for all component instances, so that incoming data points are distributed among these instances. If not specified, a random value is assigned.

    • nodes [mandatory], a list of Kafka nodes that can be used for bootstrapping the connection to the Kafka cluster. The configuration of each node must contain the following properties:

      • host [mandatory], the IP address or the domain name of the node.
      • port [optional], the port on which the node is listening for incoming connections. Default is 9092.
    • security [mandatory], the security policy to apply.

  • aggregators [mandatory], the list of temporal aggregators to apply on incoming data points. Currently accepted values are:

    • 1m: one minute
    • 1h: one hour
    • 1d: one day
    • 1mo: one month
  • parallelism [mandatory]: the number of simultaneous queries that can be submitted by the component to the Cassandra cluster.

  • sharder [mandatory]: the sharder to use for time sharding. Currently accepted values are:

    • day
    • month (recommended in most cases)
  • grouper [mandatory], a description of how processed values should be bufferized before being sent to Cassandra.

    • size [mandatory], the maximum number of elements to be bufferized. If the number of waiting elements exceeds this threshold, the buffer is flushed to Cassandra.
    • duration [mandatory], the duration value corresponding to the maximum waiting time for a bufferized element.
    • duration_unit [mandatory], the duration unit corresponding to the maximum waiting time for a bufferized element.
  • cassandra [mandatory], a description of the Cassandra cluster to use for data points and tagsets storage.

    • nodes [mandatory], a list of Cassandra nodes that can be used for bootstrapping the connection to the Cassandra cluster. The configuration of each node must contain the following properties: - host [mandatory], the IP address or the domain name of the node. - port [optional], the port on which the node is listening for incoming connections. Default is 9042.
    • keyspaces [mandatory], a description of the keyspaces to be used for data points and tagsets storage.
      • raw [mandatory], the name of the keyspace on which raw data points are stored.
      • aggregated [mandatory], the name of the keyspace on which aggregated data points and tagsets are stored.
    • security [mandatory], the security policy to apply.

Example

The snippet below shows an example of valid configuration document for the Processor component.

source = {
    type = "flow/kafka/pb"
    topic = "raw"
    group = "ts_processing"
    nodes = [
      {
        host = "localhost"
        port = 9092
      }
    ],
    security = {
      type = "anonymous"
    }
}

aggregators = [ "1m", "1h", "1d" ]
parallelism = 8
sharder = "month"

grouper {
  size = 1000,
  duration = 10,
  duration_unit = "minutes"
}

cassandra {
  nodes = [
    {
        host = "localhost"
        port = 9042
    }
  ]
  keyspaces {
    raw = "tsorage_ts"
    aggregated = "tsorage"
  }
  security = {
    type = "password"
    login = "cassandra"
    password = "cassandra"
  }
}

Monitoring

Alerting

Rule Specification

Sending Alerts

Tag Management

Tag Edition

Tag Querying

Archiving