Developer Guide¶
Architecture deep-dive and extension patterns for contributors and advanced users.
Architecture overview¶
Entry points:
CLI (ubunye run) ──┐
Python API ──┤
↓
ConfigLoader ← YAML + Jinja2 → Pydantic v2
↓
Engine.run(cfg)
├── Backend.start() ← SparkBackend or DatabricksBackend
├── Registry.get_reader(format) → Reader.read() [per input]
├── Registry.get_transform(type) → Transform.apply()
├── Registry.get_writer(format) → Writer.write() [per output]
└── Backend.stop()
↓
LineageRecorder (if --lineage)
Config loading pipeline¶
- Read raw YAML —
ConfigLoader.load(path, variables). - Jinja2 render —
ConfigResolver.resolve(raw_yaml, variables)renders all string values. Variables come from:--varflags,os.environ(asenv.*), and any extra context. - Pydantic validation — rendered YAML is parsed into
UbunyeConfig. Strict validation; unknown top-level keys raise an error. - Profile merge —
UbunyeConfig.merged_spark_conf(profile)merges base + profile Spark conf.
Key files:
ubunye/config/loader.py—ConfigLoaderubunye/config/resolver.py— Jinja resolverubunye/config/schema.py— all Pydantic models
Plugin registry¶
ubunye.core.registry.PluginRegistry discovers plugins at startup via importlib.metadata.entry_points.
# Entry point groups
ubunye.readers → Reader subclasses
ubunye.writers → Writer subclasses
ubunye.transforms → Transform subclasses
ubunye.monitors → Monitor implementations
registry.get_reader("hive") looks up the hive key and returns the class.
The class is instantiated fresh for each task run.
Engine runtime¶
Engine.run(cfg) in ubunye/core/runtime.py:
- Merge Spark conf (base + profile).
- Start
SparkBackend. - For each input in
cfg.CONFIG.inputs: callReader.read(io_cfg_dict, backend). - Call
Transform.apply(inputs_dict, transform_cfg_params, backend). - For each output in
cfg.CONFIG.outputs: callWriter.write(df, io_cfg_dict, backend). - Stop
SparkBackend. - If
LineageRecorderis attached (via monitors): write run record.
The task_dir is added to sys.path before step 4, so transformations.py and
model.py can be imported without explicit path configuration.
Backends¶
SparkBackend¶
ubunye/backends/spark_backend.py:
- Lazily imports
pyspark— no import error if PySpark is not installed and Spark isn't used. - Implements context manager (
with SparkBackend(...) as be:). be.spark— theSparkSession.be.conf_effective— dict of active configuration.- Safe for multiple
start()calls (idempotent). stop()terminates the session.
DatabricksBackend¶
ubunye/backends/databricks_backend.py:
- Wraps an existing SparkSession instead of creating a new one.
- If no session is passed explicitly, retrieves the active session via
SparkSession.getActiveSession(). start()attaches to the active session (or no-ops if already attached).stop()is a no-op — we don't own the session, so we never stop it.- Use this on Databricks where a SparkSession is always available.
Auto-detection in the Python API¶
ubunye.run_task() and ubunye.run_pipeline() auto-detect the backend:
- If
spark=is passed explicitly →DatabricksBackend(spark=session) - If an active SparkSession exists →
DatabricksBackend - Otherwise →
SparkBackendwith config-driven Spark conf
Python API¶
ubunye/api.py exposes run_task() and run_pipeline() — the same execution
pipeline as the CLI but callable from Python code (notebooks, scripts, tests).
Key differences from the CLI path:
- Backend auto-detection: picks
DatabricksBackendif an active session exists, otherwiseSparkBackend. - No subprocess: runs in-process, so the caller can inspect returned DataFrames directly.
- Shared session lifecycle: when using
DatabricksBackend, the session stays alive after the run.
Both functions are re-exported from ubunye.__init__:
Lineage system¶
ubunye/lineage/:
| Module | Responsibility |
|---|---|
context.py |
RunContext frozen dataclass — run ID, task metadata, timestamps |
recorder.py |
LineageRecorder — implements Monitor protocol; writes step records |
storage.py |
FileSystemLineageStore — reads/writes JSON under .ubunye/lineage/ |
hasher.py |
hash_dataframe() — SHA-256 of sampled rows + schema (Spark-optional) |
The LineageRecorder is attached as a monitor and called at task start and end.
On task end it writes a RunRecord JSON file keyed by run_id.
Telemetry¶
ubunye/telemetry/ — all telemetry is opt-in via UBUNYE_TELEMETRY=1:
| Module | Backend |
|---|---|
events.py |
JSON Lines log file |
prometheus.py |
Prometheus counters and histograms (port UBUNYE_PROM_PORT) |
otel.py |
OpenTelemetry spans (OTLP or console exporter) |
mlflow.py |
MLflow run logging (experiment, metrics, params) |
All monitors implement the Monitor protocol:
class Monitor(Protocol):
def on_task_start(self, ctx: RunContext) -> None: ...
def on_task_end(self, ctx: RunContext, success: bool) -> None: ...
Monitors are wrapped with safe_call() — a failing monitor never crashes the task.
ML architecture¶
Two separate ML systems coexist:
| System | Location | Purpose |
|---|---|---|
| Internal wrappers | ubunye/plugins/ml/ |
SklearnModel, SparkMLModel — engine-owned adapters |
| User contract | ubunye/models/ |
UbunyeModel ABC — user-implemented; engine never imports ML libs |
UbunyeModel (ubunye/models/base.py) is the only interface the engine calls.
ModelTransform (ubunye/plugins/transforms/model_transform.py) loads the user's class
dynamically via load_model_class() and calls train() or predict().
The ModelRegistry (ubunye/models/registry.py) stores artifacts and metadata on the
filesystem. JSON serialization uses dataclasses.asdict().
Writing tests¶
Unit tests (Spark-free)¶
Use MockDF for any test that would otherwise require PySpark:
class MockDF:
def __init__(self, rows=None):
self._rows = rows or [{"id": 1, "val": 2.0}]
def count(self): return len(self._rows)
def toPandas(self):
import pandas as pd; return pd.DataFrame(self._rows)
All tests in tests/unit/ must pass without pyspark installed.
Integration tests¶
Mark with @pytest.mark.integration and use SparkBackend:
import pytest
from ubunye.backends.spark_backend import SparkBackend
@pytest.fixture(scope="session")
def spark():
with SparkBackend(app_name="test", spark_conf={"spark.sql.shuffle.partitions": "1"}) as be:
yield be.spark
@pytest.mark.integration
def test_hive_reader(spark):
...
Run:
Orchestration exporters¶
ubunye/orchestration/:
base.py—OrchestrationExporterABC withexport(cfg, output_path).airflow.py— generates an Airflow DAG Python file.databricks.py— generates a Databricks Jobs API JSON.
Exporters read from cfg.ORCHESTRATION and cfg.ENGINE (for profile-specific cluster settings).
They do not interact with the running cluster — they only produce configuration artifacts.
Adding a new orchestration target¶
- Subclass
OrchestrationExporter. - Implement
export(cfg: UbunyeConfig, output_path: str, profile: str). - Add a new value to
OrchestrationTypeenum inschema.py. - Register in the CLI
exportcommand.