Информация

Дата основания
1993
Местоположение
США
Сайт
www.epam.com
Численность
свыше 10 000 человек
Дата регистрации

Блог на Хабре

Обновить
EPAM
Компания для карьерного и профессионального роста

Реализация кастомной Edge I-IoT платформы

Блог компании EPAMNoSQLПромышленное программированиеSCADAИнтернет вещей
В предыдущей статье был краткий обзор промышленного интернета вещей I-IoT и описание платформы граничных вычислений. В этой статье я хочу показать простой пример релизации Edge I-IoT платформы, используя популярные открытые технологии.

image

С архитектурной точки зрения платформа IoT требует решить следующие задачи:

  • Объем данных, получаемых, принимаемых и обрабатываемых, требует высокой пропускной способности, хранения и вычислительных возможностей.
  • Устройства могут быть распределены по обширной географической области
  • Компании требуют, чтобы их архитектура постоянно развивалась, чтобы можно было предлагать новые услуги клиентам.

Одной из особенностей IoT-платформы является независимость между объектами и сигналами, что позволяет выполнять параллельные вычисления, повышая производительность.

Данные, поступающие с датчиков, собираются из источников: PLC, DCS, микроконтроллеров и т.п и могут храниться во временной области во избежание потери данных из-за проблем с подключением. Данные могут быть временными рядами (события), полуструктурированными данными (логи и двоичные файлы), или неструктурированными (изображения). Данные и события временного ряда собираются часто (от каждой секунды до нескольких минут). Затем они отправляются по сети и сохраняются в централизованном озере данных (data lake) и базе данных временных рядов (time-series database TSDB). Data lake может быть облачным, локальным центром обработки данных или сторонней системой хранения.

Данные могут быть немедленно обработаны с использованием анализа потока данных, который называется «hot path», с механизмом проверки правил, основанном на простой или интеллектуальной уставке. Продвинутая аналитика может включать цифровых близнецов, машинное обучение, глубокое обучение или аналитику на основе физических характеристик. Такая система может обрабатывать большой объем данных (от десяти минут до месяца) с разных датчиков. Эти данные хранятся в промежуточном хранилище. Такая аналитика называется «cold path», и как правило, запускается планировщиком или при доступности данных и требует большого количества вычислительных ресурсов. Продвинутая аналитика часто нуждается в такой дополнительной информации, как модель контролируемой машины и эксплуатационные атрибуты — её можно найти в asset registry. Asset registry содержит информацию о типе актива, включая его имя, серийный номер, символическое имя, местоположение, рабочие возможности, историю комплектующих, из которых он состоит, и роль, которую он играет в производственном процессе. В asset registry мы можем хранить список измерений каждого актива, логическое имя, единицу измерения и диапазон границ. В промышленном секторе эта статическая информация важна для правильной аналитической модели.

Причины разработки кастомной платформы:

  • Возврат инвестиций: небольшой бюджет;
  • Технология: использование технологии независимо от поставщика;
  • Конфиденциальность данных;
  • Интеграция: необходимость разработки уровня интеграции с новой или устаревшей платформой;
  • Другие ограничения.

image

Cквозной поток данных в I-IoT

Пример кастомной реализации Edge-платформы


На данном рисунке показана реализация следующих звеньев платформы:

  • Источник данных: как пример выбран симулятор контроллера Simatic PLCSIM Advanced с активированным OPC сервером, как описано в предыдущей статье;
  • В качестве граничного шлюза выбрана популярная платформа Node-Red c установленным плагином node-red-contrib-opcua;
  • MQTT брокер Mosquitto используется как диспетчер передачи данных между другими звеньями потока;
  • Apache Kafka используется как распределенная потоковая платформа, выполняющая роль аналитики hot path с помощью kafka-streams.

image

Node-red Edge gateway


В качестве шлюза граничных вычислений будем использовать Node-red — простую настраиваемую платформу, имеющую множество различных плагинов. Роль промышленного адаптера (Industrial adapter) играет плагин node-red-contrib-opcua. Для множественного сбора данных с контроллера способом подписки используются ноды: OpcUa-Browser и OpcUa-client. В OPC-браузер ноде настраивается url OPC-сервера (endpoint) и топик, в котором указано пространство имен и имя читаемого блока данных, например: ns=3;s=«HMI_Alarms_Area». В OPC-клиент ноде также указывается url OPC-сервера, в качестве действия (Action) устанавливается SUBSCRIBE и интервал обновления данных.

Node-red main flow
image

Настройка ноды OPC-browser
image

Настройка ноды OPC-client
image

Для того, чтобы выполнилась подписка на чтение множественных данных, необходимо подготовить и загрузить теги с контроллера, согласно OPC протокола. Для этого вначале используется inject нода с чекбоксом only once, которая триггерит единоразовое чтение блоков данных, указанных в нодах OPC-браузера. Затем данные обрабатываются функцией Decode&filter. После чего нода OPC-клиента подписывается и читает изменяющиеся данные с контроллера. Дальнейшая обработка потока зависит от конкретной реализации и требований. В своем примере я обрабатываю данные для дальнейшей отсылки их в MQTT брокер на разные топики.

Вкладки HMI control и Office представляют собой простую реализацию HMI на базе Scadavis.io и node-red dashboard, как описано ранее в статье.

image

Пример парсинга данных OPC-browser ноды:

var items = msg.payload;
for (var i=0; i<items.length; i++) {
    var item = items[i];
	var ref = item.item;
	var nodeClass = ref.$nodeClass;
	var typeDef = ref.typeDefinition;
	var bname = ref.browseName;
	var ns=bname.namespaceIndex;
	var name=bname.name;
	var value = ref.value;
	var datatype = ref.dataType;
	// Select only want namespace variables
	if (ns==3) {
	    var newmsg={};
		newmsg.topic = 
		    ref.nodeId+
		    ";datatype="+datatype;
		newmsg.payload=value;
		node.send(newmsg);
	}
}

MQTT брокер


В качестве брокера можно использовать любую реализацию. В моем случае уже установлен и настроен Mosquitto брокер. Брокер выполняет функцию транспорта данных между Edge gateway и другими участниками платформы. Есть примеры с балансировкой нагрузки и распределенной архитектурой (как здесь). В данном случае ограничимся одним mqtt брокером с передачей данных без шифрования.

Локальное хранилище данных временных рядов


Данные временного ряда удобно записывать и хранить в NoSql time-series базе данных. Для наших целей удачно подходит стек InfluxData. Нам необходимо четыре сервиса из этого стека:

InfluxDB — это база данных временных рядов с открытым исходным кодом, которая является частью стека TICK (Telegraf, InfluxDB, Chronograf, Kapacitor). Предназначена для обработки данных с высокой нагрузкой и предоставляет SQL-подобный язык запросов InfluxQL для взаимодействия с данными.

Telegraf — это агент для сбора и отправки метрик и событий в InfluxDB из внешних IoT систем, датчиков и т.п. Настраивается на сбор данных из mqtt топиков.

Kapacitor — это встроенный механизм обработки данных для InfluxDB 1.x и интегрированный компонент в платформу InfluxDB. Этот сервис можно настроить на мониторинг различных уставок и тревог, а также установить обработчик отправки событий во внешние системы, как Kafka, email и т.п.

Chronograf — это пользовательский интерфейс и административный компонент платформы InfluxDB. Используется для быстрого создания панелей мониторинга с визуализацией в реальном времени.

Все компоненты стека можно запустить локально или настроить Docker-контейнер.

image
Выборка данных и настройка дашбордов с помощью Chronograf

Для запуска InfluxDB достаточно выполнить команду influxd, в настройках influxdb.conf можно указать место хранения данных и другие свойства, по умолчанию данные хранятся в пользовательском каталоге в .influxdb директории.

Для запуска telegraf необходимо выполнить команду telegraf -config telegraf.conf, где в настройках можно указать источники метрик и событий, в нашем примере для mqtt это выглядит так:

# # Read metrics from MQTT topic(s)
 [[inputs.mqtt_consumer]]
   servers = ["tcp://192.168.1.107:1883"]
   qos = 0
   topics = ["HMI_Status_Area/#", "HMI_Alarms_Area/#"]
   data_format = "value"
   data_type = "float"  


В свойстве servers указываем url к mqtt брокеру, qos можем оставить 0, если достаточно записывать данные без подтверждения. В свойстве topics указываем маски mqtt топиков, из которых будем читать данные. Например HMI_Status_Area/# означает, что мы читаем все топики, имеющие префикс HMI_Status_Area. Таким образом telegraf для каждого топика создаст свою метрику в базе, куда будет писать данные.

Для запуска kapacitor необходимо выполнить команду kapacitord -config kapacitor.conf. Свойства можно оставить по умолчанию и дальнейшие настройки выполнить с помощью chronograf.
Чтобы запустить chronograf достаточно выполнить одноименную команду chronograf. Веб интерфейс будет доступен localhost:8888/

Для настройки уставок и тревог с помощью Kapacitor можно воспользоваться мануалом. Вкратце – нужно перейти во вкладку Alerting в Chronograf и создать новое правило с помощью кнопки Build Alert Rule, интерфейс интуитивно понятен, все выполняется визуально. Для настройки отсылки результатов обработки в kafka и т.п. необходимо добавить обработчик в разделе Conditions

Настройки обработчика Kapacitor
image

Распределенная потоковая обработка с Apache Kafka


Для предлагаемой архитектуры необходимо отделить сбор данных от обработки, улучшив масштабируемость и независимость уровней. Для достижения этой цели мы можем использовать очередь. В качестве реализации может быть Java Message Service (JMS) или Advanced Message Queuing Protocol (AMQP), но в данном случае будем использовать Apache Kafka. Kafka поддерживается большинством аналитических платформ, имеет очень высокую производительность и масштабируемость, а также имеет хорошую библиотеку потоковой обработки Kafka-streams.

Для взаимодействия с Kafka можно использовать плагин Node-red node-red-contrib-kafka-manager. Но, учитывая разделение сбора от обработки данных, установим плагин MQTT, который подписывается на топики Mosquitto. Плагин MQTT доступен по ссылке.

Для настройки коннектора необходимо в каталог kafka/libs/ скопировать библиотеки kafka-connect-mqtt-1.1-SNAPSHOT.jar и org.eclipse.paho.client.mqttv3-1.0.2.jar (или другую версию). Затем в каталоге /config необходимо создать файл свойств mqtt.properties со следующим содержимым:

name=mqtt
connector.class=com.evokly.kafka.connect.mqtt.MqttSourceConnector
tasks.max=1
 
kafka.topic=streams-measures
mqtt.client_id=mqtt-kafka-123456789
 
mqtt.clean_session=true
mqtt.connection_timeout=30
mqtt.keep_alive_interval=60
 
mqtt.server_uris=tcp://192.168.1.107:1883
mqtt.topic=mqtt


Предварительно запустив zookeeper-server и kafka-server можем запустить коннектор с помощью команды:

connect-standalone.bat …\config\connect-standalone.properties …\config\mqtt.properties

Из топика mqtt (mqtt.topic=mqtt) данные будут записываться в Kafka-топик streams-measures (kafka.topic=streams-measures).

В качестве простого примера можно создать maven-проект, используя библиотеку kafka-streams.
С помощью kafka-streams можно реализовать различные сервисы и сценарии hot-аналитики и потоковой обработки данных.

Пример сравнения текущей температуры с уставкой за период.
StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-measures");

        KStream<Windowed<String>, String> max = source
                .selectKey((String key, String value) -> {
                        return getKey(key, value);
                    }
                )
                .groupByKey()
                .windowedBy(TimeWindows.of(Duration.ofSeconds(WINDOW_SIZE)))
                .reduce((String value1, String value2) -> {
                        double v1=getValue(value1);
                        double v2=getValue(value2);
                        if ( v1 > v2)
                            return value1;
                        else
                            return value2;
                    }
                )
                .toStream()
                .filter((Windowed<String> key, String value) -> {
                        String measure = tagMapping.get(key.key());
                        double parsedValue = getValue(value);

                        if (measure!=null) {
                            Double threshold = excursion.get(measure);
                            if (threshold!=null) {
                                if(parsedValue > threshold) {
                                    log.info(String.format("%s : %s; Threshold: %s", key.key(), parsedValue, threshold));
                                    return true;
                                }
                                return false;
                            }
                        } else {
                            log.severe("UNKNOWN MEASURE! Did you mapped? : " + key.key());
                        }
                        return false;
                    }
                );

        final Serde<String> STRING_SERDE = Serdes.String();
        final Serde<Windowed<String>> windowedSerde = Serdes.serdeFrom(
                new TimeWindowedSerializer<>(STRING_SERDE.serializer()),
                new TimeWindowedDeserializer<>(STRING_SERDE.deserializer(), TimeWindows.of(Duration.ofSeconds(WINDOW_SIZE)).size()));

        // the output
        max.to("excursion", Produced.with(windowedSerde, Serdes.String()));


Asset registry


Реестр активов, вообще-то, не является структурной составляющей Edge платформы и представляет собой часть облачной IoT среды. Но в данном примере показано взаимодействие Edge и Cloud.

В качестве asset registry будем использовать популярную IoT платформу ThingsBoard, интерфейс которой также достаточно понятен интуитивно. Установка возможна с демо-данными. Платформу можно установить локально, в докере или использовать готовую облачную среду.

В набор демо-данных входят тестовые устройства (можно легко создать новое), в которые можно отправлять значения. По умолчанию ThingsBoard запускается со своим mqtt брокером, к которому необходимо подключаться и отсылать данные в json формате. Допустим, мы хотим отсылать данные в ThingsBoard с устройства TEST DEVICE A1. Для этого нам необходимо подключиться к ThingBoard брокеру по адресу localhost:1883, используя A1_TEST_TOKEN в качестве логина, который можно скопировать из настроек устройства. После чего можем публиковать данные в топик v1/devices/me/telemetry: {“temperature”:26}

image

В документации платформы имеется мануал по настройке передачи данных и обработке аналитики в Kafka — IoT data analytics using Kafka, Kafka Streams and ThingsBoard

Пример использования kafka-ноды в Thingsboard
image


Заключение


Современные IT-технологии и открытые протоколы позволяют проектировать системы любой сложности. Edge платформа — это точка соединения между промышленной средой и облачной IoT платформой. Она может быть разложена на макрокомпоненты, среди которых граничный шлюз выпоняет ключевую роль, отвечающую за пересылку данных от устройств в IoT дата-хаб. Открытые инструменты потоковой обработки данных позволяют эффективно реализовать аналитику и граничные вычисления.
Теги:IoTkafkanode-redPLCsiemens 1500OPC-server
Хабы: Блог компании EPAM NoSQL Промышленное программирование SCADA Интернет вещей
Рейтинг +3
Количество просмотров 1,4k Добавить в закладки 13
Комментарии
Комментарии 6

Похожие публикации

Лучшие публикации за сутки