Skip to content

DataPipe

How to find DataPipe?

  1. Find the Unit you are interested in visually or via the search tab
  2. Right-click (RMB) on the Unit if UnitNode has not yet been displayed
  3. Left-click (LMB) on the Input or Output UnitNode
  4. Click the yellow DataPipe button
Name of the buttonPurpose
Import YML ConfigUpload a pipeline from your local device into the editor
Export YML ConfigDownload the pipeline that is currently active on the server
Import CSV DataUpload data from your local device into DataPipe
Export CSV DataDownload all accumulated data in CSV format; if there is no data, an error will be returned
Set ConfigSend a new pipeline configuration to the server
Del Saved Pipe DataDelete all data that was produced by the pipeline for this UnitNode

Editor features:

  1. The editor shows basic YML syntax errors
  2. The editor can show business errors if the format does not match the structure expected by the server
  3. When you try to send an invalid YML config, an error will also be shown

Meaning of parameters

There are 4 main stages of the pipeline: active_period, filtres, transformations and processing_policy

active_period

The main goal of this stage is to limit the period of data collection:

  • for a specific period
  • starting from a specific date
  • until a specific date
  • permanently
Parameter namePossible valuesRequired?Comment
typeDateRange, FromDate, ToDate, PermanentYesChoose how to limit the time
startDatetime in format 2023-11-15T12:43:56ZRequired for types: DateRange and FromDateStart time of the DateRange period or the start time of the FromDate mode
endDatetime in format 2023-11-15T12:43:56ZRequired for types: DateRange and ToDateEnd time of the DateRange period or the end time of the ToDate mode

filtres

Parameter namePossible valuesRequired?Comment
type_input_valueText or NumberYesType of the processed data. Number is float64 from Golang
type_value_filteringWhiteList and BlackListNoType of value filtering
filtering_valuesList of only numeric or only text valuesRequired if type_value_filtering is setValidation is tied to the type from type_input_value
type_value_thresholdMin, Max or RangeNoType of filtering for numeric value ranges
threshold_minfloat64 from GolangRequired if type_value_threshold is Min or RangeWorks only when type_input_value = Number
threshold_maxfloat64 from GolangRequired if type_value_threshold is Max or RangeWorks only when type_input_value = Number
max_rate0 <= max_rate <= 86400 integers onlyYesDefines after how many seconds the next message will be processed. 0 means no limit
last_unique_checktrue, falseYesIf true, only passes a new value if it differs from the previous one. Default is false
max_size0 <= max_size <= MQTT_MAX_PAYLOAD_SIZE * 1024YesMaximum message size; if exceeded, the message will be dropped

transformations

This stage is not mandatory, but if you use it, you need to know the type_input_value type from the previous stage:

For type_input_value = Number:

Parameter namePossible valuesRequired?Comment
multiplication_ratiofloat64 from GolangNoValue to multiply the incoming value by — a linear transformation
round_decimal_point0 <= round_decimal_point <= 7NoHow many digits after the decimal point should remain

For type_input_value = Text:

Parameter namePossible valuesRequired?Comment
slice_startIntegerNoWorks like the first index in a Python 3 slice: any_list[slice_start:]
slice_endIntegerNoWorks like the second index in a Python 3 slice: any_list[:slice_end]

processing_policy

Parameter namePossible valuesRequired?Comment
policy_typeLastValue, NRecords, TimeWindow, AggregationYesOne of the 4 processing policies
n_records_countNumber of stored records 0 < n_records_count =< 1024Required only for policy_type = NRecordsDefines how many records will be stored
time_window_sizeOne of: [60, 300, 600, 900, 1200, 1800, 3600, 7200, 10800, 14400, 21600, 28800, 43200, 86400]Required for policy_type = TimeWindow or AggregationWindow size in seconds; it must evenly divide 86400
aggregation_functionsAvg, Min, Max, SumRequired for policy_type = AggregationFunction used to calculate the resulting value based on time_window_size