Skip to content

DataPipe

WARNING

DataPipe изначально выключены, чтобы UnitNode не потребляли серверные ресурсы.

Как найти DataPipe?

  1. Найдите интересующий вас Unit визуально или через вкладку поиска.
  2. Нажмите ПКМ по Unit если UnitNode ещё не отобразились
  3. Выберите ЛКМ Input или Output UnitNode
  4. Кликните жёлткую кнопку DataPipe

Кнопки:

  • Кнопка Import YML Config - позволяет загрузить конвейер с вашего локального устройства в редкатор
  • Кнопка Export YML Config - позволяет скачать конвейер, который на данный момент актуален на сервере
  • Кнопка Export CSV Data - позволяет скачать все накопленные данные в формате CSV, если данных нет выдаст ошибку
  • Кнопка Set Config - позволяет отправить на сервер новый конвейер
  • Кнопка Del Saved Pipe Data - позволяет удалить все данные которые получились в результате работы конвейера, для данного UnitNode

Особенности редактора:

  1. Редактор выводит базовые ошибки синтаксиса YML
  2. Редактор умеет выводить бизнес ошибки, если формат не соответствует структуре ожидаемой сервером
  3. При попытке отправки неверного YML конфига, так же будет выведена ошибка

Значение параметров

Всего есть 4 основных этапа конвейера: active_period, filtres, transformations, processing_policy

active_period

Основная цель данного этапа ограничить время сбора данных:

  • определённый период
  • начиная с определённой даты
  • до определённой даты
  • постоянно
Название параметраВозможные значенияОбязательный?Комментарий
typeDateRange, FromDate, ToDate, PermanentДаВыбор ограничения времени
startДата в формате 2023-11-15T12:43:56ZОбязателен для типов: DateRange и FromDateВремя начала периода DateRange или время начала работы FromDate
endДата в формате 2023-11-15T12:43:56ZОбязателен для типов: DateRange и ToDateВремя конца периода DateRange или время конца работы ToDate

filtres

Название параметраВозможные значенияОбязательный?Комментарий
type_input_valueText или NumberДаТип обрабатываемых данных. Number это float64 из go
type_value_filteringWhiteList и BlackListНетТип фильтрации значений
filtering_valuesСписок только числовых или только текстовых значенийОбязателен в случае если есть type_value_filteringПроверка завязана на тип из type_input_value
type_value_thresholdMin, Max или RangeНетТип фильтрации диапазонов числовых значений
threshold_minfloat64 из goОбязателен если есть type_value_threshold с типами: Min и RangeРаботает только с type_input_value = Number
threshold_maxfloat64 из goОбязателен если есть type_value_threshold с типами: Max и RangeРаботает только с type_input_value = Number
max_rate0 <= max_rate <= 86400 только целыеДаОпределят через сколько секунд будет обработано следующее сообщение. 0 без ограничения
last_unique_checktrue, falseДаЕсли true пропустит только если новое значение отличается от предыдущего, по умолчанию false
max_size0 <= max_size <= MQTT_MAX_PAYLOAD_SIZE * 1024ДаМаксимальный размер сообщения, если рамер привысит, сообщение будет пропущено

transformations

Данный этап не является обязательным, но если вы его используете, он требует знания о типе type_input_value из предыдущего этапа:

Для type_input_value = Number:

Название параметраВозможные значенияОбязательный?Комментарий
multiplication_ratiofloat64 из goНетНа это число можно умножить заданное значение - линейное преобразование
round_decimal_point0 <= round_decimal_point <= 7НетСколько чисел после запятой останется ?

Для type_input_value = Text:

Название параметраВозможные значенияОбязательный?Комментарий
slice_startЦелое числоНетРаботает как первый элемент среза в python3 - any_list[slice_start:]
slice_endЦелое числоНетРаботает как второй элемент среза в python3 - any_list[:slice_end]

processing_policy

Название параметраВозможные значенияОбязательный?Комментарий
policy_typeLastValue, NRecords, TimeWindow, AggregationДаОдна из 4 политик обработки
n_records_countЧисло хранимых записей 0 < n_records_count =< 1024Обязателен только для policy_type = NRecordsОпределяет сколько записей будет храниться
time_window_sizeОдно из значений: [60, 300, 600, 900, 1200, 1800, 3600, 7200, 10800, 14400, 21600, 28800, 43200, 86400]Обязателен для policy_type = TimeWindow или AggregationРазмер окна в секундах, оно должно нацело делить 86400
aggregation_functionsAvg, Min, Max, SumОбязателен для policy_type = AggregationФунация на основе которой будет расчитано итоговое значение, на основе time_window_size

Примеры DataPipe

  • Функционал подразумевает в будущем отображение через Grafana, на данном этапе доступны только выгрузки через csv по кнопке, а также REST и GQL запросы, получения данных порциями.

LastValue

Например нам нужно отображать только самое последнее значение в UnitNode интерфейсе:

Подойдёт конфигурация с постоянным временем действия, которая бы не позволяла обновляться чаще чем 1 раз в 5 секунд, без проверки на уникальность, с максимальной длинной текста 100. Политика cохранения LastValue:

TIP

Данная конфигурация почти не потребляет CPU и RAM. За счёт политики LastValue. Ведь постоянно хранится только 1 запись.

yml
active_period:
  type: Permanent
filters:
  type_input_value: Text
  max_rate: 5
  last_unique_check: false
  max_size: 100
processing_policy:
  policy_type: LastValue

NRecords

Например нам нужно увидеть некоторые ивентовые события, открытие заслонки или включение чего-либо. Для этого нужно хранить N записей, и как только их становится больше определённого числа, затирать старые:

Подойдёт конфигурация которая начнёт действовать в определённый момент и без лимита. С текстовым типом данных. Ограничим возможные значения через WhiteList, будем сохранять все значения, но одинаковые подряд будем игнорировать. 512 записей вполне подойдёт.

TIP

Данная конфигурация почти не потребляет CPU и RAM. За счёт дедупликации и WhiteList. Так же сама политика не позволяет создать больше ~= 512 записей.

yml
active_period:
  type: FromDate
  start: '2023-11-15T00:00:00+00:00'
filters:
  type_input_value: Text
  type_value_filtering: WhiteList
  filtering_values:
  - Alarm
  - Error
  - Warning
  max_rate: 0
  last_unique_check: true
  max_size: 100
processing_policy:
  policy_type: NRecords
  n_records_count: 512

TimeWindow

Например мы хотим в течении 1 недели оценивать динамику влажности в комнате с окном в 1 день:

Мы установим до какой даты мы будем проводить замеры. Определим ограничение возможного значения влажности от 0 до 100 %. Ограничимся разрешением в 1 минуту, но будем учитвать все значения. Округлим до значащих 2 цифр. И будем наблюдать за окном в 86400 секунд.

TIP

Данная конфигурация почти не потребляет CPU и RAM, за счёт разрешения max_rate. Число записей для данной политики будет постоянным ~= 1440.

yml
active_period:
  type: ToDate
  end: '2025-11-15T00:00:00+00:00'
filters:
  type_input_value: Number
  type_value_threshold: Range
  threshold_min: 0
  threshold_max: 100
  max_rate: 60
  last_unique_check: false
  max_size: 50
transformations:
  round_decimal_point: 2
processing_policy:
  policy_type: TimeWindow
  time_window_size: 86400

Aggregation

Например нам нужно накапливать статистику с датчика температуры ds18b20, мы не хотим тратить большой обьём памяти, но нам нужны данные за очень длительный период c разрешением 15 минут:

Подойдёт концигурация с заданным периодом действия, ограничением ошибочных значений, ограничением реальных значений, проверкой всех значений, дедупликацией. Ограничим до 2 значащих цифр, ведь точность датчика 12 бит. Выставим период 900 секунд, а функцию расчёта как среднее.

TIP

Данная конфигурация почти не потребяет CPU и RAM, за счёт применения агрегации. Данный DataPipe будет создвать 96 записей в бд в день.

yml
active_period:
  type: DateRange
  start: '2023-11-15T00:00:00+00:00'
  end: '2200-11-15T00:00:00+00:00'
filters:
  type_input_value: Number
  type_value_filtering: BlackList
  filtering_values:
  - -127.00
  - 85.00
  type_value_threshold: Range
  threshold_min: -50
  threshold_max: 120
  max_rate: 0
  last_unique_check: true
  max_size: 100
transformations:
  round_decimal_point: 2
processing_policy:
  policy_type: Aggregation
  time_window_size: 900
  aggregation_functions: Avg

Import CSV Data

Данная кнопка позволяет загрузить правильно отформатированный CSV файл в DataPipe. При этом все данные пройдут проверки, указанные в YML конфигурации. Важным условие является отсортированность по create_datetime и end_window_datetime в asc - первая запись старше чем последняя.

Далее примеры файлов для разных policy_type, с минимальным набором полей для импорта:

NRecords

csv
state,create_datetime
9.95,2025-09-10 08:27:00
4.21,2025-09-10 09:27:00
5.69,2025-09-10 10:27:00
1.43,2025-09-10 11:27:00
7.84,2025-09-10 12:27:00
6.47,2025-09-10 13:27:00
8.93,2025-09-10 14:27:00
1.85,2025-09-10 15:27:00
7.26,2025-09-10 16:27:00
2.96,2025-09-10 17:27:00
6.58,2025-09-10 18:27:00

TimeWindow

csv
state,create_datetime
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:42
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:44
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:46
"{""level"": ""warning"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:48
"{""level"": ""info"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:50
"{""level"": ""error"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:52
"{""level"": ""error"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:54
"{""level"": ""info"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:56
"{""level"": ""warning"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:58
"{""level"": ""warning"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:24:00
"{""level"": ""warning"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:24:02
"{""level"": ""error"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:24:04
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:24:06

Aggregation

csv
state,create_datetime,start_window_datetime,end_window_datetime
-15.13,2025-09-13 02:08:00,2025-09-13 02:07:00,2025-09-13 02:08:00
-5.91,2025-09-13 02:09:00,2025-09-13 02:08:00,2025-09-13 02:09:00
8.3,2025-09-13 02:10:00,2025-09-13 02:09:00,2025-09-13 02:10:00
-8.72,2025-09-13 02:11:00,2025-09-13 02:10:00,2025-09-13 02:11:00
7.56,2025-09-13 02:12:00,2025-09-13 02:11:00,2025-09-13 02:12:00
-15.75,2025-09-13 02:13:00,2025-09-13 02:12:00,2025-09-13 02:13:00
-0.94,2025-09-13 02:14:00,2025-09-13 02:13:00,2025-09-13 02:14:00
-1.45,2025-09-13 02:15:00,2025-09-13 02:14:00,2025-09-13 02:15:00
1.49,2025-09-13 02:16:00,2025-09-13 02:15:00,2025-09-13 02:16:00
-12.69,2025-09-13 02:17:00,2025-09-13 02:16:00,2025-09-13 02:17:00
-6.51,2025-09-13 02:18:00,2025-09-13 02:17:00,2025-09-13 02:18:00

LastValue

Не предусмотрен