From f7c4b2617b46bc1bf00791edc2505c201ba8e764 Mon Sep 17 00:00:00 2001 From: Ritwik Date: Mon, 20 Apr 2026 17:47:12 +0200 Subject: [PATCH] Suspend destination folder on systemic errors (disk full, stale NFS) When a systemic infrastructure error occurs during mass transfer (e.g. ENOSPC, stale NFS handle, read-only filesystem), the processor now: 1. Stops processing remaining series immediately instead of grinding through them all 2. Sets DicomFolder.suspended = True on the destination 3. Sends an alert email to admins 4. Returns FAILURE for the current task Subsequent tasks for the same destination check the suspended flag at the start of process() and return WARNING without touching the PACS, preventing thousands of wasted queries. Non-systemic errors (bad DICOM data, conversion failures) continue to be handled per-series as before. --- .../migrations/0018_dicomfolder_suspended.py | 18 +++ adit/core/models.py | 4 + adit/mass_transfer/processors.py | 90 ++++++++++-- adit/mass_transfer/tests/test_processor.py | 138 ++++++++++++++++++ 4 files changed, 241 insertions(+), 9 deletions(-) create mode 100644 adit/core/migrations/0018_dicomfolder_suspended.py diff --git a/adit/core/migrations/0018_dicomfolder_suspended.py b/adit/core/migrations/0018_dicomfolder_suspended.py new file mode 100644 index 00000000..d6c42814 --- /dev/null +++ b/adit/core/migrations/0018_dicomfolder_suspended.py @@ -0,0 +1,18 @@ +# Generated by Django 6.0.3 on 2026-04-20 14:32 + +from django.db import migrations, models + + +class Migration(migrations.Migration): + + dependencies = [ + ('core', '0017_review_fixes'), + ] + + operations = [ + migrations.AddField( + model_name='dicomfolder', + name='suspended', + field=models.BooleanField(default=False, help_text='Suspended destinations skip processing (e.g. disk full).'), + ), + ] diff --git a/adit/core/models.py b/adit/core/models.py index 1a1e88d4..6bf4aeaa 100644 --- a/adit/core/models.py +++ b/adit/core/models.py @@ -177,6 +177,10 @@ class DicomFolder(DicomNode): blank=True, help_text="When to warn the admins by Email (used space in GB).", ) + suspended = models.BooleanField( + default=False, + help_text="Suspended destinations skip processing (e.g. disk full).", + ) objects: DicomNodeManager["DicomFolder"] = DicomNodeManager["DicomFolder"]() diff --git a/adit/mass_transfer/processors.py b/adit/mass_transfer/processors.py index d2830090..317e29d6 100644 --- a/adit/mass_transfer/processors.py +++ b/adit/mass_transfer/processors.py @@ -1,5 +1,6 @@ from __future__ import annotations +import errno import json import logging import secrets @@ -71,6 +72,21 @@ def from_dict(cls, d: dict) -> "FilterSpec": _MIN_SPLIT_WINDOW = timedelta(minutes=30) _DELAY_BETWEEN_STUDIES = 0.5 # seconds between studies to avoid overwhelming the PACS +_SYSTEMIC_ERRNOS = (errno.ENOSPC, errno.ESTALE, errno.EROFS, errno.EIO) + + +def _is_systemic_error(err: Exception) -> bool: + """Return True if the error indicates a systemic infrastructure problem. + + Systemic errors (disk full, stale NFS handle, read-only filesystem, I/O error) + affect all remaining series equally, so continuing is pointless. + """ + if isinstance(err, OSError) and err.errno in _SYSTEMIC_ERRNOS: + return True + if isinstance(err, DicomError) and "Out of disk space" in str(err): + return True + return False + # Deterministic pseudonyms use 14 characters. Random pseudonyms use 15 so the # two modes can be distinguished by length. @@ -257,9 +273,19 @@ def process(self): "log": "Task skipped because the mass transfer app is suspended.", } + destination_node = self.mass_task.destination + if ( + destination_node.node_type == DicomNode.NodeType.FOLDER + and destination_node.dicomfolder.suspended + ): + return { + "status": MassTransferTask.Status.WARNING, + "message": f"Destination '{destination_node.name}' is suspended.", + "log": "Task skipped because the destination folder is suspended.", + } + job = self.mass_task.job source_node = self.mass_task.source - destination_node = self.mass_task.destination if source_node.node_type != DicomNode.NodeType.SERVER: raise DicomError("Mass transfer source must be a DICOM server.") @@ -312,18 +338,62 @@ def process(self): grouped_volumes = self._group_volumes(volumes) # Transfer: fetch series grouped by study - return self._transfer_grouped_series( - operator, - grouped_volumes, - job, - pseudonymizer, - output_base, - dest_operator, - ) + try: + return self._transfer_grouped_series( + operator, + grouped_volumes, + job, + pseudonymizer, + output_base, + dest_operator, + ) + except Exception as err: + if _is_systemic_error(err): + self._suspend_destination(destination_node, job, err) + return { + "status": MassTransferTask.Status.FAILURE, + "message": f"Destination suspended: {err}", + "log": ( + "Systemic error detected. Destination folder suspended.\n" + f"{err}" + ), + } + raise finally: if dest_operator: dest_operator.close() + def _suspend_destination( + self, + destination_node: DicomNode, + job: MassTransferJob, + err: Exception, + ) -> None: + """Suspend a folder destination after a systemic error (e.g. disk full).""" + if destination_node.node_type != DicomNode.NodeType.FOLDER: + return + + folder = destination_node.dicomfolder + folder.suspended = True + folder.save(update_fields=["suspended"]) + + logger.critical( + "Destination folder '%s' suspended due to systemic error (job %d): %s", + destination_node.name, + job.pk, + err, + ) + + from adit.core.utils.mail import send_mail_to_admins + + send_mail_to_admins( + f"Mass transfer destination '{destination_node.name}' suspended", + f"Destination folder '{destination_node.name}' was automatically suspended " + f"due to a systemic error during mass transfer job {job.pk}.\n\n" + f"Error: {err}\n\n" + f"Please fix the underlying issue and unsuspend the folder in the admin panel.", + ) + def _create_pending_volumes( self, discovered: list[DiscoveredSeries], @@ -535,6 +605,8 @@ def _transfer_single_series( ) volume.status = MassTransferVolume.Status.ERROR volume.log = str(err) + if _is_systemic_error(err): + raise finally: if volume.status == MassTransferVolume.Status.PENDING: logger.error( diff --git a/adit/mass_transfer/tests/test_processor.py b/adit/mass_transfer/tests/test_processor.py index f0063e88..5046319c 100644 --- a/adit/mass_transfer/tests/test_processor.py +++ b/adit/mass_transfer/tests/test_processor.py @@ -1,3 +1,4 @@ +import errno import json from datetime import date, datetime, timedelta from pathlib import Path @@ -29,6 +30,7 @@ _birth_date_range, _destination_base_dir, _dicom_match, + _is_systemic_error, _parse_int, _series_folder_name, _study_datetime, @@ -520,6 +522,7 @@ def _make_process_env( processor.mass_task.source.dicomserver = mocker.MagicMock() processor.mass_task.destination.node_type = DicomNode.NodeType.FOLDER processor.mass_task.destination.dicomfolder.path = str(tmp_path / "output") + processor.mass_task.destination.dicomfolder.suspended = False processor.mass_task.pk = 42 processor.mass_task.partition_key = "20240101" @@ -1989,3 +1992,138 @@ def fake_export(op, s, path, subject_id, pseudonymizer): # The path should contain the job-identifying folder expected_prefix = f"adit_mass_transfer_{job.pk}_{job.created.strftime('%Y%m%d')}_researcher" assert expected_prefix in str(export_paths[0]) + + +# --------------------------------------------------------------------------- +# Systemic error detection tests +# --------------------------------------------------------------------------- + + +class TestIsSystemicError: + def test_enospc(self): + assert _is_systemic_error(OSError(errno.ENOSPC, "No space left on device")) + + def test_estale(self): + assert _is_systemic_error(OSError(errno.ESTALE, "Stale file handle")) + + def test_erofs(self): + assert _is_systemic_error(OSError(errno.EROFS, "Read-only file system")) + + def test_eio(self): + assert _is_systemic_error(OSError(errno.EIO, "Input/output error")) + + def test_disk_space_dicom_error(self): + assert _is_systemic_error(DicomError("Out of disk space while trying to save 'foo.dcm'.")) + + def test_regular_oserror(self): + assert not _is_systemic_error(OSError(errno.ENOENT, "No such file")) + + def test_regular_exception(self): + assert not _is_systemic_error(ValueError("bad value")) + + def test_regular_dicom_error(self): + assert not _is_systemic_error(DicomError("Something else went wrong")) + + +@pytest.mark.django_db +def test_process_stops_on_systemic_error(mocker: MockerFixture, mass_transfer_env): + """When export raises ENOSPC, the task stops immediately and the destination is suspended.""" + env = mass_transfer_env + series = [ + _make_discovered(patient_id="PAT1", series_uid="series-1"), + _make_discovered(patient_id="PAT1", series_uid="series-2"), + ] + + processor = MassTransferTaskProcessor(env.task) + mocker.patch.object(processor, "_discover_series", return_value=series) + mocker.patch("adit.mass_transfer.processors.DicomOperator") + mocker.patch.object( + processor, + "_export_series", + side_effect=DicomError("Out of disk space while trying to save 'test.dcm'."), + ) + mocker.patch("adit.core.utils.mail.send_mail_to_admins") + + result = processor.process() + + assert result["status"] == MassTransferTask.Status.FAILURE + assert "suspended" in result["message"].lower() or "suspended" in result["log"].lower() + + env.destination.refresh_from_db() + assert env.destination.suspended is True + + # Only the first series should have been attempted (second skipped due to early exit) + vols = MassTransferVolume.objects.filter(job=env.job) + error_vols = vols.filter(status=MassTransferVolume.Status.ERROR) + assert error_vols.count() == 1 + + +@pytest.mark.django_db +def test_process_continues_on_regular_error(mocker: MockerFixture, mass_transfer_env): + """Regular errors (non-systemic) mark the volume as ERROR but continue to the next series.""" + env = mass_transfer_env + series = [ + _make_discovered(patient_id="PAT1", series_uid="series-1"), + _make_discovered(patient_id="PAT1", series_uid="series-2"), + ] + + call_count = 0 + + def export_first_fails(*args, **kwargs): + nonlocal call_count + call_count += 1 + if call_count == 1: + raise DicomError("Bad DICOM data") + return (1, "", "") + + processor = MassTransferTaskProcessor(env.task) + mocker.patch.object(processor, "_discover_series", return_value=series) + mocker.patch("adit.mass_transfer.processors.DicomOperator") + mocker.patch.object(processor, "_export_series", side_effect=export_first_fails) + + result = processor.process() + + # Should be WARNING (one succeeded, one failed) — NOT FAILURE + assert result["status"] == MassTransferTask.Status.WARNING + + env.destination.refresh_from_db() + assert env.destination.suspended is False + + # Both series were attempted + assert call_count == 2 + + +@pytest.mark.django_db +def test_process_skips_when_destination_suspended(mocker: MockerFixture, mass_transfer_env): + """When the destination folder is suspended, process() returns WARNING immediately.""" + env = mass_transfer_env + env.destination.suspended = True + env.destination.save() + + processor = MassTransferTaskProcessor(env.task) + discover_mock = mocker.patch.object(processor, "_discover_series") + + result = processor.process() + + assert result["status"] == MassTransferTask.Status.WARNING + assert "suspended" in result["message"].lower() + discover_mock.assert_not_called() + + +@pytest.mark.django_db +def test_process_does_not_suspend_on_regular_error(mocker: MockerFixture, mass_transfer_env): + """Non-systemic errors do NOT suspend the destination.""" + env = mass_transfer_env + series = [_make_discovered(patient_id="PAT1", series_uid="series-1")] + + processor = MassTransferTaskProcessor(env.task) + mocker.patch.object(processor, "_discover_series", return_value=series) + mocker.patch("adit.mass_transfer.processors.DicomOperator") + mocker.patch.object(processor, "_export_series", side_effect=DicomError("Export failed")) + + result = processor.process() + + assert result["status"] == MassTransferTask.Status.FAILURE + + env.destination.refresh_from_db() + assert env.destination.suspended is False