diff --git a/arc/common.py b/arc/common.py index 98f4916749..8fe0d9a9b4 100644 --- a/arc/common.py +++ b/arc/common.py @@ -374,6 +374,28 @@ def from_yaml(yaml_str: str) -> dict | list: return yaml.load(stream=yaml_str, Loader=yaml.FullLoader) +def append_yaml_document(path: str, + content: list | dict, + ) -> None: + """ + Append a YAML document to a file, separated by the ``---`` document marker. + Useful for files that accumulate time-ordered snapshots and should be parsed + with ``yaml.safe_load_all`` / ``yaml.load_all``. + + Args: + path (str): The YAML file path to append to. + content (list, dict): The document to append. + """ + if not isinstance(path, str): + raise InputError(f'path must be a string, got {path} which is a {type(path)}') + if '/' in path and os.path.dirname(path) and not os.path.exists(os.path.dirname(path)): + os.makedirs(os.path.dirname(path)) + yaml_str = to_yaml(py_content=content) + with open(path, 'a') as f: + f.write('---\n') + f.write(yaml_str) + + def to_yaml(py_content: list | dict) -> str: """ Convert a Python list or dictionary to a YAML string format. diff --git a/arc/common_test.py b/arc/common_test.py index 8c0b74ecee..7c76aa6eb6 100644 --- a/arc/common_test.py +++ b/arc/common_test.py @@ -13,6 +13,7 @@ import numpy as np import pandas as pd +import yaml from random import shuffle import arc.common as common @@ -62,6 +63,23 @@ def test_read_yaml_file(self): with self.assertRaises(InputError): common.read_yaml_file('nopath') + def test_append_yaml_document(self): + """Test the append_yaml_document() function""" + path = os.path.join(common.ARC_PATH, 'arc', 'testing', 'append_yaml_document_test.yml') + try: + common.append_yaml_document(path, {'timestamp': '2026-05-07T14:00:00', 'jobs': {'A': ['j1']}}) + common.append_yaml_document(path, {'timestamp': '2026-05-07T15:00:00', 'jobs': {'A': ['j1', 'j2']}}) + with open(path, 'r') as f: + docs = list(yaml.safe_load_all(f)) + self.assertEqual(len(docs), 2) + self.assertEqual(docs[0]['timestamp'], '2026-05-07T14:00:00') + self.assertEqual(docs[1]['jobs'], {'A': ['j1', 'j2']}) + with self.assertRaises(InputError): + common.append_yaml_document(123, {'foo': 'bar'}) + finally: + if os.path.isfile(path): + os.remove(path) + def test_get_git_commit(self): """Test the get_git_commit() function""" git_commit = common.get_git_commit() diff --git a/arc/scheduler.py b/arc/scheduler.py index f32ed2e0d2..9c95cd9538 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -17,7 +17,8 @@ from arc import plotter from arc.checks.common import get_i_from_job_name, is_conformer_job, sum_time_delta from arc.checks.ts import check_imaginary_frequencies, check_ts, check_irc_species_and_rxn -from arc.common import (extremum_list, +from arc.common import (append_yaml_document, + extremum_list, get_angle_in_180_range, get_logger, get_number_with_ordinal_indicator, @@ -312,7 +313,9 @@ def __init__(self, self.initialize_output_dict() self.restart_path = os.path.join(self.project_directory, 'restart.yml') + self.running_jobs_snapshot_path = os.path.join(self.project_directory, 'running_jobs.yml') self.report_time = time.time() # init time for reporting status every 1 hr + self._last_status_payload: dict | None = None self.servers = list() self.composite_method = composite_method self.conformer_opt_level = conformer_opt_level @@ -840,14 +843,33 @@ def schedule_jobs(self): t = time.time() - self.report_time if t > 3600 and (self.running_jobs or self.active_pipes): self.report_time = time.time() - if self.running_jobs: - logger.info(f'Currently running jobs:\n{pprint.pformat(self.running_jobs)}') - if self.active_pipes: - logger.info(f'Active pipe runs: {list(self.active_pipes.keys())}') + self.report_running_jobs_snapshot() # Generate a TS report: self.generate_final_ts_guess_report() + def report_running_jobs_snapshot(self) -> None: + """ + Append a snapshot of the currently running jobs and active pipes to + ``/running_jobs.yml`` (each snapshot is a separate YAML + document prefixed with ``---``). If the payload is identical to the + previous snapshot, only a one-line heartbeat is logged to ARC.log. + """ + payload = {'running_jobs': dict(self.running_jobs), + 'active_pipes': list(self.active_pipes.keys())} + n_species = len(self.running_jobs) + n_jobs = sum(len(v) for v in self.running_jobs.values()) + n_pipes = len(self.active_pipes) + summary = f'{n_species} species / {n_jobs} jobs / {n_pipes} active pipes' + if payload == self._last_status_payload: + logger.info(f'Status unchanged: {summary}.') + return + snapshot = {'timestamp': datetime.datetime.now().isoformat(timespec='seconds'), + **payload} + append_yaml_document(self.running_jobs_snapshot_path, snapshot) + logger.info(f'Status changed: {summary}; snapshot appended to running_jobs.yml.') + self._last_status_payload = payload + def run_job(self, job_type: str, conformer: int | None = None, diff --git a/arc/scheduler_test.py b/arc/scheduler_test.py index f272478464..598d2b446f 100644 --- a/arc/scheduler_test.py +++ b/arc/scheduler_test.py @@ -10,6 +10,8 @@ import os import shutil +import yaml + import arc.parser.parser as parser from arc.checks.ts import check_ts from arc.common import ARC_PATH, ARC_TESTING_PATH, almost_equal_coords_lists, initialize_job_types, read_yaml_file @@ -1033,6 +1035,39 @@ def test_run_sp_monoatomic_dlpno(self, mock_run_job): self.assertEqual(o_level.method, 'dlpno-ccsd(t)-f12') self.assertEqual(o_level.cabs, 'cc-pvtz-f12-cabs') + def test_report_running_jobs_snapshot(self): + """Snapshot is appended on change and skipped (heartbeat only) on no change.""" + path = self.sched1.running_jobs_snapshot_path + if os.path.isfile(path): + os.remove(path) + self.sched1._last_status_payload = None + self.sched1.running_jobs = {'spcA': ['opt_a1234']} + self.sched1.active_pipes = {} + + # First call: file doesn't exist, snapshot must be appended. + self.sched1.report_running_jobs_snapshot() + with open(path, 'r') as f: + docs = list(yaml.safe_load_all(f)) + self.assertEqual(len(docs), 1) + self.assertIn('timestamp', docs[0]) + self.assertEqual(docs[0]['running_jobs'], {'spcA': ['opt_a1234']}) + + # Second call with identical payload: must NOT append. + self.sched1.report_running_jobs_snapshot() + with open(path, 'r') as f: + docs = list(yaml.safe_load_all(f)) + self.assertEqual(len(docs), 1) + + # Third call after a change: must append a new document. + self.sched1.running_jobs = {'spcA': ['opt_a1234', 'sp_b5678']} + self.sched1.report_running_jobs_snapshot() + with open(path, 'r') as f: + docs = list(yaml.safe_load_all(f)) + self.assertEqual(len(docs), 2) + self.assertEqual(docs[1]['running_jobs'], {'spcA': ['opt_a1234', 'sp_b5678']}) + + os.remove(path) + @classmethod def tearDownClass(cls): """