Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions arc/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,6 +374,28 @@ def from_yaml(yaml_str: str) -> dict | list:
return yaml.load(stream=yaml_str, Loader=yaml.FullLoader)


def append_yaml_document(path: str,
content: list | dict,
) -> None:
"""
Append a YAML document to a file, separated by the ``---`` document marker.
Useful for files that accumulate time-ordered snapshots and should be parsed
with ``yaml.safe_load_all`` / ``yaml.load_all``.

Args:
path (str): The YAML file path to append to.
content (list, dict): The document to append.
"""
if not isinstance(path, str):
raise InputError(f'path must be a string, got {path} which is a {type(path)}')
if '/' in path and os.path.dirname(path) and not os.path.exists(os.path.dirname(path)):
os.makedirs(os.path.dirname(path))
yaml_str = to_yaml(py_content=content)
with open(path, 'a') as f:
f.write('---\n')
f.write(yaml_str)


def to_yaml(py_content: list | dict) -> str:
"""
Convert a Python list or dictionary to a YAML string format.
Expand Down
18 changes: 18 additions & 0 deletions arc/common_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import numpy as np
import pandas as pd
import yaml
from random import shuffle

import arc.common as common
Expand Down Expand Up @@ -62,6 +63,23 @@ def test_read_yaml_file(self):
with self.assertRaises(InputError):
common.read_yaml_file('nopath')

def test_append_yaml_document(self):
"""Test the append_yaml_document() function"""
path = os.path.join(common.ARC_PATH, 'arc', 'testing', 'append_yaml_document_test.yml')
try:
common.append_yaml_document(path, {'timestamp': '2026-05-07T14:00:00', 'jobs': {'A': ['j1']}})
common.append_yaml_document(path, {'timestamp': '2026-05-07T15:00:00', 'jobs': {'A': ['j1', 'j2']}})
with open(path, 'r') as f:
docs = list(yaml.safe_load_all(f))
self.assertEqual(len(docs), 2)
self.assertEqual(docs[0]['timestamp'], '2026-05-07T14:00:00')
self.assertEqual(docs[1]['jobs'], {'A': ['j1', 'j2']})
with self.assertRaises(InputError):
common.append_yaml_document(123, {'foo': 'bar'})
finally:
if os.path.isfile(path):
os.remove(path)

def test_get_git_commit(self):
"""Test the get_git_commit() function"""
git_commit = common.get_git_commit()
Expand Down
32 changes: 27 additions & 5 deletions arc/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
from arc import plotter
from arc.checks.common import get_i_from_job_name, is_conformer_job, sum_time_delta
from arc.checks.ts import check_imaginary_frequencies, check_ts, check_irc_species_and_rxn
from arc.common import (extremum_list,
from arc.common import (append_yaml_document,
extremum_list,
get_angle_in_180_range,
get_logger,
get_number_with_ordinal_indicator,
Expand Down Expand Up @@ -312,7 +313,9 @@ def __init__(self,
self.initialize_output_dict()

self.restart_path = os.path.join(self.project_directory, 'restart.yml')
self.running_jobs_snapshot_path = os.path.join(self.project_directory, 'running_jobs.yml')
self.report_time = time.time() # init time for reporting status every 1 hr
self._last_status_payload: dict | None = None
self.servers = list()
self.composite_method = composite_method
self.conformer_opt_level = conformer_opt_level
Expand Down Expand Up @@ -840,14 +843,33 @@ def schedule_jobs(self):
t = time.time() - self.report_time
if t > 3600 and (self.running_jobs or self.active_pipes):
self.report_time = time.time()
if self.running_jobs:
logger.info(f'Currently running jobs:\n{pprint.pformat(self.running_jobs)}')
if self.active_pipes:
logger.info(f'Active pipe runs: {list(self.active_pipes.keys())}')
self.report_running_jobs_snapshot()

# Generate a TS report:
self.generate_final_ts_guess_report()

def report_running_jobs_snapshot(self) -> None:
"""
Append a snapshot of the currently running jobs and active pipes to
``<project>/running_jobs.yml`` (each snapshot is a separate YAML
document prefixed with ``---``). If the payload is identical to the
previous snapshot, only a one-line heartbeat is logged to ARC.log.
"""
payload = {'running_jobs': dict(self.running_jobs),
'active_pipes': list(self.active_pipes.keys())}
n_species = len(self.running_jobs)
n_jobs = sum(len(v) for v in self.running_jobs.values())
n_pipes = len(self.active_pipes)
summary = f'{n_species} species / {n_jobs} jobs / {n_pipes} active pipes'
if payload == self._last_status_payload:
logger.info(f'Status unchanged: {summary}.')
Comment on lines +858 to +865
return
snapshot = {'timestamp': datetime.datetime.now().isoformat(timespec='seconds'),
**payload}
append_yaml_document(self.running_jobs_snapshot_path, snapshot)
logger.info(f'Status changed: {summary}; snapshot appended to running_jobs.yml.')
Comment on lines +851 to +870
self._last_status_payload = payload

def run_job(self,
job_type: str,
conformer: int | None = None,
Expand Down
35 changes: 35 additions & 0 deletions arc/scheduler_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
import os
import shutil

import yaml

import arc.parser.parser as parser
from arc.checks.ts import check_ts
from arc.common import ARC_PATH, ARC_TESTING_PATH, almost_equal_coords_lists, initialize_job_types, read_yaml_file
Expand Down Expand Up @@ -1033,6 +1035,39 @@ def test_run_sp_monoatomic_dlpno(self, mock_run_job):
self.assertEqual(o_level.method, 'dlpno-ccsd(t)-f12')
self.assertEqual(o_level.cabs, 'cc-pvtz-f12-cabs')

def test_report_running_jobs_snapshot(self):
"""Snapshot is appended on change and skipped (heartbeat only) on no change."""
path = self.sched1.running_jobs_snapshot_path
if os.path.isfile(path):
os.remove(path)
self.sched1._last_status_payload = None
self.sched1.running_jobs = {'spcA': ['opt_a1234']}
self.sched1.active_pipes = {}

# First call: file doesn't exist, snapshot must be appended.
self.sched1.report_running_jobs_snapshot()
with open(path, 'r') as f:
docs = list(yaml.safe_load_all(f))
self.assertEqual(len(docs), 1)
self.assertIn('timestamp', docs[0])
self.assertEqual(docs[0]['running_jobs'], {'spcA': ['opt_a1234']})

# Second call with identical payload: must NOT append.
self.sched1.report_running_jobs_snapshot()
with open(path, 'r') as f:
docs = list(yaml.safe_load_all(f))
self.assertEqual(len(docs), 1)

# Third call after a change: must append a new document.
self.sched1.running_jobs = {'spcA': ['opt_a1234', 'sp_b5678']}
self.sched1.report_running_jobs_snapshot()
with open(path, 'r') as f:
docs = list(yaml.safe_load_all(f))
self.assertEqual(len(docs), 2)
self.assertEqual(docs[1]['running_jobs'], {'spcA': ['opt_a1234', 'sp_b5678']})

os.remove(path)

@classmethod
def tearDownClass(cls):
"""
Expand Down
Loading