Skip to content

S3 / Delta Connector

Two closely related connectors handle path-based and Delta Lake storage.

  • s3 — any Spark-readable path (S3, HDFS, ADLS, local). Supports Parquet, CSV, JSON, ORC, Avro.
  • delta — Delta Lake tables, accessed by path or by three-part table name.

s3 — Path-based reads and writes

Read

CONFIG:
  inputs:
    raw_events:
      format: s3
      path: s3://my-bucket/raw/events/dt=2024-06-01/
      options:
        mergeSchema: "true"

Default format is Parquet. Override with options.format or by specifying a Spark data source format in options:

    csv_data:
      format: s3
      path: s3://my-bucket/exports/customers.csv
      options:
        format: csv
        header: "true"
        inferSchema: "true"

Write

CONFIG:
  outputs:
    clean_events:
      format: s3
      path: s3://my-bucket/clean/events/
      mode: overwrite
      options:
        partitionBy: event_date
        compression: snappy

delta — Delta Lake

Read by path

CONFIG:
  inputs:
    transactions:
      format: delta
      path: s3://my-bucket/delta/transactions/

Read by table name

    transactions:
      format: delta
      table: main.finance.transactions    # or db_name + tbl_name

Write

CONFIG:
  outputs:
    predictions:
      format: delta
      path: s3://my-bucket/delta/predictions/
      mode: overwrite
      options:
        mergeSchema: "true"
        overwriteSchema: "true"

MERGE (upsert)

    customer_features:
      format: delta
      path: s3://my-bucket/delta/features/
      mode: merge
      options:
        merge_keys: id,dt    # comma-separated; used by the Delta MERGE writer

Fields — s3

Field Type Required Description
format "s3" Yes Selects this connector
path string Yes S3 / HDFS / local path
mode overwrite | append No Write mode
options dict No Spark reader/writer options

Fields — delta

Field Type Required Description
format "delta" Yes Selects this connector
path string Conditional Delta table path (required unless table set)
table string Conditional Table name (alternative to path)
mode overwrite | append | merge No Write mode
options dict No Spark reader/writer options; merge_keys for MERGE mode

Spark configuration for S3

ENGINE:
  spark_conf:
    spark.hadoop.fs.s3a.access.key: "{{ env.AWS_ACCESS_KEY_ID }}"
    spark.hadoop.fs.s3a.secret.key: "{{ env.AWS_SECRET_ACCESS_KEY }}"
    spark.hadoop.fs.s3a.endpoint: "s3.amazonaws.com"
    # For Delta Lake on open-source Spark:
    spark.jars.packages: "io.delta:delta-core_2.12:2.4.0"
    spark.sql.extensions: "io.delta.sql.DeltaSparkSessionExtension"
    spark.sql.catalog.spark_catalog: "org.apache.spark.sql.delta.catalog.DeltaCatalog"

On Databricks, Delta and S3 are pre-configured — no extra Spark settings needed.


Partitioned paths with Jinja

  outputs:
    daily_events:
      format: delta
      path: "s3://my-bucket/delta/events/dt={{ dt | default('2024-01-01') }}/"
      mode: overwrite