API Reference¶
Auto-generated from docstrings via mkdocstrings.
Python API¶
The public Python API for running Ubunye tasks without the CLI. Primary use case: Databricks notebooks and jobs where a SparkSession already exists.
import ubunye
# Run a single task
outputs = ubunye.run_task(
task_dir="pipelines/fraud_detection/ingestion/claim_etl",
mode="nonprod",
dt="202510",
)
# Run multiple tasks sequentially
results = ubunye.run_pipeline(
usecase_dir="pipelines",
usecase="fraud_detection",
package="ingestion",
tasks=["claim_etl", "feature_engineering"],
mode="nonprod",
dt="202510",
)
ubunye.api.run_task ¶
run_task(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, hooks=None)
Run a single Ubunye task and return the outputs map.
Parameters¶
task_dir : str
Path to the task directory containing config.yaml and
transformations.py.
mode : str
Run mode, used for Spark profile merging. Default "DEV".
dt : str, optional
Data timestamp, injected as {{ dt }} in Jinja templates.
dtf : str, optional
Data timestamp format, injected as {{ dtf }}.
spark : SparkSession, optional
Explicit SparkSession to reuse. If None, auto-detects an active
session (Databricks) or creates a new one.
lineage : bool
Record lineage for this run.
lineage_dir : str
Root directory for lineage records.
profile : str, optional
Config profile for validation (passed to load_config).
hooks : iterable of Hook, optional
Replace the engine's default hooks entirely. Rarely needed; prefer
the ubunye.hooks entry point for always-on hooks.
Returns¶
Dict[str, Any] Mapping of output name → DataFrame.
ubunye.api.run_pipeline ¶
run_pipeline(usecase_dir, usecase, package, tasks, *, mode='DEV', dt=None, dtf=None, spark=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, hooks=None)
Run multiple tasks sequentially and return all outputs.
Parameters¶
usecase_dir : str
Root directory for use cases (e.g. "./pipelines").
usecase : str
Use case name.
package : str
Package/pipeline name.
tasks : List[str]
Task names to run in order.
mode, dt, dtf, spark, lineage, lineage_dir, profile, hooks
Same as :func:run_task.
Returns¶
Dict[str, Dict[str, Any]] Mapping of task name → outputs map.
Notebook API (Interactive)¶
Step-by-step execution for prototyping in Databricks notebooks.
Environment variables referenced via {{ env.VAR }} in config.yaml are
auto-resolved from widgets and secrets — no manual os.environ setup.
import ubunye
ctx = ubunye.notebook(
"/Workspace/pipelines/claims/claim_etl",
mode="PROD",
dt="2026-01-01",
)
sources = ctx.read()
sources["raw_claims"].display()
outputs = ctx.transform(sources)
outputs["bronze_claims"].show()
ctx.write(outputs)
ubunye.notebook.notebook ¶
notebook(task_dir, *, mode='DEV', dt=None, dtf=None, spark=None, env=None, secrets_scope=None, secrets_map=None, lineage=False, lineage_dir='.ubunye/lineage', profile=None, auto_env=True, hooks=None)
Create an interactive notebook context for step-by-step task execution.
All {{ env.VAR }} references in the task's config.yaml are
auto-resolved from Databricks widgets and secrets — no manual
os.environ setup needed.
ubunye.notebook.NotebookContext ¶
Stateful notebook session for step-by-step Ubunye task execution.
Core Engine¶
ubunye.core.runtime.Engine ¶
Executes a task by reading inputs, applying one or more transforms, and writing outputs.
Minimal required config structure
cfg[CONFIG][inputs] : mapping input_name -> reader cfg (must include 'format')
cfg[CONFIG][outputs] : mapping output_name -> writer cfg (must include 'format')
cfg[CONFIG][transform]: EITHER a single transform dict with 'type',
OR a list of transform dicts to form a pipeline.
Observation (logging, metrics, tracing, user monitors) is delegated to
:class:ubunye.core.hooks.Hook instances. Pass hooks= to override the
default set.
__init__ ¶
__init__(backend=None, registry=None, context=None, hooks=None, extra_hooks=None, manage_backend=True)
Parameters¶
hooks : iterable of Hook, optional
Replace the default hook set entirely.
extra_hooks : iterable of Hook, optional
Append these hooks to the default set. Ignored when hooks is
also given.
manage_backend : bool, default True
If True (default), the engine calls backend.start() and
backend.stop(). Set to False when the caller owns the backend
lifecycle (e.g. Python API running multiple tasks on one session).
apply_transforms ¶
Apply configured transforms to sources.
Returns a dict mapping output name to DataFrame.
read_inputs ¶
Read all inputs defined in CONFIG.inputs.
Returns a dict mapping input name to DataFrame — suitable for
interactive inspection before calling :meth:apply_transforms.
run ¶
Run a task using the provided config mapping.
Parameters¶
cfg : dict Parsed task configuration (already rendered/validated). dry_run : bool, default False If True, validates and returns without executing readers/writers.
Returns¶
Optional[Dict[str, Any]] Optionally returns the outputs map (None if dry_run).
Interfaces (Protocols)¶
See the full Interfaces page for design principles, discovery, and auto-detection.
ubunye.interfaces ¶
Formal interfaces (typing.Protocol) for Ubunye's pluggable seams.
Each protocol defines the structural contract that backends must satisfy. Implementations are discovered at runtime via entry-point groups and auto-detect classmethods.
Cross-boundary dataclasses accompany each protocol so that callers never depend on a concrete backend's internal types.
AuthBackend ¶
Bases: Protocol
Structural interface for authentication backends.
Concrete implementations: TokenAuthBackend (built-in),
ServicePrincipalAuthBackend (Phase 2).
resolve ¶
Resolve credentials for the given workspace.
Parameters¶
workspace_url:
The Databricks (or other) workspace host URL.
**kwargs:
Backend-specific hints (e.g. token_env="MY_TOKEN").
Returns¶
Credentials Populated credentials ready for client construction.
Raises¶
ubunye.core.errors.AuthNotFoundError When credentials cannot be located (missing env var, etc.).
Credentials
dataclass
¶
Resolved authentication credentials.
The auth_type field tells callers which fields are populated.
metadata carries backend-specific extras (e.g. token expiry,
scope, tenant ID) without polluting the core fields.
DeployAdapter ¶
Bases: Protocol
Structural interface for deployment backends.
Each adapter is responsible for:
- Validating that the target dict contains the fields it needs.
- Preparing artifacts (notebooks, config bundles) for the target.
- Uploading artifacts and creating/updating the remote job.
- Returning a :class:
DeployResultwith job metadata.
deploy ¶
Deploy a task to the target environment.
Parameters¶
ctx:
Task metadata, config dict, and filesystem paths.
target:
Adapter-specific target configuration (validated by the
adapter via :meth:validate_target).
dry_run:
If True, build and return the job spec without making
any remote API calls.
Returns¶
DeployResult Job metadata including name, id, spec, and workspace path.
DeployContext
dataclass
¶
Structured metadata for a deploy operation.
Callers construct this from CLI flags and the parsed config; the adapter never derives task identity from filesystem paths.
DeployResult
dataclass
¶
Returned by :meth:DeployAdapter.deploy.
metadata carries adapter-specific extras (notebook source, cluster
id, etc.) without polluting the core fields.
LineageBackend ¶
Bases: Protocol
Structural interface for lineage storage backends.
Concrete implementations: FilesystemLineageBackend (built-in),
DeltaLineageBackend (Phase 2).
compact ¶
Optimize the underlying storage.
For filesystem backends this is a no-op. For Delta backends
this triggers OPTIMIZE on the lineage table.
get_run ¶
Load a single run record by ID.
Raises¶
ubunye.core.errors.LineageRecordNotFoundError
When no record exists for run_id.
record ¶
Persist a lineage record (non-blocking per Decision 1).
Creates or overwrites the record for the given run_id.
Typically called twice per task: once at start
(status="running") and once at end (status="success"
or "error").
The actual write is dispatched to a background worker thread. On failure, the record is written to a fallback manifest (Decision 2).
Parameters¶
record: The lineage record to persist. Callers construct this with the current engine version and task metadata.
search ¶
Search across recorded runs with optional filters.
Parameters¶
task:
Filter by task name.
usecase:
Filter by use case.
pipeline:
Filter by pipeline.
status:
Filter by status ("success", "error", "running").
since:
ISO-8601 datetime; only return runs recorded at or after.
limit:
Maximum number of records to return.
LineageRecord
dataclass
¶
Cross-boundary lineage record (Decision 3 compliant).
Core columns are strict; everything else goes into metadata.
Every record stamps engine_version so analysts can filter by
writer version.
ModelVersionInfo
dataclass
¶
Cross-boundary model version record (Decision 3 compliant).
Core fields are strict; everything else goes into metadata.
Every record stamps engine_version so downstream consumers can
filter by writer version.
RegistryBackend ¶
Bases: Protocol
Structural interface for model registries.
Concrete implementations: FilesystemRegistryBackend (built-in),
MLflowRegistryBackend (Phase 2).
get ¶
Retrieve a model version by exact version or active stage.
Returns¶
tuple of (artifact_path, ModelVersionInfo)
artifact_path is an opaque string — it may be a local
path, an MLflow artifact URI, or a cloud storage URL.
Callers pass it to ModelClass.load(artifact_path).
Raises¶
ubunye.core.errors.VersionNotFoundError When no matching version exists.
promote ¶
Promote a version to a higher lifecycle stage.
If gates are provided, all checks must pass before the
transition. On gate failure, raises
:class:~ubunye.core.errors.PromotionBlockedError — this is
NOT caught by the graceful degradation layer (auth and gate
failures must propagate).
Metadata update is non-blocking per Decision 1.
register ¶
Register a trained model as a new version.
Artifact writes are synchronous — the method returns only after the model is safely persisted. Index/metadata updates are non-blocking per Decision 1.
Parameters¶
use_case:
Logical namespace (e.g. "fraud_detection").
model_name:
Model identifier within the use case.
version:
Explicit semver string, or None for auto-increment.
model:
A trained model instance whose save()/metadata()
methods will be called by the backend.
metrics:
Training metrics dict.
metadata:
Optional flexible key-value pairs (Decision 3).
lineage_run_id:
Optional lineage run ID for cross-referencing.
Config¶
ubunye.config.schema.UbunyeConfig ¶
Bases: BaseModel
Top-level Ubunye task config (the full contents of a config.yaml).
MODEL and VERSION are optional; they default to etl and
"0.0.0-dev" respectively. Set them explicitly in production pipelines
where job type or version is load-bearing (lineage records, model
registry, orchestrator metadata).
ubunye.config.schema.TaskConfig ¶
Bases: BaseModel
The CONFIG section of a task: inputs, transform, outputs.
ubunye.config.schema.IOConfig ¶
Bases: BaseModel
Input or output connector configuration.
Extra fields are allowed so plugin-specific keys (e.g. rest_api's
auth, pagination, headers) pass through to the plugin
unchanged via model_dump().
ubunye.config.schema.EngineConfig ¶
Bases: BaseModel
Spark/compute settings with optional per-profile overrides.
ubunye.config.schema.RegistryConfig ¶
Bases: BaseModel
Configuration for model registry integration within a transform.
ubunye.config.schema.ModelTransformParams ¶
Bases: BaseModel
Typed params for transform.type: model — for documentation and validation.
Backends¶
SparkBackend¶
Creates and manages a new SparkSession. Use for local development, CI, and non-Databricks environments.
ubunye.backends.spark_backend.SparkBackend ¶
Bases: Backend
Creates and manages a SparkSession for a Ubunye run.
Parameters¶
app_name : str Spark application name (appears in Spark UI/history server). conf : Optional[Dict[str, str]] Spark configuration key-values (e.g., {"spark.master": "yarn"}).
Notes¶
- pyspark is imported lazily inside
start()to keep installation lightweight. sparkproperty is only valid afterstart()(or inside the context manager).
conf_effective
property
¶
The effective Spark configuration currently applied to the session.
Returns an empty dict if the session hasn't been started yet.
conf_input
property
¶
The configuration dict passed into this backend at construction time.
spark
property
¶
Return the active SparkSession (after start()).
Raises¶
RuntimeError
If start() has not been called.
start ¶
Create (or reuse) a SparkSession.
Safe to call multiple times; a second call is a no-op if a session already exists.
DatabricksBackend¶
Reuses an active SparkSession instead of creating one. Use on Databricks where a session already exists.
ubunye.backends.databricks_backend.DatabricksBackend ¶
Bases: Backend
Backend that reuses the existing Databricks SparkSession.
Parameters¶
spark : Optional[SparkSession]
An explicit session to wrap. If None (the default), the active
session is retrieved via SparkSession.getActiveSession().
Models (UbunyeModel contract)¶
ubunye.models.base.UbunyeModel ¶
Bases: ABC
Abstract contract every user model must implement.
The engine calls these methods during the pipeline lifecycle. Users implement them with whatever ML library they choose — the engine never imports or knows about the underlying library.
Minimal example::
from ubunye.models.base import UbunyeModel
import joblib
class FraudRiskModel(UbunyeModel):
def __init__(self):
self._clf = None
def train(self, df):
import pandas as pd
from sklearn.ensemble import GradientBoostingClassifier
pdf = df.toPandas()
X, y = pdf.drop("label", axis=1), pdf["label"]
self._clf = GradientBoostingClassifier().fit(X, y)
return {"accuracy": self._clf.score(X, y)}
def predict(self, df):
pdf = df.toPandas()
pdf["score"] = self._clf.predict_proba(pdf)[:, 1]
return spark.createDataFrame(pdf)
def save(self, path):
import pathlib, joblib
pathlib.Path(path).mkdir(parents=True, exist_ok=True)
joblib.dump(self._clf, f"{path}/clf.joblib")
@classmethod
def load(cls, path):
import joblib
m = cls()
m._clf = joblib.load(f"{path}/clf.joblib")
return m
def metadata(self):
import sklearn
return {
"library": "sklearn",
"library_version": sklearn.__version__,
"features": list(self._clf.feature_names_in_),
"params": self._clf.get_params(),
}
load
abstractmethod
classmethod
¶
Deserialize a model from the given directory path.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Directory path where model files were saved. |
required |
Returns:
| Type | Description |
|---|---|
'UbunyeModel'
|
An instance of the model ready for prediction. |
metadata
abstractmethod
¶
Return model metadata for the registry.
Should include at minimum:
- library: str — e.g. "xgboost", "sklearn", "pytorch"
- library_version: str — e.g. "2.0.3"
- features: List[str] — feature names used during training
- params: dict — model hyperparameters
Additional keys are allowed and will be stored in the registry.
predict
abstractmethod
¶
Generate predictions on the provided DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
Any
|
Spark DataFrame with feature columns. |
required |
Returns:
| Type | Description |
|---|---|
Any
|
Spark DataFrame (or DataFrame-like) with prediction columns added. |
save
abstractmethod
¶
Serialize the model to the given directory path.
The user chooses the serialization format (pickle, joblib, ONNX, etc.). The engine only provides the path — it never inspects the saved files.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
path
|
str
|
Directory path where model files should be written. |
required |
train
abstractmethod
¶
Train the model on the provided DataFrame.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
df
|
Any
|
Spark DataFrame (or any DataFrame-like) with training data. |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of metrics e.g. |
Dict[str, Any]
|
These metrics are stored in the registry alongside the model version. |
validate ¶
Optional: validate model on holdout data.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Metrics dict in the same format as :meth: |
Raises:
| Type | Description |
|---|---|
NotImplementedError
|
if the subclass has not overridden this method. |
ubunye.models.loader.load_model_class ¶
Dynamically load a UbunyeModel subclass.
The class_name is a dotted path like "model.FraudRiskModel" where the
first segment is the Python module filename (without .py) and the rest is
the class name.
Resolution order:
1. If task_dir is provided → load <module_segment>.py directly from
that directory using importlib.util.spec_from_file_location.
2. If task_dir is None → attempt a standard importlib.import_module
(the module must already be on sys.path; _run_single_task adds the
task directory to sys.path before transforms are invoked, so this works
automatically when called from the engine).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
task_dir
|
Optional[str]
|
Absolute or relative path to the directory containing the model
file (e.g. the pipeline task directory). Pass |
required |
class_name
|
str
|
Dotted path to the class, e.g. |
required |
Returns:
| Type | Description |
|---|---|
Type[UbunyeModel]
|
The model class (not an instance). Callers should do |
Type[UbunyeModel]
|
|
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
The module file does not exist in |
ImportError
|
The class name was not found in the module. |
TypeError
|
The class does not subclass :class: |
Model Registry¶
ubunye.models.registry.ModelRegistry ¶
Manages model versions and lifecycle transitions.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
store_path
|
str
|
Root directory for all model artifacts and registry JSON files. |
required |
archive ¶
compare_versions ¶
Compare metrics between two versions.
Returns:
| Type | Description |
|---|---|
Dict[str, Dict[str, Any]]
|
Dict mapping metric name → |
Dict[str, Dict[str, Any]]
|
Only metrics present in at least one version are included. |
demote ¶
Demote a model version to a lower stage.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_stage
|
ModelStage
|
Target stage (typically |
required |
Returns:
| Name | Type | Description |
|---|---|---|
Updated |
ModelVersion
|
class: |
get_model ¶
Return the artifact path and version metadata for a model.
Specify either version (exact) or stage (active version in that stage).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Use-case grouping. |
required |
model_name
|
str
|
Model identifier. |
required |
task_dir
|
Optional[str]
|
Task directory containing model.py (passed to caller for loading). |
None
|
version
|
Optional[str]
|
Exact version string. |
None
|
stage
|
Optional[ModelStage]
|
Stage to look up active version for. |
None
|
Returns:
| Type | Description |
|---|---|
str
|
|
ModelVersion
|
|
Raises:
| Type | Description |
|---|---|
ValueError
|
No matching version found. |
list_versions ¶
List all registered versions for a model (newest first by registered_at).
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
Model not found in registry. |
promote ¶
Promote a model version to a higher stage.
If gates are provided, all gate checks must pass before promotion.
If promoting to production, the current production version is automatically
archived.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Use-case grouping. |
required |
model_name
|
str
|
Model identifier. |
required |
version
|
str
|
Version string to promote. |
required |
to_stage
|
ModelStage
|
Target :class: |
required |
promoted_by
|
Optional[str]
|
Optional username. |
None
|
gates
|
Optional[Dict[str, Any]]
|
Optional dict of promotion gate rules (see :class: |
None
|
Raises:
| Type | Description |
|---|---|
ValueError
|
Version not found, or one or more promotion gates failed. |
Returns:
| Name | Type | Description |
|---|---|---|
Updated |
ModelVersion
|
class: |
register ¶
Register a trained model as a new version (stage=development).
- Saves model artifacts to
versions/<version>/model/. - Writes
metadata.jsonandmetrics.json. - Adds a new :class:
ModelVersionentry toregistry.json.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
use_case
|
str
|
Logical use-case grouping (e.g. |
required |
model_name
|
str
|
Model identifier (e.g. |
required |
version
|
Optional[str]
|
Semver string. Pass |
required |
model
|
UbunyeModel
|
A trained :class: |
required |
metrics
|
Dict[str, Any]
|
Metrics dict returned by :meth: |
required |
lineage_run_id
|
Optional[str]
|
Optional lineage |
None
|
registered_by
|
Optional[str]
|
Optional username for the audit trail. |
None
|
Returns:
| Type | Description |
|---|---|
ModelVersion
|
The newly created :class: |
rollback ¶
Roll back production to a specific previous version.
- Archives the current production version.
- Promotes
to_versionto production.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
to_version
|
str
|
The version string to restore to production. |
required |
Returns:
| Type | Description |
|---|---|
ModelVersion
|
The restored :class: |
ubunye.models.registry.ModelVersion
dataclass
¶
Metadata for a single registered model version.
to_info ¶
Convert to the cross-boundary :class:ModelVersionInfo type.
ubunye.models.registry.ModelStage ¶
Bases: str, Enum
Promotion Gates¶
ubunye.models.gates.PromotionGate ¶
Evaluates a set of metric thresholds before allowing model promotion.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
gate_config
|
Dict[str, Any]
|
Dict of gate rules. Supported keys:
- |
required |
all_passed ¶
Return True only if every configured gate passes.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
Dict[str, Any]
|
Model metrics from :meth: |
required |
metadata
|
Optional[Dict[str, Any]]
|
Optional model metadata from :meth: |
None
|
evaluate ¶
Evaluate all configured gates against the provided metrics.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
metrics
|
Dict[str, Any]
|
Model metrics dict from :meth: |
required |
metadata
|
Optional[Dict[str, Any]]
|
Optional model metadata dict from :meth: |
None
|
Returns:
| Type | Description |
|---|---|
List[GateResult]
|
List of :class: |
List[GateResult]
|
Promotion is allowed only if all results have |
ubunye.models.gates.GateResult
dataclass
¶
Result of evaluating a single promotion gate.
Lineage¶
ubunye.lineage.recorder.LineageRecorder ¶
Monitor plugin that persists run lineage as structured JSON.
Parameters¶
store:
Backend type — "filesystem" (default) or "s3" (stub).
base_dir:
Root directory for the FileSystemLineageStore.
sample_fraction:
Fraction of rows sampled when hashing DataFrames (0 < value ≤ 1).
ubunye.lineage.context.RunContext
dataclass
¶
Full lineage record for one task execution.
ubunye.lineage.storage.FileSystemLineageStore ¶
Bases: LineageStore
Stores lineage records as JSON files in a local directory tree.
Satisfies both the legacy :class:LineageStore ABC and the
:class:~ubunye.interfaces.LineageBackend protocol.
Layout::
{base_dir}/{usecase}/{package}/{task_name}/{run_id}.json
flush ¶
Drain the background worker queue.
Returns the number of records that went to fallback manifests.
get_run ¶
Load a single run record by run ID.
Uses a run_id→path index for O(1) lookup instead of rglob.
record ¶
Persist a lineage record (non-blocking per Decision 1).
Dispatches the write to a background worker thread. On failure, the record is written to a fallback manifest (Decision 2).
search_records ¶
Protocol-compatible search returning LineageRecord objects.
Plugins — Readers¶
ubunye.plugins.readers.rest_api.RestApiReader ¶
Bases: Reader
Read a Spark DataFrame from a REST API endpoint.
Handles pagination (offset, cursor, next_link), authentication (bearer, api_key, basic), rate limiting, and retry with exponential backoff.
Plugins — Writers¶
ubunye.plugins.writers.rest_api.RestApiWriter ¶
Bases: Writer
Write a Spark DataFrame to a REST API endpoint in JSON batches.
Iterates over DataFrame rows, groups them into batches of batch_size,
and POSTs each batch as {"records": [...]}. Tracks and logs success/
failure counts per batch.
write ¶
POST DataFrame rows to a REST endpoint in batches.
Parameters¶
df : pyspark.sql.DataFrame
Source DataFrame.
cfg : dict
Writer configuration. Required key: url.
See module docstring for full reference.
backend : SparkBackend
Ubunye Spark backend (passed for interface consistency; not used directly).
Raises¶
ValueError if url is missing from cfg.
Plugins — Transforms¶
ubunye.plugins.transforms.model_transform.ModelTransform ¶
Bases: Transform
Transform plugin for ML operations (train / predict).
Implements :class:ubunye.core.interfaces.Transform so it plugs into the
existing engine dispatch with zero changes to the core runtime.
apply ¶
Dispatch to train or predict based on cfg["action"].
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
inputs
|
Dict[str, Any]
|
Named DataFrames from the read phase. |
required |
cfg
|
dict
|
Transform params from |
required |
backend
|
Backend
|
Active engine backend (provides Spark session if needed). |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of named outputs. For |
Dict[str, Any]
|
For |
Internal ML wrappers¶
Note
These are internal wrappers used by Ubunye's own sklearn/Spark ML adapters.
User-defined models should implement UbunyeModel, not these classes.
ubunye.plugins.ml.base.BaseModel ¶
Bases: HasSchema, ABC
Framework-agnostic estimator interface.
Subclasses must implement
- _fit_core(X, y)
- _predict_core(X) -> y_pred (and optionally probabilities)
- _save_core(path)
- _load_core(path)
- params property (read-only view)
predict ¶
Predict on inputs; returns labels or (labels, probabilities) if proba=True.