API Reference¶
Auto-generated from docstrings via mkdocstrings.
Python API¶
The public Python API for running Ubunye tasks without the CLI. Primary use case: Databricks notebooks and jobs where a SparkSession already exists.
import ubunye
# Run a single task
outputs = ubunye.run_task(
task_dir="pipelines/fraud_detection/ingestion/claim_etl",
mode="nonprod",
dt="202510",
)
# Run multiple tasks sequentially
results = ubunye.run_pipeline(
usecase_dir="pipelines",
usecase="fraud_detection",
package="ingestion",
tasks=["claim_etl", "feature_engineering"],
mode="nonprod",
dt="202510",
)
ubunye.api.run_task ¶
run_task(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None)
Run a single Ubunye task and return the outputs map.
Parameters¶
task_dir : str
Path to the task directory containing config.yaml and
transformations.py.
mode : str
Run mode, used for Spark profile merging. Default "DEV".
dt : str, optional
Data timestamp, injected as {{ dt }} in Jinja templates.
dtf : str, optional
Data timestamp format, injected as {{ dtf }}.
spark : SparkSession, optional
Explicit SparkSession to reuse. If None, auto-detects an active
session (Databricks) or creates a new one.
lineage : bool
Record lineage for this run.
lineage_dir : str
Root directory for lineage records.
profile : str, optional
Config profile for validation (passed to load_config).
Returns¶
Dict[str, Any] Mapping of output name → DataFrame.
ubunye.api.run_pipeline ¶
run_pipeline(usecase_dir, usecase, package, tasks, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None)
Run multiple tasks sequentially and return all outputs.
Parameters¶
usecase_dir : str
Root directory for use cases (e.g. "./pipelines").
usecase : str
Use case name.
package : str
Package/pipeline name.
tasks : List[str]
Task names to run in order.
mode, dt, dtf, spark, lineage, lineage_dir, profile
Same as :func:run_task.
Returns¶
Dict[str, Dict[str, Any]] Mapping of task name → outputs map.
Core Engine¶
ubunye.core.runtime.Engine ¶
Executes a task by reading inputs, applying one or more transforms, and writing outputs.
Minimal required config structure
cfg[CONFIG][inputs] : mapping input_name -> reader cfg (must include 'format')
cfg[CONFIG][outputs] : mapping output_name -> writer cfg (must include 'format')
cfg[CONFIG][transform]: EITHER a single transform dict with 'type',
OR a list of transform dicts to form a pipeline.
Notes¶
- This class is intentionally small; orchestration, telemetry, and retries can be layered around it without changing plugin contracts.
run ¶
Run a task using the provided config mapping.
Parameters¶
cfg : dict Parsed task configuration (already rendered/validated). dry_run : bool, default False If True, validates and returns without executing readers/writers.
Returns¶
Optional[Dict[str, Any]] Optionally returns the outputs map (None if dry_run).
Interfaces¶
ubunye.core.interfaces ¶
Interfaces for Ubunye Engine components.
These abstract base classes define the contracts for backends, readers, writers, transforms, and user-defined tasks.
Config¶
ubunye.config.schema.UbunyeConfig ¶
Bases: BaseModel
Top-level Ubunye task config (the full contents of a config.yaml).
ubunye.config.schema.TaskConfig ¶
Bases: BaseModel
The CONFIG section of a task: inputs, transform, outputs.
ubunye.config.schema.IOConfig ¶
Bases: BaseModel
Input or output connector configuration.
Extra fields are allowed so plugin-specific keys (e.g. rest_api's
auth, pagination, headers) pass through to the plugin
unchanged via model_dump().
ubunye.config.schema.EngineConfig ¶
Bases: BaseModel
Spark/compute settings with optional per-profile overrides.
ubunye.config.schema.RegistryConfig ¶
Bases: BaseModel
Configuration for model registry integration within a transform.
ubunye.config.schema.ModelTransformParams ¶
Bases: BaseModel
Typed params for transform.type: model — for documentation and validation.
Backends¶
SparkBackend¶
Creates and manages a new SparkSession. Use for local development, CI, and non-Databricks environments.
ubunye.backends.spark_backend.SparkBackend ¶
Bases: Backend
Creates and manages a SparkSession for a Ubunye run.
Parameters¶
app_name : str Spark application name (appears in Spark UI/history server). conf : Optional[Dict[str, str]] Spark configuration key-values (e.g., {"spark.master": "yarn"}).
Notes¶
- pyspark is imported lazily inside
start()to keep installation lightweight. sparkproperty is only valid afterstart()(or inside the context manager).
conf_effective
property
¶
The effective Spark configuration currently applied to the session.
Returns an empty dict if the session hasn't been started yet.
conf_input
property
¶
The configuration dict passed into this backend at construction time.
spark
property
¶
Return the active SparkSession (after start()).
Raises¶
RuntimeError
If start() has not been called.
start ¶
Create (or reuse) a SparkSession.
Safe to call multiple times; a second call is a no-op if a session already exists.
DatabricksBackend¶
Reuses an active SparkSession instead of creating one. Use on Databricks where a session already exists.
ubunye.backends.databricks_backend.DatabricksBackend ¶
Models (UbunyeModel contract)¶
ubunye.models.base.UbunyeModel ¶
Bases: ABC
Abstract contract every user model must implement.
The engine calls these methods during the pipeline lifecycle. Users implement them with whatever ML library they choose — the engine never imports or knows about the underlying library.
Minimal example::
from ubunye.models.base import UbunyeModel
import joblib
class FraudRiskModel(UbunyeModel):
def __init__(self):
self._clf = None
def train(self, df):
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
pdf = df.toPandas()
X, y = pdf.drop("label", axis=1), pdf["label"]
self._clf = GradientBoostingClassifier().fit(X, y)
return {"accuracy": self._clf.score(X, y)}
def predict(self, df):
pdf = df.toPandas()
pdf["score"] = self._clf.predict_proba(pdf)[:, 1]
return spark.createDataFrame(pdf)
def save(self, path):
import pathlib, joblib
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
joblib.dump(self._clf, f"{path}/clf.joblib")
@classmethod
def load(cls, path):
import joblib
m = cls()
m._clf = joblib.load(f"{path}/clf.joblib")
return m
def metadata(self):
import sklearn
return {
"library": "sklearn",
"library_version": sklearn.__version__,
"features": list(self._clf.feature_names_in_),
"params": self._clf.get_params(),
}
load
abstractmethod
classmethod
¶
Deserialize a model from the given directory path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Directory path where model files were saved. |
required |
Returns:
| Type | Description |
|---|---|
'UbunyeModel'
|
An instance of the model ready for prediction. |
metadata
abstractmethod
¶
Return model metadata for the registry.
Should include at minimum:
- library: str — e.g. "xgboost", "sklearn", "pytorch"
- library_version: str — e.g. "2.0.3"
- features: List[str] — feature names used during training
- params: dict — model hyperparameters
Additional keys are allowed and will be stored in the registry.
predict
abstractmethod
¶
Generate predictions on the provided DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
Any
|
Spark DataFrame with feature columns. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Spark DataFrame (or DataFrame-like) with prediction columns added. |
save
abstractmethod
¶
Serialize the model to the given directory path.
The user chooses the serialization format (pickle, joblib, ONNX, etc.). The engine only provides the path — it never inspects the saved files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Directory path where model files should be written. |
required |
train
abstractmethod
¶
Train the model on the provided DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
Any
|
Spark DataFrame (or any DataFrame-like) with training data. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of metrics e.g. |
Dict[str, Any]
|
These metrics are stored in the registry alongside the model version. |
validate ¶
Optional: validate model on holdout data.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Metrics dict in the same format as :meth: |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
if the subclass has not overridden this method. |
ubunye.models.loader.load_model_class ¶
Dynamically load a UbunyeModel subclass.
The class_name is a dotted path like "model.FraudRiskModel" where the
first segment is the Python module filename (without .py) and the rest is
the class name.
Resolution order:
1. If task_dir is provided → load <module_segment>.py directly from
that directory using importlib.util.spec_from_file_location.
2. If task_dir is None → attempt a standard importlib.import_module
(the module must already be on sys.path; _run_single_task adds the
task directory to sys.path before transforms are invoked, so this works
automatically when called from the engine).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_dir
|
Optional[str]
|
Absolute or relative path to the directory containing the model
file (e.g. the pipeline task directory). Pass |
required |
class_name
|
str
|
Dotted path to the class, e.g. |
required |
Returns:
| Type | Description |
|---|---|
Type[UbunyeModel]
|
The model class (not an instance). Callers should do |
Type[UbunyeModel]
|
|
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
The module file does not exist in |
ImportError
|
The class name was not found in the module. |
TypeError
|
The class does not subclass :class: |
Model Registry¶
ubunye.models.registry.ModelRegistry ¶
Manages model versions and lifecycle transitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store_path
|
str
|
Root directory for all model artifacts and registry JSON files. |
required |
archive ¶
compare_versions ¶
Compare metrics between two versions.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, Any]]
|
Dict mapping metric name → |
Dict[str, Dict[str, Any]]
|
Only metrics present in at least one version are included. |
demote ¶
Demote a model version to a lower stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_stage
|
ModelStage
|
Target stage (typically |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Updated |
ModelVersion
|
class: |
get_model ¶
Return the artifact path and version metadata for a model.
Specify either version (exact) or stage (active version in that stage).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Use-case grouping. |
required |
model_name
|
str
|
Model identifier. |
required |
task_dir
|
Optional[str]
|
Task directory containing model.py (passed to caller for loading). |
None
|
version
|
Optional[str]
|
Exact version string. |
None
|
stage
|
Optional[ModelStage]
|
Stage to look up active version for. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
|
ModelVersion
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
No matching version found. |
list_versions ¶
List all registered versions for a model (newest first by registered_at).
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
Model not found in registry. |
promote ¶
Promote a model version to a higher stage.
If gates are provided, all gate checks must pass before promotion.
If promoting to production, the current production version is automatically
archived.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Use-case grouping. |
required |
model_name
|
str
|
Model identifier. |
required |
version
|
str
|
Version string to promote. |
required |
to_stage
|
ModelStage
|
Target :class: |
required |
promoted_by
|
Optional[str]
|
Optional username. |
None
|
gates
|
Optional[Dict[str, Any]]
|
Optional dict of promotion gate rules (see :class: |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
Version not found, or one or more promotion gates failed. |
Returns:
| Name | Type | Description |
|---|---|---|
Updated |
ModelVersion
|
class: |
register ¶
Register a trained model as a new version (stage=development).
- Saves model artifacts to
versions/<version>/model/. - Writes
metadata.jsonandmetrics.json. - Adds a new :class:
ModelVersionentry toregistry.json.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Logical use-case grouping (e.g. |
required |
model_name
|
str
|
Model identifier (e.g. |
required |
version
|
Optional[str]
|
Semver string. Pass |
required |
model
|
UbunyeModel
|
A trained :class: |
required |
metrics
|
Dict[str, Any]
|
Metrics dict returned by :meth: |
required |
lineage_run_id
|
Optional[str]
|
Optional lineage |
None
|
registered_by
|
Optional[str]
|
Optional username for the audit trail. |
None
|
Returns:
| Type | Description |
|---|---|
ModelVersion
|
The newly created :class: |
rollback ¶
Roll back production to a specific previous version.
- Archives the current production version.
- Promotes
to_versionto production.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_version
|
str
|
The version string to restore to production. |
required |
Returns:
| Type | Description |
|---|---|
ModelVersion
|
The restored :class: |
ubunye.models.registry.ModelVersion
dataclass
¶
Metadata for a single registered model version.
ubunye.models.registry.ModelStage ¶
Bases: str, Enum
Promotion Gates¶
ubunye.models.gates.PromotionGate ¶
Evaluates a set of metric thresholds before allowing model promotion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gate_config
|
Dict[str, Any]
|
Dict of gate rules. Supported keys:
- |
required |
all_passed ¶
Return True only if every configured gate passes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
Dict[str, Any]
|
Model metrics from :meth: |
required |
metadata
|
Optional[Dict[str, Any]]
|
Optional model metadata from :meth: |
None
|
evaluate ¶
Evaluate all configured gates against the provided metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
Dict[str, Any]
|
Model metrics dict from :meth: |
required |
metadata
|
Optional[Dict[str, Any]]
|
Optional model metadata dict from :meth: |
None
|
Returns:
| Type | Description |
|---|---|
List[GateResult]
|
List of :class: |
List[GateResult]
|
Promotion is allowed only if all results have |
ubunye.models.gates.GateResult
dataclass
¶
Result of evaluating a single promotion gate.
Lineage¶
ubunye.lineage.recorder.LineageRecorder ¶
Monitor plugin that persists run lineage as structured JSON.
Parameters¶
store:
Backend type — "filesystem" (default) or "s3" (stub).
base_dir:
Root directory for the FileSystemLineageStore.
sample_fraction:
Fraction of rows sampled when hashing DataFrames (0 < value ≤ 1).
ubunye.lineage.context.RunContext
dataclass
¶
Full lineage record for one task execution.
ubunye.lineage.storage.FileSystemLineageStore ¶
Bases: LineageStore
Stores lineage records as JSON files in a local directory tree.
Layout::
{base_dir}/{usecase}/{package}/{task_name}/{run_id}.json
Plugins — Readers¶
ubunye.plugins.readers.rest_api.RestApiReader ¶
Bases: Reader
Read a Spark DataFrame from a REST API endpoint.
Handles pagination (offset, cursor, next_link), authentication (bearer, api_key, basic), rate limiting, and retry with exponential backoff.
Plugins — Writers¶
ubunye.plugins.writers.rest_api.RestApiWriter ¶
Bases: Writer
Write a Spark DataFrame to a REST API endpoint in JSON batches.
Iterates over DataFrame rows, groups them into batches of batch_size,
and POSTs each batch as {"records": [...]}. Tracks and logs success/
failure counts per batch.
write ¶
POST DataFrame rows to a REST endpoint in batches.
Parameters¶
df : pyspark.sql.DataFrame
Source DataFrame.
cfg : dict
Writer configuration. Required key: url.
See module docstring for full reference.
backend : SparkBackend
Ubunye Spark backend (passed for interface consistency; not used directly).
Raises¶
ValueError if url is missing from cfg.
Plugins — Transforms¶
ubunye.plugins.transforms.model_transform.ModelTransform ¶
Bases: Transform
Transform plugin for ML operations (train / predict).
Implements :class:ubunye.core.interfaces.Transform so it plugs into the
existing engine dispatch with zero changes to the core runtime.
apply ¶
Dispatch to train or predict based on cfg["action"].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Dict[str, Any]
|
Named DataFrames from the read phase. |
required |
cfg
|
dict
|
Transform params from |
required |
backend
|
Backend
|
Active engine backend (provides Spark session if needed). |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of named outputs. For |
Dict[str, Any]
|
For |
Internal ML wrappers¶
Note
These are internal wrappers used by Ubunye's own sklearn/Spark ML adapters.
User-defined models should implement UbunyeModel, not these classes.
ubunye.plugins.ml.base.BaseModel ¶
Bases: HasSchema, ABC
Framework-agnostic estimator interface.
Subclasses must implement
- _fit_core(X, y)
- _predict_core(X) -> y_pred (and optionally probabilities)
- _save_core(path)
- _load_core(path)
- params property (read-only view)
predict ¶
Predict on inputs; returns labels or (labels, probabilities) if proba=True.