Pattern: REST API Ingestion¶
Pull data from third-party APIs, normalise it, and land it in your data lake. This pattern covers single-request, paginated, and incremental ingestion strategies.
Single-request ingestion¶
For small payloads that fit in one response:
MODEL: etl
VERSION: "1.0.0"
CONFIG:
inputs:
exchange_rates:
format: rest_api
url: "https://api.exchangerate.host/latest"
params:
base: USD
symbols: EUR,GBP,ZAR,JPY
response:
root_key: rates
transform:
type: noop
outputs:
rates:
format: delta
path: s3://datalake/forex/rates/
mode: append
Paginated ingestion (offset)¶
For APIs that return large datasets across multiple pages:
CONFIG:
inputs:
orders:
format: rest_api
url: "https://api.shopify.example.com/v1/orders"
auth:
type: api_key
header: X-Shopify-Access-Token
key: "{{ env.SHOPIFY_TOKEN }}"
pagination:
type: offset
page_size: 250
max_pages: 200
response:
root_key: orders
schema:
- name: id
type: long
- name: created_at
type: timestamp
- name: total_price
type: double
- name: currency
type: string
- name: status
type: string
Incremental ingestion (cursor / date filter)¶
Pass the last-seen cursor or date as a Jinja variable:
ubunye run -d pipelines -u crm -p ingest -t contacts \
--var since=2024-06-01 \
--profile prod \
--lineage
CONFIG:
inputs:
contacts:
format: rest_api
url: "https://crm.example.com/api/contacts"
params:
updated_after: "{{ since | default('2020-01-01') }}"
auth:
type: bearer
token: "{{ env.CRM_TOKEN }}"
pagination:
type: next_link
link_field: _links.next.href
response:
root_key: contacts
Cursor-based pagination¶
For APIs that return a cursor/token to request the next page:
CONFIG:
inputs:
events:
format: rest_api
url: "https://api.mixpanel.com/export/events"
auth:
type: basic
username: "{{ env.MP_USER }}"
password: "{{ env.MP_SECRET }}"
pagination:
type: cursor
cursor_field: next_page_token
page_size: 1000
Rate-limited API¶
For APIs with strict rate limits:
CONFIG:
inputs:
github_repos:
format: rest_api
url: "https://api.github.com/orgs/my-org/repos"
headers:
Authorization: "Bearer {{ env.GITHUB_TOKEN }}"
X-GitHub-Api-Version: "2022-11-28"
pagination:
type: next_link
link_field: next # parsed from Link header (or response body)
rate_limit:
requests_per_second: 1 # GitHub: 5000 req/hr authenticated
retry_on: [429, 503]
max_retries: 5
POST with body¶
For APIs that require POST requests:
CONFIG:
inputs:
search_results:
format: rest_api
url: "https://api.elastic.example.com/search"
method: POST
headers:
Content-Type: application/json
Authorization: "Bearer {{ env.ES_TOKEN }}"
params:
query: "fraud risk"
size: 100
Writing back to an API¶
After enrichment, send results back to an external system:
CONFIG:
outputs:
scored_customers:
format: rest_api
url: "https://crm.example.com/api/bulk/scores"
method: POST
auth:
type: bearer
token: "{{ env.CRM_TOKEN }}"
batch_size: 200
rate_limit:
requests_per_second: 10
retry_on: [429, 503]
max_retries: 3
Handling nested JSON¶
The response.root_key only extracts one level. For deeply nested responses,
use a transform to flatten:
from pyspark.sql import functions as F
from ubunye.core.interfaces import Task
class FlattenTask(Task):
def transform(self, sources: dict) -> dict:
df = sources["api_data"]
flat = df.select(
"id",
F.col("address.street").alias("street"),
F.col("address.city").alias("city"),
F.col("metadata.tags").alias("tags"),
F.explode("line_items").alias("item"),
).select("id", "street", "city", "tags", "item.*")
return {"api_data": flat}
Scheduling¶
Run nightly via Airflow: