## **Complete REST Connection Structure**
connection:
kind: Rest # REQUIRED: Must be "Rest"
# ====== CLIENT CONFIGURATION (API Identity & Connection) ======
client: # REQUIRED: API client settings
base_url: string # REQUIRED: Base API URL, e.g., "https://api.example.com/v1"
timeout: number # OPTIONAL: Request timeout in seconds, default 5, 0 = no timeout
# --- Authentication ---
auth: # CONDITIONAL: Required if API needs authentication
method: string # REQUIRED: One of: basic|digest|header|oauth2|bearer_token|api_key
values: dict # REQUIRED: Key-value pairs for chosen auth method
# Examples:
# basic: {username: "user", password: "pass"}
# header: {"X-Api-Key": "key123"}
# bearer_token: {token: "token123"}
# api_key: {"x-api-key": "key123", "x-api-secret": "secret123"}
# --- Advanced Client Settings (rarely used) ---
headers: dict # OPTIONAL: Additional HTTP headers for all requests
proxies: dict # OPTIONAL: Proxy configuration if needed
verify_ssl: boolean # OPTIONAL: Verify SSL certificates, default true
# ====== REQUEST CONFIGURATION (What to Ask For) ======
request: # REQUIRED: Request definition
method: string # REQUIRED: HTTP method: get|post|put|patch|delete
endpoint: string # REQUIRED: API endpoint path, e.g., "/customers" or "/v2/orders"
content_type: string # OPTIONAL: Request content type, default "application/json"
# --- Request Parameters ---
query: dict # OPTIONAL: URL query parameters, e.g., {status: "active", limit: 100}
headers: dict # OPTIONAL: Request-specific headers (overrides client headers)
body: string|dict # OPTIONAL: Request body for POST/PUT/PATCH methods
# --- Request Body Template Syntax ---
# Body can use template variables with special syntax:
body: >
@json[ # @json prefix indicates Jinja2-style template that renders to JSON
<% for item in API.look("previous_stage", variables) %>
{
"id": << item.id >>,
"value": << item.value >>
}<% if not loop.last %>,<% endif %>
<% endfor %>
]
# --- Error Handling ---
errorhandler: # OPTIONAL: Custom error handling
error_status_codes: list[int] # OPTIONAL: Additional HTTP codes to treat as errors
condition: string # OPTIONAL: JMESPath expression, error if evaluates true
message: string # OPTIONAL: JMESPath to extract error message from response
# --- Retry Logic ---
max_retries: integer # OPTIONAL: Max retries on failures, default 0
initial_backoff: float # OPTIONAL: Initial backoff seconds, default 0.5
# --- Throttling (Rate Limiting) ---
throttle: number # OPTIONAL: Wait seconds between requests
# ====== RESPONSE HANDLING (Processing What Comes Back) ======
response: # OPTIONAL: Response processing configuration
content_type: string # OPTIONAL: Expected response type: json|xml|csv|text, default json
# --- Data Extraction ---
locator: string # OPTIONAL: JMESPath to extract data from response
# Examples:
# locator: "data.results" # Extract nested array
# locator: "items[?status=='active']" # Filter active items
# locator: "results[*].{id: id, name: name}" # Transform structure
# --- Response Handler (for complex APIs) ---
handler: # OPTIONAL: Choose ONE handler type for special API patterns
# OPTION 1: Pagination Handler (for multi-page APIs)
kind: paginator # REQUIRED for pagination
page_param: string # REQUIRED: Parameter name for page number
page_size: integer # REQUIRED: Records per page
increment: integer # REQUIRED: Page increment step, usually 1
param_locator: string # OPTIONAL: Where param goes: query|body, default query
total_records: string # OPTIONAL: JMESPath to total records count
throttle: number # OPTIONAL: Wait seconds between page requests
# OPTION 2: State Polling Handler (for async APIs)
kind: state_poller # REQUIRED for polling
ready_status: string # REQUIRED: JMESPath expression, true when ready
# Example: "job_status == 'completed'"
# OPTION 3: URL Polling Handler (for redirect-to-results APIs)
kind: url_poller # REQUIRED for URL polling
ready_status: string # OPTIONAL: JMESPath to check if URL is ready
# OPTION 4: Basic Handler (default, no special handling)
kind: basic # Simple single-response handler
# ====== COMMON CONNECTION FIELDS (Inherited) ======
fields: list # OPTIONAL: Output schema definition (overrides inferred schema)
- name: string # REQUIRED: Column name
type: string # REQUIRED: Data type
show_schema: boolean # OPTIONAL: Auto-detect and log response schema
# ====== STAGE-LEVEL FIELDS (Outside connection block) ======
# These belong in the stage, not connection, but commonly used with REST:
name: string # REQUIRED: Stage name
kind: source|sink # REQUIRED: Stage type (REST can be source or sink)
throttle: number # OPTIONAL: Wait seconds after stage execution
on_error: string # OPTIONAL: "fail" (default) or "continue"
## Authentication Method Reference
# Method 1: Basic Auth (username/password)
auth:
method: basic
values:
username: string # REQUIRED
password: string # REQUIRED
# Method 2: Header-based Auth (API keys)
auth:
method: header
values:
X-API-Key: string # REQUIRED: Custom header name/value
X-API-Secret: string # OPTIONAL: Additional headers
# Method 3: OAuth2 (OAuth style)
auth:
method: oauth2
values:
token: string # REQUIRED: Bearer token
### Example 1: Simple GET Request with Pagination
- name: fetch_customers
kind: source
connection:
kind: Rest
client:
base_url: "https://api.crm.com/v2"
auth:
method: oauth2
values:
token: "<<secrets.get('CRM_TOKEN')>>"
request:
method: get
endpoint: "/customers"
query:
status: "active"
response:
content_type: json
locator: "data"
handler:
kind: paginator
page_param: "page"
page_size: 100
increment: 1
throttle: 1 # Wait 1s between pages
### Example 2: POST with Dynamic Body
- name: send_orders
kind: sink
connection:
kind: Rest
client:
base_url: "https://api.erp.com"
auth:
method: basic
values:
username: "<<secrets.get('ERP_USER')>>"
password: "<<secrets.get('ERP_PASS')>>"
request:
method: post
endpoint: "/orders/batch"
content_type: "application/json"
body: >
@json{
"batch_id": "<<batch_id>>",
"orders": <<orders_data>>,
"timestamp": "<<current_timestamp>>"
}
response:
content_type: json
locator: "result.ids" # Extract just the created IDs
### Example 3: Async API with Polling
- name: fetch_report
kind: source
connection:
kind: Rest
client:
base_url: "https://api.analytics.com"
auth:
method: header
values:
x-api-key: "<<secrets.get('ANALYTICS_KEY')>>"
request:
method: post
endpoint: "/reports/generate"
body:
report_type: "monthly_sales"
format: "csv"
response:
handler:
kind: StatePoller
ready_status: "report_status == 'ready'"
# After ready, automatically fetches from URL in response
This schema provides every parameter available for REST connections, marked with their requirements and conditions for use.
A REST Connection lets your data pipeline talk to web applications and APIs just like a web browser talks to websites. It's your pipeline's way of:
Fetching data from online services (Salesforce, Shopify, marketing platforms)
Sending data to web applications
Automating data exchange between business systems
Think of it as your pipeline's digital assistant that can make phone calls to other systems and have conversations with them.
Every REST connection has three essential parts that work together:
1. The Client: Who You Are
This defines your identity and basic connection rules to the API:
client:
base_url: "https://api.salesforce.com/v1" # The main address
timeout: 30 # Wait up to 30 seconds for response
# Authentication (how you prove who you are):
auth:
method: basic # Simple username/password
values:
username: "company_api_user"
password: "<<secrets.get('SECURE_PASSWORD')>>" # Securely stored
Key Decisions:
Which authentication method? (Basic, OAuth, API key)
What's the base URL? (The main service address)
How long to wait? (Timeout settings)
2. The Request: What You're Asking For
This specifies exactly what you want from the API:
request:
method: get # What action: get, post, put, delete
endpoint: "/customers" # Which specific resource
query: # Optional filters (like URL parameters)
status: "active"
created_after: "2024-01-01"
body: # Data to send (for POST/PUT requests)
customer_name: "Acme Corp"
industry: "Manufacturing"
3. The Response: What You Get Back
This tells the system how to understand what comes back:
response:
content_type: json # Data format (json, xml, csv)
locator: "results.customers[*]" # Where to find the actual data
Common Business Scenarios
Scenario 1: Daily Customer Sync from CRM
connection:
kind: Rest
client:
base_url: "https://api.crmplatform.com"
auth:
method: bearer_token
values:
token: "secrets.get('CRM_TOKEN')"
request:
method: get
endpoint: "/v2/customers"
query:
updated_since: "<<API.dt().shift(days=-1).isoformat()>>" # Only get recent changes
response:
content_type: json
locator: "data" # Get the 'data' array from response
Use Case: Keep your analytics updated with latest customer changes from CRM.
Scenario 2: Sending Marketing Leads
connection:
kind: Rest
client:
base_url: "https://api.marketingtool.com"
auth:
method: api_key
values:
x-api-key: "secrets.get('MARKETING_KEY')"
request:
method: post
endpoint: "/leads"
body: # Send new leads from our system
leads: <<variables.get('ProcessedLeads')>> # Data from previous pipeline step
response:
content_type: json
Use Case: Automatically send qualified leads from your website to marketing automation.
Feature 1: Pagination - Handling Multi-Page Results
Some APIs return data in "pages" (like search results). The system can automatically fetch all pages:
response:
handler:
kind: paginator
page_param: "page" # Which parameter controls pages
page_size: 100 # How many records per page
increment: 1 # Increase page number by 1 each time
Feature 2: Polling - Waiting for Results
For APIs that process requests asynchronously (take time to complete):
response:
handler:
kind: state_poller
ready_status: "status == 'completed'" # Keep checking until done
Feature 3: Error Handling - Graceful Failures
request:
errorhandler:
error_status_codes: [400, 401, 403, 500] # Which codes mean errors
message: "error.message" # Where to find error details
Data Transformation with JMESPath
JMESPath is a simple query language for extracting specific parts from JSON responses.
Examples:
# Response looks like: {"results": {"customers": [...]}}
locator: "results.customers" # Get just the customers array
# Response: {"data": [{"id": 1, "active": true}, ...]}
locator: "data[?active==`true`]" # Get only active records
# Response with nested data
locator: "orders[*].items[?quantity > 10]" # High-quantity items only
Using Variables in Requests
You can make requests dynamic based on pipeline data:
request:
endpoint: "/sales/<<API.dt().format('MM')>>" # Use 0-padded month number
query:
region: "<<variables.get('current_region')>>" # Use pipeline variables
Date-Based Automation
query:
start_date: "<<API.dt().shift(days=1).format('YYYY-MM-DD')>>" # Uses yesterday's date
end_date: "<<API.dt().format('YYYY-MM-DD')>>" # No date shift
Common REST API Patterns
Pattern : Incremental Updates
# Only fetch data changed since last run
query:
updated_after: "<<variables.get('LastSync')>" # Timestamp from previous run
Pattern: Retry Logic
# Automatically retry failed requests
request:
max_retries: 3 # Try up to 3 times if it fails
initial_backoff: 1 # Wait 1 second, then 2, then 4...
Best Practices for Business Users
1. Start Simple, Then Enhance
Begin with a basic GET request, then add filters, then pagination, then error handling.
2. Use API Documentation
Most APIs provide "Postman collections" or OpenAPI specs - these are your blueprint for configuring the connection.
3. Test with Small Data First
# Add limit parameter during testing
query:
limit: 10 # Only fetch 10 records while testing
4. Handle Rate Limits
# Be a good API citizen
params:
requests_per_minute: 60 # Don't exceed API limits
5. Secure Your Credentials
# Never hardcode passwords
auth:
values:
password: "<<secrets.get('PASSWORD')>>" # Reference secure storage
connection:
kind: Rest
client:
base_url: "https://api.example.com" # Your API's address
auth: # From API documentation
method: ...
values:
...
request:
method: get # Usually 'get' for reading data
endpoint: "/your_endpoint_here" # From API documentation
query: # Optional filters
param1: "value1"
response:
content_type: json # Most common format
locator: "data" # Try "data", "results", or "items" based on API
Information You'll Need:
Base URL of the API
Authentication method and credentials
API documentation or endpoint examples
Any rate limits or usage guidelines
By understanding REST connections, you can bridge your data pipeline with virtually any modern business application, enabling automated data flows between all your company's systems.
Common Issues & Solutions:
╔════════════════════╦═══════════════════╦══════════════════════════════╗
║ Issue ║ Likely Cause ║ Quick Fix ║
╠════════════════════╬═══════════════════╬══════════════════════════════╣
║ "401 Unauthorized" ║ Wrong credentials ║ Check auth method and tokens ║
╠════════════════════╬═══════════════════╬══════════════════════════════╣
║ "404 Not Found" ║ Wrong endpoint ║ Verify endpoint path ║
╠════════════════════╬═══════════════════╬══════════════════════════════╣
║ Slow responses ║ Large data volume ║ Add pagination or limits ║
╠════════════════════╬═══════════════════╬══════════════════════════════╣
║ Missing data ║ Wrong locator ║ Check JMESPath expression ║
╚════════════════════╩═══════════════════╩══════════════════════════════╝
Quick Debug Commands:
See raw data: show: 10 (first 10 rows) or show: -1 (all rows)
Alongside with show option use connection.requests.response.locator, start with "@" or "" to see entire raw response.
Auto-discover schema: show_schema: true → logs field names and types
See request details: log_level: DEBUG → shows requests/responses (credentials masked)
Critical Tip: Once testing is complete, add explicit fields to lock your schema and prevent drift: fields: [{name: "id", type: "integer"}, {name: "name", type: "string"}]
Example to TAP stage with debugging options:
- name: your_api_connection
kind: source
log_level: DEBUG # enable extended logging of raw request
show: 10 # See sample data
show_schema: true # Auto-discover structure
connection:
... # From the above example
fields: # turns off auto-discovery
- name: customer_id
type: BIGINT
After testing:
Replace show_schema: true with explicit fields: [list]
Rate limits: Add throttle: 1 between requests if needed
Error handling: Set on_error: "continue" for non-critical stages
Performance: Add pagination handler for datasets > 1000 records (if API supports it - most do, and require - check specific API documentation for details)
Remember: The golden rule is test with show_schema: true and show: N, then lock with explicit fields to prevent unexpected schema changes from breaking your pipeline.