From 260d72cffc1d1adb1a39915e8f1abb62732cbe86 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Thu, 29 Jan 2026 15:16:52 +0100 Subject: [PATCH 1/6] Small fix --- .github/workflows/ci.yaml | 2 +- src/openfe_analysis/rmsd.py | 135 +++++++++++++++---------- src/openfe_analysis/tests/conftest.py | 25 ++++- src/openfe_analysis/tests/test_rmsd.py | 42 +++++++- 4 files changed, 140 insertions(+), 64 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 767401f..36d52f4 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -60,7 +60,7 @@ jobs: ~/.cache/openfe_analysis # macOS cache location ~/Library/Caches/openfe_analysis - key: pooch-${{ matrix.os }}-v2 + key: pooch-${{ matrix.os }}-v3 - name: "Download Zenodo data" run: | diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 166a00d..ded82b0 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -15,22 +15,42 @@ from .transformations import Aligner, ClosestImageShift, NoJump -def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Universe: - """Makes a Universe and applies some transformations +def make_Universe( + top: pathlib.Path, + trj: nc.Dataset, + state: int, + ligand_resname: str = "UNK", + protein_selection: str = "protein and name CA", +) -> mda.Universe: + """ + Creates a Universe and applies transformations for protein and ligands. - Identifies two AtomGroups: - - protein, defined as having standard amino acid names, then filtered - down to CA - - ligand, defined as resname UNK + Parameters + ---------- + top : pathlib.Path + Path to the topology file. + trj : nc.Dataset + Trajectory dataset. + state : int + State index in the trajectory. + ligand_resname : str, default 'UNK' + Residue name(s) for ligands. Supports multiple ligands. + protein_selection : str, default 'protein and name CA' + MDAnalysis selection string for the protein atoms to consider. - Then applies some transformations. + Returns + ------- + mda.Universe + Universe with transformations applied. + Notes + ----- If a protein is present: - prevents the protein from jumping between periodic images - moves the ligand to the image closest to the protein - aligns the entire system to minimise the protein RMSD - If only a ligand: + If only a ligand is present: - prevents the ligand from jumping between periodic images """ u = mda.Universe( @@ -39,8 +59,8 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers state_id=state, format=FEReader, ) - prot = u.select_atoms("protein and name CA") - ligand = u.select_atoms("resname UNK") + prot = u.select_atoms(protein_selection) + ligands = [res.atoms for res in u.residues if res.resname == ligand_resname] if prot: # Unwrap all atoms @@ -48,7 +68,7 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers # Shift chains + ligand chains = [seg.atoms for seg in prot.segments] - shift = ClosestImageShift(chains[0], [*chains[1:], ligand]) + shift = ClosestImageShift(chains[0], [*chains[1:], *ligands]) # Make each protein chain whole for frag in prot.fragments: make_whole(frag, reference_atom=frag[0]) @@ -62,21 +82,20 @@ def make_Universe(top: pathlib.Path, trj: nc.Dataset, state: int) -> mda.Univers ) else: # if there's no protein - # - make the ligand not jump periodic images between frames - # - align the ligand to minimise its RMSD - nope = NoJump(ligand) - align = Aligner(ligand) - - u.trajectory.add_transformations( - nope, - align, - ) + # - make the ligands not jump periodic images between frames + # - align the ligands to minimise its RMSD + for lig in ligands: + u.trajectory.add_transformations(NoJump(lig), Aligner(lig)) return u def gather_rms_data( - pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None + pdb_topology: pathlib.Path, + dataset: pathlib.Path, + skip: Optional[int] = None, + ligand_resname: str = "UNK", + protein_selection: str = "protein and name CA", ) -> dict[str, list[float]]: """Generate structural analysis of RBFE simulation @@ -89,6 +108,10 @@ def gather_rms_data( skip : int, optional step at which to progress through the trajectory. by default, selects a step that produces roughly 500 frames of analysis per replicate + ligand_resname : str, default 'UNK' + Residue name for ligand(s). Supports multiple ligands. + protein_selection : str, default 'protein and name CA' + MDAnalysis selection string for the protein atoms to consider. Produces, for each lambda state: - 1D protein RMSD timeseries 'protein_RMSD' @@ -126,31 +149,35 @@ def gather_rms_data( for i in range(n_lambda): # cheeky, but we can read the PDB topology once and reuse per universe # this then only hits the PDB file once for all replicas - u = make_Universe(u_top._topology, ds, state=i) - - prot = u.select_atoms("protein and name CA") - ligand = u.select_atoms("resname UNK") - - # save coordinates for 2D RMSD matrix - # TODO: Some smart guard to avoid allocating a silly amount of memory? - prot2d = np.empty((len(u.trajectory[::skip]), len(prot), 3), dtype=np.float32) - - # Would this copy be safer? - prot_start = prot.positions.copy() - ligand_start = ligand.positions.copy() - ligand_initial_com = ligand.center_of_mass() - ligand_weights = ligand.masses / np.mean(ligand.masses) + u = make_Universe( + u_top._topology, + ds, + state=i, + ligand_resname=ligand_resname, + protein_selection=protein_selection, + ) + + prot = u.select_atoms(protein_selection) + ligands = [res.atoms for res in u.residues if res.resname == ligand_resname] + + # Prepare storage + if prot: + prot_positions = np.empty( + (len(u.trajectory[::skip]), len(prot), 3), dtype=np.float32 + ) + prot_start = prot.positions.copy() + prot_rmsd = [] - this_protein_rmsd = [] - this_ligand_rmsd = [] - this_ligand_wander = [] + lig_starts = [lig.positions.copy() for lig in ligands] + lig_initial_coms = [lig.center_of_mass() for lig in ligands] + lig_rmsd: list[list[float]] = [[] for _ in ligands] + lig_wander: list[list[float]] = [[] for _ in ligands] for ts_i, ts in enumerate(u.trajectory[::skip]): pb.update() - if prot: - prot2d[ts_i, :, :] = prot.positions - this_protein_rmsd.append( + prot_positions[ts_i, :, :] = prot.positions + prot_rmsd.append( rms.rmsd( prot.positions, prot_start, @@ -159,37 +186,37 @@ def gather_rms_data( superposition=False, ) ) - if ligand: - this_ligand_rmsd.append( + for i, lig in enumerate(ligands): + lig_rmsd[i].append( rms.rmsd( - ligand.positions, - ligand_start, - ligand_weights, + lig.positions, + lig_starts[i], + lig.masses / np.mean(lig.masses), center=False, superposition=False, ) ) - this_ligand_wander.append( + lig_wander[i].append( # distance between start and current ligand position # ignores PBC, but we've already centered the traj - mda.lib.distances.calc_bonds(ligand.center_of_mass(), ligand_initial_com) + mda.lib.distances.calc_bonds(lig.center_of_mass(), lig_initial_coms[i]) ) if prot: # can ignore weights here as it's all Ca - rmsd2d = twoD_RMSD(prot2d, w=None) # prot_weights) - output["protein_RMSD"].append(this_protein_rmsd) + rmsd2d = twoD_RMSD(prot_positions, w=None) # prot_weights) + output["protein_RMSD"].append(prot_rmsd) output["protein_2D_RMSD"].append(rmsd2d) - if ligand: - output["ligand_RMSD"].append(this_ligand_rmsd) - output["ligand_wander"].append(this_ligand_wander) + if ligands: + output["ligand_RMSD"].append(lig_rmsd) + output["ligand_wander"].append(lig_wander) output["time(ps)"] = list(np.arange(len(u.trajectory))[::skip] * u.trajectory.dt) return output -def twoD_RMSD(positions, w: Optional[npt.NDArray]) -> list[float]: +def twoD_RMSD(positions: np.ndarray, w: Optional[npt.NDArray]) -> list[float]: """2 dimensions RMSD Parameters diff --git a/src/openfe_analysis/tests/conftest.py b/src/openfe_analysis/tests/conftest.py index 851668c..210059b 100644 --- a/src/openfe_analysis/tests/conftest.py +++ b/src/openfe_analysis/tests/conftest.py @@ -4,11 +4,12 @@ import pooch import pytest -ZENODO_DOI = "doi:10.5281/zenodo.18378051" +ZENODO_DOI = "doi:10.5281/zenodo.18415326" ZENODO_FILES = { - "openfe_analysis_simulation_output.tar.gz": "md5:7f0babaac3dc8f7dd2db63cb79dff00f", + "openfe_analysis_full.tar.gz": "md5:7f0babaac3dc8f7dd2db63cb79dff00f", "openfe_analysis_skipped.tar.gz": "md5:ac42219bde9da3641375adf3a9ddffbf", + "openfe_analysis_septop.tar.gz": "md5:4b47198c57025bd6e0c6cf76f864370a", } POOCH_CACHE = pathlib.Path(pooch.os_cache("openfe_analysis")) @@ -35,8 +36,8 @@ def _fetch_and_untar_once(filename: str) -> pathlib.Path: @pytest.fixture(scope="session") def rbfe_output_data_dir() -> pathlib.Path: - untar_dir = _fetch_and_untar_once("openfe_analysis_simulation_output.tar.gz") - return untar_dir / "openfe_analysis_simulation_output" + untar_dir = _fetch_and_untar_once("openfe_analysis_full.tar.gz") + return untar_dir / "openfe_analysis_full" @pytest.fixture(scope="session") @@ -45,6 +46,12 @@ def rbfe_skipped_data_dir() -> pathlib.Path: return untar_dir / "openfe_analysis_skipped" +@pytest.fixture(scope="session") +def rbfe_septop_data_dir() -> pathlib.Path: + untar_dir = _fetch_and_untar_once("openfe_analysis_septop.tar.gz") + return untar_dir / "openfe_analysis_septop" + + @pytest.fixture(scope="session") def simulation_nc(rbfe_output_data_dir) -> pathlib.Path: return rbfe_output_data_dir / "simulation.nc" @@ -55,6 +62,16 @@ def simulation_skipped_nc(rbfe_skipped_data_dir) -> pathlib.Path: return rbfe_skipped_data_dir / "simulation.nc" +@pytest.fixture(scope="session") +def simulation_nc_septop(rbfe_septop_data_dir) -> pathlib.Path: + return rbfe_septop_data_dir / "complex.nc" + + +@pytest.fixture(scope="session") +def system_septop(rbfe_septop_data_dir) -> pathlib.Path: + return rbfe_septop_data_dir / "alchemical_system.pdb" + + @pytest.fixture(scope="session") def hybrid_system_pdb(rbfe_output_data_dir) -> pathlib.Path: return rbfe_output_data_dir / "hybrid_system.pdb" diff --git a/src/openfe_analysis/tests/test_rmsd.py b/src/openfe_analysis/tests/test_rmsd.py index 6183791..4e808c3 100644 --- a/src/openfe_analysis/tests/test_rmsd.py +++ b/src/openfe_analysis/tests/test_rmsd.py @@ -47,13 +47,13 @@ def test_gather_rms_data_regression(simulation_nc, hybrid_system_pdb): ) assert len(output["ligand_RMSD"]) == 3 assert_allclose( - output["ligand_RMSD"][0], + output["ligand_RMSD"][0][0], [0.0, 0.9094, 1.0398, 0.9774, 1.9108, 1.2149], rtol=1e-3, ) assert len(output["ligand_wander"]) == 3 assert_allclose( - output["ligand_wander"][0], + output["ligand_wander"][0][0], [0.0, 0.5458, 0.8364, 0.4914, 1.1939, 0.7587], rtol=1e-3, ) @@ -67,6 +67,38 @@ def test_gather_rms_data_regression(simulation_nc, hybrid_system_pdb): ) +def test_gather_rms_data_septop(simulation_nc_septop, system_septop): + output = gather_rms_data( + system_septop, + simulation_nc_septop, + skip=100, + ) + + assert_allclose(output["time(ps)"], [0.0, 10000.0]) + assert len(output["protein_RMSD"]) == 19 + assert len(output["ligand_RMSD"]) == 19 + # Check that we have two lists, one for each ligand + assert len(output["ligand_RMSD"][0]) == 2 + assert len(output["ligand_wander"]) == 19 + # Check that we have two lists, one for each ligand + assert len(output["ligand_wander"][0]) == 2 + + +def test_make_universe_two_ligands(simulation_nc_septop, system_septop): + # Create Universe + u = make_Universe(top=system_septop, trj=simulation_nc_septop, state=0) + + # Select protein and ligands + prot = u.select_atoms("protein and name CA") + ligands = [res.atoms for res in u.residues if res.resname == "UNK"] + + # Check that we have two ligands + assert len(ligands) == 2, f"Expected 2 ligands, got {len(ligands)}" + + # Check protein is present + assert len(prot) > 0 + + def test_gather_rms_data_regression_skippednc(simulation_skipped_nc, hybrid_system_skipped_pdb): output = gather_rms_data( hybrid_system_skipped_pdb, @@ -84,13 +116,13 @@ def test_gather_rms_data_regression_skippednc(simulation_skipped_nc, hybrid_syst ) assert len(output["ligand_RMSD"]) == 11 assert_allclose( - output["ligand_RMSD"][0][:6], + output["ligand_RMSD"][0][0][:6], [0.0, 1.092039, 0.839234, 1.228383, 1.533331, 1.276798], rtol=1e-3, ) assert len(output["ligand_wander"]) == 11 assert_allclose( - output["ligand_wander"][0][:6], + output["ligand_wander"][0][0][:6], [0.0, 0.908097, 0.674262, 0.971328, 0.909263, 1.101882], rtol=1e-3, ) @@ -158,7 +190,7 @@ def test_rmsd_reference_is_first_frame(mda_universe): u = mda_universe prot = u.select_atoms("protein") - ts = next(iter(u.trajectory)) # SAFE + _ = next(iter(u.trajectory)) # SAFE ref = prot.positions.copy() rmsd = np.sqrt(((prot.positions - ref) ** 2).mean()) From 5e7a0376395dc39627798f620b26720d7c2043d8 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Thu, 29 Jan 2026 15:28:57 +0100 Subject: [PATCH 2/6] fix --- .github/workflows/ci.yaml | 2 +- src/openfe_analysis/tests/conftest.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/ci.yaml b/.github/workflows/ci.yaml index 36d52f4..9b202af 100644 --- a/.github/workflows/ci.yaml +++ b/.github/workflows/ci.yaml @@ -60,7 +60,7 @@ jobs: ~/.cache/openfe_analysis # macOS cache location ~/Library/Caches/openfe_analysis - key: pooch-${{ matrix.os }}-v3 + key: pooch-${{ matrix.os }}-v1 - name: "Download Zenodo data" run: | diff --git a/src/openfe_analysis/tests/conftest.py b/src/openfe_analysis/tests/conftest.py index 210059b..70672c1 100644 --- a/src/openfe_analysis/tests/conftest.py +++ b/src/openfe_analysis/tests/conftest.py @@ -1,5 +1,4 @@ import pathlib -from importlib import resources import pooch import pytest From 4fac0a0d5b29e6728b7a5d24cb13bebeb746a808 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Thu, 29 Jan 2026 15:51:26 +0100 Subject: [PATCH 3/6] Use ligand selection string instead of resname --- src/openfe_analysis/rmsd.py | 36 +++++++++++++++++++++++++----------- 1 file changed, 25 insertions(+), 11 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index ded82b0..3ef4b95 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -15,11 +15,26 @@ from .transformations import Aligner, ClosestImageShift, NoJump +def select_protein_and_ligands( + u: mda.Universe, + protein_selection: str, + ligand_selection: str, +): + prot = u.select_atoms(protein_selection) + + lig_atoms = u.select_atoms(ligand_selection) + + # split into individual ligands by residue + ligands = [res.atoms for res in lig_atoms.residues] + + return prot, ligands + + def make_Universe( top: pathlib.Path, trj: nc.Dataset, state: int, - ligand_resname: str = "UNK", + ligand_selection: str = "resname UNK", protein_selection: str = "protein and name CA", ) -> mda.Universe: """ @@ -33,8 +48,8 @@ def make_Universe( Trajectory dataset. state : int State index in the trajectory. - ligand_resname : str, default 'UNK' - Residue name(s) for ligands. Supports multiple ligands. + ligand_selection : str, default 'resname UNK' + MDAnalysis selection string for ligands. Supports multiple ligands. protein_selection : str, default 'protein and name CA' MDAnalysis selection string for the protein atoms to consider. @@ -59,8 +74,8 @@ def make_Universe( state_id=state, format=FEReader, ) - prot = u.select_atoms(protein_selection) - ligands = [res.atoms for res in u.residues if res.resname == ligand_resname] + + prot, ligands = select_protein_and_ligands(u, protein_selection, ligand_selection) if prot: # Unwrap all atoms @@ -94,7 +109,7 @@ def gather_rms_data( pdb_topology: pathlib.Path, dataset: pathlib.Path, skip: Optional[int] = None, - ligand_resname: str = "UNK", + ligand_selection: str = "resname UNK", protein_selection: str = "protein and name CA", ) -> dict[str, list[float]]: """Generate structural analysis of RBFE simulation @@ -108,8 +123,8 @@ def gather_rms_data( skip : int, optional step at which to progress through the trajectory. by default, selects a step that produces roughly 500 frames of analysis per replicate - ligand_resname : str, default 'UNK' - Residue name for ligand(s). Supports multiple ligands. + ligand_selection: str = "resname UNK", + MDAnalysis selection string for ligand(s). Supports multiple ligands. protein_selection : str, default 'protein and name CA' MDAnalysis selection string for the protein atoms to consider. @@ -153,12 +168,11 @@ def gather_rms_data( u_top._topology, ds, state=i, - ligand_resname=ligand_resname, + ligand_resname=ligand_selection, protein_selection=protein_selection, ) - prot = u.select_atoms(protein_selection) - ligands = [res.atoms for res in u.residues if res.resname == ligand_resname] + prot, ligands = select_protein_and_ligands(u, protein_selection, ligand_selection) # Prepare storage if prot: From c329ba9b36dc5978e85536d31a39be8af17ee68e Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 30 Jan 2026 10:24:07 +0100 Subject: [PATCH 4/6] Update test files --- src/openfe_analysis/rmsd.py | 2 +- src/openfe_analysis/tests/conftest.py | 30 +++++++++++---------------- 2 files changed, 13 insertions(+), 19 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 3ef4b95..2971b07 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -168,7 +168,7 @@ def gather_rms_data( u_top._topology, ds, state=i, - ligand_resname=ligand_selection, + ligand_selection=ligand_selection, protein_selection=protein_selection, ) diff --git a/src/openfe_analysis/tests/conftest.py b/src/openfe_analysis/tests/conftest.py index 70672c1..3795682 100644 --- a/src/openfe_analysis/tests/conftest.py +++ b/src/openfe_analysis/tests/conftest.py @@ -3,10 +3,10 @@ import pooch import pytest -ZENODO_DOI = "doi:10.5281/zenodo.18415326" +ZENODO_DOI = "doi:10.5281/zenodo.17916321" ZENODO_FILES = { - "openfe_analysis_full.tar.gz": "md5:7f0babaac3dc8f7dd2db63cb79dff00f", + "openfe_analysis_full.tar.gz": "md5:a51b1f8d98b91ab1a69a6f55508d07db", "openfe_analysis_skipped.tar.gz": "md5:ac42219bde9da3641375adf3a9ddffbf", "openfe_analysis_septop.tar.gz": "md5:4b47198c57025bd6e0c6cf76f864370a", } @@ -21,34 +21,28 @@ ) -def _fetch_and_untar_once(filename: str) -> pathlib.Path: - # If already untarred, reuse it - untar_dir = POOCH_CACHE / f"{filename}.untar" - if untar_dir.exists(): - return untar_dir - - # Otherwise fetch + untar - paths = ZENODO_RBFE_DATA.fetch(filename, processor=pooch.Untar()) - - return pathlib.Path(paths[0]).parent +def _fetch_and_untar(dirname: str) -> pathlib.Path: + ZENODO_RBFE_DATA.fetch(f"{dirname}.tar.gz", processor=pooch.Untar()) + cached_dir = pathlib.Path(f"{POOCH_CACHE}/{dirname}.tar.gz.untar/{dirname}") + return cached_dir @pytest.fixture(scope="session") def rbfe_output_data_dir() -> pathlib.Path: - untar_dir = _fetch_and_untar_once("openfe_analysis_full.tar.gz") - return untar_dir / "openfe_analysis_full" + cached_dir = _fetch_and_untar("openfe_analysis_full") + return cached_dir @pytest.fixture(scope="session") def rbfe_skipped_data_dir() -> pathlib.Path: - untar_dir = _fetch_and_untar_once("openfe_analysis_skipped.tar.gz") - return untar_dir / "openfe_analysis_skipped" + cached_dir = _fetch_and_untar("openfe_analysis_skipped") + return cached_dir @pytest.fixture(scope="session") def rbfe_septop_data_dir() -> pathlib.Path: - untar_dir = _fetch_and_untar_once("openfe_analysis_septop.tar.gz") - return untar_dir / "openfe_analysis_septop" + cached_dir = _fetch_and_untar("openfe_analysis_septop") + return cached_dir @pytest.fixture(scope="session") From b5cc7e08e1f1d885ad1a2717872fd1521130c3f6 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 30 Jan 2026 10:36:47 +0100 Subject: [PATCH 5/6] Rename ligand_wander to ligand_COM_drift --- src/openfe_analysis/rmsd.py | 10 +++++----- src/openfe_analysis/tests/test_rmsd.py | 12 ++++++------ 2 files changed, 11 insertions(+), 11 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 2971b07..418d362 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -131,13 +131,13 @@ def gather_rms_data( Produces, for each lambda state: - 1D protein RMSD timeseries 'protein_RMSD' - ligand RMSD timeseries - - ligand COM motion 'ligand_wander' + - ligand COM motion 'ligand_COM_drift' - 2D protein RMSD plot """ output = { "protein_RMSD": [], "ligand_RMSD": [], - "ligand_wander": [], + "ligand_COM_drift": [], "protein_2D_RMSD": [], } @@ -185,7 +185,7 @@ def gather_rms_data( lig_starts = [lig.positions.copy() for lig in ligands] lig_initial_coms = [lig.center_of_mass() for lig in ligands] lig_rmsd: list[list[float]] = [[] for _ in ligands] - lig_wander: list[list[float]] = [[] for _ in ligands] + lig_com_drift: list[list[float]] = [[] for _ in ligands] for ts_i, ts in enumerate(u.trajectory[::skip]): pb.update() @@ -210,7 +210,7 @@ def gather_rms_data( superposition=False, ) ) - lig_wander[i].append( + lig_com_drift[i].append( # distance between start and current ligand position # ignores PBC, but we've already centered the traj mda.lib.distances.calc_bonds(lig.center_of_mass(), lig_initial_coms[i]) @@ -223,7 +223,7 @@ def gather_rms_data( output["protein_2D_RMSD"].append(rmsd2d) if ligands: output["ligand_RMSD"].append(lig_rmsd) - output["ligand_wander"].append(lig_wander) + output["ligand_COM_drift"].append(lig_com_drift) output["time(ps)"] = list(np.arange(len(u.trajectory))[::skip] * u.trajectory.dt) diff --git a/src/openfe_analysis/tests/test_rmsd.py b/src/openfe_analysis/tests/test_rmsd.py index 4e808c3..26d93af 100644 --- a/src/openfe_analysis/tests/test_rmsd.py +++ b/src/openfe_analysis/tests/test_rmsd.py @@ -51,9 +51,9 @@ def test_gather_rms_data_regression(simulation_nc, hybrid_system_pdb): [0.0, 0.9094, 1.0398, 0.9774, 1.9108, 1.2149], rtol=1e-3, ) - assert len(output["ligand_wander"]) == 3 + assert len(output["ligand_COM_drift"]) == 3 assert_allclose( - output["ligand_wander"][0][0], + output["ligand_COM_drift"][0][0], [0.0, 0.5458, 0.8364, 0.4914, 1.1939, 0.7587], rtol=1e-3, ) @@ -79,9 +79,9 @@ def test_gather_rms_data_septop(simulation_nc_septop, system_septop): assert len(output["ligand_RMSD"]) == 19 # Check that we have two lists, one for each ligand assert len(output["ligand_RMSD"][0]) == 2 - assert len(output["ligand_wander"]) == 19 + assert len(output["ligand_COM_drift"]) == 19 # Check that we have two lists, one for each ligand - assert len(output["ligand_wander"][0]) == 2 + assert len(output["ligand_COM_drift"][0]) == 2 def test_make_universe_two_ligands(simulation_nc_septop, system_septop): @@ -120,9 +120,9 @@ def test_gather_rms_data_regression_skippednc(simulation_skipped_nc, hybrid_syst [0.0, 1.092039, 0.839234, 1.228383, 1.533331, 1.276798], rtol=1e-3, ) - assert len(output["ligand_wander"]) == 11 + assert len(output["ligand_COM_drift"]) == 11 assert_allclose( - output["ligand_wander"][0][0][:6], + output["ligand_COM_drift"][0][0][:6], [0.0, 0.908097, 0.674262, 0.971328, 0.909263, 1.101882], rtol=1e-3, ) From 56b0e619a316f0ecf5b09e7b3873e6e229889030 Mon Sep 17 00:00:00 2001 From: hannahbaumann Date: Fri, 30 Jan 2026 13:54:01 +0100 Subject: [PATCH 6/6] split out per state analysis into its own function --- src/openfe_analysis/rmsd.py | 241 +++++++++++++++++++++--------------- 1 file changed, 143 insertions(+), 98 deletions(-) diff --git a/src/openfe_analysis/rmsd.py b/src/openfe_analysis/rmsd.py index 418d362..fa55a4a 100644 --- a/src/openfe_analysis/rmsd.py +++ b/src/openfe_analysis/rmsd.py @@ -15,19 +15,17 @@ from .transformations import Aligner, ClosestImageShift, NoJump -def select_protein_and_ligands( +def _select_protein_and_ligands( u: mda.Universe, protein_selection: str, ligand_selection: str, -): - prot = u.select_atoms(protein_selection) - +) -> tuple[mda.core.groups.AtomGroup, list[mda.core.groups.AtomGroup]]: + protein = u.select_atoms(protein_selection) lig_atoms = u.select_atoms(ligand_selection) - # split into individual ligands by residue ligands = [res.atoms for res in lig_atoms.residues] - return prot, ligands + return protein, ligands def make_Universe( @@ -75,20 +73,20 @@ def make_Universe( format=FEReader, ) - prot, ligands = select_protein_and_ligands(u, protein_selection, ligand_selection) + protein, ligands = _select_protein_and_ligands(u, protein_selection, ligand_selection) - if prot: + if protein: # Unwrap all atoms - unwrap_tr = unwrap(prot) + unwrap_tr = unwrap(protein) # Shift chains + ligand - chains = [seg.atoms for seg in prot.segments] + chains = [seg.atoms for seg in protein.segments] shift = ClosestImageShift(chains[0], [*chains[1:], *ligands]) # Make each protein chain whole - for frag in prot.fragments: + for frag in protein.fragments: make_whole(frag, reference_atom=frag[0]) - align = Aligner(prot) + align = Aligner(protein) u.trajectory.add_transformations( unwrap_tr, @@ -105,6 +103,121 @@ def make_Universe( return u +def twoD_RMSD(positions: np.ndarray, w: Optional[npt.NDArray]) -> list[float]: + """2 dimensions RMSD + + Parameters + ---------- + positions : np.ndarray + the protein positions for the entire trajectory + w : np.ndarray, optional + weights array + + Returns + ------- + rmsd_matrix : list + Flattened list of RMSD values between all frame pairs. + """ + nframes, _, _ = positions.shape + + output = [] + + for i, j in itertools.combinations(range(nframes), 2): + posi, posj = positions[i], positions[j] + + rmsd = rms.rmsd(posi, posj, w, center=True, superposition=True) + + output.append(rmsd) + + return output + + +def analyze_state( + u: mda.Universe, + prot: Optional[mda.core.groups.AtomGroup], + ligands: list[mda.core.groups.AtomGroup], + skip: int, +) -> tuple[ + Optional[list[float]], + Optional[np.ndarray], + Optional[list[list[float]]], + Optional[list[list[float]]], +]: + """ + Compute RMSD and COM drift for a single lambda state. + + Parameters + ---------- + u : mda.Universe + Universe containing the trajectory. + protein : AtomGroup or None + Protein atoms to compute RMSD for. + ligands : list of AtomGroups + Ligands to compute RMSD and COM drift for. + skip : int + Step size to skip frames (e.g., every `skip`-th frame). + + Returns + ------- + protein_rmsd : list[float] or None + RMSD of protein per frame, if protein is present. + protein_2D_rmsd : list[float] or None + Flattened 2D RMSD between all protein frames. + ligand_rmsd : list of list[float] or None + RMSD of each ligand per frame. + ligand_com_drift : list of list[float] or None + COM drift of each ligand per frame. + """ + traj_slice = u.trajectory[::skip] + # Prepare storage + if prot: + prot_positions = np.empty((len(traj_slice), len(prot), 3), dtype=np.float32) + prot_start = prot.positions.copy() + prot_rmsd = [] + else: + prot_positions = None + prot_rmsd = None + + lig_starts = [lig.positions.copy() for lig in ligands] + lig_initial_coms = [lig.center_of_mass() for lig in ligands] + lig_rmsd: list[list[float]] = [[] for _ in ligands] + lig_com_drift: list[list[float]] = [[] for _ in ligands] + + for ts_i, ts in enumerate(traj_slice): + if prot: + prot_positions[ts_i, :, :] = prot.positions + prot_rmsd.append( + rms.rmsd( + prot.positions, + prot_start, + None, # prot_weights, + center=False, + superposition=False, + ) + ) + for i, lig in enumerate(ligands): + lig_rmsd[i].append( + rms.rmsd( + lig.positions, + lig_starts[i], + lig.masses / np.mean(lig.masses), + center=False, + superposition=False, + ) + ) + lig_com_drift[i].append( + # distance between start and current ligand position + # ignores PBC, but we've already centered the traj + mda.lib.distances.calc_bonds(lig.center_of_mass(), lig_initial_coms[i]) + ) + + if prot: + # can ignore weights here as it's all Ca + rmsd2d = twoD_RMSD(prot_positions, w=None) # prot_weights) + + return prot_rmsd, rmsd2d, lig_rmsd, lig_com_drift + + def gather_rms_data( pdb_topology: pathlib.Path, dataset: pathlib.Path, @@ -123,16 +236,19 @@ def gather_rms_data( skip : int, optional step at which to progress through the trajectory. by default, selects a step that produces roughly 500 frames of analysis per replicate - ligand_selection: str = "resname UNK", - MDAnalysis selection string for ligand(s). Supports multiple ligands. - protein_selection : str, default 'protein and name CA' - MDAnalysis selection string for the protein atoms to consider. + ligand_selection : str, optional + MDAnalysis selection string for ligands (default "resname UNK"). + protein_selection : str, optional + MDAnalysis selection string for protein (default "protein and name CA"). - Produces, for each lambda state: - - 1D protein RMSD timeseries 'protein_RMSD' - - ligand RMSD timeseries - - ligand COM motion 'ligand_COM_drift' - - 2D protein RMSD plot + Returns + ------- + output : dict[str, list] + Dictionary containing: + - 'protein_RMSD': list of protein RMSD per state + - 'protein_2D_RMSD': list of 2D RMSD per state + - 'ligand_RMSD': list of ligand RMSD per state + - 'ligand_COM_drift': list of ligand COM drift per state """ output = { "protein_RMSD": [], @@ -161,99 +277,28 @@ def gather_rms_data( u_top = mda.Universe(pdb_topology) - for i in range(n_lambda): + for state in range(n_lambda): # cheeky, but we can read the PDB topology once and reuse per universe # this then only hits the PDB file once for all replicas u = make_Universe( u_top._topology, ds, - state=i, + state=state, ligand_selection=ligand_selection, protein_selection=protein_selection, ) - - prot, ligands = select_protein_and_ligands(u, protein_selection, ligand_selection) - - # Prepare storage - if prot: - prot_positions = np.empty( - (len(u.trajectory[::skip]), len(prot), 3), dtype=np.float32 - ) - prot_start = prot.positions.copy() - prot_rmsd = [] - - lig_starts = [lig.positions.copy() for lig in ligands] - lig_initial_coms = [lig.center_of_mass() for lig in ligands] - lig_rmsd: list[list[float]] = [[] for _ in ligands] - lig_com_drift: list[list[float]] = [[] for _ in ligands] - - for ts_i, ts in enumerate(u.trajectory[::skip]): - pb.update() - if prot: - prot_positions[ts_i, :, :] = prot.positions - prot_rmsd.append( - rms.rmsd( - prot.positions, - prot_start, - None, # prot_weights, - center=False, - superposition=False, - ) - ) - for i, lig in enumerate(ligands): - lig_rmsd[i].append( - rms.rmsd( - lig.positions, - lig_starts[i], - lig.masses / np.mean(lig.masses), - center=False, - superposition=False, - ) - ) - lig_com_drift[i].append( - # distance between start and current ligand position - # ignores PBC, but we've already centered the traj - mda.lib.distances.calc_bonds(lig.center_of_mass(), lig_initial_coms[i]) - ) + prot, ligands = _select_protein_and_ligands(u, protein_selection, ligand_selection) + prot_rmsd, rmsd2d, lig_rmsd, lig_com_drift = analyze_state(u, prot, ligands, skip) if prot: - # can ignore weights here as it's all Ca - rmsd2d = twoD_RMSD(prot_positions, w=None) # prot_weights) output["protein_RMSD"].append(prot_rmsd) output["protein_2D_RMSD"].append(rmsd2d) + if ligands: output["ligand_RMSD"].append(lig_rmsd) output["ligand_COM_drift"].append(lig_com_drift) output["time(ps)"] = list(np.arange(len(u.trajectory))[::skip] * u.trajectory.dt) - - return output - - -def twoD_RMSD(positions: np.ndarray, w: Optional[npt.NDArray]) -> list[float]: - """2 dimensions RMSD - - Parameters - ---------- - positions : np.ndarray - the protein positions for the entire trajectory - w : np.ndarray, optional - weights array - - Returns - ------- - rmsd_matrix : list - a flattened version of the 2d - """ - nframes, _, _ = positions.shape - - output = [] - - for i, j in itertools.combinations(range(nframes), 2): - posi, posj = positions[i], positions[j] - - rmsd = rms.rmsd(posi, posj, w, center=True, superposition=True) - - output.append(rmsd) + pb.update(len(u.trajectory[::skip])) return output