API Reference¶
Auto-generated from docstrings via mkdocstrings.
Python API¶
The public Python API for running Ubunye tasks without the CLI. Primary use case: Databricks notebooks and jobs where a SparkSession already exists.
import ubunye
# Run a single task
outputs = ubunye.run_task(
task_dir="pipelines/fraud_detection/ingestion/claim_etl",
mode="nonprod",
dt="202510",
)
# Run multiple tasks sequentially
results = ubunye.run_pipeline(
usecase_dir="pipelines",
usecase="fraud_detection",
package="ingestion",
tasks=["claim_etl", "feature_engineering"],
mode="nonprod",
dt="202510",
)
ubunye.api.run_task ¶
run_task(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, hooks=None)
Run a single Ubunye task and return the outputs map.
Parameters¶
task_dir : str
Path to the task directory containing config.yaml and
transformations.py.
mode : str
Run mode, used for Spark profile merging. Default "DEV".
dt : str, optional
Data timestamp, injected as {{ dt }} in Jinja templates.
dtf : str, optional
Data timestamp format, injected as {{ dtf }}.
spark : SparkSession, optional
Explicit SparkSession to reuse. If None, auto-detects an active
session (Databricks) or creates a new one.
lineage : bool
Record lineage for this run.
lineage_dir : str
Root directory for lineage records.
profile : str, optional
Config profile for validation (passed to load_config).
hooks : iterable of Hook, optional
Replace the engine's default hooks entirely. Rarely needed; prefer
the ubunye.hooks entry point for always-on hooks.
Returns¶
Dict[str, Any] Mapping of output name → DataFrame.
ubunye.api.run_pipeline ¶
run_pipeline(usecase_dir, usecase, package, tasks, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, hooks=None)
Run multiple tasks sequentially and return all outputs.
Parameters¶
usecase_dir : str
Root directory for use cases (e.g. "./pipelines").
usecase : str
Use case name.
package : str
Package/pipeline name.
tasks : List[str]
Task names to run in order.
mode, dt, dtf, spark, lineage, lineage_dir, profile, hooks
Same as :func:run_task.
Returns¶
Dict[str, Dict[str, Any]] Mapping of task name → outputs map.
Core Engine¶
ubunye.core.runtime.Engine ¶
Executes a task by reading inputs, applying one or more transforms, and writing outputs.
Minimal required config structure
cfg[CONFIG][inputs] : mapping input_name -> reader cfg (must include 'format')
cfg[CONFIG][outputs] : mapping output_name -> writer cfg (must include 'format')
cfg[CONFIG][transform]: EITHER a single transform dict with 'type',
OR a list of transform dicts to form a pipeline.
Observation (logging, metrics, tracing, user monitors) is delegated to
:class:ubunye.core.hooks.Hook instances. Pass hooks= to override the
default set.
__init__ ¶
__init__(backend=None, registry=None, context=None, hooks=None, extra_hooks=None, manage_backend=True)
Parameters¶
hooks : iterable of Hook, optional
Replace the default hook set entirely.
extra_hooks : iterable of Hook, optional
Append these hooks to the default set. Ignored when hooks is
also given.
manage_backend : bool, default True
If True (default), the engine calls backend.start() and
backend.stop(). Set to False when the caller owns the backend
lifecycle (e.g. Python API running multiple tasks on one session).
run ¶
Run a task using the provided config mapping.
Parameters¶
cfg : dict Parsed task configuration (already rendered/validated). dry_run : bool, default False If True, validates and returns without executing readers/writers.
Returns¶
Optional[Dict[str, Any]] Optionally returns the outputs map (None if dry_run).
Interfaces¶
ubunye.core.interfaces ¶
Interfaces for Ubunye Engine components.
These abstract base classes define the contracts for backends, readers, writers, transforms, and user-defined tasks.
Config¶
ubunye.config.schema.UbunyeConfig ¶
Bases: BaseModel
Top-level Ubunye task config (the full contents of a config.yaml).
MODEL and VERSION are optional; they default to etl and
"0.0.0-dev" respectively. Set them explicitly in production pipelines
where job type or version is load-bearing (lineage records, model
registry, orchestrator metadata).
ubunye.config.schema.TaskConfig ¶
Bases: BaseModel
The CONFIG section of a task: inputs, transform, outputs.
ubunye.config.schema.IOConfig ¶
Bases: BaseModel
Input or output connector configuration.
Extra fields are allowed so plugin-specific keys (e.g. rest_api's
auth, pagination, headers) pass through to the plugin
unchanged via model_dump().
ubunye.config.schema.EngineConfig ¶
Bases: BaseModel
Spark/compute settings with optional per-profile overrides.
ubunye.config.schema.RegistryConfig ¶
Bases: BaseModel
Configuration for model registry integration within a transform.
ubunye.config.schema.ModelTransformParams ¶
Bases: BaseModel
Typed params for transform.type: model — for documentation and validation.
Backends¶
SparkBackend¶
Creates and manages a new SparkSession. Use for local development, CI, and non-Databricks environments.
ubunye.backends.spark_backend.SparkBackend ¶
Bases: Backend
Creates and manages a SparkSession for a Ubunye run.
Parameters¶
app_name : str Spark application name (appears in Spark UI/history server). conf : Optional[Dict[str, str]] Spark configuration key-values (e.g., {"spark.master": "yarn"}).
Notes¶
- pyspark is imported lazily inside
start()to keep installation lightweight. sparkproperty is only valid afterstart()(or inside the context manager).
conf_effective
property
¶
The effective Spark configuration currently applied to the session.
Returns an empty dict if the session hasn't been started yet.
conf_input
property
¶
The configuration dict passed into this backend at construction time.
spark
property
¶
Return the active SparkSession (after start()).
Raises¶
RuntimeError
If start() has not been called.
start ¶
Create (or reuse) a SparkSession.
Safe to call multiple times; a second call is a no-op if a session already exists.
DatabricksBackend¶
Reuses an active SparkSession instead of creating one. Use on Databricks where a session already exists.
ubunye.backends.databricks_backend.DatabricksBackend ¶
Models (UbunyeModel contract)¶
ubunye.models.base.UbunyeModel ¶
Bases: ABC
Abstract contract every user model must implement.
The engine calls these methods during the pipeline lifecycle. Users implement them with whatever ML library they choose — the engine never imports or knows about the underlying library.
Minimal example::
from ubunye.models.base import UbunyeModel
import joblib
class FraudRiskModel(UbunyeModel):
def __init__(self):
self._clf = None
def train(self, df):
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
pdf = df.toPandas()
X, y = pdf.drop("label", axis=1), pdf["label"]
self._clf = GradientBoostingClassifier().fit(X, y)
return {"accuracy": self._clf.score(X, y)}
def predict(self, df):
pdf = df.toPandas()
pdf["score"] = self._clf.predict_proba(pdf)[:, 1]
return spark.createDataFrame(pdf)
def save(self, path):
import pathlib, joblib
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
joblib.dump(self._clf, f"{path}/clf.joblib")
@classmethod
def load(cls, path):
import joblib
m = cls()
m._clf = joblib.load(f"{path}/clf.joblib")
return m
def metadata(self):
import sklearn
return {
"library": "sklearn",
"library_version": sklearn.__version__,
"features": list(self._clf.feature_names_in_),
"params": self._clf.get_params(),
}
load
abstractmethod
classmethod
¶
Deserialize a model from the given directory path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Directory path where model files were saved. |
required |
Returns:
| Type | Description |
|---|---|
'UbunyeModel'
|
An instance of the model ready for prediction. |
metadata
abstractmethod
¶
Return model metadata for the registry.
Should include at minimum:
- library: str — e.g. "xgboost", "sklearn", "pytorch"
- library_version: str — e.g. "2.0.3"
- features: List[str] — feature names used during training
- params: dict — model hyperparameters
Additional keys are allowed and will be stored in the registry.
predict
abstractmethod
¶
Generate predictions on the provided DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
Any
|
Spark DataFrame with feature columns. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Spark DataFrame (or DataFrame-like) with prediction columns added. |
save
abstractmethod
¶
Serialize the model to the given directory path.
The user chooses the serialization format (pickle, joblib, ONNX, etc.). The engine only provides the path — it never inspects the saved files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Directory path where model files should be written. |
required |
train
abstractmethod
¶
Train the model on the provided DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
Any
|
Spark DataFrame (or any DataFrame-like) with training data. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of metrics e.g. |
Dict[str, Any]
|
These metrics are stored in the registry alongside the model version. |
validate ¶
Optional: validate model on holdout data.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Metrics dict in the same format as :meth: |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
if the subclass has not overridden this method. |
ubunye.models.loader.load_model_class ¶
Dynamically load a UbunyeModel subclass.
The class_name is a dotted path like "model.FraudRiskModel" where the
first segment is the Python module filename (without .py) and the rest is
the class name.
Resolution order:
1. If task_dir is provided → load <module_segment>.py directly from
that directory using importlib.util.spec_from_file_location.
2. If task_dir is None → attempt a standard importlib.import_module
(the module must already be on sys.path; _run_single_task adds the
task directory to sys.path before transforms are invoked, so this works
automatically when called from the engine).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_dir
|
Optional[str]
|
Absolute or relative path to the directory containing the model
file (e.g. the pipeline task directory). Pass |
required |
class_name
|
str
|
Dotted path to the class, e.g. |
required |
Returns:
| Type | Description |
|---|---|
Type[UbunyeModel]
|
The model class (not an instance). Callers should do |
Type[UbunyeModel]
|
|
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
The module file does not exist in |
ImportError
|
The class name was not found in the module. |
TypeError
|
The class does not subclass :class: |
Model Registry¶
ubunye.models.registry.ModelRegistry ¶
Manages model versions and lifecycle transitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store_path
|
str
|
Root directory for all model artifacts and registry JSON files. |
required |
archive ¶
compare_versions ¶
Compare metrics between two versions.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, Any]]
|
Dict mapping metric name → |
Dict[str, Dict[str, Any]]
|
Only metrics present in at least one version are included. |
demote ¶
Demote a model version to a lower stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_stage
|
ModelStage
|
Target stage (typically |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Updated |
ModelVersion
|
class: |
get_model ¶
Return the artifact path and version metadata for a model.
Specify either version (exact) or stage (active version in that stage).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Use-case grouping. |
required |
model_name
|
str
|
Model identifier. |
required |
task_dir
|
Optional[str]
|
Task directory containing model.py (passed to caller for loading). |
None
|
version
|
Optional[str]
|
Exact version string. |
None
|
stage
|
Optional[ModelStage]
|
Stage to look up active version for. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
|
ModelVersion
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
No matching version found. |
list_versions ¶
List all registered versions for a model (newest first by registered_at).
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
Model not found in registry. |
promote ¶
Promote a model version to a higher stage.
If gates are provided, all gate checks must pass before promotion.
If promoting to production, the current production version is automatically
archived.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Use-case grouping. |
required |
model_name
|
str
|
Model identifier. |
required |
version
|
str
|
Version string to promote. |
required |
to_stage
|
ModelStage
|
Target :class: |
required |
promoted_by
|
Optional[str]
|
Optional username. |
None
|
gates
|
Optional[Dict[str, Any]]
|
Optional dict of promotion gate rules (see :class: |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
Version not found, or one or more promotion gates failed. |
Returns:
| Name | Type | Description |
|---|---|---|
Updated |
ModelVersion
|
class: |
register ¶
Register a trained model as a new version (stage=development).
- Saves model artifacts to
versions/<version>/model/. - Writes
metadata.jsonandmetrics.json. - Adds a new :class:
ModelVersionentry toregistry.json.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Logical use-case grouping (e.g. |
required |
model_name
|
str
|
Model identifier (e.g. |
required |
version
|
Optional[str]
|
Semver string. Pass |
required |
model
|
UbunyeModel
|
A trained :class: |
required |
metrics
|
Dict[str, Any]
|
Metrics dict returned by :meth: |
required |
lineage_run_id
|
Optional[str]
|
Optional lineage |
None
|
registered_by
|
Optional[str]
|
Optional username for the audit trail. |
None
|
Returns:
| Type | Description |
|---|---|
ModelVersion
|
The newly created :class: |
rollback ¶
Roll back production to a specific previous version.
- Archives the current production version.
- Promotes
to_versionto production.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_version
|
str
|
The version string to restore to production. |
required |
Returns:
| Type | Description |
|---|---|
ModelVersion
|
The restored :class: |
ubunye.models.registry.ModelVersion
dataclass
¶
Metadata for a single registered model version.
ubunye.models.registry.ModelStage ¶
Bases: str, Enum
Promotion Gates¶
ubunye.models.gates.PromotionGate ¶
Evaluates a set of metric thresholds before allowing model promotion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gate_config
|
Dict[str, Any]
|
Dict of gate rules. Supported keys:
- |
required |
all_passed ¶
Return True only if every configured gate passes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
Dict[str, Any]
|
Model metrics from :meth: |
required |
metadata
|
Optional[Dict[str, Any]]
|
Optional model metadata from :meth: |
None
|
evaluate ¶
Evaluate all configured gates against the provided metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
Dict[str, Any]
|
Model metrics dict from :meth: |
required |
metadata
|
Optional[Dict[str, Any]]
|
Optional model metadata dict from :meth: |
None
|
Returns:
| Type | Description |
|---|---|
List[GateResult]
|
List of :class: |
List[GateResult]
|
Promotion is allowed only if all results have |
ubunye.models.gates.GateResult
dataclass
¶
Result of evaluating a single promotion gate.
Lineage¶
ubunye.lineage.recorder.LineageRecorder ¶
Monitor plugin that persists run lineage as structured JSON.
Parameters¶
store:
Backend type — "filesystem" (default) or "s3" (stub).
base_dir:
Root directory for the FileSystemLineageStore.
sample_fraction:
Fraction of rows sampled when hashing DataFrames (0 < value ≤ 1).
ubunye.lineage.context.RunContext
dataclass
¶
Full lineage record for one task execution.
ubunye.lineage.storage.FileSystemLineageStore ¶
Bases: LineageStore
Stores lineage records as JSON files in a local directory tree.
Layout::
{base_dir}/{usecase}/{package}/{task_name}/{run_id}.json
Plugins — Readers¶
ubunye.plugins.readers.rest_api.RestApiReader ¶
Bases: Reader
Read a Spark DataFrame from a REST API endpoint.
Handles pagination (offset, cursor, next_link), authentication (bearer, api_key, basic), rate limiting, and retry with exponential backoff.
Plugins — Writers¶
ubunye.plugins.writers.rest_api.RestApiWriter ¶
Bases: Writer
Write a Spark DataFrame to a REST API endpoint in JSON batches.
Iterates over DataFrame rows, groups them into batches of batch_size,
and POSTs each batch as {"records": [...]}. Tracks and logs success/
failure counts per batch.
write ¶
POST DataFrame rows to a REST endpoint in batches.
Parameters¶
df : pyspark.sql.DataFrame
Source DataFrame.
cfg : dict
Writer configuration. Required key: url.
See module docstring for full reference.
backend : SparkBackend
Ubunye Spark backend (passed for interface consistency; not used directly).
Raises¶
ValueError if url is missing from cfg.
Plugins — Transforms¶
ubunye.plugins.transforms.model_transform.ModelTransform ¶
Bases: Transform
Transform plugin for ML operations (train / predict).
Implements :class:ubunye.core.interfaces.Transform so it plugs into the
existing engine dispatch with zero changes to the core runtime.
apply ¶
Dispatch to train or predict based on cfg["action"].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Dict[str, Any]
|
Named DataFrames from the read phase. |
required |
cfg
|
dict
|
Transform params from |
required |
backend
|
Backend
|
Active engine backend (provides Spark session if needed). |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of named outputs. For |
Dict[str, Any]
|
For |
Internal ML wrappers¶
Note
These are internal wrappers used by Ubunye's own sklearn/Spark ML adapters.
User-defined models should implement UbunyeModel, not these classes.
ubunye.plugins.ml.base.BaseModel ¶
Bases: HasSchema, ABC
Framework-agnostic estimator interface.
Subclasses must implement
- _fit_core(X, y)
- _predict_core(X) -> y_pred (and optionally probabilities)
- _save_core(path)
- _load_core(path)
- params property (read-only view)
predict ¶
Predict on inputs; returns labels or (labels, probabilities) if proba=True.