"""Public API for BATTER.
This module collects the stable entry points intended for external consumption.
They fall into four broad categories:
* **Configuration helpers** – load and dump ``RunConfig`` / ``SimulationConfig`` objects.
* **Execution** – orchestrate complete workflows from a YAML definition.
* **Portable results** – inspect and copy artifacts produced by a run.
* **Utilities** – clone the state of an execution for reproducibility.
Typical usage
-------------
Run a workflow from a top-level YAML::
from batter.api import run_from_yaml
run_from_yaml(\"examples/mabfe.yaml\")
Inspect FE records stored in a work directory::
from batter.api import list_fe_runs, load_fe_run
runs = list_fe_runs(\"work/adrb2\")
latest = runs.iloc[-1][\"run_id\"]
# pass ``ligand`` when the run contains more than one ligand
record = load_fe_run(\"work/adrb2\", latest, ligand=\"LIG1\")
Run FE analysis on an existing execution::
from batter.api import run_analysis_from_execution
run_analysis_from_execution(\"work/adrb2\", latest, ligand=\"LIG1\")
For more examples, refer to ``docs/getting_started.rst`` and the tutorials.
"""
from __future__ import annotations
import json
from pathlib import Path
from typing import Any, TYPE_CHECKING, Sequence, Union
from loguru import logger
from tqdm import tqdm
from ._version import __version__ # semantic version string
# --- Schemas / configs ---
from .config.simulation import SimulationConfig
from .config.run import RunConfig
from .config import (
load_run_config,
dump_run_config,
load_simulation_config as load_sim_config,
dump_simulation_config as save_sim_config,
)
# --- Orchestration entrypoint ---
from .orchestrate.run import run_from_yaml, save_fe_records
# --- Portable runtime + FE results ---
from .runtime.portable import ArtifactStore
from .runtime.fe_repo import FEResultsRepository, FERecord, WindowResult
from .utils.exec_clone import clone_execution
# --- System descriptor (read-only for users) ---
from .systems.core import SimSystem, SystemMeta
from batter.pipeline.payloads import StepPayload
from batter.pipeline.step import Step
if TYPE_CHECKING:
import pandas as pd # type: ignore[assignment] # for type hints only
__all__ = [
# version
"__version__",
# configs
"SimulationConfig",
"RunConfig",
"load_run_config",
"dump_run_config",
"load_sim_config",
"save_sim_config",
# orchestration
"run_from_yaml",
# portable store + results
"ArtifactStore",
"FEResultsRepository",
"FERecord",
"WindowResult",
# system descriptor
"SimSystem",
# convenience helpers
"list_fe_runs",
"load_fe_run",
"run_analysis_from_execution",
# execution cloning
"clone_execution",
]
[docs]
def list_fe_runs(work_dir: Union[str, Path]) -> "pd.DataFrame":
"""
Return an index of FE runs contained in a portable work directory.
Parameters
----------
work_dir : str or Path
Path to the root directory of a BATTER execution (portable layout).
Returns
-------
pandas.DataFrame
DataFrame with one row per stored FE run. Columns include ``run_id``,
``ligand``, ``mol_name``, ``system_name``, ``temperature``, ``total_dG``,
``total_se``, ``canonical_smiles``, ``original_name``, ``original_path``,
``protocol``, ``sim_range``, ``status``, ``failure_reason``, and
``created_at``.
"""
store = ArtifactStore(Path(work_dir))
repo = FEResultsRepository(store)
return repo.index()
[docs]
def load_fe_run(
work_dir: Union[str, Path], run_id: str, ligand: str | None = None
) -> FERecord:
"""
Load a single FE record by ``run_id`` from a portable work directory.
Parameters
----------
work_dir : str or Path
Root directory of the BATTER execution.
run_id : str
Identifier of the FE run to load (as returned by :func:`list_fe_runs`).
ligand : str, optional
Ligand identifier when multiple ligands were processed in the run. If omitted,
the sole ligand is selected automatically or a ValueError is raised when
multiple matches exist.
Returns
-------
FERecord
Structured record containing total ΔG, standard error, components, and
per-window results.
"""
store = ArtifactStore(Path(work_dir))
repo = FEResultsRepository(store)
if ligand:
return repo.load(run_id, ligand)
df = repo.index()
matches = df[df["run_id"] == run_id]
if matches.empty:
raise KeyError(f"No FE records found for run_id '{run_id}'.")
if len(matches) > 1:
raise ValueError(
f"Multiple ligands stored for run_id '{run_id}'. "
"Call `load_fe_run` with the `ligand` argument or inspect `list_fe_runs`."
)
ligand_name = matches.iloc[0]["ligand"]
return repo.load(run_id, ligand_name)
[docs]
def run_analysis_from_execution(
work_dir: Union[str, Path],
run_id: str,
*,
ligand: str | None = None,
components: Sequence[str] | None = None,
n_workers: int | None = None,
sim_range: tuple[int, int] | None = None,
raise_on_error: bool = True,
) -> None:
"""
Run FE analysis for a partially finished/finished execution.
Parameters
----------
work_dir : str or Path
Root directory containing the portable execution store.
run_id : str
Identifier of the execution (e.g., ``run-20240101``).
ligand : str, optional
Ligand identifier to target when only a subset should be analyzed.
components : sequence of str, optional
Components to include during analysis (overrides ``sim_cfg.components``).
n_workers : int, optional
Number of worker processes requested for the analysis handler.
sim_range : (int, int), optional
(start, end) range of lambda windows to analyze.
raise_on_error : bool, optional
When ``True`` (default) propagate errors raised by the analysis handler.
Set to ``False`` to log the failure and continue with other ligands.
"""
work_root = Path(work_dir)
run_dir = work_root / "executions" / run_id
if not run_dir.exists():
raise FileNotFoundError(f"Run '{run_id}' does not exist under {work_root}.")
config_dir = run_dir / "artifacts" / "config"
sim_cfg_path = config_dir / "sim.resolved.yaml"
if not sim_cfg_path.exists():
raise FileNotFoundError(
f"Simulation configuration missing for run '{run_id}' at {sim_cfg_path}."
)
sim_cfg = load_sim_config(sim_cfg_path)
run_meta_path = config_dir / "run_meta.json"
run_meta: dict[str, Any] = {}
if run_meta_path.exists():
run_meta = json.loads(run_meta_path.read_text()) or {}
protocol = run_meta.get("protocol", "abfe")
system_name = run_meta.get("system_name") or sim_cfg.system_name
index_path = run_dir / "artifacts" / "ligand_params" / "index.json"
if not index_path.exists():
raise FileNotFoundError(
f"Ligand index missing for run '{run_id}' at {index_path}."
)
ligands_payload = json.loads(index_path.read_text()) or {}
entries = ligands_payload.get("ligands", [])
if not entries:
raise RuntimeError(f"No ligands recorded for run '{run_id}'.")
param_dir_dict = {
entry.get("residue_name"): entry.get("store_dir")
for entry in entries
if entry.get("residue_name") and entry.get("store_dir")
}
requested: Sequence[str] | None = [ligand] if ligand else None
children: list[SimSystem] = []
for entry in entries:
lig_name = entry["ligand"]
if requested and lig_name not in requested:
continue
child_root = run_dir / "simulations" / lig_name
if not child_root.is_dir():
raise FileNotFoundError(
f"Simulation directory for ligand '{lig_name}' was not found at {child_root}."
)
meta = SystemMeta(
ligand=lig_name,
residue_name=entry.get("residue_name"),
param_dir_dict=dict(param_dir_dict) if param_dir_dict else {},
)
children.append(
SimSystem(
name=f"{system_name}:{lig_name}:{run_id}",
root=child_root,
meta=meta,
)
)
if requested and not children:
raise KeyError(f"Ligand '{ligand}' not present in run '{run_id}'.")
logger.info(f"Running analysis for {len(children)} ligands in run '{run_id}'.")
logger.info(f"Number of workers: {n_workers}")
payload_data: dict[str, Any] = {"sim": sim_cfg}
if components:
payload_data["components"] = list(components)
if n_workers is not None:
payload_data["analysis_n_workers"] = n_workers
payload_data["n_workers"] = n_workers
if sim_range is not None:
payload_data["sim_range"] = sim_range
logger.info(f"Lambda window range set to: {sim_range}")
else:
sim_range_cfg = getattr(sim_cfg, "analysis_fe_range", None)
if sim_range_cfg is not None:
payload_data["sim_range"] = sim_range_cfg
sim_range = sim_range_cfg
logger.info(f"Lambda window range loaded: {sim_range}")
payload = StepPayload(**payload_data)
params = payload.to_mapping()
analyze_step = Step(name="analyze")
from batter.exec.handlers.fe_analysis import analyze_handler
for child in tqdm(children, desc="Running analysis", unit="ligand"):
try:
analyze_handler(analyze_step, child, params)
except Exception as exc:
msg = f"Analysis failed for ligand '{child.meta.get('ligand')}' in run '{run_id}': {exc}"
if raise_on_error:
raise RuntimeError(msg) from exc
logger.warning(msg)
continue
store = ArtifactStore(work_root)
repo = FEResultsRepository(store)
failures = save_fe_records(
run_dir=run_dir,
run_id=run_id,
children_all=children,
sim_cfg_updated=sim_cfg,
repo=repo,
protocol=protocol,
)
if failures:
failed = ", ".join(
[f"{name} ({status}: {reason})" for name, status, reason in failures]
)
logger.warning(f"Analysis recorded issues for run '{run_id}': {failed}")
logger.info(f"Analysis complete for run '{run_dir}'.")