"""Ligand parameterisation helpers for GAFF/GAFF2 and OpenFF workflows."""
from __future__ import annotations
import hashlib
import json
import os
import re
import shutil
import tempfile
import warnings
from abc import ABC, abstractmethod
from pathlib import Path
from typing import (
Any,
Dict,
Iterable,
List,
Mapping,
MutableMapping,
Optional,
Sequence,
Tuple,
Union,
Literal,
)
import numpy as np
from gufe import SmallMoleculeComponent
from loguru import logger
from MDAnalysis import Universe
from openff.toolkit.topology import Molecule
from openff.toolkit.typing.engines.smirnoff.forcefield import get_available_force_fields
from rdkit import Chem
warnings.filterwarnings("ignore", category=UserWarning, module="gufe")
from batter.utils import (
antechamber,
parmchk2,
run_with_log,
tleap,
)
# type helpers
LigandPath = Union[str, Path]
LigandPathMap = Mapping[str, LigandPath]
LigandPathSeq = Sequence[LigandPath]
OnFailureMode = Optional[Literal["prune", "retry", "raise"]]
__all__ = [
"LigandProcessing",
"PDB_LigandProcessing",
"SDF_LigandProcessing",
"MOL2_LigandProcessing",
"LigandFactory",
"batch_ligand_process",
]
# --------------------------------------------------------------------------- #
# helpers #
# --------------------------------------------------------------------------- #
# forbidden molecule names in MDA selection language and AMBER residue names
FORBIDDEN_MOL_NAMES = {"add", "all", "and", "any", "not", "set"}
_SCIENTIFIC_NOTATION_RESNAME_RE = re.compile(r"^\d+e\d+$", re.IGNORECASE)
def _base26_triplet(n: int) -> str:
"""Map nonnegative int → 3 lowercase letters (base-26, a..z)."""
n = n % (26**3)
a = n // (26**2)
b = (n // 26) % 26
c = n % 26
return chr(a + 97) + chr(b + 97) + chr(c + 97)
def _stable_hash_int(s: str) -> int:
"""Deterministic int from string (md5, not Python's salted hash)."""
h = hashlib.md5(s.encode("utf-8")).hexdigest()
return int(h, 16)
def _looks_like_scientific_notation_resname(name: str) -> bool:
"""Return True for residue-name candidates like ``8e3``."""
return bool(_SCIENTIFIC_NOTATION_RESNAME_RE.fullmatch(name))
def _convert_mol_name_to_unique(
mol_name: str,
ind: int,
smiles: str,
exist_mol_names: set[str],
) -> str:
"""
Convert a molecule name to a unique 3-char identifier (letters/digits),
deterministic across runs, and never all digits.
Parameters
----------
mol_name
Suggested name.
ind
Index used as a deterministic tiebreaker.
smiles
Canonical SMILES used to derive a stable hash on collision.
exist_mol_names
Set of names already taken.
Returns
-------
str
Unique 3-character residue name.
"""
base = re.sub(r"[^a-zA-Z0-9]", "", (mol_name or "")).lower()[:3]
if len(base) < 3:
base = (base + _base26_triplet(ind))[:3]
if not base:
base = _base26_triplet(ind)
if len(base) == 3 and base.isdigit():
base = "l" + base[:2]
if _looks_like_scientific_notation_resname(base):
base = ""
if base and base not in exist_mol_names and base not in FORBIDDEN_MOL_NAMES:
return base
seed = _stable_hash_int(smiles or mol_name or "lig")
for attempt in range(200):
candidate = _base26_triplet(seed + attempt)
if candidate not in exist_mol_names and candidate not in FORBIDDEN_MOL_NAMES:
return candidate
for attempt in range(200):
candidate = _base26_triplet(ind + attempt)
if candidate not in exist_mol_names and candidate not in FORBIDDEN_MOL_NAMES:
return candidate
raise ValueError(
f"Could not derive unique 3-char name for molecule {mol_name!r} (smiles: {smiles})"
)
def _ensure_sdf_internal_name(sdf_path: Union[str, Path], name: str) -> None:
"""Normalize the ``_Name`` property of an SDF file.
Parameters
----------
sdf_path
Input SDF file.
name
Value written to the ``_Name`` header field.
"""
p = str(sdf_path)
suppl = Chem.SDMolSupplier(p, removeHs=False)
mols = [m for m in suppl if m is not None]
if not mols:
logger.warning(f"[ligand] Could not read SDF to set name: {p}")
return
mol = mols[0]
try:
mol.SetProp("_Name", name)
except Exception as e:
logger.warning(f"[ligand] Could not set _Name for {p}: {e}")
return
w = Chem.SDWriter(p)
w.write(mol)
w.close()
logger.debug(f"[ligand] Set SDF _Name to '{name}' in {Path(p).name}")
def _rdkit_load(path: Union[str, Path], retain_h: bool) -> Chem.Mol:
"""
Load a ligand file with RDKit (SDF/MOL2). Raise on failure.
Parameters
----------
path
Input file path.
retain_h
Whether to preserve explicit hydrogens from the file.
Returns
-------
Chem.Mol
RDKit molecule.
"""
p = str(path)
mol: Optional[Chem.Mol] = None
if p.lower().endswith(".sdf"):
suppl = Chem.SDMolSupplier(p, removeHs=not retain_h)
mol = suppl[0] if suppl and len(suppl) > 0 else None
elif p.lower().endswith(".mol2"):
mol = Chem.MolFromMol2File(p, removeHs=not retain_h)
else:
raise ValueError(
f"Unsupported ligand format for hashing: {p} (use .sdf or .mol2)"
)
if mol is None:
raise ValueError(f"RDKit failed to load {p}")
# Normalize: ensure a consistent H state for hashing
if not retain_h:
mol = Chem.RemoveHs(mol)
mol = Chem.AddHs(mol, addCoords=True)
return mol
def _canonical_payload(mol: Chem.Mol) -> str:
"""
Canonical text used for hashing: canonical SMILES with explicit Hs.
Parameters
----------
mol
RDKit molecule (with Hs added).
Returns
-------
str
Canonical SMILES (isomeric).
"""
return Chem.MolToSmiles(mol, isomericSmiles=True)
def _hash_id(payload: str, ligand_ff: str, retain_h: bool) -> str:
"""
Build a short, stable content hash for (mol, ff, retain flag).
The final id is 12 hex chars of SHA256.
"""
h = hashlib.sha256()
h.update(payload.encode("utf-8"))
h.update(f"|ff={ligand_ff}|retain={int(retain_h)}".encode("utf-8"))
return h.hexdigest()[:12]
# --------------------------------------------------------------------------- #
# core #
# --------------------------------------------------------------------------- #
[docs]
class LigandProcessing(ABC):
"""
Base class for ligand processing and parameterization.
It loads a ligand, determines a unique residue/name, estimates the charge,
and generates AMBER/OpenFF parameters.
Parameters
----------
ligand_file
Input ligand path (SDF/MOL2/PDB depending on subclass).
index
1-based index used for stable name generation.
output_dir
Output folder for generated files.
ligand_name
Optional preferred name; will be uniquified to 3 chars.
charge
Charge method for OpenFF pre-charge or quick estimate (e.g., ``"am1bcc"``).
retain_lig_prot
If ``True``, keep hydrogen atoms from input.
ligand_ff
One of ``"gaff"`` or ``"gaff2"`` or an OpenFF release like ``"openff-2.2.0"``.
unique_mol_names
Existing names to avoid collisions.
Attributes
----------
ligand_object : SmallMoleculeComponent
openff_molecule : Molecule
ligand_charge : float
Estimated total charge (integer).
atomnames : list[str]
Atom names extracted from generated PDB (AMBER path).
"""
def __init__(
self,
ligand_file: Union[str, Path],
index: int,
output_dir: Union[str, Path],
ligand_name: Optional[str] = None,
charge: str = "am1bcc",
retain_lig_prot: bool = True,
ligand_ff: str = "gaff2",
unique_mol_names: Optional[List[str]] = None,
) -> None:
self.ligand_file = str(ligand_file)
self.index = index
Path(output_dir).mkdir(parents=True, exist_ok=True)
self.output_dir = str(Path(output_dir).absolute())
self._name = ligand_name.lower() if ligand_name is not None else None
self.charge = charge
self.retain_lig_prot = retain_lig_prot
self.ligand_ff = ligand_ff
available_amber_ff = ["gaff", "gaff2"]
available_openff_ff = [
ff.removesuffix(".offxml")
for ff in get_available_force_fields()
if "openff" in ff
]
if ligand_ff in available_amber_ff:
self.force_field = "amber"
elif ligand_ff in available_openff_ff:
self.force_field = "openff"
else:
raise ValueError(
f"Unsupported force field: {ligand_ff}. "
f"Supported: {available_amber_ff + available_openff_ff}"
)
logger.debug(
"Using force field {} for ligand {}", self.force_field, ligand_file
)
self.unique_mol_names = set(unique_mol_names or [])
ligand_rdkit = self._load_ligand()
self._cano_smiles = Chem.MolToSmiles(ligand_rdkit, canonical=True)
if ligand_name is None:
ligand = SmallMoleculeComponent(ligand_rdkit)
else:
# skip warning
ligand = SmallMoleculeComponent(ligand_rdkit, name=ligand_name)
self.ligand_object = ligand
self.openff_molecule = ligand.to_openff()
self._generate_unique_name()
# -------------------- name / io helpers --------------------
def _generate_unique_name(self) -> None:
"""Derive a unique residue name for the ligand."""
self._get_mol_name()
self._name = _convert_mol_name_to_unique(
self._name, self.index, self.smiles, self.unique_mol_names
)
logger.debug("Ligand {}: {}", self.index, self.name)
self.openff_molecule.name = self.name.lower()
# needed for residue construction
self.openff_molecule.add_default_hierarchy_schemes()
self.openff_molecule.residues[0].residue_name = self.name.lower()
self.openff_molecule.to_file(self.ligand_sdf_path, file_format="sdf")
[docs]
def to_dict(self) -> Dict[str, Any]:
return {
"ligand_file": self.ligand_file,
"charge_type": self.charge,
"smiles": self.smiles,
"ligand_sdf_path": self.ligand_sdf_path,
"ligand_charge": getattr(self, "ligand_charge", None),
"ligand_ff": self.ligand_ff,
"ligand_name": self.name,
"retain_lig_prot": self.retain_lig_prot,
}
@abstractmethod
def _load_ligand(self) -> Chem.Mol:
"""Subclasses must load and return an RDKit molecule."""
raise NotImplementedError
def _get_mol_name(self) -> None:
"""Populate ``self._name`` from user input or the source file."""
if self._name is not None:
return
if self.ligand_object.name is not None:
mol_name = self.ligand_object.name
else:
mol_name = Path(self.ligand_file).stem.lower()
self._name = mol_name
@property
def name(self) -> str:
"""str: Three-character residue name used for generated artifacts."""
return str(self._name)
@property
def smiles(self) -> str:
"""str: Canonical SMILES with explicit hydrogens."""
return self._cano_smiles
@property
def ligand_sdf_path(self) -> str:
"""str: Path to the canonicalised SDF stored on disk."""
return str(Path(self.output_dir) / f"{self.name}.sdf")
# -------------------- parameterization --------------------
def _calculate_partial_charge(self) -> None:
"""Estimate the net charge using the configured partial-charge method."""
molecule = self.openff_molecule
charge_method = self.charge if self.force_field == "openff" else "gasteiger"
molecule.assign_partial_charges(partial_charge_method=charge_method)
ligand_charge = np.round(
np.sum([charge._magnitude for charge in molecule.partial_charges])
)
self.ligand_charge = float(ligand_charge)
logger.debug(
"Net charge of ligand {} in {} is {}.",
self.name,
self.ligand_file,
self.ligand_charge,
)
[docs]
def prepare_ligand_parameters(self) -> None:
"""
Generate parameters using either AMBER (GAFF/GAFF2) or OpenFF path.
Notes
-----
- OpenFF path first creates AMBER artifacts for tleap-based system build.
- Writes a ``<name>.json`` metadata file to the output folder.
"""
if self.force_field == "openff":
self.prepare_ligand_parameters_openff()
elif self.force_field == "amber":
self.prepare_ligand_parameters_amberff()
else:
raise ValueError("Unsupported force field; expected 'amber' or 'openff'.")
metadata = self.to_dict()
json_path = Path(self.output_dir) / f"{self.name}.json"
with json_path.open("w") as f:
json.dump(metadata, f, indent=4)
[docs]
def prepare_ligand_parameters_openff(self) -> None:
"""
Prepare ligand parameters using OpenFF toolkit (and AMBER bootstrap).
Behavior
--------
- Runs a **fast** AMBER bootstrap (GAFF2 + gas charges) so tleap artifacts exist.
- Generates an OpenFF `prmtop` for downstream if you prefer OpenMM/OpenFF.
"""
# Bootstrap via AMBER with fast charges
ligand_ff_openff = self.ligand_ff
self.ligand_ff = "gaff2"
self.prepare_ligand_parameters_amberff(charge_method="gas")
self.ligand_ff = ligand_ff_openff
from openff.toolkit import ForceField, Topology
mol = self.name
logger.debug(
"Preparing ligand {} parameters with OpenFF force field {}.",
mol,
self.ligand_ff,
)
openff_ff = ForceField(f"{self.ligand_ff}.offxml")
topology = Topology()
topology.add_molecule(self.openff_molecule)
interchange = openff_ff.create_interchange(
topology, charge_from_molecules=[self.openff_molecule]
)
# ensure residue + atom names are present
for residue in interchange.topology.hierarchy_iterator("residues"):
residue.residue_name = mol.lower()
for atom, atn in zip(interchange.topology.atoms, self.atomnames):
atom.name = atn
interchange.to_prmtop(f"{self.output_dir}/{mol}.prmtop")
logger.debug(
f"Ligand {mol} OpenFF parameters prepared: {self.output_dir}/{mol}.prmtop",
)
[docs]
def prepare_ligand_parameters_amberff(self, charge_method: str = "bcc") -> None:
"""
Prepare ligand parameters using AMBER (GAFF/GAFF2): mol2/frcmod/lib/prmtop.
Parameters
----------
charge_method
Antechamber charge method (e.g., ``"bcc"`` or ``"gas"``).
"""
mol = self.name
logger.debug(
"Preparing ligand {} parameters with AMBER force field {}.",
mol,
self.ligand_ff,
)
self._calculate_partial_charge()
abspath_sdf = str(Path(self.ligand_sdf_path).absolute())
ante_cmd = (
f"{antechamber} -i {abspath_sdf} -fi sdf -o {self.output_dir}/{mol}_ante.mol2 "
f"-fo mol2 -c {charge_method} -s 2 -at {self.ligand_ff} -nc {int(self.ligand_charge)} "
f"-rn {mol} -dr no"
)
with tempfile.TemporaryDirectory() as tmpdir:
run_with_log(ante_cmd, working_dir=tmpdir)
shutil.copy(
f"{self.output_dir}/{mol}_ante.mol2", f"{self.output_dir}/{mol}.mol2"
)
self._ligand_mol2_path = f"{self.output_dir}/{mol}.mol2"
if self.ligand_ff == "gaff":
run_with_log(
f"{parmchk2} -i {self.output_dir}/{mol}_ante.mol2 -f mol2 -o {self.output_dir}/{mol}.frcmod -s 1"
)
elif self.ligand_ff == "gaff2":
run_with_log(
f"{parmchk2} -i {self.output_dir}/{mol}_ante.mol2 -f mol2 -o {self.output_dir}/{mol}.frcmod -s 2"
)
with tempfile.TemporaryDirectory() as tmpdir:
run_with_log(
f"{antechamber} -i {self.ligand_sdf_path} -fi sdf -o {self.output_dir}/{mol}_ante.pdb -fo pdb -rn {mol} -dr no",
working_dir=tmpdir,
)
shutil.copy(f"{self.output_dir}/{mol}_ante.pdb", f"{self.output_dir}/{mol}.pdb")
tleap_script = f"""
source leaprc.protein.ff14SB
source leaprc.{self.ligand_ff}
lig = loadmol2 {self.output_dir}/{mol}.mol2
loadamberparams {self.output_dir}/{mol}.frcmod
saveoff lig {self.output_dir}/{mol}.lib
saveamberparm lig {self.output_dir}/{mol}.prmtop {self.output_dir}/{mol}.inpcrd
quit
"""
with tempfile.TemporaryDirectory() as tmpdir:
p = Path(tmpdir) / "tleap.in"
p.write_text(tleap_script)
run_with_log(f"{tleap} -f {p.name}", working_dir=tmpdir)
# atom names from generated pdb
lig_u = Universe(f"{self.output_dir}/{mol}.pdb")
self.atomnames = list(lig_u.atoms.names)
logger.debug(
"Ligand {} AMBER parameters prepared: {}/{}.lib", mol, self.output_dir, mol
)
# -------------------- DB lookup --------------------
[docs]
def fetch_from_existing_db(self, database: Union[str, Path]) -> bool:
"""
Search and copy ligand artifacts from a local database.
Parameters
----------
database
Directory containing ``<name>.(frcmod|lib|prmtop|inpcrd|mol2|pdb|json|sdf)``.
Returns
-------
bool
``True`` if a full, matching entry was found and copied.
"""
db = Path(database)
if not db.exists():
return False
db_files = [f for f in db.iterdir() if f.suffix == ".frcmod"]
db_names = [f.stem for f in db_files]
if self.name not in db_names:
return False
ligand_info = json.loads((db / f"{self.name}.json").read_text())
if ligand_info.get("charge_type") != self.charge:
logger.warning(
"Ligand {} found in DB {}, but charge mismatch: {} vs {}. Will re-parameterize.",
self.name,
db,
ligand_info.get("charge_type"),
self.charge,
)
return False
if ligand_info.get("ligand_ff") != self.ligand_ff:
logger.warning(
"Ligand {} found in DB {}, but FF mismatch: {} vs {}. Will re-parameterize.",
self.name,
db,
ligand_info.get("ligand_ff"),
self.ligand_ff,
)
return False
if ligand_info.get("smiles") != self.smiles:
logger.warning(
"Ligand {} found in DB {}, but SMILES mismatch: {} vs {}. Will re-parameterize.",
self.name,
db,
ligand_info.get("smiles"),
self.smiles,
)
return False
if not self.retain_lig_prot and ligand_info.get("retain_lig_prot"):
logger.warning(
"Ligand {} found in DB {}, but retain_lig_prot mismatch: {} vs {}. Will re-parameterize.",
self.name,
db,
ligand_info.get("retain_lig_prot"),
self.retain_lig_prot,
)
return False
suffixes = ["frcmod", "lib", "prmtop", "inpcrd", "mol2", "pdb", "json", "sdf"]
for suffix in suffixes:
if not (db / f"{self.name}.{suffix}").exists():
logger.warning(
"Ligand {} found in DB {}, but missing {}. Will re-parameterize.",
self.name,
db,
f"{self.name}.{suffix}",
)
return False
for suffix in suffixes:
shutil.copy(
db / f"{self.name}.{suffix}",
Path(self.output_dir) / f"{self.name}.{suffix}",
)
logger.debug(
"Ligand {} found in database {}. Files copied to {}.",
self.name,
db,
self.output_dir,
)
return True
# --------------------------------------------------------------------------- #
# concrete loaders #
# --------------------------------------------------------------------------- #
[docs]
class PDB_LigandProcessing(LigandProcessing):
def _load_ligand(self) -> Chem.Mol:
lig_u = Universe(self.ligand_file)
self._ligand_u = lig_u
ligand_rdkit = lig_u.atoms.convert_to("RDKIT")
if ligand_rdkit is None:
raise ValueError(
f"Failed to load ligand from {self.ligand_file} with RDKit. Check the input."
)
if not self.retain_lig_prot:
ligand_rdkit = Chem.RemoveHs(ligand_rdkit)
ligand_rdkit = Chem.AddHs(ligand_rdkit, addCoords=True)
return ligand_rdkit
def _get_mol_name(self) -> None:
if self._name is not None:
return
self._name = self._ligand_u.atoms.resnames[0] # type: ignore[attr-defined]
[docs]
class SDF_LigandProcessing(LigandProcessing):
def _load_ligand(self) -> Chem.Mol:
supplier = Chem.SDMolSupplier(
self.ligand_file, removeHs=not self.retain_lig_prot
)
ligand_rdkit = supplier[0] if supplier and len(supplier) > 0 else None
if not self.retain_lig_prot and ligand_rdkit is not None:
ligand_rdkit = Chem.RemoveHs(ligand_rdkit)
ligand_rdkit = Chem.AddHs(ligand_rdkit, addCoords=True)
else:
if (
ligand_rdkit is not None
and ligand_rdkit.GetNumAtoms() == ligand_rdkit.GetNumHeavyAtoms()
):
logger.warning(
"Probably no explicit H in {} (SMILES: {}). retain_lig_prot=True may cause issues.",
self.ligand_file,
Chem.MolToSmiles(ligand_rdkit) if ligand_rdkit else "<none>",
)
if ligand_rdkit is None:
raise ValueError(
f"Failed to load ligand from {self.ligand_file} with RDKit. Check the input."
)
return ligand_rdkit
[docs]
class MOL2_LigandProcessing(LigandProcessing):
def _load_ligand(self) -> Chem.Mol:
ligand_rdkit = Chem.MolFromMol2File(
self.ligand_file, removeHs=not self.retain_lig_prot
)
if not self.retain_lig_prot and ligand_rdkit is not None:
ligand_rdkit = Chem.RemoveHs(ligand_rdkit)
ligand_rdkit = Chem.AddHs(ligand_rdkit)
if ligand_rdkit is None:
raise ValueError(
f"Failed to load ligand from {self.ligand_file} with RDKit. Check the input."
)
return ligand_rdkit
# --------------------------------------------------------------------------- #
# factory + batch #
# --------------------------------------------------------------------------- #
[docs]
class LigandFactory:
"""
Factory that chooses the appropriate loader/processor by file extension.
"""
[docs]
def create_ligand(
self,
ligand_file: Union[str, Path],
index: int,
output_dir: Union[str, Path],
ligand_name: Optional[str] = None,
charge: str = "am1bcc",
retain_lig_prot: bool = True,
ligand_ff: str = "gaff2",
unique_mol_names: Optional[List[str]] = None,
) -> LigandProcessing:
"""Instantiate a concrete :class:`LigandProcessing` subclass.
Parameters
----------
ligand_file, index, output_dir, ligand_name, charge, retain_lig_prot,
ligand_ff, unique_mol_names
Forwarded to the underlying processor.
Returns
-------
LigandProcessing
Processor configured for the detected file type.
Raises
------
ValueError
If the file extension is unsupported.
"""
path = str(ligand_file).lower()
if path.endswith(".pdb"):
raise ValueError("PDB ligand input is not supported. Use SDF or MOL2.")
if path.endswith(".sdf"):
return SDF_LigandProcessing(
ligand_file=ligand_file,
index=index,
output_dir=output_dir,
ligand_name=ligand_name,
charge=charge,
retain_lig_prot=retain_lig_prot,
ligand_ff=ligand_ff,
unique_mol_names=unique_mol_names or [],
)
if path.endswith(".mol2"):
return MOL2_LigandProcessing(
ligand_file=ligand_file,
index=index,
output_dir=output_dir,
ligand_name=ligand_name,
charge=charge,
retain_lig_prot=retain_lig_prot,
ligand_ff=ligand_ff,
unique_mol_names=unique_mol_names or [],
)
raise ValueError(f"Unsupported ligand file format: {ligand_file!r}")
[docs]
def batch_ligand_process(
ligand_paths: Union[LigandPathSeq, LigandPathMap],
output_path: Union[str, Path],
retain_lig_prot: bool = True,
ligand_ph: float = 7.0,
ligand_ff: str = "gaff2",
charge_method: str = "am1bcc",
overwrite: bool = False,
run_with_slurm: bool = False,
max_slurm_jobs: int = 50,
run_with_slurm_kwargs: Optional[Dict[str, Any]] = None,
job_extra_directives: Optional[List[str]] = None,
on_failure: OnFailureMode = None,
) -> Tuple[List[str], Dict[str, Tuple[str, str]]]:
"""Parameterise ligands into a content-addressed store.
Artifacts for each ligand are written under::
<output_path>/<hash_id>/*
where ``hash_id = sha256(canonical_smiles + ligand_ff + retain).hexdigest()[:12]``.
Parameters
----------
ligand_paths
List of file paths or mapping {alias: path}. Only the file path affects hashing.
output_path
Output directory for the content-addressed store.
retain_lig_prot
Whether to retain hydrogens from inputs.
ligand_ph
Target protonation pH (reserved for future use).
ligand_ff
Force field ('gaff'/'gaff2' or a valid OpenFF release name).
charge_method
Charge method for ligand.
overwrite
If True, re-parameterize even if <hash_id> already exists.
run_with_slurm
If True, distribute parametrization with Dask+SLURM (same behavior as before).
max_slurm_jobs, run_with_slurm_kwargs, job_extra_directives
SLURM/Dask configuration.
Returns
-------
list of str
Hash identifiers in processing order (duplicates preserved).
dict
Mapping from the provided input path to ``(hash_id, canonical_smiles)``.
"""
# --- normalize inputs ---
def _norm(p: LigandPath) -> str:
return str(Path(p))
if isinstance(ligand_paths, (list, tuple)):
lig_map: Dict[str, str] = {f"lig{i}": _norm(p) for i, p in enumerate(ligand_paths)}
else:
lig_map = {k: _norm(v) for k, v in ligand_paths.items()}
for lp in lig_map.values():
if not Path(lp).exists():
raise FileNotFoundError(f"Ligand file not found: {lp}")
out_root = Path(output_path)
out_root.mkdir(parents=True, exist_ok=True)
available_amber_ff = ["gaff", "gaff2"]
available_openff_ff = [
ff.removesuffix(".offxml")
for ff in get_available_force_fields()
if "openff" in ff
]
if ligand_ff not in (available_amber_ff + available_openff_ff):
raise ValueError(
f"Unsupported force field: {ligand_ff}. "
f"Supported: {available_amber_ff + available_openff_ff}"
)
# --- compute content hashes for unique physical inputs ---
# key: path (string) → (hash_id, canonical_smiles)
unique: Dict[str, Tuple[str, str]] = {}
for alias, path in lig_map.items():
mol = _rdkit_load(path, retain_h=retain_lig_prot)
smi = _canonical_payload(mol)
hid = _hash_id(smi, ligand_ff=ligand_ff, retain_h=retain_lig_prot)
unique[path] = (hid, smi)
# order by first appearance of path in input list (stable)
ordered_paths = []
seen = set()
ligand_names = []
hash_order = []
for name, p in lig_map.items():
if p not in seen:
ordered_paths.append(p)
ligand_names.append(name)
seen.add(p)
hash_order.append(unique[p][0])
# --- build LigandProcessing objects for each unique hash ---
to_prepare: List[Tuple[str, str, "LigandProcessing", str]] = []
success_paths: set[str] = set()
factory = LigandFactory()
for idx, p in enumerate(ordered_paths, start=1):
lig_name = ligand_names[idx - 1]
hid, smi = unique[p]
target_dir = out_root / hid
target_dir.mkdir(parents=True, exist_ok=True)
lig = factory.create_ligand(
ligand_file=p,
index=idx,
output_dir=target_dir.as_posix(),
# set to a generic name
ligand_name="lig",
retain_lig_prot=retain_lig_prot,
charge=charge_method,
ligand_ff=ligand_ff,
unique_mol_names=[],
)
# Dump metadata for traceability
meta = {
"hash_id": hid,
"input_path": str(Path(p).resolve()),
"aliases": [name for name, path in lig_map.items() if path == p],
"canonical_smiles": smi,
"retain_lig_prot": bool(retain_lig_prot),
"ligand_ff": ligand_ff,
"prepared_base": lig_name,
}
(target_dir / "metadata.json").write_text(json.dumps(meta, indent=2))
# Skip if artifacts already present and not overwriting
marker_any = any(
(target_dir / f"{lig.name}.{ext}").exists()
for ext in ("frcmod", "lib", "prmtop", "xml")
)
if not overwrite and marker_any:
logger.info("Reusing cached ligand @ {} ({})", hid, meta["prepared_base"])
# treat cached ligands as successful so they propagate downstream
success_paths.add(p)
else:
to_prepare.append((lig_name, hid, lig, p))
# --- perform parametrization (local or SLURM) ---
mode_lower = (on_failure or "").lower()
if to_prepare:
if run_with_slurm:
raise NotImplementedError("Not implemented yet.")
else:
for lig_name, hid, lig, path in to_prepare:
logger.info(f"Preparing {lig_name} in {lig.output_dir}")
try:
lig.prepare_ligand_parameters()
success_paths.add(path)
except Exception as exc:
if mode_lower in {"prune", "retry"}:
logger.error(
"[param_ligands] failed to prepare %s (hash=%s): %s — skipping due to on_failure=%s",
lig_name,
hid,
exc,
mode_lower,
)
continue
raise
# filter outputs to successful ligands only
if not success_paths:
if mode_lower in {"prune", "retry"}:
logger.warning(f"[param_ligands] No ligands prepared successfully (on_failure={mode_lower}).")
return [], {}
logger.warning("[param_ligands] No ligands prepared successfully.")
return [], {}
unique_filtered: Dict[str, Tuple[str, str]] = {
p: unique[p] for p in success_paths if p in unique
}
hash_order_filtered: List[str] = []
for name, p in lig_map.items():
if p in success_paths and p in unique_filtered:
hash_order_filtered.append(unique_filtered[p][0])
skipped = len(lig_map) - len(success_paths)
logger.success(
f"Prepared {len(success_paths)} ligands into {out_root}"
f"{f' (skipped {skipped})' if skipped else ''}"
)
return hash_order_filtered, unique_filtered