DataPipe
Как найти DataPipe?
Кнопки:
- Кнопка
Import YML Config- позволяет загрузить конвейер с вашего локального устройства в редкатор - Кнопка
Export YML Config- позволяет скачать конвейер, который на данный момент актуален на сервере - Кнопка
Export CSV Data- позволяет скачать все накопленные данные в форматеCSV, если данных нет выдаст ошибку - Кнопка
Set Config- позволяет отправить на сервер новый конвейер - Кнопка
Del Saved Pipe Data- позволяет удалить все данные которые получились в результате работы конвейера, для данного UnitNode
Особенности редактора:
- Редактор выводит базовые ошибки синтаксиса YML
- Редактор умеет выводить
бизнес ошибки, если формат не соответствует структуре ожидаемой сервером - При попытке отправки неверного YML конфига, так же будет выведена ошибка
Значение параметров
Всего есть 4 основных этапа конвейера: active_period, filtres, transformations, processing_policy
active_period
Основная цель данного этапа ограничить время сбора данных:
- определённый период
- начиная с определённой даты
- до определённой даты
- постоянно
| Название параметра | Возможные значения | Обязательный? | Комментарий |
|---|---|---|---|
type | DateRange, FromDate, ToDate, Permanent | Да | Выбор ограничения времени |
start | Дата в формате 2023-11-15T12:43:56Z | Обязателен для типов: DateRange и FromDate | Время начала периода DateRange или время начала работы FromDate |
end | Дата в формате 2023-11-15T12:43:56Z | Обязателен для типов: DateRange и ToDate | Время конца периода DateRange или время конца работы ToDate |
filtres
| Название параметра | Возможные значения | Обязательный? | Комментарий |
|---|---|---|---|
type_input_value | Text или Number | Да | Тип обрабатываемых данных. Number это float64 из go |
type_value_filtering | WhiteList и BlackList | Нет | Тип фильтрации значений |
filtering_values | Список только числовых или только текстовых значений | Обязателен в случае если есть type_value_filtering | Проверка завязана на тип из type_input_value |
type_value_threshold | Min, Max или Range | Нет | Тип фильтрации диапазонов числовых значений |
threshold_min | float64 из go | Обязателен если есть type_value_threshold с типами: Min и Range | Работает только с type_input_value = Number |
threshold_max | float64 из go | Обязателен если есть type_value_threshold с типами: Max и Range | Работает только с type_input_value = Number |
max_rate | 0 <= max_rate <= 86400 только целые | Да | Определят через сколько секунд будет обработано следующее сообщение. 0 без ограничения |
last_unique_check | true, false | Да | Если true пропустит только если новое значение отличается от предыдущего, по умолчанию false |
max_size | 0 <= max_size <= MQTT_MAX_PAYLOAD_SIZE * 1024 | Да | Максимальный размер сообщения, если рамер привысит, сообщение будет пропущено |
transformations
Данный этап не является обязательным, но если вы его используете, он требует знания о типе type_input_value из предыдущего этапа:
Для type_input_value = Number:
| Название параметра | Возможные значения | Обязательный? | Комментарий |
|---|---|---|---|
multiplication_ratio | float64 из go | Нет | На это число можно умножить заданное значение - линейное преобразование |
round_decimal_point | 0 <= round_decimal_point <= 7 | Нет | Сколько чисел после запятой останется ? |
Для type_input_value = Text:
| Название параметра | Возможные значения | Обязательный? | Комментарий |
|---|---|---|---|
slice_start | Целое число | Нет | Работает как первый элемент среза в python3 - any_list[slice_start:] |
slice_end | Целое число | Нет | Работает как второй элемент среза в python3 - any_list[:slice_end] |
processing_policy
| Название параметра | Возможные значения | Обязательный? | Комментарий |
|---|---|---|---|
policy_type | LastValue, NRecords, TimeWindow, Aggregation | Да | Одна из 4 политик обработки |
n_records_count | Число хранимых записей 0 < n_records_count =< 1024 | Обязателен только для policy_type = NRecords | Определяет сколько записей будет храниться |
time_window_size | Одно из значений: [60, 300, 600, 900, 1200, 1800, 3600, 7200, 10800, 14400, 21600, 28800, 43200, 86400] | Обязателен для policy_type = TimeWindow или Aggregation | Размер окна в секундах, оно должно нацело делить 86400 |
aggregation_functions | Avg, Min, Max, Sum | Обязателен для policy_type = Aggregation | Фунация на основе которой будет расчитано итоговое значение, на основе time_window_size |
Примеры DataPipe
- Функционал подразумевает в будущем отображение через
Grafana, на данном этапе доступны только выгрузки черезcsvпо кнопке, а также REST и GQL запросы, получения данных порциями.
LastValue
Например нам нужно отображать только самое последнее значение в UnitNode интерфейсе:
Подойдёт конфигурация с постоянным временем действия, которая бы не позволяла обновляться чаще чем 1 раз в 5 секунд, без проверки на уникальность, с максимальной длинной текста 100. Политика cохранения LastValue:
TIP
Данная конфигурация почти не потребляет CPU и RAM. За счёт политики LastValue. Ведь постоянно хранится только 1 запись.
active_period:
type: Permanent
filters:
type_input_value: Text
max_rate: 5
last_unique_check: false
max_size: 100
processing_policy:
policy_type: LastValueNRecords
Например нам нужно увидеть некоторые ивентовые события, открытие заслонки или включение чего-либо. Для этого нужно хранить N записей, и как только их становится больше определённого числа, затирать старые:
Подойдёт конфигурация которая начнёт действовать в определённый момент и без лимита. С текстовым типом данных. Ограничим возможные значения через WhiteList, будем сохранять все значения, но одинаковые подряд будем игнорировать. 512 записей вполне подойдёт.
TIP
Данная конфигурация почти не потребляет CPU и RAM. За счёт дедупликации и WhiteList. Так же сама политика не позволяет создать больше ~= 512 записей.
active_period:
type: FromDate
start: '2023-11-15T00:00:00+00:00'
filters:
type_input_value: Text
type_value_filtering: WhiteList
filtering_values:
- Alarm
- Error
- Warning
max_rate: 0
last_unique_check: true
max_size: 100
processing_policy:
policy_type: NRecords
n_records_count: 512TimeWindow
Например мы хотим в течении 1 недели оценивать динамику влажности в комнате с окном в 1 день:
Мы установим до какой даты мы будем проводить замеры. Определим ограничение возможного значения влажности от 0 до 100 %. Ограничимся разрешением в 1 минуту, но будем учитвать все значения. Округлим до значащих 2 цифр. И будем наблюдать за окном в 86400 секунд.
TIP
Данная конфигурация почти не потребляет CPU и RAM, за счёт разрешения max_rate. Число записей для данной политики будет постоянным ~= 1440.
active_period:
type: ToDate
end: '2025-11-15T00:00:00+00:00'
filters:
type_input_value: Number
type_value_threshold: Range
threshold_min: 0
threshold_max: 100
max_rate: 60
last_unique_check: false
max_size: 50
transformations:
round_decimal_point: 2
processing_policy:
policy_type: TimeWindow
time_window_size: 86400Aggregation
Например нам нужно накапливать статистику с датчика температуры ds18b20, мы не хотим тратить большой обьём памяти, но нам нужны данные за очень длительный период c разрешением 15 минут:
Подойдёт концигурация с заданным периодом действия, ограничением ошибочных значений, ограничением реальных значений, проверкой всех значений, дедупликацией. Ограничим до 2 значащих цифр, ведь точность датчика 12 бит. Выставим период 900 секунд, а функцию расчёта как среднее.
TIP
Данная конфигурация почти не потребяет CPU и RAM, за счёт применения агрегации. Данный DataPipe будет создвать 96 записей в бд в день.
active_period:
type: DateRange
start: '2023-11-15T00:00:00+00:00'
end: '2200-11-15T00:00:00+00:00'
filters:
type_input_value: Number
type_value_filtering: BlackList
filtering_values:
- -127.00
- 85.00
type_value_threshold: Range
threshold_min: -50
threshold_max: 120
max_rate: 0
last_unique_check: true
max_size: 100
transformations:
round_decimal_point: 2
processing_policy:
policy_type: Aggregation
time_window_size: 900
aggregation_functions: AvgImport CSV Data
Данная кнопка позволяет загрузить правильно отформатированный CSV файл в DataPipe. При этом все данные пройдут проверки, указанные в YML конфигурации. Важным условие является отсортированность по create_datetime и end_window_datetime в asc - первая запись старше чем последняя.
Далее примеры файлов для разных policy_type, с минимальным набором полей для импорта:
NRecords
state,create_datetime
9.95,2025-09-10 08:27:00
4.21,2025-09-10 09:27:00
5.69,2025-09-10 10:27:00
1.43,2025-09-10 11:27:00
7.84,2025-09-10 12:27:00
6.47,2025-09-10 13:27:00
8.93,2025-09-10 14:27:00
1.85,2025-09-10 15:27:00
7.26,2025-09-10 16:27:00
2.96,2025-09-10 17:27:00
6.58,2025-09-10 18:27:00TimeWindow
state,create_datetime
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:42
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:44
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:46
"{""level"": ""warning"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:48
"{""level"": ""info"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:50
"{""level"": ""error"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:52
"{""level"": ""error"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:54
"{""level"": ""info"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:56
"{""level"": ""warning"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:58
"{""level"": ""warning"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:24:00
"{""level"": ""warning"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:24:02
"{""level"": ""error"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:24:04
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:24:06Aggregation
state,create_datetime,start_window_datetime,end_window_datetime
-15.13,2025-09-13 02:08:00,2025-09-13 02:07:00,2025-09-13 02:08:00
-5.91,2025-09-13 02:09:00,2025-09-13 02:08:00,2025-09-13 02:09:00
8.3,2025-09-13 02:10:00,2025-09-13 02:09:00,2025-09-13 02:10:00
-8.72,2025-09-13 02:11:00,2025-09-13 02:10:00,2025-09-13 02:11:00
7.56,2025-09-13 02:12:00,2025-09-13 02:11:00,2025-09-13 02:12:00
-15.75,2025-09-13 02:13:00,2025-09-13 02:12:00,2025-09-13 02:13:00
-0.94,2025-09-13 02:14:00,2025-09-13 02:13:00,2025-09-13 02:14:00
-1.45,2025-09-13 02:15:00,2025-09-13 02:14:00,2025-09-13 02:15:00
1.49,2025-09-13 02:16:00,2025-09-13 02:15:00,2025-09-13 02:16:00
-12.69,2025-09-13 02:17:00,2025-09-13 02:16:00,2025-09-13 02:17:00
-6.51,2025-09-13 02:18:00,2025-09-13 02:17:00,2025-09-13 02:18:00LastValue
Не предусмотрен