Source code for batter.pipeline.step
from __future__ import annotations
from dataclasses import dataclass, field, replace
from typing import Any, List, Mapping
__all__ = ["Step", "ExecResult"]
[docs]
@dataclass(frozen=True, slots=True)
class Step:
"""
One unit of work in the pipeline.
Parameters
----------
name : str
Unique step name (e.g., ``"prepare_fe"``).
requires : list[str]
Names of steps that must complete before this step can run.
payload : Any, optional
Typed payload consumed by the backend. Typically a :class:`~batter.pipeline.payloads.StepPayload`.
Notes
-----
- Steps are **immutable** descriptors. Execution is handled by a backend.
- The backend decides how to interpret ``params`` (e.g., templates, flags).
"""
name: str
requires: List[str] = field(default_factory=list)
payload: Any = None
@property
def params(self) -> Any:
"""Backwards-compatible alias for ``payload``."""
return self.payload
[docs]
def replace(self, **updates: Any) -> "Step":
"""
Return a new :class:`Step` with selected attributes updated.
Parameters
----------
**updates
Keyword overrides for any of the dataclass fields (``name``,
``requires``, or ``payload``).
Returns
-------
Step
Fresh step instance containing the requested updates.
"""
return replace(self, **updates)
[docs]
@dataclass(slots=True)
class ExecResult:
"""
Execution result returned by a backend.
Parameters
----------
job_ids : list[str]
Scheduler or process identifiers (may be empty for local runs).
artifacts : Mapping[str, Any]
Named outputs (paths, metrics, small JSON blobs).
"""
job_ids: List[str] = field(default_factory=list)
artifacts: Mapping[str, Any] = field(default_factory=dict)