A pipeline is a sequence of steps that moves and transforms data from sources to destinations. Think of it as an assembly line for your data: raw data goes in, gets cleaned, enriched, and analyzed, then insights come out.
Every pipeline follows the same basic pattern:
# SOURCE → TRANSFORM → SINK (optional)
- name: EcommerceData
kind: source # This step LOADS data
connection:
kind: Parquet # Data format
locator: ecommerce.parquet # Location
show: -1 # Preview data
Key attributes:
name: Reference this data in later steps
kind: source: Identifies as a data loader
connection: How to access the data
show: Controls preview display (-1 = show all rows)
Other source types you can use:
# CSV file
connection:
kind: CSV
locator: data.csv
# Excel file
connection:
kind: File
locator: data.xlsx
params:
kind: xlsx
sheet_name: Sheet1
# HTTP API
connection:
kind: Rest
client:
base_url: https://api.example.com
auth:
kind: header
values:
x-Key: <<secrets.get('API_KEY')>>
request:
method: POST
endpoint: /data
body: <<variables.get('MyData')>>
# Database
connection:
kind: Clickhouse
locator: sales
query: SELECT * FROM Clickhouse
- name: CleanAndAnalyze
kind: transform # This step MODIFIES data
query: |
SELECT
InvoiceNo,
CustomerID,
Country,
-- Calculate new fields
Quantity * UnitPrice as LineTotal,
-- Add business logic
CASE
WHEN Description LIKE '%POSTAGE%' THEN 'Shipping'
ELSE 'Product'
END as LineType
FROM EcommerceData # Reference the source by name
WHERE CustomerID IS NOT NULL # Filter out bad data
AND Quantity > 0
Key attributes:
name: Give transformations meaningful names
kind: transform: Identifies as a data processor
query: SQL that defines the transformation
Use FROM StepName to reference previous steps
Common transformation patterns:
-- 1. Clean data
SELECT
TRIM(Name) as CleanName,
COALESCE(Email, 'unknown@example.com') as Email,
CAST(Price AS DECIMAL(10,2)) as Price
FROM RawData
-- 2. Aggregate data
SELECT
Country,
COUNT(*) as OrderCount,
SUM(Revenue) as TotalRevenue,
AVG(Revenue) as AvgOrderValue
FROM Orders
GROUP BY Country
-- 3. Join data
SELECT
o.OrderID,
c.CustomerName,
p.ProductName,
o.Quantity * p.Price as LineTotal
FROM Orders o
JOIN Customers c ON o.CustomerID = c.ID
JOIN Products p ON o.ProductID = p.ID
-- 4. Window functions (advanced)
SELECT
Date,
Revenue,
SUM(Revenue) OVER (ORDER BY Date) as RunningTotal,
AVG(Revenue) OVER (ORDER BY Date ROWS 7 PRECEDING) as WeeklyAvg
FROM DailySales
- name: SaveResults
kind: sink # This step OUTPUTS data
connection:
kind: CSV
locator: results.csv # Output location
- name: SendToDashboard
kind: sink
connection:
kind: HTTP
url: https://dashboard.company.com/api/data
method: POST
headers:
Authorization: Bearer ${API_TOKEN}
SOURCE TRANSFORM SINK
• Load data ───▶ • Clean ───▶ • Save
• Validate • Calculate • Export
• Cache • Filter • Notify
↓ ↓ ↓
Raw data Clean data Business insights
1. Variable Substitution
- name: SalesData
kind: source
connection:
kind: Parquet
locator: <<context.get('YEAR')>>_sales.parquet # Will be replaced at runtime
2. Error Handling
- name: LoadData
kind: source
connection:
kind: Parquet
locator: data.parquet
on_error: skip # Options: skip, fail (default)
3. Conditional Execution
- name: CheckDataQuality
kind: transform
query: |
SELECT COUNT(*) as RowCount FROM SalesData
skip_if: "<<True if RowCount <= 0 else False>>" # Only run if data exists
4. Looping
- name: MakeLoop
kind: transform
query: SELECT 'US' AS value UNION ALL SELECT 'MX' AS value
- name: Process
kind: pipeline
stages:
- name: Data
kind: transform
query: SELECT * FROM SalesData WHERE Country = '<<API.look('loop_control.value', variables)>>'
1. Naming Conventions
# Good
- name: Load_Customer_Orders
- name: Clean_Order_Data
- name: Calculate_Monthly_Revenue
# Avoid
- name: step1
- name: data
- name: transform
2. Step Documentation
- name: EnrichWithCustomerData
kind: transform
description: "Join orders with customer profiles"
tags: [enrichment, customer, join]
query: >
-- Purpose: Add customer demographics to orders
-- Author: Data Team
-- Last Updated: 2024-01-15
SELECT
o.*,
c.CustomerTier,
c.JoinDate,
c.Region
FROM Orders o
LEFT JOIN Customers c ON o.CustomerID = c.ID
3. Modular Design
Instead of one massive pipeline consider splitting the logic into multiple pipelines and add them to a job. This approach enables both better
Try modifying the demo pipeline:
Add a filter:
-- Change the WHERE clause to:
WHERE Country = 'United Kingdom'
AND CustomerID IS NOT NULL
Add aggregation:
SELECT
Country,
COUNT(DISTINCT CustomerID) as CustomerCount,
SUM(Quantity * UnitPrice) as TotalRevenue,
AVG(Quantity * UnitPrice) as AvgOrderValue
FROM EcommerceData
GROUP BY Country
Now that you understand pipeline basics:
Connections: Connect to databases, APIs, cloud storage
Advanced Transformations: Window functions, pivots, ML
Pipeline Templating: Reusable pipeline patterns
Error Handling: Make pipelines resilient
Remember: Every pipeline follows the same pattern: Source → Transform → Sink. Start simple, then add complexity as needed!
Q: Can I have multiple sources?
- name: SalesData
kind: source
# ... load sales
- name: CustomerData
kind: source
# ... load customers
- name: JoinedData
kind: transform
query: SELECT * FROM SalesData JOIN CustomerData ...
Q: Can transforms reference multiple previous steps?
Yes! SQL can JOIN, UNION, or reference any previous step by name.
Q: Do I always need a sink?
No! Pipelines can end with a transform if you just want to preview results. Sinks are for saving/outputting data. Common pattern is to build the pipeline of transforms and add sinks as you go towards the end of pipeline.
Q: Can I generate data without a source?
Yes! Use SQL Macros (pre-built functions) or inline data:
# Generate a date calendar (4 years back, 1 year forward)
- name: DateCalendar
kind: transform
query: >
SELECT * FROM Fn.calendar() -- No parameters needed!
-- Returns: date, year, month, month_no, day, weekday, etc.
# Create a number sequence
- name: Sequence
kind: transform
query: >
SELECT
Fn.add(1, 2) as Three, -- 1 + 2 = 3
Fn.minus(10, 3) as Seven, -- 10 - 3 = 7
Fn.mult(4, 5) as Twenty, -- 4 * 5 = 20
Fn.div(10, 2) as Five -- 10 / 2 = 5
# Type conversion macros
- name: CleanTypes
kind: transform
query: >
SELECT
Fn.str(mixed_column) as CleanString,
Fn.int(string_number) as CleanInteger,
Fn.float(decimal_text) as CleanDecimal,
Fn.dt(date_string) as CleanTimestamp
FROM RawData
Q: What are these Fn.* macros?
They're convenience functions built into the platform:
Fn.calendar() - Date series with attributes: SELECT * FROM Fn.calendar()
Fn.add(a, b) - Safe addition: Fn.add(revenue, tax)
Fn.minus(a, b) - Safe subtraction: Fn.minus(total, discount)
Fn.mult(a, b) - Safe multiplication: Fn.mult(quantity, price)
Fn.div(a, b) - Safe division (handles zero): Fn.div(revenue, customers)