Skip to main content

Flow Builder

Build and manage multi-step data transformation pipelines.

Overview

The Flow Builder allows you to create sequential data transformation steps that process dataset records. Common flow step types include:

  • Filter - Remove records based on criteria
  • Transform - Modify field values
  • Aggregate - Group and summarize data
  • Join - Combine with other datasets
  • Sort - Order records
  • Limit - Restrict record count

GET /api/datasets/{id}/flow

Get the complete flow definition for a dataset.

Authentication: Required

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug

Response:

{
"flow": [
{
"type": "filter",
"index": 0,
"name": "Filter Active Records",
"criteria": {
"status": "active"
}
},
{
"type": "transform",
"index": 1,
"name": "Calculate Profit",
"field": "profit",
"expression": "revenue - cost"
},
{
"type": "aggregate",
"index": 2,
"name": "Sum by Region",
"groupBy": ["region"],
"aggregations": [
{
"field": "profit",
"function": "sum",
"alias": "total_profit"
}
]
}
]
}

Flow Step Properties:

PropertyTypeDescription
typestringStep type
indexintegerPosition in flow (0-based)
namestringStep description
Additional properties vary by step type

PUT /api/datasets/{id}/flow

Update the entire flow definition.

Authentication: Required

Permission: dataset:write

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug

Request Body:

Array of flow step objects.

Example Request:

{
"flow": [
{
"type": "filter",
"name": "Active Orders Only",
"criteria": {
"$and": [
{ "status": "active" },
{ "order_date": { "$gte": "2024-01-01" } }
]
}
},
{
"type": "transform",
"name": "Add Tax",
"field": "total_with_tax",
"expression": "amount * 1.08"
},
{
"type": "sort",
"name": "Sort by Amount",
"field": "amount",
"order": "desc"
},
{
"type": "limit",
"name": "Top 100",
"count": 100
}
]
}

Response:

Returns the updated dataset with new flow.

Behavior:

  • Replaces entire flow definition
  • Step indices are automatically assigned (0, 1, 2, ...)
  • Flow is executed during dataset refresh
  • Steps are processed sequentially

DELETE /api/datasets/{id}/flow

Delete the entire flow (reset to no transformations).

Authentication: Required

Permission: dataset:write

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug

Response:

204 No Content

Use Case:

Remove all transformations and return dataset to raw query results.


POST /api/datasets/{id}/flow

Add a new step to the end of the flow.

Authentication: Required

Permission: dataset:write

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug

Request Body:

Single flow step object (without index).

Example Request:

{
"type": "filter",
"name": "High Value Only",
"criteria": {
"amount": { "$gte": 10000 }
}
}

Response:

{
"type": "filter",
"index": 3,
"name": "High Value Only",
"criteria": {
"amount": { "$gte": 10000 }
}
}

Behavior:

  • Appends step to end of flow
  • Assigns next available index
  • Returns the created step with assigned index

GET /api/datasets/{id}/flow/{index}

Get a single flow step by index.

Authentication: Required

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug
indexintegerStep index (0-based)

Response:

{
"type": "filter",
"index": 0,
"name": "Filter Active Records",
"criteria": {
"status": "active"
}
}

PUT /api/datasets/{id}/flow/{index}

Update a specific flow step.

Authentication: Required

Permission: dataset:write

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug
indexintegerStep index (0-based)

Request Body:

Updated flow step object (index is ignored).

Example Request:

{
"type": "filter",
"name": "Filter Active and Recent Records",
"criteria": {
"$and": [
{ "status": "active" },
{ "created_at": { "$gte": "2024-01-01" } }
]
}
}

Response:

Returns the updated flow step.


DELETE /api/datasets/{id}/flow/{index}

Delete a specific flow step.

Authentication: Required

Permission: dataset:write

Path Parameters:

ParameterTypeDescription
idstringDataset ID or slug
indexintegerStep index (0-based)

Response:

204 No Content

Behavior:

  • Removes step at specified index
  • Remaining steps are re-indexed sequentially
  • Flow continues to execute with remaining steps

Flow Step Types

Filter Step

Remove records based on criteria.

{
"type": "filter",
"name": "Active High-Value Orders",
"criteria": {
"$and": [
{ "status": "active" },
{ "amount": { "$gte": 1000 } }
]
}
}

Properties:

  • criteria - Filter criteria (same syntax as saved filters)

Transform Step

Calculate new field or modify existing field.

{
"type": "transform",
"name": "Calculate Margin",
"field": "margin",
"expression": "(revenue - cost) / revenue * 100",
"dataType": "number"
}

Properties:

  • field - Field name to create/update
  • expression - Calculation expression
  • dataType - Result data type

Aggregate Step

Group records and calculate aggregations.

{
"type": "aggregate",
"name": "Sum by Region and Category",
"groupBy": ["region", "category"],
"aggregations": [
{
"field": "amount",
"function": "sum",
"alias": "total_amount"
},
{
"field": "amount",
"function": "avg",
"alias": "avg_amount"
},
{
"field": "order_id",
"function": "count",
"alias": "order_count"
}
]
}

Properties:

  • groupBy - Array of fields to group by
  • aggregations - Array of aggregation definitions

Aggregation Functions:

  • sum - Sum of values
  • avg - Average of values
  • min - Minimum value
  • max - Maximum value
  • count - Count of records

Sort Step

Order records by field.

{
"type": "sort",
"name": "Sort by Amount Descending",
"field": "amount",
"order": "desc"
}

Properties:

  • field - Field to sort by
  • order - Sort direction: asc or desc

Limit Step

Restrict number of records.

{
"type": "limit",
"name": "Top 100 Records",
"count": 100,
"offset": 0
}

Properties:

  • count - Number of records to keep
  • offset - Number of records to skip

Join Step

Combine with another dataset.

{
"type": "join",
"name": "Join with Customer Data",
"datasetId": "customers",
"joinType": "left",
"leftField": "customer_id",
"rightField": "id",
"fields": ["customer_name", "customer_email", "customer_region"]
}

Properties:

  • datasetId - Dataset to join with
  • joinType - Join type: inner, left, right, outer
  • leftField - Field in current dataset
  • rightField - Field in joined dataset
  • fields - Fields to include from joined dataset

Deduplicate Step

Remove duplicate records.

{
"type": "deduplicate",
"name": "Remove Duplicate Customers",
"fields": ["customer_id"],
"keepFirst": true
}

Properties:

  • fields - Fields to use for duplicate detection
  • keepFirst - Keep first occurrence (true) or last (false)

Flow Execution

Execution Order

Flow steps execute sequentially:

  1. Base query runs (from datasource)
  2. Step 0 executes (operates on query results)
  3. Step 1 executes (operates on step 0 results)
  4. Step 2 executes (operates on step 1 results)
  5. Final results indexed to Elasticsearch

Preview Results

Preview results at each step:

// Get data after step 2
const preview = await GET('/api/datasets/sales-2024/data?flowIndex=2&limit=100');

Performance

  • Filter early - Place filter steps first to reduce record count
  • Minimize transformations - Complex expressions slow execution
  • Index group fields - For faster aggregations
  • Limit before sort - Sort fewer records
  • Test incrementally - Add steps one at a time and test

Flow Examples

ETL Pipeline

[
{
"type": "filter",
"name": "Remove Test Records",
"criteria": { "is_test": false }
},
{
"type": "transform",
"name": "Standardize Region",
"field": "region",
"expression": "UPPER(region)"
},
{
"type": "transform",
"name": "Calculate Age",
"field": "days_old",
"expression": "(NOW() - created_at) / 86400000"
},
{
"type": "deduplicate",
"name": "Remove Duplicates",
"fields": ["order_id"]
}
]

Aggregation Pipeline

[
{
"type": "filter",
"name": "Current Year",
"criteria": {
"order_date": { "$gte": "2024-01-01" }
}
},
{
"type": "aggregate",
"name": "Sum by Month and Region",
"groupBy": ["month", "region"],
"aggregations": [
{ "field": "amount", "function": "sum", "alias": "revenue" },
{ "field": "order_id", "function": "count", "alias": "orders" }
]
},
{
"type": "transform",
"name": "Average Order Value",
"field": "avg_order_value",
"expression": "revenue / orders"
},
{
"type": "sort",
"name": "Sort by Revenue",
"field": "revenue",
"order": "desc"
}
]

Top N Analysis

[
{
"type": "filter",
"name": "Completed Orders",
"criteria": { "status": "completed" }
},
{
"type": "aggregate",
"name": "Sum by Customer",
"groupBy": ["customer_id", "customer_name"],
"aggregations": [
{ "field": "amount", "function": "sum", "alias": "total_spent" }
]
},
{
"type": "sort",
"name": "Sort by Total Spent",
"field": "total_spent",
"order": "desc"
},
{
"type": "limit",
"name": "Top 50 Customers",
"count": 50
}
]

Best Practices

Flow Design

  • Plan ahead - Sketch flow on paper before implementing
  • Keep it simple - Fewer steps = easier maintenance
  • Name steps clearly - Describe what each step does
  • Test incrementally - Verify results after each step
  • Document complex logic - Add comments in step names

Performance Optimization

  • Filter first - Reduce data volume early
  • Aggregate wisely - Only group by necessary fields
  • Limit transformations - Use calculated fields instead when possible
  • Sort strategically - Sort on indexed fields
  • Monitor execution time - Benchmark complex flows

Data Quality

  • Validate inputs - Filter out invalid data early
  • Handle nulls - Use conditional expressions for null-safe transforms
  • Deduplicate carefully - Understand which record to keep
  • Test edge cases - Empty results, single record, large volumes

Maintainability

  • Version flows - Use drafts for testing changes
  • Document dependencies - Note if flow depends on specific field names
  • Keep flows reusable - Use parameters instead of hardcoded values
  • Review regularly - Update flows as business logic changes
Flow vs. Calculated Fields

Use flows for multi-record operations (filtering, aggregating, sorting). Use calculated fields for single-record computations (formulas, concatenations).