Developing Your Own Data Historian: A Practical Guide
A practical guide to implement your Data Historian for IIOT environment based on the open-source technologies.
Introduction to Data Historian
Data Historians are specialized software systems designed to capture, preserve, and optimize the vast streams of data emanating from multitude of sources within IIoT environments. It enables OT-IT convergence and serves as a central repository for historical data, allowing organizations to analyse and visualize data trends, make informed decisions, monitor process and device issues, and improve overall operational efficiency. In this article I will walk you through the key aspects of developing your own Data Historian, providing insights into what makes a successful IIoT implementation.
What Makes a Successful IIoT?
Before we dive into the details of building a Data Historian, it’s important to understand that a successful IIoT system should possess the following characteristics.
Protocols
Protocols form the foundation of any IIoT system, ensuring that data can flow seamlessly between devices and the Data Historian. A wide range of protocol supports must be considered as part of the Data Historian to facilitate different communication protocols to the assets / devices / sensors. Here’s a table outlining different protocols and their characteristics.
Interoperability
An ability to seamlessly integrate with existing SCADA and various devices is crucial for a successful IIoT implementation. The collected data should be exported to the central system where monitoring and analytics could be applied for decision making. We need to ensure that Data Historian supports industry-standard communication protocols to achieve this.
Ability to Scale
As you add more assets and devices to the IIoT implementation, the volume of data generated by these assets will also grow. This data can be of various types, including analog and digital. Therefore, our Data Historian must be scalable to accommodate increasing data loads without compromising performance.
Quality of Service
Reliable data collection from the devices and storing the collected data is paramount in the world of IIoT. The Data Historian should have robust data collection and storage mechanisms to ensure data integrity and availability. Reconciliation of missing data and fast-forwarding in case of latency and unreliable network speeds hinder the data feed.
Alerts and Notifications
An ability to configure digital inputs and customize alarms based on the OEM specification of the assets, and triggering notifications are essential futures in the Data historian.
Custom Tag / Parameters
More often, user-specific data requirements cannot be fulfilled from raw data. Therefore, the Data Historian should have the ability to create new custom tags or parameters based on certain rules against real-time data.
Architectural Building blocks
Following high level architecture illustrates primary components of our custom Data Historian.
1. Collector — This component is responsible for collecting, customizing, and aggregating data from devices using adapters for protocols such as OPC UA, Modbus, MQTT, and others. Each collector operates as an independent microservice based on the specific protocol specification. These microservices collect data from SCADA/devices and publish it to the “//datahistorian” topic on the Internal Communication Bus (ICB), typically using MQTT or a similar protocol.
2. Internal Communication Bus (ICB) — A Message Broker (for eg. VerneMQ / RabbitMQ) serves channel for collecting data from the Collector modules. Each topic is unique and represents a measurement tag associated with a specific asset.
3. DB-Collector — This component subscribes to all topics published on the “//datahistorian” channel. It streams the subscribed data into a time-series database, which is typically TimescaleDB. TimescaleDB is preferred for its compressed storage, performance, and data analysis capabilities, especially for handling large volumes of IoT data.
4. Exporters — These components are responsible for exporting collected data to client systems at a prescribed resolution and frequency. The standard export option could be FTP, MQTT, Kafka, and OPC UA.
Implementation of Collectors
As represented in the architecture diagram above in Figure 1, data collectors play a critical role in the Data Historian, as they are responsible for gathering data from various sources using the appropriate protocols. Those collectors could be implemented in Node.js or Python based on your language of choice. Both of these programming languages support native libraries for industrial protocols such as OPC UA, Modbus, AMQP, etc. The choice of libraries for these protocols depends on factors like open-source vs. licensed options, availability of the documentation, security requirements, data complexity, and system compatibility.
For Node.js-based implementations, I have used the following libraries:
- OPCA UA → NodeOPCUA! the OPCUA sdk for node.js (node-opcua.github.io)
- Modbus → modbus-serial — npm (npmjs.com)
- MQTT → mqtt — npm (npmjs.com)
Subsequent sections will discuss the OPC UA collector and the remaining data flow mechanisms. We will create an OPC UA client object and establish a session based on the authentication mode and desired message encryption level.”
1) Create an OPC UA Client object and create session based on the authentication mode and desired message encryption level.
const opcua = require('node-opcua');
var client = new opcua.OPCUAClient();
// Define the endpoint URL of the OPC UA server you want to connect to
const endpointUrl = 'opc.tcp://localhost:41850/opc-ua-server/server/';
//Helper function that create
const userIdentityInfo = Helper.getUserIdentityInfo();
const tagMapping = Helper.getTagMapping();
async function createConnection() {
try {
// Connect to the server
await client.connect(endpointUrl);
console.log('Connected to OPC UA server.');
// Create a session
const session = await client.createSession(userIdentityInfo);
console.log('Session created.');
// Now you can have a separate function to subscribe the desired tags
// and read the data from OPC UA Server
subscribeAndPublish(session, tagMapping);
} catch (err) {
console.error('Error:', err.message);
}
}
2) Subscribe required tags and then “poll” or “monitor” as per nature of the data. For instance, Alarm related data must be monitored as they are event driven, but the sensor reading need to be “polled” at the specific interval for eg 5 sec, 10 sec and so on.
function subscribeAndPublish (devSession,tagMapping) {
try {
tagMapping.forEach((_node) => {
if (devSession) {
//read the value for specific tag for "ns=2;s=default:@IIOT1.STRUCDATA.BLK1.INV1.AC_PWR"
devSession.read({ nodeId: _node.code }, 0, (err, nodeData) => {
if (nodeData && typeof nodeData.value !== "undefined") {
//when you need to run a formula against the tag for eg. applying scaling factor to convert MW into KW
//acKWH = actualMW * 1000
if (_node.hasFormula && typeof _node.formula === "function") {
tag_data[_node.code] = _node.formula(nodeData.value.value);
} else {
tag_data[_node.code] = nodeData.value.value;
}
}
});
}
}
} catch (err) {
logger.error("[session error] err:", err);
}
}
At this point, we have all the data in the ‘tag_data’ dictionary. For the simplicity of this article, the polling mechanism for different frequencies are not explained. However, you can build a wrapper function that calls the ‘subscribeAndPublish’ function to retrieve measurement data at specific intervals.
3) Invoke Custom Tag capabilities
One of the key features in Data Historian is to allow user to define their own data points or variables. This feature enables them to collect, store, and analyze specific information that may not be inherently available or relevant in the SCADA/devices connected to the historian. This can include simple arithmetic calculations and aggregations.
For eg:
total_wf_energy = turbine1. today_energy + turbine2. today_energy
or complex rules that may requires some conditional coding.
For eg:
var wf_outage = false;
total_wf_energy = turbine1. today_energy + turbine2. today_energy
if (total_wf_energy < 0 && wind_speed > 0) wf_outage = true;
return wf_outage;
This feature can be easily achieved by defining this as a Typescript function in Node.js.
// prepare function parameters fnParams from tag_data
fnParams.push(tag_data[tagCode]);
//get the function
customTagFormula = eval(custom_tag_def.func);
// call the function with the desired parameter
customTagValue = customTagFormula(fnParams);
3) Publish data to Internal Communication Bus (ICB).
The next process in the collector module is to send all the data/alarms to a message broker (i.e., ICB). In this way, each collector will take responsibility for acquiring data from the underlying protocol and apply the necessary scaling factor to convert it into meaningful data required for the downstream system.
The following code snippet will take the data from step 2 and push it into ICB.
mqtt = require("mqtt"),
mqttClient = mqtt.connect(global.env.MQTT_URL,
{
username: global.env.MQTT_USR,
password: global.env.MQTT_PASSWD,
}
);
if (mqttClient && mqttClient.connected) {
try {
tag_data.forEach((_tagCode) => {
mqttClient.publish(
"//datahistorian" + "/" + devId + "/" + _tagCode,
JSON.stringify({
tag_name: _tagCode,
value: tag_data[_tagCode],
})
);
});
} catch (err) {
logger.error("[publish DataValue][MQTT] error mqtt pub: ", err);
}
}
Persisting of Data
Another key feature of Data Historians is the persistence of data for both intermediate processing and historical purposes. There are several benefits to data persistence:
- Data Recovery and Reconciliation: Data will always be stored in the historian, even when exporting to a central/cloud system is failed.
- Compliance Requirements: Many industries and customers require the retention of historical data.
The DB Collector component presented in the architecture will subscribe to the root topic in ICB. All measurement data, including alarms and derived data points, will be pushed into the Time Series database. To stream the data, we can use Sequelize, which is a popular open-source ORM library for Node.js. It provides a powerful and flexible way to interact with Time Series databases.
Let’s first initialize the libraries and connect to the IoT database.
var sequelize = require('sequelize');
var sequelizeStream = require('node-sequelize-stream');
// Initialize connection to database (using environment variables)
var connection = new sequelize('iot_db', process.env.DB_SERVER_USER_NAME, process.env.DB_SERVER_USER_PASSWORD, {
host: process.env.DB_SERVER_HOST,
port: process.env.DB_SERVER_PORT,
dialect: 'postgres',
logging:()=>{}
});
// Define ORM model for the iot schema
var meas_model = connection.define('iot_meas_by_min', {
.....
});
Our collector component will then initiate an MQTT connection to the ICB and subscribe to the root topic. When a new message arrives, we will collect the data and stream it into the IoT database.
// Handle message payload from ICB
mqttclient.on('message', (topic, message) => {
try {
sequelize.sync()
.then(async () => {
// stream the data into iot table
await meas_model.create({
asset_id: message.message.asset_id,
meas_time: message.message.timestamp,
meas_name: message.message.tag_name,
meas_val: message.message.value
})
})
.catch((error) => console.log('Failed to strean with the database:', error))
} catch(function (err) {
console.log(err);
});
}
At this point, all the data collected from the SCADA/devices is pushed into the Time Series database. Specific database configuration is needed in Timescale to support partitioning (Hyper Table and Chunks), time bucketing, and compression, which is not discussed here, but I will publish a separate article on this subject.
Data Exporter
The Data Exporter is another sub-process in Data Historians that enables customer to harness real-time and historical data collected from the field or industry. This collected data is used for data-driven decision-making, process optimization, regulatory compliance, and predictive maintenance. Customers may expect the data to be exported in multiple protocols (FTP, MQTT, OPC UA, etc.) and a variety of formats like CSV and JSON. To support a multitude of export mechanisms, we will have different microservices for each protocol as represented in Figure 1.
Some key aspects to consider when implementing the Data Exporter are as follows:
Scheduler : This serves as the main controller to trigger an export event based on the frequency specified by the user. Users need to specify the dataset to be exported, the format, frequency, and granularity of the data. For example, “1 minute — data of the entire wind farm in a — CSV file at — 5 minute intervals.”
Data Format: Data can be exported in various formats, depending on the requirements of the receiving system or application. Common formats include CSV, JSON, and XML.
Communication Protocol: Determine the protocol on which the data can be exported or integrated with other systems that the customer may have.
Security: Data export mechanisms should have proper access control, encryption, and user authentication.
Conclusion
Data Historian is a pivotal step in harnessing the power of the Industrial Internet of Things, providing full control to enable OT-IT convergence and reduce licensing costs. It involves careful consideration of communication protocol requirements, related open-source libraries, interoperability, scalability, and the quality of service. By understanding these key elements, you can embark on a journey to create a Data Historian tailored to your organization’s specific needs.
In this article, I have discussed most of the key architectural components that are essential to building your own Data Historian.
I hope this was useful, please leave your feedback and comments.