Flow Builder
Build and manage multi-step data transformation pipelines.
Overview
The Flow Builder allows you to create sequential data transformation steps that process dataset records. Common flow step types include:
- Filter - Remove records based on criteria
- Transform - Modify field values
- Aggregate - Group and summarize data
- Join - Combine with other datasets
- Sort - Order records
- Limit - Restrict record count
GET /api/datasets/{id}/flow
Get the complete flow definition for a dataset.
Authentication: Required
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
Response:
{
"flow": [
{
"type": "filter",
"index": 0,
"name": "Filter Active Records",
"criteria": {
"status": "active"
}
},
{
"type": "transform",
"index": 1,
"name": "Calculate Profit",
"field": "profit",
"expression": "revenue - cost"
},
{
"type": "aggregate",
"index": 2,
"name": "Sum by Region",
"groupBy": ["region"],
"aggregations": [
{
"field": "profit",
"function": "sum",
"alias": "total_profit"
}
]
}
]
}
Flow Step Properties:
| Property | Type | Description |
|---|---|---|
type | string | Step type |
index | integer | Position in flow (0-based) |
name | string | Step description |
| Additional properties vary by step type |
PUT /api/datasets/{id}/flow
Update the entire flow definition.
Authentication: Required
Permission: dataset:write
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
Request Body:
Array of flow step objects.
Example Request:
{
"flow": [
{
"type": "filter",
"name": "Active Orders Only",
"criteria": {
"$and": [
{ "status": "active" },
{ "order_date": { "$gte": "2024-01-01" } }
]
}
},
{
"type": "transform",
"name": "Add Tax",
"field": "total_with_tax",
"expression": "amount * 1.08"
},
{
"type": "sort",
"name": "Sort by Amount",
"field": "amount",
"order": "desc"
},
{
"type": "limit",
"name": "Top 100",
"count": 100
}
]
}
Response:
Returns the updated dataset with new flow.
Behavior:
- Replaces entire flow definition
- Step indices are automatically assigned (0, 1, 2, ...)
- Flow is executed during dataset refresh
- Steps are processed sequentially
DELETE /api/datasets/{id}/flow
Delete the entire flow (reset to no transformations).
Authentication: Required
Permission: dataset:write
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
Response:
204 No Content
Use Case:
Remove all transformations and return dataset to raw query results.
POST /api/datasets/{id}/flow
Add a new step to the end of the flow.
Authentication: Required
Permission: dataset:write
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
Request Body:
Single flow step object (without index).
Example Request:
{
"type": "filter",
"name": "High Value Only",
"criteria": {
"amount": { "$gte": 10000 }
}
}
Response:
{
"type": "filter",
"index": 3,
"name": "High Value Only",
"criteria": {
"amount": { "$gte": 10000 }
}
}
Behavior:
- Appends step to end of flow
- Assigns next available index
- Returns the created step with assigned index
GET /api/datasets/{id}/flow/{index}
Get a single flow step by index.
Authentication: Required
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
index | integer | Step index (0-based) |
Response:
{
"type": "filter",
"index": 0,
"name": "Filter Active Records",
"criteria": {
"status": "active"
}
}
PUT /api/datasets/{id}/flow/{index}
Update a specific flow step.
Authentication: Required
Permission: dataset:write
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
index | integer | Step index (0-based) |
Request Body:
Updated flow step object (index is ignored).
Example Request:
{
"type": "filter",
"name": "Filter Active and Recent Records",
"criteria": {
"$and": [
{ "status": "active" },
{ "created_at": { "$gte": "2024-01-01" } }
]
}
}
Response:
Returns the updated flow step.
DELETE /api/datasets/{id}/flow/{index}
Delete a specific flow step.
Authentication: Required
Permission: dataset:write
Path Parameters:
| Parameter | Type | Description |
|---|---|---|
id | string | Dataset ID or slug |
index | integer | Step index (0-based) |
Response:
204 No Content
Behavior:
- Removes step at specified index
- Remaining steps are re-indexed sequentially
- Flow continues to execute with remaining steps
Flow Step Types
Filter Step
Remove records based on criteria.
{
"type": "filter",
"name": "Active High-Value Orders",
"criteria": {
"$and": [
{ "status": "active" },
{ "amount": { "$gte": 1000 } }
]
}
}
Properties:
criteria- Filter criteria (same syntax as saved filters)
Transform Step
Calculate new field or modify existing field.
{
"type": "transform",
"name": "Calculate Margin",
"field": "margin",
"expression": "(revenue - cost) / revenue * 100",
"dataType": "number"
}
Properties:
field- Field name to create/updateexpression- Calculation expressiondataType- Result data type
Aggregate Step
Group records and calculate aggregations.
{
"type": "aggregate",
"name": "Sum by Region and Category",
"groupBy": ["region", "category"],
"aggregations": [
{
"field": "amount",
"function": "sum",
"alias": "total_amount"
},
{
"field": "amount",
"function": "avg",
"alias": "avg_amount"
},
{
"field": "order_id",
"function": "count",
"alias": "order_count"
}
]
}
Properties:
groupBy- Array of fields to group byaggregations- Array of aggregation definitions
Aggregation Functions:
sum- Sum of valuesavg- Average of valuesmin- Minimum valuemax- Maximum valuecount- Count of records
Sort Step
Order records by field.
{
"type": "sort",
"name": "Sort by Amount Descending",
"field": "amount",
"order": "desc"
}
Properties:
field- Field to sort byorder- Sort direction:ascordesc
Limit Step
Restrict number of records.
{
"type": "limit",
"name": "Top 100 Records",
"count": 100,
"offset": 0
}
Properties:
count- Number of records to keepoffset- Number of records to skip
Join Step
Combine with another dataset.
{
"type": "join",
"name": "Join with Customer Data",
"datasetId": "customers",
"joinType": "left",
"leftField": "customer_id",
"rightField": "id",
"fields": ["customer_name", "customer_email", "customer_region"]
}
Properties:
datasetId- Dataset to join withjoinType- Join type:inner,left,right,outerleftField- Field in current datasetrightField- Field in joined datasetfields- Fields to include from joined dataset
Deduplicate Step
Remove duplicate records.
{
"type": "deduplicate",
"name": "Remove Duplicate Customers",
"fields": ["customer_id"],
"keepFirst": true
}
Properties:
fields- Fields to use for duplicate detectionkeepFirst- Keep first occurrence (true) or last (false)
Flow Execution
Execution Order
Flow steps execute sequentially:
- Base query runs (from datasource)
- Step 0 executes (operates on query results)
- Step 1 executes (operates on step 0 results)
- Step 2 executes (operates on step 1 results)
- Final results indexed to Elasticsearch
Preview Results
Preview results at each step:
// Get data after step 2
const preview = await GET('/api/datasets/sales-2024/data?flowIndex=2&limit=100');
Performance
- Filter early - Place filter steps first to reduce record count
- Minimize transformations - Complex expressions slow execution
- Index group fields - For faster aggregations
- Limit before sort - Sort fewer records
- Test incrementally - Add steps one at a time and test
Flow Examples
ETL Pipeline
[
{
"type": "filter",
"name": "Remove Test Records",
"criteria": { "is_test": false }
},
{
"type": "transform",
"name": "Standardize Region",
"field": "region",
"expression": "UPPER(region)"
},
{
"type": "transform",
"name": "Calculate Age",
"field": "days_old",
"expression": "(NOW() - created_at) / 86400000"
},
{
"type": "deduplicate",
"name": "Remove Duplicates",
"fields": ["order_id"]
}
]
Aggregation Pipeline
[
{
"type": "filter",
"name": "Current Year",
"criteria": {
"order_date": { "$gte": "2024-01-01" }
}
},
{
"type": "aggregate",
"name": "Sum by Month and Region",
"groupBy": ["month", "region"],
"aggregations": [
{ "field": "amount", "function": "sum", "alias": "revenue" },
{ "field": "order_id", "function": "count", "alias": "orders" }
]
},
{
"type": "transform",
"name": "Average Order Value",
"field": "avg_order_value",
"expression": "revenue / orders"
},
{
"type": "sort",
"name": "Sort by Revenue",
"field": "revenue",
"order": "desc"
}
]
Top N Analysis
[
{
"type": "filter",
"name": "Completed Orders",
"criteria": { "status": "completed" }
},
{
"type": "aggregate",
"name": "Sum by Customer",
"groupBy": ["customer_id", "customer_name"],
"aggregations": [
{ "field": "amount", "function": "sum", "alias": "total_spent" }
]
},
{
"type": "sort",
"name": "Sort by Total Spent",
"field": "total_spent",
"order": "desc"
},
{
"type": "limit",
"name": "Top 50 Customers",
"count": 50
}
]
Best Practices
Flow Design
- Plan ahead - Sketch flow on paper before implementing
- Keep it simple - Fewer steps = easier maintenance
- Name steps clearly - Describe what each step does
- Test incrementally - Verify results after each step
- Document complex logic - Add comments in step names
Performance Optimization
- Filter first - Reduce data volume early
- Aggregate wisely - Only group by necessary fields
- Limit transformations - Use calculated fields instead when possible
- Sort strategically - Sort on indexed fields
- Monitor execution time - Benchmark complex flows
Data Quality
- Validate inputs - Filter out invalid data early
- Handle nulls - Use conditional expressions for null-safe transforms
- Deduplicate carefully - Understand which record to keep
- Test edge cases - Empty results, single record, large volumes
Maintainability
- Version flows - Use drafts for testing changes
- Document dependencies - Note if flow depends on specific field names
- Keep flows reusable - Use parameters instead of hardcoded values
- Review regularly - Update flows as business logic changes
Use flows for multi-record operations (filtering, aggregating, sorting). Use calculated fields for single-record computations (formulas, concatenations).