diff --git a/.gitignore b/.gitignore index bee2d5e0ec..a87e6b816e 100644 --- a/.gitignore +++ b/.gitignore @@ -69,5 +69,9 @@ build/* *.log *.xml -# AI Agent files +# AI Agent files and folders AGENTS.md +.claude/* +.vexb/* + +ARC.egg-info/* diff --git a/ARC.py b/ARC.py index 0707ec2130..bba6da4c2c 100644 --- a/ARC.py +++ b/ARC.py @@ -11,6 +11,8 @@ from arc.common import read_yaml_file from arc.main import ARC +from arc.tckdb.config import TCKDBConfig +from arc.tckdb.sweep import run_upload_sweep def parse_command_line_arguments(command_line_args=None): @@ -59,8 +61,32 @@ def main(): input_dict['verbose'] = input_dict['verbose'] if 'verbose' in input_dict else verbose if 'project_directory' not in input_dict or not input_dict['project_directory']: input_dict['project_directory'] = project_directory + + tckdb_config = TCKDBConfig.from_dict(input_dict.pop('tckdb', None)) + arc_object = ARC(**input_dict) - arc_object.execute() + arc_object.tckdb_config = tckdb_config + if tckdb_config is not None: + print(f'TCKDB integration enabled: {tckdb_config.base_url}') + + # Persistent SSH pool lives for the duration of the run; close it + # explicitly on every exit path (success, error, ctrl-C) so we don't + # leave paramiko Transports orphaned. Lazily instantiated on first + # remote-queue job, so this is a no-op for fully-local runs. + try: + arc_object.execute() + + if tckdb_config is not None: + from arc.tckdb.adapter import TCKDBAdapter + adapter = TCKDBAdapter(tckdb_config, project_directory=arc_object.project_directory) + run_upload_sweep( + adapter=adapter, + project_directory=arc_object.project_directory, + tckdb_config=tckdb_config, + ) + finally: + from arc.job.ssh_pool import reset_default_pool + reset_default_pool() if __name__ == '__main__': diff --git a/ARC_test.py b/ARC_test.py new file mode 100644 index 0000000000..dea1f49964 --- /dev/null +++ b/ARC_test.py @@ -0,0 +1,355 @@ +"""Tests for the ARC.py end-of-run TCKDB upload sweep dispatcher. + +These tests focus on the wiring between ``tckdb.upload_mode`` and the +adapter method that gets called per species. They use a stub adapter so +no network or live ARC objects are required. +""" + +import os +import shutil +import tempfile +import unittest +from pathlib import Path +from types import SimpleNamespace +from unittest import mock + +import yaml + +from arc.tckdb.adapter import UploadOutcome +from arc.tckdb.config import TCKDBConfig +from arc.tckdb.sweep import _resolve_artifact_path, run_upload_sweep + + +# -------------------------------------------------------------------------- +# Test doubles +# -------------------------------------------------------------------------- + + +class _StubAdapter: + """Records which adapter method was called per species, no network.""" + + def __init__(self, *, conformer_outcome=None, bundle_outcome=None, + conformer_raises=None, bundle_raises=None): + self.conformer_calls = [] + self.bundle_calls = [] + self.artifact_calls = [] + self._conformer_outcome = conformer_outcome + self._bundle_outcome = bundle_outcome + self._conformer_raises = conformer_raises + self._bundle_raises = bundle_raises + + def submit_from_output(self, *, output_doc, species_record): + self.conformer_calls.append(species_record.get("label")) + if self._conformer_raises is not None: + raise self._conformer_raises + return self._conformer_outcome + + def submit_computed_species_from_output(self, *, output_doc, species_record): + self.bundle_calls.append(species_record.get("label")) + if self._bundle_raises is not None: + raise self._bundle_raises + return self._bundle_outcome + + def submit_artifacts_for_calculation(self, **kwargs): + self.artifact_calls.append(kwargs) + return None + + +def _outcome(status, *, label="ethanol", error=None, + primary=None, additional=None): + """Build a stand-in UploadOutcome with the fields the sweep reads.""" + return UploadOutcome( + status=status, + payload_path=Path(f"/tmp/{label}.payload.json"), + sidecar_path=Path(f"/tmp/{label}.meta.json"), + idempotency_key=f"arc:test:{label}:k:abc1234567890def", + error=error, + primary_calculation=primary, + additional_calculations=additional or [], + ) + + +# -------------------------------------------------------------------------- +# Fixtures +# -------------------------------------------------------------------------- + + +def _write_output_yml(project_dir: str, *, species_labels=("CCO",), with_ts=False): + """Write a minimal ``output.yml`` matching what the sweep reads.""" + out_dir = os.path.join(project_dir, "output") + os.makedirs(out_dir, exist_ok=True) + doc = { + "schema_version": "1.0", + "project": "test_project", + "arc_version": "0.0.0", + "opt_level": {"method": "wb97xd", "basis": "def2-tzvp", "software": "gaussian"}, + "species": [ + { + "label": label, + "smiles": "CCO", + "charge": 0, + "multiplicity": 1, + "is_ts": False, + "converged": True, + "xyz": "C 0.0 0.0 0.0\nH 1.0 0.0 0.0", + "opt_n_steps": 12, + "opt_final_energy_hartree": -154.0, + "ess_versions": {"opt": "Gaussian 16, Revision A.03"}, + } + for label in species_labels + ], + "transition_states": [ + {"label": "TS0", "is_ts": True, "converged": True} + ] if with_ts else [], + } + with open(os.path.join(out_dir, "output.yml"), "w") as f: + yaml.safe_dump(doc, f) + return doc + + +# -------------------------------------------------------------------------- +# Dispatch behavior +# -------------------------------------------------------------------------- + + +class TestRunTckdbUploadSweepDispatch(unittest.TestCase): + """Wiring tests: which adapter method gets called per upload_mode.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp(prefix="arc-sweep-test-") + self.addCleanup(shutil.rmtree, self.tmp, ignore_errors=True) + _write_output_yml(self.tmp) + self.arc_object = SimpleNamespace(project_directory=self.tmp) + + def _cfg(self, **overrides): + defaults = dict( + enabled=True, + base_url="http://localhost:8000/api/v1", + api_key_env="X_TCKDB_API_KEY", + ) + defaults.update(overrides) + return TCKDBConfig(**defaults) + + # ---------------- 1: missing upload_mode → conformer (default) + def test_default_mode_uses_legacy_conformer_path(self): + cfg = self._cfg() # upload_mode defaults to "conformer" + adapter = _StubAdapter(conformer_outcome=_outcome("uploaded")) + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + self.assertEqual(adapter.conformer_calls, ["CCO"]) + self.assertEqual(adapter.bundle_calls, []) + + # ---------------- 2: explicit conformer + def test_explicit_conformer_mode_uses_legacy_path(self): + cfg = self._cfg(upload_mode="conformer") + adapter = _StubAdapter(conformer_outcome=_outcome("uploaded")) + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + self.assertEqual(adapter.conformer_calls, ["CCO"]) + self.assertEqual(adapter.bundle_calls, []) + + # ---------------- 3: computed_species → bundle path + def test_computed_species_mode_dispatches_bundle(self): + cfg = self._cfg(upload_mode="computed_species") + adapter = _StubAdapter(bundle_outcome=_outcome("uploaded")) + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + self.assertEqual(adapter.bundle_calls, ["CCO"]) + self.assertEqual(adapter.conformer_calls, []) + + # ---------------- 4: bundle path never calls legacy + def test_computed_species_does_not_call_legacy_submit(self): + # Multiple species so we'd notice any leak across iterations. + _write_output_yml(self.tmp, species_labels=("CCO", "CO", "CC")) + cfg = self._cfg(upload_mode="computed_species") + adapter = _StubAdapter(bundle_outcome=_outcome("uploaded")) + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + self.assertEqual(adapter.bundle_calls, ["CCO", "CO", "CC"]) + self.assertEqual(adapter.conformer_calls, []) + # And no per-artifact sweep call: bundles inline artifacts. + self.assertEqual(adapter.artifact_calls, []) + + # ---------------- 5: failure in bundle mode is recorded; sweep continues + def test_computed_species_failure_continues_to_next_species(self): + _write_output_yml(self.tmp, species_labels=("CCO", "CO")) + cfg = self._cfg(upload_mode="computed_species") + # First species: outcome with status=failed (non-strict path). + # Second species: outcome with status=uploaded. + # We achieve "different per call" by mutating the stub's outcome + # mid-sweep, since the stub returns the same outcome each call by + # default. Use a side-effect via a wrapper instead. + outcomes = iter([ + _outcome("failed", label="CCO", error="HTTP 503"), + _outcome("uploaded", label="CO"), + ]) + adapter = _StubAdapter() + adapter.submit_computed_species_from_output = ( + lambda *, output_doc, species_record: ( + adapter.bundle_calls.append(species_record.get("label")) + or next(outcomes) + ) + ) + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + # Both species processed; first failed, second uploaded. + self.assertEqual(adapter.bundle_calls, ["CCO", "CO"]) + + # ---------------- 5b: an unhandled exception in bundle mode is caught + def test_computed_species_exception_is_caught_and_logged(self): + _write_output_yml(self.tmp, species_labels=("CCO", "CO")) + cfg = self._cfg(upload_mode="computed_species") + # Simulate an unhandled exception on the FIRST species; second + # should still be attempted (matches conformer-mode behavior). + call_log = [] + def fake_submit(*, output_doc, species_record): + label = species_record.get("label") + call_log.append(label) + if label == "CCO": + raise RuntimeError("boom") + return _outcome("uploaded", label=label) + adapter = _StubAdapter() + adapter.submit_computed_species_from_output = fake_submit + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + self.assertEqual(call_log, ["CCO", "CO"]) + + # ---------------- 6: sidecar written before live upload failure (bundle) + def test_bundle_mode_sidecar_written_before_upload_failure(self): + # This is fundamentally an adapter-level guarantee, but we verify + # the wiring preserves it: a "failed" outcome carrying real + # payload_path and sidecar_path values means the sweep still + # passes those upward to the user. + cfg = self._cfg(upload_mode="computed_species") + sentinel_payload = Path("/tmp/sentinel.payload.json") + sentinel_sidecar = Path("/tmp/sentinel.meta.json") + outcome = UploadOutcome( + status="failed", + payload_path=sentinel_payload, + sidecar_path=sentinel_sidecar, + idempotency_key="arc:t:CCO:c:abc1234567890def", + error="HTTP 503", + ) + adapter = _StubAdapter(bundle_outcome=outcome) + # Capture stdout to confirm the failure summary is printed + # (don't assert on exact text — assert on key tokens). + with mock.patch("builtins.print") as mock_print: + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + printed = "\n".join(str(c.args[0]) for c in mock_print.call_args_list) + self.assertIn("computed-species bundle", printed) + self.assertIn("failed: 1", printed) + self.assertIn("HTTP 503", printed) + + +# -------------------------------------------------------------------------- +# Summary-print mode awareness +# -------------------------------------------------------------------------- + + +class TestSweepSummaryByMode(unittest.TestCase): + """The summary line names the mode; bundle mode omits the artifact line.""" + + def setUp(self): + self.tmp = tempfile.mkdtemp(prefix="arc-sweep-summary-") + self.addCleanup(shutil.rmtree, self.tmp, ignore_errors=True) + _write_output_yml(self.tmp) + self.arc_object = SimpleNamespace(project_directory=self.tmp) + + def _run_with_mode(self, *, upload_mode, artifacts_upload=False): + from arc.tckdb.config import TCKDBArtifactConfig + cfg = TCKDBConfig( + enabled=True, base_url="http://x", api_key_env="X", + upload_mode=upload_mode, + artifacts=TCKDBArtifactConfig(upload=artifacts_upload), + ) + adapter = _StubAdapter( + conformer_outcome=_outcome("uploaded"), + bundle_outcome=_outcome("uploaded"), + ) + with mock.patch("builtins.print") as mock_print: + run_upload_sweep(adapter=adapter, project_directory=self.arc_object.project_directory, tckdb_config=cfg) + return "\n".join(str(c.args[0]) for c in mock_print.call_args_list) + + def test_conformer_mode_summary_says_conformer(self): + out = self._run_with_mode(upload_mode="conformer") + self.assertIn("conformer/calculation", out) + self.assertNotIn("computed-species bundle", out) + + def test_bundle_mode_summary_says_bundle(self): + out = self._run_with_mode(upload_mode="computed_species") + self.assertIn("computed-species bundle", out) + self.assertNotIn("conformer/calculation", out) + + def test_bundle_mode_omits_artifact_line_even_when_enabled(self): + # Inline artifacts mean the standalone artifact tally would mislead. + out = self._run_with_mode(upload_mode="computed_species", artifacts_upload=True) + self.assertNotIn("artifacts: uploaded", out) + + def test_conformer_mode_emits_artifact_line_when_enabled(self): + out = self._run_with_mode(upload_mode="conformer", artifacts_upload=True) + self.assertIn("artifacts:", out) + + +# -------------------------------------------------------------------------- +# _resolve_artifact_path: prefer recorded _input over derivation +# -------------------------------------------------------------------------- + + +class TestResolveArtifactPath(unittest.TestCase): + """The legacy artifact sweep prefers ``output.yml``'s ``_input`` + field, falling back to settings-based derivation only when absent.""" + + def test_input_kind_prefers_recorded_field(self): + """When ``opt_input`` is on the record, it wins over the derived path.""" + species_record = { + "opt_log": "calcs/CH4/opt/input.log", + "opt_input": "calcs/CH4/opt/explicit_input.gjf", # NEW field + } + output_doc = {"opt_level": {"software": "gaussian"}} + path = _resolve_artifact_path( + kind="input", calc_type="opt", + species_record=species_record, output_doc=output_doc, + ) + self.assertEqual(path, "calcs/CH4/opt/explicit_input.gjf") + + def test_input_kind_falls_back_to_settings_when_field_absent(self): + """Older output.yml without ``_input`` still resolves via settings.""" + species_record = {"opt_log": "/abs/calcs/CH4/opt/input.log"} + output_doc = {"opt_level": {"software": "gaussian"}} + path = _resolve_artifact_path( + kind="input", calc_type="opt", + species_record=species_record, output_doc=output_doc, + ) + # Derived sibling: input.gjf next to the log. + self.assertEqual(path, "/abs/calcs/CH4/opt/input.gjf") + + def test_input_kind_falls_back_when_recorded_field_is_none(self): + """Explicit ``None`` in the record (deck wasn't kept) → fallback.""" + species_record = { + "opt_log": "/abs/calcs/CH4/opt/input.log", + "opt_input": None, + } + output_doc = {"opt_level": {"software": "gaussian"}} + path = _resolve_artifact_path( + kind="input", calc_type="opt", + species_record=species_record, output_doc=output_doc, + ) + self.assertEqual(path, "/abs/calcs/CH4/opt/input.gjf") + + def test_input_kind_per_job_picks_correct_recorded_field(self): + """Different calcs hit different ``_input`` fields, not all opt's.""" + species_record = { + "opt_log": "/abs/opt.log", "opt_input": "/abs/opt_deck.gjf", + "freq_log": "/abs/freq.log", "freq_input": "/abs/freq_deck.gjf", + "sp_log": "/abs/sp.log", "sp_input": "/abs/sp_deck.in", # cross-software run + } + output_doc = {"opt_level": {"software": "gaussian"}} + for calc, expected in ( + ("opt", "/abs/opt_deck.gjf"), + ("freq", "/abs/freq_deck.gjf"), + ("sp", "/abs/sp_deck.in"), # NOT input.gjf — sp uses its own software + ): + path = _resolve_artifact_path( + kind="input", calc_type=calc, + species_record=species_record, output_doc=output_doc, + ) + self.assertEqual(path, expected, + msg=f"{calc}: expected {expected}, got {path}") + + +if __name__ == "__main__": + unittest.main() diff --git a/arc/imports.py b/arc/imports.py index ba385c3770..ff4be11939 100644 --- a/arc/imports.py +++ b/arc/imports.py @@ -10,13 +10,30 @@ from arc.settings.submit import incore_commands, pipe_submit, submit_scripts +def _local_overlays_disabled() -> bool: + """Return True when ~/.arc/{settings,submit,inputs}.py overlays should be skipped. + + The repo's arc/settings/settings.py is always loaded as the baseline. This + guard only controls whether a user's personal ~/.arc/*.py files are layered + on top — we want them ignored under pytest (so tests see the same defaults + CI does) and overridable explicitly via env var. + + Triggered by either pytest being loaded (`'pytest' in sys.modules`, true + from collection onward) or an explicit ARC_IGNORE_LOCAL_SETTINGS=1 env var. + """ + if os.environ.get('ARC_IGNORE_LOCAL_SETTINGS') == '1': + return True + return 'pytest' in sys.modules + + # Common imports where the user can optionally put a modified copy of settings.py or submit.py file under ~/.arc home = os.getenv("HOME") or os.path.expanduser("~") local_arc_path = os.path.join(home, '.arc') +_skip_local = _local_overlays_disabled() local_arc_settings_path = os.path.join(local_arc_path, 'settings.py') settings = {key: val for key, val in vars(arc_settings).items() if '__' not in key} -if os.path.isfile(local_arc_settings_path): +if not _skip_local and os.path.isfile(local_arc_settings_path): local_settings = dict() if local_arc_path not in sys.path: sys.path.insert(1, local_arc_path) @@ -32,7 +49,7 @@ if 'global_ess_settings' in local_settings_dict and local_settings_dict['global_ess_settings'] else None local_arc_submit_path = os.path.join(local_arc_path, 'submit.py') -if os.path.isfile(local_arc_submit_path): +if not _skip_local and os.path.isfile(local_arc_submit_path): local_incore_commands, local_pipe_submit, local_submit_scripts = dict(), dict(), dict() if local_arc_path not in sys.path: sys.path.insert(1, local_arc_path) @@ -56,7 +73,7 @@ submit_scripts.update(local_submit_scripts) local_arc_inputs_path = os.path.join(local_arc_path, 'inputs.py') -if os.path.isfile(local_arc_inputs_path): +if not _skip_local and os.path.isfile(local_arc_inputs_path): local_input_files = dict() if local_arc_path not in sys.path: sys.path.insert(1, local_arc_path) diff --git a/arc/job/adapter.py b/arc/job/adapter.py index 040c9920fe..1198fd1fef 100644 --- a/arc/job/adapter.py +++ b/arc/job/adapter.py @@ -218,9 +218,82 @@ def execute(self): with an HDF5 file that contains specific directions. The output is returned within the HDF5 file. The new ARC instance, representing a single worker, will run all of its jobs incore. + + Connection sharing: for remote-queue jobs we lease one + :class:`SSHClient` from the process-global pool + (:mod:`arc.job.ssh_pool`) and reuse it for both file upload and + qsub/sbatch submission within this call. Across an entire ARC + run, every remote job for a given server reuses the *same* + pooled client — 100 TS guess opts share one paramiko Transport + instead of opening 200. Pipe mode currently can't bundle these + (``should_use_pipe`` refuses non-``local`` servers, see + ``arc/job/pipe/pipe_coordinator.py:77``); the pool is the + leverage available short of full remote-pipe support. """ - self.upload_files() execution_type = JobExecutionTypeEnum(self.execution_type) + use_shared_ssh = ( + execution_type == JobExecutionTypeEnum.queue + and self.server is not None + and self.server != 'local' + and not self.testing + ) + if use_shared_ssh: + from arc.job.ssh_pool import get_default_pool + with get_default_pool().borrow(self.server) as ssh: + self._shared_ssh = ssh + try: + self._dispatch_execution(execution_type) + finally: + # Pool retains the SSHClient; clearing the attr + # just prevents a later code path on this adapter + # from grabbing a stale reference if the pool + # subsequently reaps and reopens the connection. + self._shared_ssh = None + else: + self._dispatch_execution(execution_type) + if not self.restarted: + self._write_initiated_job_to_csv_file() + + def _open_or_borrow_ssh(self): + """Yield an :class:`SSHClient` for ``self.server``, in priority order: + + 1. ``self._shared_ssh`` if set — the per-call client opened by + :meth:`execute`. Available within the upload+submit window. + 2. The process-global pool (:mod:`arc.job.ssh_pool`) — keeps + one client alive across jobs for the run's lifetime, so the + hot status-poll loop reuses connections. + 3. A fresh ``SSHClient`` opened just for this call — only hit + when the pool can't construct one (testing, exotic env). + + Returns a context manager that does NOT close the underlying + client on exit; the pool retains ownership in case (2), and + case (3) opens-and-closes inline. + """ + from contextlib import contextmanager + shared = getattr(self, '_shared_ssh', None) + if shared is not None: + @contextmanager + def _shared_cm(): + yield shared + return _shared_cm() + try: + from arc.job.ssh_pool import get_default_pool + return get_default_pool().borrow(self.server) + except Exception: + # Pool refused (e.g., factory failed). Fall back to a + # one-shot client so we degrade gracefully — the caller + # gets correctness at the cost of one connection. + logger.debug("ssh pool unavailable; opening one-shot client", exc_info=True) + @contextmanager + def _fresh_cm(): + with SSHClient(self.server) as fresh: + yield fresh + return _fresh_cm() + + def _dispatch_execution(self, execution_type: 'JobExecutionTypeEnum') -> None: + """Inner body of :meth:`execute`, factored out so the SSH-share + wrapper around it stays small and readable.""" + self.upload_files() if execution_type == JobExecutionTypeEnum.incore: self.initial_time = datetime.datetime.now() self.job_status[0] = 'running' @@ -235,19 +308,25 @@ def execute(self): raise ValueError('Pipe execution is handled at the Scheduler level. ' 'JobAdapters inside a pipe must be executed by the worker ' "with execution_type='incore'.") - if not self.restarted: - self._write_initiated_job_to_csv_file() - def legacy_queue_execution(self): + def legacy_queue_execution(self, ssh: 'SSHClient | None' = None): """ Execute a job to the server's queue. The server could be either "local" or remote. + + ``ssh`` is an explicitly-passed shared connection. When ``None`` + we route through :meth:`_open_or_borrow_ssh` which prefers + ``self._shared_ssh`` (set by :meth:`execute`), then the + process-global pool, then opens fresh. """ self._log_job_execution() # Submit to queue, differentiate between local (same machine using its queue) and remote servers. if self.server != 'local': - with SSHClient(self.server) as ssh: + if ssh is not None: self.job_status[0], self.job_id = ssh.submit_job(remote_path=self.remote_path) + else: + with self._open_or_borrow_ssh() as borrowed: + self.job_status[0], self.job_id = borrowed.submit_job(remote_path=self.remote_path) else: # submit to the local queue self.job_status[0], self.job_id = submit_job(path=self.local_path) @@ -363,26 +442,24 @@ def set_file_paths(self): self.set_additional_file_paths() - def upload_files(self): + def upload_files(self, ssh: 'SSHClient | None' = None): """ Upload the relevant files for the job. + + ``ssh`` is an explicitly-passed shared connection. When ``None`` + we route through :meth:`_open_or_borrow_ssh` which prefers + ``self._shared_ssh`` (set by :meth:`execute`), then the + process-global pool, then opens fresh. """ if not self.testing: if self.execution_type != 'incore' and self.server != 'local': # If the job execution type is incore, then no need to upload any files. # Also, even if the job is submitted to the que, no need to upload files if the server is local. - with SSHClient(self.server) as ssh: - for up_file in self.files_to_upload: - logger.debug(f"Uploading {up_file['file_name']} source {up_file['source']} to {self.server}") - if up_file['source'] == 'path': - ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=up_file['local']) - elif up_file['source'] == 'input_files': - ssh.upload_file(remote_file_path=up_file['remote'], file_string=up_file['local']) - else: - raise ValueError(f"Unclear file source for {up_file['file_name']}. Should either be 'path' or " - f"'input_files', got: {up_file['source']}") - if up_file['make_x']: - ssh.change_mode(mode='+x', file_name=up_file['file_name'], remote_path=self.remote_path) + if ssh is not None: + self._upload_with_ssh(ssh) + else: + with self._open_or_borrow_ssh() as borrowed: + self._upload_with_ssh(borrowed) else: # running locally, just copy the check file, if exists, to the job folder for up_file in self.files_to_upload: @@ -393,6 +470,25 @@ def upload_files(self): pass self.initial_time = datetime.datetime.now() + def _upload_with_ssh(self, ssh) -> None: + """SFTP-put every entry in ``self.files_to_upload`` over an open client. + + Factored out of :meth:`upload_files` so the with-shared vs. + with-new code paths share one body — adding a future per-file + knob (compression, retry, throttle) lands in one place. + """ + for up_file in self.files_to_upload: + logger.debug(f"Uploading {up_file['file_name']} source {up_file['source']} to {self.server}") + if up_file['source'] == 'path': + ssh.upload_file(remote_file_path=up_file['remote'], local_file_path=up_file['local']) + elif up_file['source'] == 'input_files': + ssh.upload_file(remote_file_path=up_file['remote'], file_string=up_file['local']) + else: + raise ValueError(f"Unclear file source for {up_file['file_name']}. Should either be 'path' or " + f"'input_files', got: {up_file['source']}") + if up_file['make_x']: + ssh.change_mode(mode='+x', file_name=up_file['file_name'], remote_path=self.remote_path) + def download_files(self): """ Download the relevant files. @@ -401,7 +497,7 @@ def download_files(self): if self.execution_type != 'incore' and self.server != 'local': # If the job execution type is incore, then no need to download any files. # Also, even if the job is submitted to the que, no need to download files if the server is local. - with SSHClient(self.server) as ssh: + with self._open_or_borrow_ssh() as ssh: for dl_file in self.files_to_download: ssh.download_file(remote_file_path=dl_file['remote'], local_file_path=dl_file['local']) self.set_initial_and_final_times(ssh=ssh) @@ -409,6 +505,16 @@ def download_files(self): self.set_initial_and_final_times() self.final_time = self.final_time or datetime.datetime.now() + def remove_remote_files(self): + """ + Remove the job's remote work directory after a successful run, to keep cluster quota in check. + No-op for local servers or when no remote_path is set. + """ + if self.server is None or self.server == 'local' or not self.remote_path: + return + with self._open_or_borrow_ssh() as ssh: + ssh.remove_dir(remote_path=self.remote_path) + def set_initial_and_final_times(self, ssh: SSHClient | None = None): """ Set the end time of the job. @@ -701,7 +807,7 @@ def delete(self): logger.debug(f'Deleting job {self.job_name} for {self.species_label}') if self.server != 'local': logger.debug(f'deleting job on {self.server}...') - with SSHClient(self.server) as ssh: + with self._open_or_borrow_ssh() as ssh: ssh.delete_job(self.job_id) else: logger.debug('deleting job locally...') @@ -764,20 +870,20 @@ def _get_additional_job_info(self): content = '' cluster_soft = servers[self.server]['cluster_soft'].lower() if cluster_soft in ['oge', 'sge', 'slurm', 'pbs', 'htcondor']: + # job.log is HTCondor's native event log; other clusters don't produce one. + include_job_log = cluster_soft == 'htcondor' local_file_path_1 = os.path.join(self.local_path, 'out.txt') local_file_path_2 = os.path.join(self.local_path, 'err.txt') - local_file_path_3 = os.path.join(self.local_path, 'job.log') + local_file_path_3 = os.path.join(self.local_path, 'job.log') if include_job_log else None if self.server != 'local' and self.remote_path is not None and not self.testing: - remote_file_path_1 = os.path.join(self.remote_path, 'out.txt') - remote_file_path_2 = os.path.join(self.remote_path, 'err.txt') - remote_file_path_3 = os.path.join(self.remote_path, 'job.log') - with SSHClient(self.server) as ssh: - for local_file_path, remote_file_path in zip([local_file_path_1, - local_file_path_2, - local_file_path_3], - [remote_file_path_1, - remote_file_path_2, - remote_file_path_3]): + remote_paths = [os.path.join(self.remote_path, 'out.txt'), + os.path.join(self.remote_path, 'err.txt')] + local_paths = [local_file_path_1, local_file_path_2] + if include_job_log: + remote_paths.append(os.path.join(self.remote_path, 'job.log')) + local_paths.append(local_file_path_3) + with self._open_or_borrow_ssh() as ssh: + for local_file_path, remote_file_path in zip(local_paths, remote_paths): try: ssh.download_file(remote_file_path=remote_file_path, local_file_path=local_file_path) @@ -787,7 +893,7 @@ def _get_additional_job_info(self): f'flags with stdout and stderr of out.txt and err.txt, respectively ' f'(e.g., "#SBATCH -o out.txt"). Error message:') logger.warning(e) - for local_file_path in [local_file_path_1, local_file_path_2, local_file_path_3]: + for local_file_path in filter(None, [local_file_path_1, local_file_path_2, local_file_path_3]): if os.path.isfile(local_file_path): with open(local_file_path, 'r') as f: lines = f.readlines() @@ -803,7 +909,7 @@ def _check_job_server_status(self) -> str: Possible statuses: ``initializing``, ``running``, ``errored on node xx``, ``done``. """ if self.server != 'local' and not self.testing: - with SSHClient(self.server) as ssh: + with self._open_or_borrow_ssh() as ssh: return ssh.check_job_status(self.job_id) else: return check_job_status(self.job_id) diff --git a/arc/job/adapter_test.py b/arc/job/adapter_test.py index dd1a520620..7cc267f7b5 100644 --- a/arc/job/adapter_test.py +++ b/arc/job/adapter_test.py @@ -89,6 +89,11 @@ def setUpClass(cls): A method that is run before all unit tests in this class. """ cls.maxDiff = None + # Register project-dir cleanups before any fixture creation so they + # still fire if a constructor below raises mid-setUpClass — that's + # how leftover scratch files end up committed to the repo. + for subdir in ('test_JobAdapter', 'test_JobAdapter_scan', 'test_JobAdapter_ServerTimeLimit'): + cls.addClassCleanup(shutil.rmtree, os.path.join(ARC_TESTING_PATH, subdir), ignore_errors=True) cls.job_1 = GaussianAdapter(execution_type='queue', job_type='conf_opt', level=Level(method='cbs-qb3'), @@ -341,18 +346,6 @@ def test_troubleshoot_queue(self): self.assertIn('middle_queue', self.job_6.attempted_queues) - - @classmethod - def tearDownClass(cls): - """ - A function that is run ONCE after all unit tests in this class. - Delete all project directories created during these unit tests - """ - shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter'), ignore_errors=True) - shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_scan'), ignore_errors=True) - shutil.rmtree(os.path.join(ARC_TESTING_PATH, 'test_JobAdapter_ServerTimeLimit'), ignore_errors=True) - - class TestRotateCSV(unittest.TestCase): """ Contains unit tests for the CSV rotation logic. @@ -410,5 +403,374 @@ def test_multiple_rotations(self): self.assertEqual(len(archives), 2) +# --------------------------------------------------------------------------- +# SSH connection sharing & pooling (Options 1 + 2). +# +# Option 1 (per-job share): one SSHClient covers both upload and submit +# inside a single execute() call — collapses 2N connections to N. +# Option 2 (process-lifetime pool): the SSHClient for a given server is +# kept alive across jobs — collapses N to a small constant. +# --------------------------------------------------------------------------- + + +class _SSHClientStub: + """In-memory SSHClient lookalike for the pool to hand out. + + Records every upload/submit so tests can assert which calls landed + on which (shared) client. The pool calls ``connect()`` after + instantiation; we no-op that since there's no real socket. + """ + + def __init__(self, server): + self.server = server + self.uploaded = [] + self.submits = [] + self.downloaded = [] + self._closed = False + # Mimic SSHClient's ``_ssh`` attribute so ssh_pool._is_alive() + # finds an active fake-Transport. + self._ssh = _FakeParamikoSSH() + + def connect(self): + pass # the real one opens TCP+auth; we no-op for tests + + def close(self): + self._closed = True + self._ssh = None + + def __enter__(self): + return self + + def __exit__(self, *a): + return False + + def upload_file(self, *, remote_file_path, local_file_path=None, file_string=None): + self.uploaded.append(remote_file_path) + + def submit_job(self, remote_path, recursion=False): + self.submits.append(remote_path) + return 'initializing', 12345 + + def change_mode(self, *, mode, file_name, remote_path): + pass + + # Methods that the post-submit lifecycle paths exercise. + def check_job_status(self, job_id): + return 'running' + + def download_file(self, *, remote_file_path, local_file_path): + self.downloaded.append(remote_file_path) + + def remove_dir(self, *, remote_path): + pass + + def delete_job(self, job_id): + pass + + +class _FakeParamikoSSH: + """Stand-in for paramiko.SSHClient — _is_alive checks Transport.is_active().""" + def get_transport(self): + return _FakeTransport() + + +class _FakeTransport: + def is_active(self): + return True + + +class _StubFactoryPool: + """A pool whose factory builds _SSHClientStub instead of real SSHClient. + + Wraps the production ``SSHConnectionPool`` so reuse + lifecycle + semantics are exactly the production behavior — only the + underlying object is faked. + """ + + def __init__(self): + from arc.job.ssh_pool import SSHConnectionPool + self.created = [] # log of every server name we built a client for + def factory(server): + client = _SSHClientStub(server) + self.created.append(server) + return client + self._inner = SSHConnectionPool(factory=factory) + + def borrow(self, server): + return self._inner.borrow(server) + + def close_all(self): + self._inner.close_all() + + @property + def opens(self): + return self._inner.opens + + @property + def borrows(self): + return self._inner.borrows + + +class _MinimalAdapter(JobAdapter): + """Concrete JobAdapter with just enough state to exercise execute(). + + Skips the heavyweight construction the GaussianAdapter does — we + only need ``server``, ``execution_type``, ``files_to_upload``, + ``remote_path``, and ``testing=False`` for the SSH-share path. + """ + + job_adapter = 'mockter' + + def __init__(self, *, server, execution_type='queue'): + # Bypass JobAdapter.__init__ entirely — all of its real work + # (file paths, settings, csv setup) is unrelated to the SSH + # share contract we're testing here. + self.server = server + self.execution_type = execution_type + self.testing = False + self.restarted = True # skip _write_initiated_job_to_csv_file + self.files_to_upload = [ + {'file_name': 'input.gjf', 'source': 'path', + 'local': '/local/input.gjf', 'remote': '/remote/input.gjf', 'make_x': False}, + {'file_name': 'submit.sh', 'source': 'path', + 'local': '/local/submit.sh', 'remote': '/remote/submit.sh', 'make_x': True}, + ] + self.remote_path = '/remote' + self.local_path = '/local' + self.job_status = ['initializing', {'status': 'initializing'}] + self.job_id = 0 + self.initial_time = None + self.final_time = None + self.job_name = 'job_test' + self.species_label = 'spc_test' + + # JobAdapter requires these abstracts; trivial bodies are fine. + def execute_incore(self): pass + def execute_queue(self): self.legacy_queue_execution() + def write_input_file(self): pass + def set_files(self): pass + def set_additional_file_paths(self): pass + def set_input_file_memory(self): pass + def upload_during_execution(self): pass + def _log_job_execution(self): pass + + +class TestSSHConnectionSharing(unittest.TestCase): + """``execute()`` shares one SSHClient per remote-queue job, and the + pool reuses it across jobs.""" + + def setUp(self): + # Inject a pool whose factory builds stubs, so the test never + # tries to open a real SSH connection to a server that isn't + # in this user's settings (e.g., 'server2'). + import arc.job.ssh_pool as _pool + self._stub_pool = _StubFactoryPool() + _pool.set_default_pool(self._stub_pool) + # Also stub the legacy-direct path: bare + # ``legacy_queue_execution()`` (called outside execute()) uses + # the SSHClient class in ``arc.job.adapter`` directly, so patch + # that name with a context-manager wrapper around our stub. + self._direct_patch = patch( + 'arc.job.adapter.SSHClient', + lambda server: _SSHClientStub(server), + ) + self._direct_patch.start() + + def tearDown(self): + import arc.job.ssh_pool as _pool + _pool.set_default_pool(None) + self._direct_patch.stop() + + def test_remote_queue_opens_one_ssh_per_job(self): + """Upload + submit share a single SSHClient inside one execute().""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + # One SSHClient created (the pool's first borrow), one borrow. + self.assertEqual(self._stub_pool.opens, 1) + self.assertEqual(self._stub_pool.borrows, 1) + + def test_remote_queue_clears_shared_ssh_after_dispatch(self): + """``self._shared_ssh`` is None after execute() returns.""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + self.assertIsNone(getattr(adapter, '_shared_ssh', None)) + + def test_local_server_opens_no_ssh(self): + """local-server queue jobs use the host's queue, no SSH at all.""" + adapter = _MinimalAdapter(server='local', execution_type='queue') + with patch('arc.job.adapter.submit_job', return_value=('initializing', 99)): + adapter.execute() + self.assertEqual(self._stub_pool.opens, 0) + self.assertEqual(self._stub_pool.borrows, 0) + + def test_incore_opens_no_ssh(self): + """incore execution runs in-process — never touches SSH.""" + adapter = _MinimalAdapter(server='server2', execution_type='incore') + adapter.execute() + self.assertEqual(self._stub_pool.opens, 0) + + def test_legacy_queue_execution_routes_through_pool_when_called_directly(self): + """Even when called bare (outside execute()), legacy_queue_execution + now reuses the pool — that's Option 2's payoff for adapter + ``execute_queue`` overrides that call ``self.legacy_queue_execution()`` + from inside their own custom flow. + """ + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.legacy_queue_execution() # bare — no execute() wrapper + self.assertEqual(self._stub_pool.opens, 1) + self.assertEqual(self._stub_pool.borrows, 1) + + def test_shared_ssh_carries_uploads_and_submit(self): + """The pooled SSHClient sees both upload calls AND the submit call.""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + # Inspect the stub the pool kept. + self.assertEqual(self._stub_pool.opens, 1) + client = self._stub_pool._inner._clients['server2'] + self.assertEqual(len(client.uploaded), 2) + self.assertEqual(len(client.submits), 1) + + +class TestSSHConnectionPoolReuse(unittest.TestCase): + """The process-lifetime pool reuses one SSHClient across many jobs.""" + + def setUp(self): + import arc.job.ssh_pool as _pool + self._stub_pool = _StubFactoryPool() + _pool.set_default_pool(self._stub_pool) + + def tearDown(self): + import arc.job.ssh_pool as _pool + _pool.set_default_pool(None) + + def test_one_open_for_many_jobs_same_server(self): + """100 jobs against one server → 1 SSHClient, 100 borrows.""" + for _ in range(100): + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.execute() + self.assertEqual(self._stub_pool.opens, 1, "should reuse the same client") + self.assertEqual(self._stub_pool.borrows, 100) + + def test_separate_clients_per_distinct_server(self): + """Different servers → different clients, each opened once.""" + for _ in range(5): + _MinimalAdapter(server='server2', execution_type='queue').execute() + for _ in range(3): + _MinimalAdapter(server='server3', execution_type='queue').execute() + self.assertEqual(self._stub_pool.opens, 2) + self.assertEqual(self._stub_pool.borrows, 8) + self.assertEqual(sorted(self._stub_pool._inner._clients.keys()), + ['server2', 'server3']) + + def test_dead_client_is_reaped_and_reopened(self): + """If the underlying Transport reports inactive, pool reopens.""" + # First borrow → opens stub #1. + _MinimalAdapter(server='server2', execution_type='queue').execute() + client1 = self._stub_pool._inner._clients['server2'] + # Simulate a dead Transport (remote rebooted, etc.). + client1._ssh = None + # Next borrow should detect the dead client and open a fresh one. + _MinimalAdapter(server='server2', execution_type='queue').execute() + client2 = self._stub_pool._inner._clients['server2'] + self.assertIs(client1._closed, True, "stale client should be closed before reopen") + self.assertIsNot(client1, client2) + self.assertEqual(self._stub_pool.opens, 2) + + def test_close_all_closes_every_pooled_client(self): + for srv in ('server2', 'server3'): + _MinimalAdapter(server=srv, execution_type='queue').execute() + clients = list(self._stub_pool._inner._clients.values()) + self._stub_pool.close_all() + self.assertEqual(self._stub_pool._inner._clients, {}) + for c in clients: + self.assertTrue(c._closed) + + def test_close_all_is_idempotent(self): + _MinimalAdapter(server='server2', execution_type='queue').execute() + self._stub_pool.close_all() + # Second call must not raise or mutate state. + self._stub_pool.close_all() + self.assertEqual(self._stub_pool._inner._clients, {}) + + def test_status_poll_reuses_pooled_client(self): + """The hot path: hundreds of status checks open exactly one client. + + ARC polls a job's queue status every poll cycle for the entire + duration of the job. Pre-pool, each call opened a fresh + SSHClient. After Option 2, all polls reuse the pool's client + for that server — the dominant SSH-cost reducer in a real run. + """ + adapter = _MinimalAdapter(server='server2', execution_type='queue') + # Simulate 200 poll cycles (~1.5 hour run at 30s polling). + for _ in range(200): + adapter._check_job_server_status() + self.assertEqual(self._stub_pool.opens, 1, "pool should reuse one client") + self.assertEqual(self._stub_pool.borrows, 200) + + def test_download_files_reuses_pooled_client(self): + """download_files (called once per finished job) uses the pool too.""" + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.files_to_download = [ + {'remote': '/r/output.log', 'local': '/l/output.log'}, + ] + # set_initial_and_final_times reads file mtimes — stub it. + adapter.set_initial_and_final_times = lambda ssh=None: None + adapter.download_files() + client = self._stub_pool._inner._clients['server2'] + self.assertIn('/r/output.log', client.downloaded) + self.assertEqual(self._stub_pool.opens, 1) + + def test_full_lifecycle_one_open_per_server(self): + """Submit + many polls + download + cleanup all share one pooled client. + + End-to-end view of one job's life: this collapses what was + previously ~(2 + N_polls + 1 + 1) ≈ N+4 individual SSH + connections into a single reused client. + """ + adapter = _MinimalAdapter(server='server2', execution_type='queue') + adapter.files_to_download = [{'remote': '/r/o.log', 'local': '/l/o.log'}] + adapter.set_initial_and_final_times = lambda ssh=None: None + + adapter.execute() # upload + submit (1 borrow) + for _ in range(50): # 50 status polls + adapter._check_job_server_status() + adapter.download_files() # 1 download borrow + adapter.remove_remote_files() # 1 cleanup borrow + adapter.delete() # 1 delete borrow + + # All phases share the same pooled client. + self.assertEqual(self._stub_pool.opens, 1) + # 1 execute + 50 polls + 1 download + 1 cleanup + 1 delete = 54 borrows. + self.assertEqual(self._stub_pool.borrows, 54) + + +class TestSSHPoolDefaultLifecycle(unittest.TestCase): + """The module-level default pool is lazy and resettable.""" + + def setUp(self): + import arc.job.ssh_pool as _pool + _pool.reset_default_pool() + self._pool_module = _pool + + def tearDown(self): + self._pool_module.reset_default_pool() + + def test_get_default_pool_is_idempotent(self): + p1 = self._pool_module.get_default_pool() + p2 = self._pool_module.get_default_pool() + self.assertIs(p1, p2) + + def test_reset_default_pool_drops_the_instance(self): + p1 = self._pool_module.get_default_pool() + self._pool_module.reset_default_pool() + p2 = self._pool_module.get_default_pool() + self.assertIsNot(p1, p2) + + def test_set_default_pool_replaces_instance(self): + replacement = _StubFactoryPool() + self._pool_module.set_default_pool(replacement) + self.assertIs(self._pool_module.get_default_pool(), replacement) + + if __name__ == '__main__': unittest.main(testRunner=unittest.TextTestRunner(verbosity=2)) diff --git a/arc/job/adapters/gaussian.py b/arc/job/adapters/gaussian.py index 314e796641..71e7ca2456 100644 --- a/arc/job/adapters/gaussian.py +++ b/arc/job/adapters/gaussian.py @@ -306,10 +306,14 @@ def write_input_file(self) -> None: if input_dict['trsh']: input_dict['trsh'] += ' ' input_dict['trsh'] += 'scf=(tight,direct)' + # 'no_tight' is set by trsh_keyword_loose_disp when a previous attempt hit + # MaxOptCycles with forces converged but displacement criteria unreachable. + drop_tight = 'no_tight' in self.ess_trsh_methods + fine_opt = [] if drop_tight else ['tight'] if self.is_ts: - keywords.extend(['tight', 'maxstep=5']) + keywords.extend(fine_opt + ['maxstep=5']) else: - keywords.extend(['tight', 'maxstep=5', f'maxcycle={max_c}']) + keywords.extend(fine_opt + ['maxstep=5', f'maxcycle={max_c}']) input_dict['job_type_1'] = "opt" if self.level.method_type not in ['dft', 'composite', 'wavefunction']\ else f"opt=({', '.join(key for key in keywords)})" diff --git a/arc/job/adapters/scripts/xtb_gsm/ograd b/arc/job/adapters/scripts/xtb_gsm/ograd index d208a7502f..f5c01f4f3a 100644 --- a/arc/job/adapters/scripts/xtb_gsm/ograd +++ b/arc/job/adapters/scripts/xtb_gsm/ograd @@ -24,3 +24,21 @@ tm2orca.py $basename rm xtbrestart cd .. +# ── Per-node provenance preservation (TCKDB path_search_result.points) ── +# tm2orca.py renames the xTB-generated Turbomole-format ``energy`` +# and ``gradient`` files (xTB writes its --grad output in Turbomole's +# on-disk text format; the calculation provenance is xTB, not Turbomole) +# to ``.energy`` and ``.gradient`` +# inside scratch/. The GSM binary then consumes the ORCA-shaped +# ``.engrad`` and may overwrite or remove the per-node files on +# subsequent calls. Copy them (plus the captured xtb stdout) into a +# stable side-effect directory at the run root so the TCKDB adapter's +# parser can recover per-node electronic energies and gradient metrics +# later. The copies are not consumed by GSM — the original scratch/ +# files stay in place unchanged for the algorithm. +node_label="$1" +preserve_dir="gsm_node_outputs" +mkdir -p "$preserve_dir" +[ -f "scratch/${basename}.energy" ] && cp -p "scratch/${basename}.energy" "$preserve_dir/${node_label}.energy" +[ -f "scratch/${basename}.gradient" ] && cp -p "scratch/${basename}.gradient" "$preserve_dir/${node_label}.gradient" +[ -f "scratch/${ofile}.xtbout" ] && cp -p "scratch/${ofile}.xtbout" "$preserve_dir/${node_label}.xtbout" diff --git a/arc/job/adapters/torch_ani_test.py b/arc/job/adapters/torch_ani_test.py index c34e47a4ef..b45f49da71 100644 --- a/arc/job/adapters/torch_ani_test.py +++ b/arc/job/adapters/torch_ani_test.py @@ -12,11 +12,13 @@ from arc.common import almost_equal_coords, almost_equal_lists, read_yaml_file from arc.job.adapters.torch_ani import TorchANIAdapter -from arc.settings.settings import tani_default_options_dict +from arc.settings.settings import TANI_PYTHON, tani_default_options_dict from arc.species import ARCSpecies from arc.species.vectors import calculate_distance, calculate_angle, calculate_dihedral_angle +@unittest.skipUnless(TANI_PYTHON is not None, + "tani_env conda environment not found; TorchANI adapter tests require it.") class TestTorchANIAdapter(unittest.TestCase): """ Contains unit tests for the TorchANIAdapter class. diff --git a/arc/job/adapters/ts/gcn_test.py b/arc/job/adapters/ts/gcn_test.py index 8524b00db2..60004ae723 100644 --- a/arc/job/adapters/ts/gcn_test.py +++ b/arc/job/adapters/ts/gcn_test.py @@ -12,6 +12,7 @@ from arc.common import ARC_TESTING_PATH import arc.job.adapters.ts.gcn_ts as ts_gcn from arc.reaction import ARCReaction +from arc.settings.settings import TS_GCN_PYTHON from arc.species.converter import str_to_xyz from arc.species.species import ARCSpecies, TSGuess @@ -67,6 +68,8 @@ def test_write_sdf_files(self): self.assertEqual(r_atoms, expected_r_atoms) self.assertEqual(p_atoms, expected_p_atoms) + @unittest.skipUnless(TS_GCN_PYTHON is not None, + "ts_gcn conda environment not found; GCN subprocess test requires it.") def test_run_subprocess_locally(self): """Test the run_subprocess_locally() function""" self.assertFalse(os.path.isfile(self.ts_path)) diff --git a/arc/job/adapters/ts/orca_neb.py b/arc/job/adapters/ts/orca_neb.py index 0647fd3169..a51c20f46f 100644 --- a/arc/job/adapters/ts/orca_neb.py +++ b/arc/job/adapters/ts/orca_neb.py @@ -39,15 +39,15 @@ %%maxcore ${memory} %%pal nprocs ${cpus} end -%%neb +%%neb Interpolation ${interpolation} NImages ${nnodes} PrintLevel 3 PreOpt ${preopt} - NEB_END_XYZFILE "${abs_path}/product.xyz" + NEB_END_XYZFILE "product.xyz" END -* XYZFILE ${charge} ${multiplicity} ${abs_path}/reactant.xyz +* XYZFILE ${charge} ${multiplicity} reactant.xyz """ @@ -222,7 +222,6 @@ def write_input_file(self) -> None: input_dict['cpus'] = self.cpu_cores input_dict['charge'] = self.charge input_dict['multiplicity'] = self.multiplicity - input_dict['abs_path'] = self.local_path # NEB specific parameters neb_settings = orca_neb_settings.get('keyword', {}) diff --git a/arc/job/adapters/ts/xtb_gsm.py b/arc/job/adapters/ts/xtb_gsm.py index bcf7d73bda..57d7c71a73 100644 --- a/arc/job/adapters/ts/xtb_gsm.py +++ b/arc/job/adapters/ts/xtb_gsm.py @@ -306,6 +306,13 @@ def set_additional_file_paths(self) -> None: self.tm2orca_path = os.path.join(self.local_path, 'tm2orca.py') self.scratch_initial0000_path = os.path.join(self.local_path, 'scratch', 'initial0000.xyz') self.stringfile_path = os.path.join(self.local_path, 'stringfile.xyz0000') + # Side-effect directory written by the patched ``ograd`` wrapper. + # Holds per-node ``