Skip to content

API Reference

Auto-generated from docstrings via mkdocstrings.


Python API

The public Python API for running Ubunye tasks without the CLI. Primary use case: Databricks notebooks and jobs where a SparkSession already exists.

import ubunye

# Run a single task
outputs = ubunye.run_task(
    task_dir="pipelines/fraud_detection/ingestion/claim_etl",
    mode="nonprod",
    dt="202510",
)

# Run multiple tasks sequentially
results = ubunye.run_pipeline(
    usecase_dir="pipelines",
    usecase="fraud_detection",
    package="ingestion",
    tasks=["claim_etl", "feature_engineering"],
    mode="nonprod",
    dt="202510",
)

ubunye.api.run_task

run_task(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, hooks=None)

Run a single Ubunye task and return the outputs map.

Parameters

task_dir : str Path to the task directory containing config.yaml and transformations.py. mode : str Run mode, used for Spark profile merging. Default "DEV". dt : str, optional Data timestamp, injected as {{ dt }} in Jinja templates. dtf : str, optional Data timestamp format, injected as {{ dtf }}. spark : SparkSession, optional Explicit SparkSession to reuse. If None, auto-detects an active session (Databricks) or creates a new one. lineage : bool Record lineage for this run. lineage_dir : str Root directory for lineage records. profile : str, optional Config profile for validation (passed to load_config). hooks : iterable of Hook, optional Replace the engine's default hooks entirely. Rarely needed; prefer the ubunye.hooks entry point for always-on hooks.

Returns

Dict[str, Any] Mapping of output name → DataFrame.

ubunye.api.run_pipeline

run_pipeline(usecase_dir, usecase, package, tasks, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, hooks=None)

Run multiple tasks sequentially and return all outputs.

Parameters

usecase_dir : str Root directory for use cases (e.g. "./pipelines"). usecase : str Use case name. package : str Package/pipeline name. tasks : List[str] Task names to run in order. mode, dt, dtf, spark, lineage, lineage_dir, profile, hooks Same as :func:run_task.

Returns

Dict[str, Dict[str, Any]] Mapping of task name → outputs map.


Notebook API (Interactive)

Step-by-step execution for prototyping in Databricks notebooks. Environment variables referenced via {{ env.VAR }} in config.yaml are auto-resolved from widgets and secrets — no manual os.environ setup.

import ubunye

ctx = ubunye.notebook(
    "/Workspace/pipelines/claims/claim_etl",
    mode="PROD",
    dt="2026-01-01",
)

sources = ctx.read()
sources["raw_claims"].display()

outputs = ctx.transform(sources)
outputs["bronze_claims"].show()

ctx.write(outputs)

ubunye.notebook.notebook

notebook(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, env=None, secrets_scope=None, secrets_map=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, auto_env=True, hooks=None)

Create an interactive notebook context for step-by-step task execution.

All {{ env.VAR }} references in the task's config.yaml are auto-resolved from Databricks widgets and secrets — no manual os.environ setup needed.

ubunye.notebook.NotebookContext

Stateful notebook session for step-by-step Ubunye task execution.

config property

config

The fully resolved config object.

config_dict property

config_dict

The config as a plain dict.

env_vars property

env_vars

The env vars that were auto-resolved and injected.

spark property

spark

Shortcut to the active SparkSession.

task_name property

task_name

The task directory name.

read

read()

Read all inputs defined in config.yaml. Returns {name: DataFrame}.

transform

transform(sources=None)

Apply the task's transform logic.

If sources is None, uses the result of the last :meth:read call (or reads automatically if :meth:read was never called).

write

write(outputs=None)

Write outputs to configured sinks.

If outputs is None, uses the result of the last :meth:transform call.

run

run()

Read, transform, and write in one call (same as ubunye.run_task).

close

close()

Restore env vars and remove task dir from sys.path.


Core Engine

ubunye.core.runtime.Engine

Executes a task by reading inputs, applying one or more transforms, and writing outputs.

Minimal required config structure

cfg[CONFIG][inputs] : mapping input_name -> reader cfg (must include 'format') cfg[CONFIG][outputs] : mapping output_name -> writer cfg (must include 'format') cfg[CONFIG][transform]: EITHER a single transform dict with 'type', OR a list of transform dicts to form a pipeline.

Observation (logging, metrics, tracing, user monitors) is delegated to :class:ubunye.core.hooks.Hook instances. Pass hooks= to override the default set.

__init__

__init__(backend=None, registry=None, context=None, hooks=None, extra_hooks=None, manage_backend=True)
Parameters

hooks : iterable of Hook, optional Replace the default hook set entirely. extra_hooks : iterable of Hook, optional Append these hooks to the default set. Ignored when hooks is also given. manage_backend : bool, default True If True (default), the engine calls backend.start() and backend.stop(). Set to False when the caller owns the backend lifecycle (e.g. Python API running multiple tasks on one session).

apply_transforms

apply_transforms(sources, cfg)

Apply configured transforms to sources.

Returns a dict mapping output name to DataFrame.

read_inputs

read_inputs(cfg)

Read all inputs defined in CONFIG.inputs.

Returns a dict mapping input name to DataFrame — suitable for interactive inspection before calling :meth:apply_transforms.

run

run(cfg, *, dry_run=False)

Run a task using the provided config mapping.

Parameters

cfg : dict Parsed task configuration (already rendered/validated). dry_run : bool, default False If True, validates and returns without executing readers/writers.

Returns

Optional[Dict[str, Any]] Optionally returns the outputs map (None if dry_run).

write_outputs

write_outputs(outputs, cfg)

Write outputs to the sinks defined in CONFIG.outputs.


Interfaces (Protocols)

See the full Interfaces page for design principles, discovery, and auto-detection.

ubunye.interfaces

Formal interfaces (typing.Protocol) for Ubunye's pluggable seams.

Each protocol defines the structural contract that backends must satisfy. Implementations are discovered at runtime via entry-point groups and auto-detect classmethods.

Cross-boundary dataclasses accompany each protocol so that callers never depend on a concrete backend's internal types.

AuthBackend

Bases: Protocol

Structural interface for authentication backends.

Concrete implementations: TokenAuthBackend (built-in), ServicePrincipalAuthBackend (Phase 2).

resolve
resolve(workspace_url, **kwargs)

Resolve credentials for the given workspace.

Parameters

workspace_url: The Databricks (or other) workspace host URL. **kwargs: Backend-specific hints (e.g. token_env="MY_TOKEN").

Returns

Credentials Populated credentials ready for client construction.

Raises

ubunye.core.errors.AuthNotFoundError When credentials cannot be located (missing env var, etc.).

validate
validate(credentials)

Validate that credentials are accepted by the workspace.

Returns True on success.

Raises

ubunye.core.errors.AuthInvalidError When the workspace rejects the credentials.

Credentials dataclass

Resolved authentication credentials.

The auth_type field tells callers which fields are populated. metadata carries backend-specific extras (e.g. token expiry, scope, tenant ID) without polluting the core fields.

DeployAdapter

Bases: Protocol

Structural interface for deployment backends.

Each adapter is responsible for:

  1. Validating that the target dict contains the fields it needs.
  2. Preparing artifacts (notebooks, config bundles) for the target.
  3. Uploading artifacts and creating/updating the remote job.
  4. Returning a :class:DeployResult with job metadata.
name property
name

Short identifier for this adapter (e.g. "databricks").

deploy
deploy(ctx, target, *, dry_run=False)

Deploy a task to the target environment.

Parameters

ctx: Task metadata, config dict, and filesystem paths. target: Adapter-specific target configuration (validated by the adapter via :meth:validate_target). dry_run: If True, build and return the job spec without making any remote API calls.

Returns

DeployResult Job metadata including name, id, spec, and workspace path.

get_job_url
get_job_url(job_id, target)

Return a browser URL to the deployed job.

Parameters

job_id: The job identifier returned by :meth:deploy. target: The same target dict, which typically contains the host URL.

validate_target
validate_target(target)

Validate adapter-specific target config.

Raises

ubunye.core.errors.TargetNotFoundError When required keys are missing or values are invalid.

DeployContext dataclass

Structured metadata for a deploy operation.

Callers construct this from CLI flags and the parsed config; the adapter never derives task identity from filesystem paths.

DeployResult dataclass

Returned by :meth:DeployAdapter.deploy.

metadata carries adapter-specific extras (notebook source, cluster id, etc.) without polluting the core fields.

LineageBackend

Bases: Protocol

Structural interface for lineage storage backends.

Concrete implementations: FilesystemLineageBackend (built-in), DeltaLineageBackend (Phase 2).

compact
compact()

Optimize the underlying storage.

For filesystem backends this is a no-op. For Delta backends this triggers OPTIMIZE on the lineage table.

get_run
get_run(run_id)

Load a single run record by ID.

Raises

ubunye.core.errors.LineageRecordNotFoundError When no record exists for run_id.

record
record(record)

Persist a lineage record (non-blocking per Decision 1).

Creates or overwrites the record for the given run_id. Typically called twice per task: once at start (status="running") and once at end (status="success" or "error").

The actual write is dispatched to a background worker thread. On failure, the record is written to a fallback manifest (Decision 2).

Parameters

record: The lineage record to persist. Callers construct this with the current engine version and task metadata.

search
search(*, task=None, usecase=None, pipeline=None, status=None, since=None, limit=100)

Search across recorded runs with optional filters.

Parameters

task: Filter by task name. usecase: Filter by use case. pipeline: Filter by pipeline. status: Filter by status ("success", "error", "running"). since: ISO-8601 datetime; only return runs recorded at or after. limit: Maximum number of records to return.

LineageRecord dataclass

Cross-boundary lineage record (Decision 3 compliant).

Core columns are strict; everything else goes into metadata. Every record stamps engine_version so analysts can filter by writer version.

ModelVersionInfo dataclass

Cross-boundary model version record (Decision 3 compliant).

Core fields are strict; everything else goes into metadata. Every record stamps engine_version so downstream consumers can filter by writer version.

RegistryBackend

Bases: Protocol

Structural interface for model registries.

Concrete implementations: FilesystemRegistryBackend (built-in), MLflowRegistryBackend (Phase 2).

delete
delete(*, use_case, model_name, version)

Remove a version and its artifacts.

demote
demote(*, use_case, model_name, version, to_stage)

Demote a version to a lower stage.

get
get(*, use_case, model_name, version=None, stage=None)

Retrieve a model version by exact version or active stage.

Returns

tuple of (artifact_path, ModelVersionInfo) artifact_path is an opaque string — it may be a local path, an MLflow artifact URI, or a cloud storage URL. Callers pass it to ModelClass.load(artifact_path).

Raises

ubunye.core.errors.VersionNotFoundError When no matching version exists.

list_versions
list_versions(*, use_case, model_name)

List all registered versions, newest first.

promote
promote(*, use_case, model_name, version, to_stage, gates=None, promoted_by=None)

Promote a version to a higher lifecycle stage.

If gates are provided, all checks must pass before the transition. On gate failure, raises :class:~ubunye.core.errors.PromotionBlockedError — this is NOT caught by the graceful degradation layer (auth and gate failures must propagate).

Metadata update is non-blocking per Decision 1.

register
register(*, use_case, model_name, version, model, metrics, metadata=None, lineage_run_id=None)

Register a trained model as a new version.

Artifact writes are synchronous — the method returns only after the model is safely persisted. Index/metadata updates are non-blocking per Decision 1.

Parameters

use_case: Logical namespace (e.g. "fraud_detection"). model_name: Model identifier within the use case. version: Explicit semver string, or None for auto-increment. model: A trained model instance whose save()/metadata() methods will be called by the backend. metrics: Training metrics dict. metadata: Optional flexible key-value pairs (Decision 3). lineage_run_id: Optional lineage run ID for cross-referencing.


Config

ubunye.config.schema.UbunyeConfig

Bases: BaseModel

Top-level Ubunye task config (the full contents of a config.yaml).

MODEL and VERSION are optional; they default to etl and "0.0.0-dev" respectively. Set them explicitly in production pipelines where job type or version is load-bearing (lineage records, model registry, orchestrator metadata).

merged_spark_conf

merged_spark_conf(profile=None)

Return base spark_conf merged with the named profile's overrides.

resolved_catalog

resolved_catalog(profile=None)

Return the catalog name, with profile override if available.

resolved_schema

resolved_schema(profile=None)

Return the schema name, with profile override if available.

ubunye.config.schema.TaskConfig

Bases: BaseModel

The CONFIG section of a task: inputs, transform, outputs.

ubunye.config.schema.IOConfig

Bases: BaseModel

Input or output connector configuration.

Extra fields are allowed so plugin-specific keys (e.g. rest_api's auth, pagination, headers) pass through to the plugin unchanged via model_dump().

ubunye.config.schema.EngineConfig

Bases: BaseModel

Spark/compute settings with optional per-profile overrides.

ubunye.config.schema.RegistryConfig

Bases: BaseModel

Configuration for model registry integration within a transform.

ubunye.config.schema.ModelTransformParams

Bases: BaseModel

Typed params for transform.type: model — for documentation and validation.


Backends

SparkBackend

Creates and manages a new SparkSession. Use for local development, CI, and non-Databricks environments.

ubunye.backends.spark_backend.SparkBackend

Bases: Backend

Creates and manages a SparkSession for a Ubunye run.

Parameters

app_name : str Spark application name (appears in Spark UI/history server). conf : Optional[Dict[str, str]] Spark configuration key-values (e.g., {"spark.master": "yarn"}).

Notes
  • pyspark is imported lazily inside start() to keep installation lightweight.
  • spark property is only valid after start() (or inside the context manager).
app_name property
app_name

Configured Spark app name.

conf_effective property
conf_effective

The effective Spark configuration currently applied to the session.

Returns an empty dict if the session hasn't been started yet.

conf_input property
conf_input

The configuration dict passed into this backend at construction time.

is_spark property
is_spark

Whether this backend is Spark-based (always True here).

spark property
spark

Return the active SparkSession (after start()).

Raises

RuntimeError If start() has not been called.

start
start()

Create (or reuse) a SparkSession.

Safe to call multiple times; a second call is a no-op if a session already exists.

stop
stop()

Stop the SparkSession if running.

DatabricksBackend

Reuses an active SparkSession instead of creating one. Use on Databricks where a session already exists.

ubunye.backends.databricks_backend.DatabricksBackend

Bases: Backend

Backend that reuses the existing Databricks SparkSession.

Parameters

spark : Optional[SparkSession] An explicit session to wrap. If None (the default), the active session is retrieved via SparkSession.getActiveSession().

start
start()

Attach to the active SparkSession if one wasn't provided.

stop
stop()

No-op — we don't own the session, so we never stop it.


Models (UbunyeModel contract)

ubunye.models.base.UbunyeModel

Bases: ABC

Abstract contract every user model must implement.

The engine calls these methods during the pipeline lifecycle. Users implement them with whatever ML library they choose — the engine never imports or knows about the underlying library.

Minimal example::

from ubunye.models.base import UbunyeModel
import joblib

class FraudRiskModel(UbunyeModel):
    def __init__(self):
        self._clf = None

    def train(self, df):
        import pandas as pd
        from sklearn.ensemble import GradientBoostingClassifier
        pdf = df.toPandas()
        X, y = pdf.drop("label", axis=1), pdf["label"]
        self._clf = GradientBoostingClassifier().fit(X, y)
        return {"accuracy": self._clf.score(X, y)}

    def predict(self, df):
        pdf = df.toPandas()
        pdf["score"] = self._clf.predict_proba(pdf)[:, 1]
        return spark.createDataFrame(pdf)

    def save(self, path):
        import pathlib, joblib
        pathlib.Path(path).mkdir(parents=True, exist_ok=True)
        joblib.dump(self._clf, f"{path}/clf.joblib")

    @classmethod
    def load(cls, path):
        import joblib
        m = cls()
        m._clf = joblib.load(f"{path}/clf.joblib")
        return m

    def metadata(self):
        import sklearn
        return {
            "library": "sklearn",
            "library_version": sklearn.__version__,
            "features": list(self._clf.feature_names_in_),
            "params": self._clf.get_params(),
        }

load abstractmethod classmethod

load(path)

Deserialize a model from the given directory path.

Parameters:

Name Type Description Default
path str

Directory path where model files were saved.

required

Returns:

Type Description
'UbunyeModel'

An instance of the model ready for prediction.

metadata abstractmethod

metadata()

Return model metadata for the registry.

Should include at minimum: - library: str — e.g. "xgboost", "sklearn", "pytorch" - library_version: str — e.g. "2.0.3" - features: List[str] — feature names used during training - params: dict — model hyperparameters

Additional keys are allowed and will be stored in the registry.

predict abstractmethod

predict(df)

Generate predictions on the provided DataFrame.

Parameters:

Name Type Description Default
df Any

Spark DataFrame with feature columns.

required

Returns:

Type Description
Any

Spark DataFrame (or DataFrame-like) with prediction columns added.

save abstractmethod

save(path)

Serialize the model to the given directory path.

The user chooses the serialization format (pickle, joblib, ONNX, etc.). The engine only provides the path — it never inspects the saved files.

Parameters:

Name Type Description Default
path str

Directory path where model files should be written.

required

train abstractmethod

train(df)

Train the model on the provided DataFrame.

Parameters:

Name Type Description Default
df Any

Spark DataFrame (or any DataFrame-like) with training data.

required

Returns:

Type Description
Dict[str, Any]

Dict of metrics e.g. {"auc": 0.94, "f1": 0.87, "accuracy": 0.91}.

Dict[str, Any]

These metrics are stored in the registry alongside the model version.

validate

validate(df)

Optional: validate model on holdout data.

Returns:

Type Description
Dict[str, Any]

Metrics dict in the same format as :meth:train.

Raises:

Type Description
NotImplementedError

if the subclass has not overridden this method.

ubunye.models.loader.load_model_class

load_model_class(task_dir, class_name)

Dynamically load a UbunyeModel subclass.

The class_name is a dotted path like "model.FraudRiskModel" where the first segment is the Python module filename (without .py) and the rest is the class name.

Resolution order: 1. If task_dir is provided → load <module_segment>.py directly from that directory using importlib.util.spec_from_file_location. 2. If task_dir is None → attempt a standard importlib.import_module (the module must already be on sys.path; _run_single_task adds the task directory to sys.path before transforms are invoked, so this works automatically when called from the engine).

Parameters:

Name Type Description Default
task_dir Optional[str]

Absolute or relative path to the directory containing the model file (e.g. the pipeline task directory). Pass None to rely on sys.path.

required
class_name str

Dotted path to the class, e.g. "model.FraudRiskModel" or "models.risk.FraudRiskModel".

required

Returns:

Type Description
Type[UbunyeModel]

The model class (not an instance). Callers should do cls() or

Type[UbunyeModel]

cls.load(path) as needed.

Raises:

Type Description
FileNotFoundError

The module file does not exist in task_dir.

ImportError

The class name was not found in the module.

TypeError

The class does not subclass :class:ubunye.models.base.UbunyeModel.


Model Registry

ubunye.models.registry.ModelRegistry

Manages model versions and lifecycle transitions.

Parameters:

Name Type Description Default
store_path str

Root directory for all model artifacts and registry JSON files.

required

archive

archive(use_case, model_name, version)

Archive a model version.

Returns:

Name Type Description
Updated ModelVersion

class:ModelVersion.

compare_versions

compare_versions(use_case, model_name, version_a, version_b)

Compare metrics between two versions.

Returns:

Type Description
Dict[str, Dict[str, Any]]

Dict mapping metric name → {"a": val_a, "b": val_b, "delta": diff}.

Dict[str, Dict[str, Any]]

Only metrics present in at least one version are included.

demote

demote(use_case, model_name, version, to_stage)

Demote a model version to a lower stage.

Parameters:

Name Type Description Default
to_stage ModelStage

Target stage (typically development or staging).

required

Returns:

Name Type Description
Updated ModelVersion

class:ModelVersion.

get_model

get_model(use_case, model_name, task_dir=None, version=None, stage=None)

Return the artifact path and version metadata for a model.

Specify either version (exact) or stage (active version in that stage).

Parameters:

Name Type Description Default
use_case str

Use-case grouping.

required
model_name str

Model identifier.

required
task_dir Optional[str]

Task directory containing model.py (passed to caller for loading).

None
version Optional[str]

Exact version string.

None
stage Optional[ModelStage]

Stage to look up active version for.

None

Returns:

Type Description
str

(artifact_path, model_version) — caller calls

ModelVersion

ModelClass.load(artifact_path) to obtain the model instance.

Raises:

Type Description
ValueError

No matching version found.

list_versions

list_versions(use_case, model_name)

List all registered versions for a model (newest first by registered_at).

Raises:

Type Description
FileNotFoundError

Model not found in registry.

promote

promote(use_case, model_name, version, to_stage, promoted_by=None, gates=None)

Promote a model version to a higher stage.

If gates are provided, all gate checks must pass before promotion. If promoting to production, the current production version is automatically archived.

Parameters:

Name Type Description Default
use_case str

Use-case grouping.

required
model_name str

Model identifier.

required
version str

Version string to promote.

required
to_stage ModelStage

Target :class:ModelStage.

required
promoted_by Optional[str]

Optional username.

None
gates Optional[Dict[str, Any]]

Optional dict of promotion gate rules (see :class:PromotionGate).

None

Raises:

Type Description
ValueError

Version not found, or one or more promotion gates failed.

Returns:

Name Type Description
Updated ModelVersion

class:ModelVersion.

register

register(use_case, model_name, version, model, metrics, lineage_run_id=None, registered_by=None)

Register a trained model as a new version (stage=development).

  1. Saves model artifacts to versions/<version>/model/.
  2. Writes metadata.json and metrics.json.
  3. Adds a new :class:ModelVersion entry to registry.json.

Parameters:

Name Type Description Default
use_case str

Logical use-case grouping (e.g. "fraud_detection").

required
model_name str

Model identifier (e.g. "FraudRiskModel").

required
version Optional[str]

Semver string. Pass None to auto-generate.

required
model UbunyeModel

A trained :class:UbunyeModel instance.

required
metrics Dict[str, Any]

Metrics dict returned by :meth:UbunyeModel.train.

required
lineage_run_id Optional[str]

Optional lineage run_id from the training run.

None
registered_by Optional[str]

Optional username for the audit trail.

None

Returns:

Type Description
ModelVersion

The newly created :class:ModelVersion.

rollback

rollback(use_case, model_name, to_version)

Roll back production to a specific previous version.

  1. Archives the current production version.
  2. Promotes to_version to production.

Parameters:

Name Type Description Default
to_version str

The version string to restore to production.

required

Returns:

Type Description
ModelVersion

The restored :class:ModelVersion (now in production stage).

ubunye.models.registry.ModelVersion dataclass

Metadata for a single registered model version.

to_info

to_info(*, name='', artifact_path='')

Convert to the cross-boundary :class:ModelVersionInfo type.

ubunye.models.registry.ModelStage

Bases: str, Enum


Promotion Gates

ubunye.models.gates.PromotionGate

Evaluates a set of metric thresholds before allowing model promotion.

Parameters:

Name Type Description Default
gate_config Dict[str, Any]

Dict of gate rules. Supported keys: - min_<metric>: metric must be >= value - max_<metric>: metric must be <= value - require_drift_check: boolean; checks metadata["drift_check_passed"]

required

all_passed

all_passed(metrics, metadata=None)

Return True only if every configured gate passes.

Parameters:

Name Type Description Default
metrics Dict[str, Any]

Model metrics from :meth:UbunyeModel.train.

required
metadata Optional[Dict[str, Any]]

Optional model metadata from :meth:UbunyeModel.metadata.

None

evaluate

evaluate(metrics, metadata=None)

Evaluate all configured gates against the provided metrics.

Parameters:

Name Type Description Default
metrics Dict[str, Any]

Model metrics dict from :meth:UbunyeModel.train.

required
metadata Optional[Dict[str, Any]]

Optional model metadata dict from :meth:UbunyeModel.metadata.

None

Returns:

Type Description
List[GateResult]

List of :class:GateResult — one per configured gate.

List[GateResult]

Promotion is allowed only if all results have passed=True.

failed_gates

failed_gates(metrics, metadata=None)

Return only the gates that did not pass.

ubunye.models.gates.GateResult dataclass

Result of evaluating a single promotion gate.


Lineage

ubunye.lineage.recorder.LineageRecorder

Monitor plugin that persists run lineage as structured JSON.

Parameters

store: Backend type — "filesystem" (default) or "s3" (stub). base_dir: Root directory for the FileSystemLineageStore. sample_fraction: Fraction of rows sampled when hashing DataFrames (0 < value ≤ 1).

task_end

task_end(*, context, config, outputs, status, duration_sec)

Update the run record with final status, duration, and step hashes.

task_start

task_start(*, context, config)

Create a "running" lineage record and persist it immediately.

ubunye.lineage.context.RunContext dataclass

Full lineage record for one task execution.

ubunye.lineage.storage.FileSystemLineageStore

Bases: LineageStore

Stores lineage records as JSON files in a local directory tree.

Satisfies both the legacy :class:LineageStore ABC and the :class:~ubunye.interfaces.LineageBackend protocol.

Layout::

{base_dir}/{usecase}/{package}/{task_name}/{run_id}.json

compact

compact()

No-op for filesystem backend.

flush

flush(timeout=None)

Drain the background worker queue.

Returns the number of records that went to fallback manifests.

get_run

get_run(run_id)

Load a single run record by run ID.

Uses a run_id→path index for O(1) lookup instead of rglob.

record

record(record)

Persist a lineage record (non-blocking per Decision 1).

Dispatches the write to a background worker thread. On failure, the record is written to a fallback manifest (Decision 2).

search_records

search_records(*, task=None, usecase=None, pipeline=None, status=None, since=None, limit=100)

Protocol-compatible search returning LineageRecord objects.


Plugins — Readers

ubunye.plugins.readers.rest_api.RestApiReader

Bases: Reader

Read a Spark DataFrame from a REST API endpoint.

Handles pagination (offset, cursor, next_link), authentication (bearer, api_key, basic), rate limiting, and retry with exponential backoff.

read

read(cfg, backend)

Fetch all pages from the API and return a Spark DataFrame.

Parameters

cfg : dict Reader configuration. Required key: url. See module docstring for full reference. backend : SparkBackend Ubunye Spark backend exposing .spark.

Returns

pyspark.sql.DataFrame


Plugins — Writers

ubunye.plugins.writers.rest_api.RestApiWriter

Bases: Writer

Write a Spark DataFrame to a REST API endpoint in JSON batches.

Iterates over DataFrame rows, groups them into batches of batch_size, and POSTs each batch as {"records": [...]}. Tracks and logs success/ failure counts per batch.

write

write(df, cfg, _backend)

POST DataFrame rows to a REST endpoint in batches.

Parameters

df : pyspark.sql.DataFrame Source DataFrame. cfg : dict Writer configuration. Required key: url. See module docstring for full reference. backend : SparkBackend Ubunye Spark backend (passed for interface consistency; not used directly).

Raises

ValueError if url is missing from cfg.


Plugins — Transforms

ubunye.plugins.transforms.model_transform.ModelTransform

Bases: Transform

Transform plugin for ML operations (train / predict).

Implements :class:ubunye.core.interfaces.Transform so it plugs into the existing engine dispatch with zero changes to the core runtime.

apply

apply(inputs, cfg, backend)

Dispatch to train or predict based on cfg["action"].

Parameters:

Name Type Description Default
inputs Dict[str, Any]

Named DataFrames from the read phase.

required
cfg dict

Transform params from config.yaml (the params sub-dict).

required
backend Backend

Active engine backend (provides Spark session if needed).

required

Returns:

Type Description
Dict[str, Any]

Dict of named outputs. For train: {"model_metrics": dict}.

Dict[str, Any]

For predict: {"predictions": DataFrame}.


Internal ML wrappers

Note

These are internal wrappers used by Ubunye's own sklearn/Spark ML adapters. User-defined models should implement UbunyeModel, not these classes.

ubunye.plugins.ml.base.BaseModel

Bases: HasSchema, ABC

Framework-agnostic estimator interface.

Subclasses must implement
  • _fit_core(X, y)
  • _predict_core(X) -> y_pred (and optionally probabilities)
  • _save_core(path)
  • _load_core(path)
  • params property (read-only view)

params property

params

Model hyperparameters (read-only view).

fit

fit(X, y=None)

Fit the model; X can be Spark DF, pandas DF, numpy array, etc.

load

load(path)

Load model and metadata from path.

metrics

metrics()

Optional: training/validation metrics (subclasses may override).

predict

predict(X, proba=False)

Predict on inputs; returns labels or (labels, probabilities) if proba=True.

save

save(path)

Persist model and metadata to a directory path.

ubunye.plugins.ml.sklearn.SklearnModel

Bases: BaseModel

Wrap any sklearn estimator with a unified Ubunye interface.

ubunye.plugins.ml.pysparkml.SparkMLModel

Bases: BaseModel

Wrap a Spark ML Estimator or loaded PipelineModel.