Skip to content

API Reference

Auto-generated from docstrings via mkdocstrings.


Python API

The public Python API for running Ubunye tasks without the CLI. Primary use case: Databricks notebooks and jobs where a SparkSession already exists.

import ubunye

# Run a single task
outputs = ubunye.run_task(
    task_dir="pipelines/fraud_detection/ingestion/claim_etl",
    mode="nonprod",
    dt="202510",
)

# Run multiple tasks sequentially
results = ubunye.run_pipeline(
    usecase_dir="pipelines",
    usecase="fraud_detection",
    package="ingestion",
    tasks=["claim_etl", "feature_engineering"],
    mode="nonprod",
    dt="202510",
)

ubunye.api.run_task

run_task(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None)

Run a single Ubunye task and return the outputs map.

Parameters

task_dir : str Path to the task directory containing config.yaml and transformations.py. mode : str Run mode, used for Spark profile merging. Default "DEV". dt : str, optional Data timestamp, injected as {{ dt }} in Jinja templates. dtf : str, optional Data timestamp format, injected as {{ dtf }}. spark : SparkSession, optional Explicit SparkSession to reuse. If None, auto-detects an active session (Databricks) or creates a new one. lineage : bool Record lineage for this run. lineage_dir : str Root directory for lineage records. profile : str, optional Config profile for validation (passed to load_config).

Returns

Dict[str, Any] Mapping of output name → DataFrame.

ubunye.api.run_pipeline

run_pipeline(usecase_dir, usecase, package, tasks, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None)

Run multiple tasks sequentially and return all outputs.

Parameters

usecase_dir : str Root directory for use cases (e.g. "./pipelines"). usecase : str Use case name. package : str Package/pipeline name. tasks : List[str] Task names to run in order. mode, dt, dtf, spark, lineage, lineage_dir, profile Same as :func:run_task.

Returns

Dict[str, Dict[str, Any]] Mapping of task name → outputs map.


Core Engine

ubunye.core.runtime.Engine

Executes a task by reading inputs, applying one or more transforms, and writing outputs.

Minimal required config structure

cfg[CONFIG][inputs] : mapping input_name -> reader cfg (must include 'format') cfg[CONFIG][outputs] : mapping output_name -> writer cfg (must include 'format') cfg[CONFIG][transform]: EITHER a single transform dict with 'type', OR a list of transform dicts to form a pipeline.

Notes
  • This class is intentionally small; orchestration, telemetry, and retries can be layered around it without changing plugin contracts.

run

run(cfg, *, dry_run=False)

Run a task using the provided config mapping.

Parameters

cfg : dict Parsed task configuration (already rendered/validated). dry_run : bool, default False If True, validates and returns without executing readers/writers.

Returns

Optional[Dict[str, Any]] Optionally returns the outputs map (None if dry_run).


Interfaces

ubunye.core.interfaces

Interfaces for Ubunye Engine components.

These abstract base classes define the contracts for backends, readers, writers, transforms, and user-defined tasks.

Backend

Bases: ABC

Abstract execution backend (e.g., Spark or Pandas).

is_spark abstractmethod property
is_spark

Whether this backend is Spark-based.

start abstractmethod
start()

Start a backend session (e.g., create SparkSession).

stop abstractmethod
stop()

Stop the backend session and release resources.

Reader

Bases: ABC

Base interface for input readers.

read abstractmethod
read(cfg, backend)

Read input into a DataFrame-like object.

Task

Bases: ABC

User-defined task contract (lives in transformations.py).

setup
setup()

Optional hook executed before transform.

transform abstractmethod
transform(sources)

Transform input sources into outputs mapping.

Transform

Bases: ABC

Base interface for transforms.

apply abstractmethod
apply(inputs, cfg, backend)

Transform inputs and return new outputs mapping.

Writer

Bases: ABC

Base interface for output writers.

write abstractmethod
write(df, cfg, backend)

Write a DataFrame-like object to a destination.


Config

ubunye.config.schema.UbunyeConfig

Bases: BaseModel

Top-level Ubunye task config (the full contents of a config.yaml).

merged_spark_conf

merged_spark_conf(profile=None)

Return base spark_conf merged with the named profile's overrides.

resolved_catalog

resolved_catalog(profile=None)

Return the catalog name, with profile override if available.

resolved_schema

resolved_schema(profile=None)

Return the schema name, with profile override if available.

ubunye.config.schema.TaskConfig

Bases: BaseModel

The CONFIG section of a task: inputs, transform, outputs.

ubunye.config.schema.IOConfig

Bases: BaseModel

Input or output connector configuration.

Extra fields are allowed so plugin-specific keys (e.g. rest_api's auth, pagination, headers) pass through to the plugin unchanged via model_dump().

ubunye.config.schema.EngineConfig

Bases: BaseModel

Spark/compute settings with optional per-profile overrides.

ubunye.config.schema.RegistryConfig

Bases: BaseModel

Configuration for model registry integration within a transform.

ubunye.config.schema.ModelTransformParams

Bases: BaseModel

Typed params for transform.type: model — for documentation and validation.


Backends

SparkBackend

Creates and manages a new SparkSession. Use for local development, CI, and non-Databricks environments.

ubunye.backends.spark_backend.SparkBackend

Bases: Backend

Creates and manages a SparkSession for a Ubunye run.

Parameters

app_name : str Spark application name (appears in Spark UI/history server). conf : Optional[Dict[str, str]] Spark configuration key-values (e.g., {"spark.master": "yarn"}).

Notes
  • pyspark is imported lazily inside start() to keep installation lightweight.
  • spark property is only valid after start() (or inside the context manager).
app_name property
app_name

Configured Spark app name.

conf_effective property
conf_effective

The effective Spark configuration currently applied to the session.

Returns an empty dict if the session hasn't been started yet.

conf_input property
conf_input

The configuration dict passed into this backend at construction time.

is_spark property
is_spark

Whether this backend is Spark-based (always True here).

spark property
spark

Return the active SparkSession (after start()).

Raises

RuntimeError If start() has not been called.

start
start()

Create (or reuse) a SparkSession.

Safe to call multiple times; a second call is a no-op if a session already exists.

stop
stop()

Stop the SparkSession if running.

DatabricksBackend

Reuses an active SparkSession instead of creating one. Use on Databricks where a session already exists.

ubunye.backends.databricks_backend.DatabricksBackend

Bases: Backend

Backend that reuses the existing Databricks SparkSession.

Parameters

spark : Optional[SparkSession] An explicit session to wrap. If None (the default), the active session is retrieved via SparkSession.getActiveSession().

start
start()

Attach to the active SparkSession if one wasn't provided.

stop
stop()

No-op — we don't own the session, so we never stop it.


Models (UbunyeModel contract)

ubunye.models.base.UbunyeModel

Bases: ABC

Abstract contract every user model must implement.

The engine calls these methods during the pipeline lifecycle. Users implement them with whatever ML library they choose — the engine never imports or knows about the underlying library.

Minimal example::

from ubunye.models.base import UbunyeModel
import joblib

class FraudRiskModel(UbunyeModel):
    def __init__(self):
        self._clf = None

    def train(self, df):
        import pandas as pd
        from sklearn.ensemble import GradientBoostingClassifier
        pdf = df.toPandas()
        X, y = pdf.drop("label", axis=1), pdf["label"]
        self._clf = GradientBoostingClassifier().fit(X, y)
        return {"accuracy": self._clf.score(X, y)}

    def predict(self, df):
        pdf = df.toPandas()
        pdf["score"] = self._clf.predict_proba(pdf)[:, 1]
        return spark.createDataFrame(pdf)

    def save(self, path):
        import pathlib, joblib
        pathlib.Path(path).mkdir(parents=True, exist_ok=True)
        joblib.dump(self._clf, f"{path}/clf.joblib")

    @classmethod
    def load(cls, path):
        import joblib
        m = cls()
        m._clf = joblib.load(f"{path}/clf.joblib")
        return m

    def metadata(self):
        import sklearn
        return {
            "library": "sklearn",
            "library_version": sklearn.__version__,
            "features": list(self._clf.feature_names_in_),
            "params": self._clf.get_params(),
        }

load abstractmethod classmethod

load(path)

Deserialize a model from the given directory path.

Parameters:

Name Type Description Default
path str

Directory path where model files were saved.

required

Returns:

Type Description
'UbunyeModel'

An instance of the model ready for prediction.

metadata abstractmethod

metadata()

Return model metadata for the registry.

Should include at minimum: - library: str — e.g. "xgboost", "sklearn", "pytorch" - library_version: str — e.g. "2.0.3" - features: List[str] — feature names used during training - params: dict — model hyperparameters

Additional keys are allowed and will be stored in the registry.

predict abstractmethod

predict(df)

Generate predictions on the provided DataFrame.

Parameters:

Name Type Description Default
df Any

Spark DataFrame with feature columns.

required

Returns:

Type Description
Any

Spark DataFrame (or DataFrame-like) with prediction columns added.

save abstractmethod

save(path)

Serialize the model to the given directory path.

The user chooses the serialization format (pickle, joblib, ONNX, etc.). The engine only provides the path — it never inspects the saved files.

Parameters:

Name Type Description Default
path str

Directory path where model files should be written.

required

train abstractmethod

train(df)

Train the model on the provided DataFrame.

Parameters:

Name Type Description Default
df Any

Spark DataFrame (or any DataFrame-like) with training data.

required

Returns:

Type Description
Dict[str, Any]

Dict of metrics e.g. {"auc": 0.94, "f1": 0.87, "accuracy": 0.91}.

Dict[str, Any]

These metrics are stored in the registry alongside the model version.

validate

validate(df)

Optional: validate model on holdout data.

Returns:

Type Description
Dict[str, Any]

Metrics dict in the same format as :meth:train.

Raises:

Type Description
NotImplementedError

if the subclass has not overridden this method.

ubunye.models.loader.load_model_class

load_model_class(task_dir, class_name)

Dynamically load a UbunyeModel subclass.

The class_name is a dotted path like "model.FraudRiskModel" where the first segment is the Python module filename (without .py) and the rest is the class name.

Resolution order: 1. If task_dir is provided → load <module_segment>.py directly from that directory using importlib.util.spec_from_file_location. 2. If task_dir is None → attempt a standard importlib.import_module (the module must already be on sys.path; _run_single_task adds the task directory to sys.path before transforms are invoked, so this works automatically when called from the engine).

Parameters:

Name Type Description Default
task_dir Optional[str]

Absolute or relative path to the directory containing the model file (e.g. the pipeline task directory). Pass None to rely on sys.path.

required
class_name str

Dotted path to the class, e.g. "model.FraudRiskModel" or "models.risk.FraudRiskModel".

required

Returns:

Type Description
Type[UbunyeModel]

The model class (not an instance). Callers should do cls() or

Type[UbunyeModel]

cls.load(path) as needed.

Raises:

Type Description
FileNotFoundError

The module file does not exist in task_dir.

ImportError

The class name was not found in the module.

TypeError

The class does not subclass :class:ubunye.models.base.UbunyeModel.


Model Registry

ubunye.models.registry.ModelRegistry

Manages model versions and lifecycle transitions.

Parameters:

Name Type Description Default
store_path str

Root directory for all model artifacts and registry JSON files.

required

archive

archive(use_case, model_name, version)

Archive a model version.

Returns:

Name Type Description
Updated ModelVersion

class:ModelVersion.

compare_versions

compare_versions(use_case, model_name, version_a, version_b)

Compare metrics between two versions.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dict mapping metric name → {"a": val_a, "b": val_b, "delta": diff}.

Dict[str, Dict[str, Any]]

Only metrics present in at least one version are included.

demote

demote(use_case, model_name, version, to_stage)

Demote a model version to a lower stage.

Parameters:

Name Type Description Default
to_stage ModelStage

Target stage (typically development or staging).

required

Returns:

Name Type Description
Updated ModelVersion

class:ModelVersion.

get_model

get_model(use_case, model_name, task_dir=None, version=None, stage=None)

Return the artifact path and version metadata for a model.

Specify either version (exact) or stage (active version in that stage).

Parameters:

Name Type Description Default
use_case str

Use-case grouping.

required
model_name str

Model identifier.

required
task_dir Optional[str]

Task directory containing model.py (passed to caller for loading).

None
version Optional[str]

Exact version string.

None
stage Optional[ModelStage]

Stage to look up active version for.

None

Returns:

Type Description
str

(artifact_path, model_version) — caller calls

ModelVersion

ModelClass.load(artifact_path) to obtain the model instance.

Raises:

Type Description
ValueError

No matching version found.

list_versions

list_versions(use_case, model_name)

List all registered versions for a model (newest first by registered_at).

Raises:

Type Description
FileNotFoundError

Model not found in registry.

promote

promote(use_case, model_name, version, to_stage, promoted_by=None, gates=None)

Promote a model version to a higher stage.

If gates are provided, all gate checks must pass before promotion. If promoting to production, the current production version is automatically archived.

Parameters:

Name Type Description Default
use_case str

Use-case grouping.

required
model_name str

Model identifier.

required
version str

Version string to promote.

required
to_stage ModelStage

Target :class:ModelStage.

required
promoted_by Optional[str]

Optional username.

None
gates Optional[Dict[str, Any]]

Optional dict of promotion gate rules (see :class:PromotionGate).

None

Raises:

Type Description
ValueError

Version not found, or one or more promotion gates failed.

Returns:

Name Type Description
Updated ModelVersion

class:ModelVersion.

register

register(use_case, model_name, version, model, metrics, lineage_run_id=None, registered_by=None)

Register a trained model as a new version (stage=development).

  1. Saves model artifacts to versions/<version>/model/.
  2. Writes metadata.json and metrics.json.
  3. Adds a new :class:ModelVersion entry to registry.json.

Parameters:

Name Type Description Default
use_case str

Logical use-case grouping (e.g. "fraud_detection").

required
model_name str

Model identifier (e.g. "FraudRiskModel").

required
version Optional[str]

Semver string. Pass None to auto-generate.

required
model UbunyeModel

A trained :class:UbunyeModel instance.

required
metrics Dict[str, Any]

Metrics dict returned by :meth:UbunyeModel.train.

required
lineage_run_id Optional[str]

Optional lineage run_id from the training run.

None
registered_by Optional[str]

Optional username for the audit trail.

None

Returns:

Type Description
ModelVersion

The newly created :class:ModelVersion.

rollback

rollback(use_case, model_name, to_version)

Roll back production to a specific previous version.

  1. Archives the current production version.
  2. Promotes to_version to production.

Parameters:

Name Type Description Default
to_version str

The version string to restore to production.

required

Returns:

Type Description
ModelVersion

The restored :class:ModelVersion (now in production stage).

ubunye.models.registry.ModelVersion dataclass

Metadata for a single registered model version.

ubunye.models.registry.ModelStage

Bases: str, Enum


Promotion Gates

ubunye.models.gates.PromotionGate

Evaluates a set of metric thresholds before allowing model promotion.

Parameters:

Name Type Description Default
gate_config Dict[str, Any]

Dict of gate rules. Supported keys: - min_<metric>: metric must be >= value - max_<metric>: metric must be <= value - require_drift_check: boolean; checks metadata["drift_check_passed"]

required

all_passed

all_passed(metrics, metadata=None)

Return True only if every configured gate passes.

Parameters:

Name Type Description Default
metrics Dict[str, Any]

Model metrics from :meth:UbunyeModel.train.

required
metadata Optional[Dict[str, Any]]

Optional model metadata from :meth:UbunyeModel.metadata.

None

evaluate

evaluate(metrics, metadata=None)

Evaluate all configured gates against the provided metrics.

Parameters:

Name Type Description Default
metrics Dict[str, Any]

Model metrics dict from :meth:UbunyeModel.train.

required
metadata Optional[Dict[str, Any]]

Optional model metadata dict from :meth:UbunyeModel.metadata.

None

Returns:

Type Description
List[GateResult]

List of :class:GateResult — one per configured gate.

List[GateResult]

Promotion is allowed only if all results have passed=True.

failed_gates

failed_gates(metrics, metadata=None)

Return only the gates that did not pass.

ubunye.models.gates.GateResult dataclass

Result of evaluating a single promotion gate.


Lineage

ubunye.lineage.recorder.LineageRecorder

Monitor plugin that persists run lineage as structured JSON.

Parameters

store: Backend type — "filesystem" (default) or "s3" (stub). base_dir: Root directory for the FileSystemLineageStore. sample_fraction: Fraction of rows sampled when hashing DataFrames (0 < value ≤ 1).

task_end

task_end(*, context, config, outputs, status, duration_sec)

Update the run record with final status, duration, and step hashes.

task_start

task_start(*, context, config)

Create a "running" lineage record and persist it immediately.

ubunye.lineage.context.RunContext dataclass

Full lineage record for one task execution.

ubunye.lineage.storage.FileSystemLineageStore

Bases: LineageStore

Stores lineage records as JSON files in a local directory tree.

Layout::

{base_dir}/{usecase}/{package}/{task_name}/{run_id}.json

Plugins — Readers

ubunye.plugins.readers.rest_api.RestApiReader

Bases: Reader

Read a Spark DataFrame from a REST API endpoint.

Handles pagination (offset, cursor, next_link), authentication (bearer, api_key, basic), rate limiting, and retry with exponential backoff.

read

read(cfg, backend)

Fetch all pages from the API and return a Spark DataFrame.

Parameters

cfg : dict Reader configuration. Required key: url. See module docstring for full reference. backend : SparkBackend Ubunye Spark backend exposing .spark.

Returns

pyspark.sql.DataFrame


Plugins — Writers

ubunye.plugins.writers.rest_api.RestApiWriter

Bases: Writer

Write a Spark DataFrame to a REST API endpoint in JSON batches.

Iterates over DataFrame rows, groups them into batches of batch_size, and POSTs each batch as {"records": [...]}. Tracks and logs success/ failure counts per batch.

write

write(df, cfg, _backend)

POST DataFrame rows to a REST endpoint in batches.

Parameters

df : pyspark.sql.DataFrame Source DataFrame. cfg : dict Writer configuration. Required key: url. See module docstring for full reference. backend : SparkBackend Ubunye Spark backend (passed for interface consistency; not used directly).

Raises

ValueError if url is missing from cfg.


Plugins — Transforms

ubunye.plugins.transforms.model_transform.ModelTransform

Bases: Transform

Transform plugin for ML operations (train / predict).

Implements :class:ubunye.core.interfaces.Transform so it plugs into the existing engine dispatch with zero changes to the core runtime.

apply

apply(inputs, cfg, backend)

Dispatch to train or predict based on cfg["action"].

Parameters:

Name Type Description Default
inputs Dict[str, Any]

Named DataFrames from the read phase.

required
cfg dict

Transform params from config.yaml (the params sub-dict).

required
backend Backend

Active engine backend (provides Spark session if needed).

required

Returns:

Type Description
Dict[str, Any]

Dict of named outputs. For train: {"model_metrics": dict}.

Dict[str, Any]

For predict: {"predictions": DataFrame}.


Internal ML wrappers

Note

These are internal wrappers used by Ubunye's own sklearn/Spark ML adapters. User-defined models should implement UbunyeModel, not these classes.

ubunye.plugins.ml.base.BaseModel

Bases: HasSchema, ABC

Framework-agnostic estimator interface.

Subclasses must implement
  • _fit_core(X, y)
  • _predict_core(X) -> y_pred (and optionally probabilities)
  • _save_core(path)
  • _load_core(path)
  • params property (read-only view)

params property

params

Model hyperparameters (read-only view).

fit

fit(X, y=None)

Fit the model; X can be Spark DF, pandas DF, numpy array, etc.

load

load(path)

Load model and metadata from path.

metrics

metrics()

Optional: training/validation metrics (subclasses may override).

predict

predict(X, proba=False)

Predict on inputs; returns labels or (labels, probabilities) if proba=True.

save

save(path)

Persist model and metadata to a directory path.

ubunye.plugins.ml.sklearn.SklearnModel

Bases: BaseModel

Wrap any sklearn estimator with a unified Ubunye interface.

ubunye.plugins.ml.pysparkml.SparkMLModel

Bases: BaseModel

Wrap a Spark ML Estimator or loaded PipelineModel.