Source code for batter.api

"""Public API for BATTER.

This module collects the stable entry points intended for external consumption.
They fall into four broad categories:

* **Configuration helpers** – load and dump ``RunConfig`` / ``SimulationConfig`` objects.
* **Execution** – orchestrate complete workflows from a YAML definition.
* **Portable results** – inspect and copy artifacts produced by a run.
* **Utilities** – clone the state of an execution for reproducibility.

Typical usage
-------------

Run a workflow from a top-level YAML::

    from batter.api import run_from_yaml
    run_from_yaml(\"examples/mabfe.yaml\")

Inspect FE records stored in a work directory::

    from batter.api import list_fe_runs, load_fe_run
    runs = list_fe_runs(\"work/adrb2\")
    latest = runs.iloc[-1][\"run_id\"]
    # pass ``ligand`` when the run contains more than one ligand
    record = load_fe_run(\"work/adrb2\", latest, ligand=\"LIG1\")

Run FE analysis on an existing execution::

    from batter.api import run_analysis_from_execution
    run_analysis_from_execution(\"work/adrb2\", latest, ligand=\"LIG1\")

For more examples, refer to ``docs/getting_started.rst`` and the tutorials.
"""

from __future__ import annotations

import json
from pathlib import Path
from typing import Any, TYPE_CHECKING, Sequence, Union

from loguru import logger
from tqdm import tqdm

from ._version import __version__  # semantic version string

# --- Schemas / configs ---
from .config.simulation import SimulationConfig
from .config.run import RunConfig
from .config import (
    load_run_config,
    dump_run_config,
    load_simulation_config as load_sim_config,
    dump_simulation_config as save_sim_config,
)

# --- Orchestration entrypoint ---
from .orchestrate.run import run_from_yaml, save_fe_records

# --- Portable runtime + FE results ---
from .runtime.portable import ArtifactStore
from .runtime.fe_repo import FEResultsRepository, FERecord, WindowResult
from .utils.exec_clone import clone_execution

# --- System descriptor (read-only for users) ---
from .systems.core import SimSystem, SystemMeta

from batter.pipeline.payloads import StepPayload
from batter.pipeline.step import Step

if TYPE_CHECKING:
    import pandas as pd  # type: ignore[assignment]  # for type hints only

__all__ = [
    # version
    "__version__",
    # configs
    "SimulationConfig",
    "RunConfig",
    "load_run_config",
    "dump_run_config",
    "load_sim_config",
    "save_sim_config",
    # orchestration
    "run_from_yaml",
    # portable store + results
    "ArtifactStore",
    "FEResultsRepository",
    "FERecord",
    "WindowResult",
    # system descriptor
    "SimSystem",
    # convenience helpers
    "list_fe_runs",
    "load_fe_run",
    "run_analysis_from_execution",
    # execution cloning
    "clone_execution",
]


[docs] def list_fe_runs(work_dir: Union[str, Path]) -> "pd.DataFrame": """ Return an index of FE runs contained in a portable work directory. Parameters ---------- work_dir : str or Path Path to the root directory of a BATTER execution (portable layout). Returns ------- pandas.DataFrame DataFrame with one row per stored FE run. Columns include ``run_id``, ``ligand``, ``mol_name``, ``system_name``, ``temperature``, ``total_dG``, ``total_se``, ``canonical_smiles``, ``original_name``, ``original_path``, ``protocol``, ``sim_range``, ``status``, ``failure_reason``, and ``created_at``. """ store = ArtifactStore(Path(work_dir)) repo = FEResultsRepository(store) return repo.index()
[docs] def load_fe_run( work_dir: Union[str, Path], run_id: str, ligand: str | None = None ) -> FERecord: """ Load a single FE record by ``run_id`` from a portable work directory. Parameters ---------- work_dir : str or Path Root directory of the BATTER execution. run_id : str Identifier of the FE run to load (as returned by :func:`list_fe_runs`). ligand : str, optional Ligand identifier when multiple ligands were processed in the run. If omitted, the sole ligand is selected automatically or a ValueError is raised when multiple matches exist. Returns ------- FERecord Structured record containing total ΔG, standard error, components, and per-window results. """ store = ArtifactStore(Path(work_dir)) repo = FEResultsRepository(store) if ligand: return repo.load(run_id, ligand) df = repo.index() matches = df[df["run_id"] == run_id] if matches.empty: raise KeyError(f"No FE records found for run_id '{run_id}'.") if len(matches) > 1: raise ValueError( f"Multiple ligands stored for run_id '{run_id}'. " "Call `load_fe_run` with the `ligand` argument or inspect `list_fe_runs`." ) ligand_name = matches.iloc[0]["ligand"] return repo.load(run_id, ligand_name)
[docs] def run_analysis_from_execution( work_dir: Union[str, Path], run_id: str, *, ligand: str | None = None, components: Sequence[str] | None = None, n_workers: int | None = None, sim_range: tuple[int, int] | None = None, raise_on_error: bool = True, ) -> None: """ Run FE analysis for a partially finished/finished execution. Parameters ---------- work_dir : str or Path Root directory containing the portable execution store. run_id : str Identifier of the execution (e.g., ``run-20240101``). ligand : str, optional Ligand identifier to target when only a subset should be analyzed. components : sequence of str, optional Components to include during analysis (overrides ``sim_cfg.components``). n_workers : int, optional Number of worker processes requested for the analysis handler. sim_range : (int, int), optional (start, end) range of lambda windows to analyze. raise_on_error : bool, optional When ``True`` (default) propagate errors raised by the analysis handler. Set to ``False`` to log the failure and continue with other ligands. """ work_root = Path(work_dir) run_dir = work_root / "executions" / run_id if not run_dir.exists(): raise FileNotFoundError(f"Run '{run_id}' does not exist under {work_root}.") config_dir = run_dir / "artifacts" / "config" sim_cfg_path = config_dir / "sim.resolved.yaml" if not sim_cfg_path.exists(): raise FileNotFoundError( f"Simulation configuration missing for run '{run_id}' at {sim_cfg_path}." ) sim_cfg = load_sim_config(sim_cfg_path) run_meta_path = config_dir / "run_meta.json" run_meta: dict[str, Any] = {} if run_meta_path.exists(): run_meta = json.loads(run_meta_path.read_text()) or {} protocol = run_meta.get("protocol", "abfe") system_name = run_meta.get("system_name") or sim_cfg.system_name index_path = run_dir / "artifacts" / "ligand_params" / "index.json" if not index_path.exists(): raise FileNotFoundError( f"Ligand index missing for run '{run_id}' at {index_path}." ) ligands_payload = json.loads(index_path.read_text()) or {} entries = ligands_payload.get("ligands", []) if not entries: raise RuntimeError(f"No ligands recorded for run '{run_id}'.") param_dir_dict = { entry.get("residue_name"): entry.get("store_dir") for entry in entries if entry.get("residue_name") and entry.get("store_dir") } requested: Sequence[str] | None = [ligand] if ligand else None children: list[SimSystem] = [] for entry in entries: lig_name = entry["ligand"] if requested and lig_name not in requested: continue child_root = run_dir / "simulations" / lig_name if not child_root.is_dir(): raise FileNotFoundError( f"Simulation directory for ligand '{lig_name}' was not found at {child_root}." ) meta = SystemMeta( ligand=lig_name, residue_name=entry.get("residue_name"), param_dir_dict=dict(param_dir_dict) if param_dir_dict else {}, ) children.append( SimSystem( name=f"{system_name}:{lig_name}:{run_id}", root=child_root, meta=meta, ) ) if requested and not children: raise KeyError(f"Ligand '{ligand}' not present in run '{run_id}'.") logger.info(f"Running analysis for {len(children)} ligands in run '{run_id}'.") logger.info(f"Number of workers: {n_workers}") payload_data: dict[str, Any] = {"sim": sim_cfg} if components: payload_data["components"] = list(components) if n_workers is not None: payload_data["analysis_n_workers"] = n_workers payload_data["n_workers"] = n_workers if sim_range is not None: payload_data["sim_range"] = sim_range logger.info(f"Lambda window range set to: {sim_range}") else: sim_range_cfg = getattr(sim_cfg, "analysis_fe_range", None) if sim_range_cfg is not None: payload_data["sim_range"] = sim_range_cfg sim_range = sim_range_cfg logger.info(f"Lambda window range loaded: {sim_range}") payload = StepPayload(**payload_data) params = payload.to_mapping() analyze_step = Step(name="analyze") from batter.exec.handlers.fe_analysis import analyze_handler for child in tqdm(children, desc="Running analysis", unit="ligand"): try: analyze_handler(analyze_step, child, params) except Exception as exc: msg = f"Analysis failed for ligand '{child.meta.get('ligand')}' in run '{run_id}': {exc}" if raise_on_error: raise RuntimeError(msg) from exc logger.warning(msg) continue store = ArtifactStore(work_root) repo = FEResultsRepository(store) failures = save_fe_records( run_dir=run_dir, run_id=run_id, children_all=children, sim_cfg_updated=sim_cfg, repo=repo, protocol=protocol, ) if failures: failed = ", ".join( [f"{name} ({status}: {reason})" for name, status, reason in failures] ) logger.warning(f"Analysis recorded issues for run '{run_id}': {failed}") logger.info(f"Analysis complete for run '{run_dir}'.")