Source code for batter.exec.handlers.equil

"""Slurm-backed equilibration handler."""

from __future__ import annotations

import os
from pathlib import Path
from typing import Any, Dict

from loguru import logger

from batter.exec.slurm_mgr import SlurmJobManager, SlurmJobSpec
from batter.orchestrate.state_registry import register_phase_state
from batter.pipeline.payloads import StepPayload
from batter.pipeline.step import ExecResult, Step
from batter.systems.core import SimSystem
from batter._internal.templates import RUN_FILES_DIR as RUN_FILES_ORIG


def _phase_paths(root: Path) -> dict[str, Path]:
    """Return resolved paths for equilibration artifacts under ``root``."""
    phase_dir = root / "equil"
    return {
        "phase_dir": phase_dir,
        "script": phase_dir / "SLURMM-run",
        "finished": phase_dir / "FINISHED",
        "failed": phase_dir / "FAILED",
        "jobid": phase_dir / "JOBID",
        "stdout": phase_dir / "slurm.out",
        "stderr": phase_dir / "slurm.err",
        "rst7": root / "artifacts" / "equil" / "equil.rst7",
    }


[docs] def equil_handler(step: Step, system: SimSystem, params: Dict[str, Any]) -> ExecResult: """Submit and register the equilibration job with the Slurm manager. Parameters ---------- step : Step Pipeline step metadata (unused but provided for symmetry). system : SimSystem Simulation system descriptor. params : dict Raw handler payload; validated into :class:`StepPayload`. Returns ------- ExecResult Result containing either existing artifacts (when already finished) or the work directory to be monitored by the manager. Raises ------ FileNotFoundError If the expected submission script is missing. RuntimeError When ``payload['job_mgr']`` is not a :class:`SlurmJobManager`. """ payload = StepPayload.model_validate(params) paths = _phase_paths(system.root) lig = system.meta.get("ligand", system.name) stage = payload.get("job_stage") or "equil" finished_rel = paths["finished"].relative_to(system.root).as_posix() failed_rel = paths["failed"].relative_to(system.root).as_posix() register_phase_state( system.root, "equil", required=[[finished_rel], [failed_rel]], success=[[finished_rel]], failure=[[failed_rel]], ) # Fast path: already done if paths["finished"].exists(): logger.info(f"[equil:{lig}] FINISHED detected — skipping submit/monitor.") arts = {"rst7": paths["rst7"], "finished": paths["finished"]} for k in ("stdout", "stderr"): if paths[k].exists(): arts[k] = paths[k] # best-effort job id try: arts["job_id"] = paths["jobid"].read_text().strip() except Exception: pass return ExecResult(job_ids=[], artifacts=arts) # Require the submit script to exist script = paths["script"] if not script.exists(): raise FileNotFoundError(f"[equil:{lig}] SLURM submit script missing: {script}") # Build job spec (submit from equil/ directory; pass partition via sbatch flags) job_name = f"fep_{os.path.abspath(system.root)}_eq" mgr = payload.get("job_mgr") if not isinstance(mgr, SlurmJobManager): raise RuntimeError( "Equilibration handler requires payload['job_mgr'] to be a SlurmJobManager instance" ) spec = SlurmJobSpec( workdir=paths["phase_dir"], script_rel=script.name, finished_name=paths["finished"].name, failed_name=paths["failed"].name, name=job_name, stage=stage, header_name="SLURMM-Am.header", header_template=RUN_FILES_ORIG / "SLURMM-Am.header", header_root=Path(getattr(payload.get("sim"), "slurm_header_dir", Path.home() / ".batter")) if payload.get("sim") else None, ) mgr.add(spec) return ExecResult(job_ids=[], artifacts={"workdir": paths["phase_dir"]})