DataPipe
Как найти DataPipe?
Кнопки:
- Кнопка
Import YML Config
- позволяет загрузить конвейер с вашего локального устройства в редкатор - Кнопка
Export YML Config
- позволяет скачать конвейер, который на данный момент актуален на сервере - Кнопка
Export CSV Data
- позволяет скачать все накопленные данные в форматеCSV
, если данных нет выдаст ошибку - Кнопка
Set Config
- позволяет отправить на сервер новый конвейер - Кнопка
Del Saved Pipe Data
- позволяет удалить все данные которые получились в результате работы конвейера, для данного UnitNode
Особенности редактора:
- Редактор выводит базовые ошибки синтаксиса YML
- Редактор умеет выводить
бизнес ошибки
, если формат не соответствует структуре ожидаемой сервером - При попытке отправки неверного YML конфига, так же будет выведена ошибка
Значение параметров
Всего есть 4 основных этапа конвейера: active_period
, filtres
, transformations
, processing_policy
active_period
Основная цель данного этапа ограничить время сбора данных:
- определённый период
- начиная с определённой даты
- до определённой даты
- постоянно
Название параметра | Возможные значения | Обязательный? | Комментарий |
---|---|---|---|
type | DateRange , FromDate , ToDate , Permanent | Да | Выбор ограничения времени |
start | Дата в формате 2023-11-15T12:43:56Z | Обязателен для типов: DateRange и FromDate | Время начала периода DateRange или время начала работы FromDate |
end | Дата в формате 2023-11-15T12:43:56Z | Обязателен для типов: DateRange и ToDate | Время конца периода DateRange или время конца работы ToDate |
filtres
Название параметра | Возможные значения | Обязательный? | Комментарий |
---|---|---|---|
type_input_value | Text или Number | Да | Тип обрабатываемых данных. Number это float64 из go |
type_value_filtering | WhiteList и BlackList | Нет | Тип фильтрации значений |
filtering_values | Список только числовых или только текстовых значений | Обязателен в случае если есть type_value_filtering | Проверка завязана на тип из type_input_value |
type_value_threshold | Min , Max или Range | Нет | Тип фильтрации диапазонов числовых значений |
threshold_min | float64 из go | Обязателен если есть type_value_threshold с типами: Min и Range | Работает только с type_input_value = Number |
threshold_max | float64 из go | Обязателен если есть type_value_threshold с типами: Max и Range | Работает только с type_input_value = Number |
max_rate | 0 <= max_rate <= 86400 только целые | Да | Определят через сколько секунд будет обработано следующее сообщение. 0 без ограничения |
last_unique_check | true , false | Да | Если true пропустит только если новое значение отличается от предыдущего, по умолчанию false |
max_size | 0 <= max_size <= MQTT_MAX_PAYLOAD_SIZE * 1024 | Да | Максимальный размер сообщения, если рамер привысит, сообщение будет пропущено |
transformations
Данный этап не является обязательным, но если вы его используете, он требует знания о типе type_input_value
из предыдущего этапа:
Для type_input_value
= Number
:
Название параметра | Возможные значения | Обязательный? | Комментарий |
---|---|---|---|
multiplication_ratio | float64 из go | Нет | На это число можно умножить заданное значение - линейное преобразование |
round_decimal_point | 0 <= round_decimal_point <= 7 | Нет | Сколько чисел после запятой останется ? |
Для type_input_value
= Text
:
Название параметра | Возможные значения | Обязательный? | Комментарий |
---|---|---|---|
slice_start | Целое число | Нет | Работает как первый элемент среза в python3 - any_list[slice_start:] |
slice_end | Целое число | Нет | Работает как второй элемент среза в python3 - any_list[:slice_end] |
processing_policy
Название параметра | Возможные значения | Обязательный? | Комментарий |
---|---|---|---|
policy_type | LastValue , NRecords , TimeWindow , Aggregation | Да | Одна из 4 политик обработки |
n_records_count | Число хранимых записей 0 < n_records_count =< 1024 | Обязателен только для policy_type = NRecords | Определяет сколько записей будет храниться |
time_window_size | Одно из значений: [60, 300, 600, 900, 1200, 1800, 3600, 7200, 10800, 14400, 21600, 28800, 43200, 86400] | Обязателен для policy_type = TimeWindow или Aggregation | Размер окна в секундах, оно должно нацело делить 86400 |
aggregation_functions | Avg , Min , Max , Sum | Обязателен для policy_type = Aggregation | Фунация на основе которой будет расчитано итоговое значение, на основе time_window_size |
Примеры DataPipe
- Функционал подразумевает в будущем отображение через
Grafana
, на данном этапе доступны только выгрузки черезcsv
по кнопке, а также REST и GQL запросы, получения данных порциями.
LastValue
Например нам нужно отображать только самое последнее значение в UnitNode интерфейсе:
Подойдёт конфигурация с постоянным временем действия, которая бы не позволяла обновляться чаще чем 1
раз в 5
секунд, без проверки на уникальность, с максимальной длинной текста 100
. Политика cохранения LastValue
:
TIP
Данная конфигурация почти не потребляет CPU
и RAM
. За счёт политики LastValue
. Ведь постоянно хранится только 1
запись.
active_period:
type: Permanent
filters:
type_input_value: Text
max_rate: 5
last_unique_check: false
max_size: 100
processing_policy:
policy_type: LastValue
NRecords
Например нам нужно увидеть некоторые ивентовые события, открытие заслонки или включение чего-либо. Для этого нужно хранить N записей, и как только их становится больше определённого числа, затирать старые:
Подойдёт конфигурация которая начнёт действовать в определённый момент и без лимита. С текстовым типом данных. Ограничим возможные значения через WhiteList
, будем сохранять все значения, но одинаковые подряд будем игнорировать. 512
записей вполне подойдёт.
TIP
Данная конфигурация почти не потребляет CPU
и RAM
. За счёт дедупликации и WhiteList
. Так же сама политика не позволяет создать больше ~= 512
записей.
active_period:
type: FromDate
start: '2023-11-15T00:00:00+00:00'
filters:
type_input_value: Text
type_value_filtering: WhiteList
filtering_values:
- Alarm
- Error
- Warning
max_rate: 0
last_unique_check: true
max_size: 100
processing_policy:
policy_type: NRecords
n_records_count: 512
TimeWindow
Например мы хотим в течении 1
недели оценивать динамику влажности в комнате с окном в 1
день:
Мы установим до какой даты мы будем проводить замеры. Определим ограничение возможного значения влажности от 0
до 100 %
. Ограничимся разрешением в 1
минуту, но будем учитвать все значения. Округлим до значащих 2
цифр. И будем наблюдать за окном в 86400
секунд.
TIP
Данная конфигурация почти не потребляет CPU
и RAM
, за счёт разрешения max_rate
. Число записей для данной политики будет постоянным ~= 1440
.
active_period:
type: ToDate
end: '2025-11-15T00:00:00+00:00'
filters:
type_input_value: Number
type_value_threshold: Range
threshold_min: 0
threshold_max: 100
max_rate: 60
last_unique_check: false
max_size: 50
transformations:
round_decimal_point: 2
processing_policy:
policy_type: TimeWindow
time_window_size: 86400
Aggregation
Например нам нужно накапливать статистику с датчика температуры ds18b20
, мы не хотим тратить большой обьём памяти, но нам нужны данные за очень длительный период c разрешением 15
минут:
Подойдёт концигурация с заданным периодом действия, ограничением ошибочных значений, ограничением реальных значений, проверкой всех значений, дедупликацией. Ограничим до 2
значащих цифр, ведь точность датчика 12 бит
. Выставим период 900
секунд, а функцию расчёта как среднее.
TIP
Данная конфигурация почти не потребяет CPU
и RAM
, за счёт применения агрегации. Данный DataPipe
будет создвать 96
записей в бд в день.
active_period:
type: DateRange
start: '2023-11-15T00:00:00+00:00'
end: '2200-11-15T00:00:00+00:00'
filters:
type_input_value: Number
type_value_filtering: BlackList
filtering_values:
- -127.00
- 85.00
type_value_threshold: Range
threshold_min: -50
threshold_max: 120
max_rate: 0
last_unique_check: true
max_size: 100
transformations:
round_decimal_point: 2
processing_policy:
policy_type: Aggregation
time_window_size: 900
aggregation_functions: Avg
Import CSV Data
Данная кнопка позволяет загрузить правильно отформатированный CSV
файл в DataPipe. При этом все данные пройдут проверки, указанные в YML
конфигурации. Важным условие является отсортированность по create_datetime
и end_window_datetime
в asc - первая запись старше чем последняя.
Далее примеры файлов для разных policy_type
, с минимальным набором полей для импорта:
NRecords
state,create_datetime
9.95,2025-09-10 08:27:00
4.21,2025-09-10 09:27:00
5.69,2025-09-10 10:27:00
1.43,2025-09-10 11:27:00
7.84,2025-09-10 12:27:00
6.47,2025-09-10 13:27:00
8.93,2025-09-10 14:27:00
1.85,2025-09-10 15:27:00
7.26,2025-09-10 16:27:00
2.96,2025-09-10 17:27:00
6.58,2025-09-10 18:27:00
TimeWindow
state,create_datetime
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:42
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:44
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:46
"{""level"": ""warning"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:48
"{""level"": ""info"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:50
"{""level"": ""error"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:52
"{""level"": ""error"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:54
"{""level"": ""info"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:23:56
"{""level"": ""warning"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:23:58
"{""level"": ""warning"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:24:00
"{""level"": ""warning"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:24:02
"{""level"": ""error"", ""TitleMessage"": ""Test Info Two""}",2025-09-14 11:24:04
"{""level"": ""info"", ""TitleMessage"": ""Test Info One""}",2025-09-14 11:24:06
Aggregation
state,create_datetime,start_window_datetime,end_window_datetime
-15.13,2025-09-13 02:08:00,2025-09-13 02:07:00,2025-09-13 02:08:00
-5.91,2025-09-13 02:09:00,2025-09-13 02:08:00,2025-09-13 02:09:00
8.3,2025-09-13 02:10:00,2025-09-13 02:09:00,2025-09-13 02:10:00
-8.72,2025-09-13 02:11:00,2025-09-13 02:10:00,2025-09-13 02:11:00
7.56,2025-09-13 02:12:00,2025-09-13 02:11:00,2025-09-13 02:12:00
-15.75,2025-09-13 02:13:00,2025-09-13 02:12:00,2025-09-13 02:13:00
-0.94,2025-09-13 02:14:00,2025-09-13 02:13:00,2025-09-13 02:14:00
-1.45,2025-09-13 02:15:00,2025-09-13 02:14:00,2025-09-13 02:15:00
1.49,2025-09-13 02:16:00,2025-09-13 02:15:00,2025-09-13 02:16:00
-12.69,2025-09-13 02:17:00,2025-09-13 02:16:00,2025-09-13 02:17:00
-6.51,2025-09-13 02:18:00,2025-09-13 02:17:00,2025-09-13 02:18:00
LastValue
Не предусмотрен