Hooks: Observing Task Runs¶
Hooks let you watch a task run without changing Engine code. Every task and every step (Reader / Transform / Writer) is wrapped in a hook context manager — so you see the start, the end, and any exception that escaped.
Use hooks to add logging, metrics, tracing, alerts, audit records, drift checks — anything that observes but does not change the pipeline.
The contract¶
from contextlib import contextmanager
from ubunye.core.hooks import Hook
class MyHook(Hook):
@contextmanager
def task(self, ctx, cfg, state):
# before the task
try:
yield
# task succeeded
except Exception:
# task failed — re-raise or you'll swallow the error
raise
@contextmanager
def step(self, ctx, name, meta):
# before each Reader/Transform/Writer step
try:
yield
# step succeeded
except Exception:
raise
Both methods are optional — inherit from Hook and only override what you need.
What your hook sees¶
| Arg | Type | Notes |
|---|---|---|
ctx |
EngineContext |
run_id, task_name, profile |
cfg |
dict |
Full resolved task config |
state |
dict |
Shared scratchpad. On task exit, state["outputs"] holds the outputs map (success only). |
name |
str |
Step name, e.g. "Reader:hive", "Transform:noop", "Writer:s3" |
meta |
dict \| None |
Step-specific metadata, e.g. {"input": "claims_raw"} |
Rules¶
- Never raise in
__exit__. Wrap all your hook code intry/except. The engine does not swallow hook errors; a crashing hook crashes the task. - Always re-raise step failures. If you catch an exception in
step(), re-raise it so the engine can fail fast. - Hooks must be cheap. They run inline with every step.
Registering hooks¶
Per-engine (explicit)¶
from ubunye.core.runtime import Engine
engine = Engine(hooks=[MyHook(), AnotherHook()])
engine.run(cfg)
Explicit hooks replace the default set. Add LegacyMonitorsHook(cfg)
yourself if you still want user monitors from CONFIG.monitors.
Environment (default set)¶
With no hooks= argument, the engine discovers hooks via the
ubunye.hooks Python entry point group. When UBUNYE_TELEMETRY=1,
every discovered hook class is instantiated with no arguments.
LegacyMonitorsHook(cfg) is always appended so user monitors in
CONFIG.monitors continue to run regardless of the flag.
Ship your own hook from a separate package:
# your-package/pyproject.toml
[project.entry-points."ubunye.hooks"]
slack = "your_package.hooks:SlackAlertHook"
After pip install your-package, Ubunye picks it up automatically — no
Engine changes, no config changes. Hooks needing constructor arguments
should still be passed via Engine(hooks=[...]).
export UBUNYE_TELEMETRY=1
export UBUNYE_PROM_PORT=8000
ubunye run -d ./pipelines -u fraud_detection -p ingestion -t claim_etl
Example: Slack alert on failure¶
import os
from contextlib import contextmanager
import requests
from ubunye.core.hooks import Hook
class SlackAlertHook(Hook):
def __init__(self, webhook_url: str | None = None):
self.url = webhook_url or os.getenv("SLACK_WEBHOOK_URL")
@contextmanager
def task(self, ctx, cfg, state):
try:
yield
except Exception as e:
self._post(f":red_circle: `{ctx.task_name}` failed: `{e!r}`")
raise
def _post(self, text: str) -> None:
if not self.url:
return
try:
requests.post(self.url, json={"text": text}, timeout=5)
except Exception:
pass
Wire it in:
Example: Row-count audit¶
from contextlib import contextmanager
from ubunye.core.hooks import Hook
class RowCountHook(Hook):
def __init__(self):
self.counts: dict[str, int] = {}
@contextmanager
def task(self, ctx, cfg, state):
yield
outputs = state.get("outputs") or {}
for name, df in outputs.items():
try:
self.counts[name] = df.count()
except Exception:
pass
print(f"[{ctx.task_name}] row counts: {self.counts}")
The state["outputs"] dict is populated only on success, before the hook's
__exit__ runs.
Example: Per-step timing¶
import time
from contextlib import contextmanager
from ubunye.core.hooks import Hook
class TimingHook(Hook):
@contextmanager
def step(self, ctx, name, meta):
t0 = time.perf_counter()
try:
yield
finally:
dur = time.perf_counter() - t0
print(f"[{ctx.run_id[:8]}] {name} took {dur:.2f}s")
Output during a run:
[5b3f91aa] Reader:hive took 0.42s
[5b3f91aa] Transform:noop took 0.01s
[5b3f91aa] Writer:s3 took 1.37s
Composing hooks¶
HookChain combines multiple hooks. Engine uses it internally, but you can
use it yourself for complex setups:
from ubunye.core.hooks import HookChain
from ubunye.telemetry.hooks import EventLoggerHook, PrometheusHook
chain = HookChain([EventLoggerHook(), PrometheusHook(), MyHook()])
- Enter order — as listed.
- Exit order — reverse (LIFO), so outermost hook sees inner hooks' work.
- Broken hooks — if one raises on
__enter__, the others still run.
When NOT to use a hook¶
Hooks observe. They do not:
- Mutate the config or outputs mid-run.
- Replace a Reader/Transform/Writer. Write a plugin for that.
- Decide whether a step runs. That belongs in config.
If you need to change pipeline behavior, write a plugin
(Connectors → Writing a Plugin) or a custom
Transform. Hooks are strictly for observability.
Source reference¶
| File | What it contains |
|---|---|
ubunye/core/hooks.py |
Hook base class, HookChain multiplexer |
ubunye/telemetry/hooks/events.py |
JSON event logger hook |
ubunye/telemetry/hooks/otel.py |
OpenTelemetry spans |
ubunye/telemetry/hooks/prometheus.py |
Prometheus metrics |
ubunye/telemetry/hooks/monitors.py |
Legacy CONFIG.monitors bridge |
tests/unit/test_hooks.py |
Reference tests — recording hook, failure propagation, dry-run |