From fb12789ee30ee48e64e1c4f24f6f51bee4b32177 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Fri, 27 Feb 2026 17:06:04 +0000 Subject: [PATCH 1/2] Replace stored program with direct api call --- requirements.conda.txt | 2 +- src/dlstbx/services/ispybsvc.py | 31 +++++++++++++++++-------------- 2 files changed, 18 insertions(+), 15 deletions(-) diff --git a/requirements.conda.txt b/requirements.conda.txt index c371fe235..91f49d501 100644 --- a/requirements.conda.txt +++ b/requirements.conda.txt @@ -1,6 +1,6 @@ drmaa hdf5plugin -ispyb>=11.1.0 +ispyb>=11.1.2 junit-xml>=1.9 marshmallow-sqlalchemy minio>=7.1.0 diff --git a/src/dlstbx/services/ispybsvc.py b/src/dlstbx/services/ispybsvc.py index 7f23a4a97..1d5dc7e01 100644 --- a/src/dlstbx/services/ispybsvc.py +++ b/src/dlstbx/services/ispybsvc.py @@ -4,6 +4,7 @@ import os.path import pathlib import time +from datetime import datetime from typing import List import ispyb.sqlalchemy @@ -323,7 +324,9 @@ def do_store_dimple_failure(self, parameters, **kwargs): ) return False - def do_register_processing(self, parameters, **kwargs): + def do_register_processing( + self, parameters, session: sqlalchemy.orm.Session, **kwargs + ): program = parameters("program") cmdline = parameters("cmdline") environment = parameters("environment") or "" @@ -335,25 +338,25 @@ def do_register_processing(self, parameters, **kwargs): ) environment = environment[: min(255, len(environment))] rpid = parameters("rpid") + parent_autoprocprogramid = parameters("parent_autoprocprogramid") or None if rpid and not rpid.isdigit(): self.log.error("Invalid processing id '%s'", rpid) return False try: - result = self.ispyb.mx_processing.upsert_program_ex( - job_id=rpid, - name=program, - command=cmdline, - environment=environment, - pipeline_id=processingpipelineid, + new_app = ispyb.sqlalchemy.AutoProcProgram( + processingJobId=rpid, + processingPrograms=program, + processingCommandLine=cmdline, + processingEnvironment=environment, + processingPipelineId=processingpipelineid, + parentAutoProcProgramId=parent_autoprocprogramid, + recordTimeStamp=datetime.now(), ) + session.add(new_app) + session.commit() + result = new_app.autoProcProgramId self.log.info( - "Registered new program '%s' for processing id '%s' with command line '%s' and environment '%s' and pipeline id '%s' with result '%s'.", - program, - rpid, - cmdline, - environment, - processingpipelineid, - result, + f"Registered new program '{program}' for processing id '{rpid}' with command line '{cmdline}' and environment '{environment}', pipeline id '{processingpipelineid}' and parent program id '{parent_autoprocprogramid}' with result '{result}'.", ) return {"success": True, "return_value": result} except ispyb.ISPyBException as e: From bd74e63d64977b92832cf8a8d93d4d8b5a637c68 Mon Sep 17 00:00:00 2001 From: Phil Blowey Date: Wed, 18 Mar 2026 16:39:45 +0000 Subject: [PATCH 2/2] Get parent_appid from scaling_id --- src/dlstbx/crud.py | 16 ++++++++++++++++ src/dlstbx/services/ispybsvc.py | 15 +++++++++++---- 2 files changed, 27 insertions(+), 4 deletions(-) diff --git a/src/dlstbx/crud.py b/src/dlstbx/crud.py index d350e85c6..84340d81f 100644 --- a/src/dlstbx/crud.py +++ b/src/dlstbx/crud.py @@ -368,6 +368,22 @@ def get_ssx_events_for_dcid( return query.all() +def get_app_id_for_scaling_id( + session: sqlalchemy.orm.session.Session, + scaling_id: int, +) -> int: + query = ( + session.query(models.AutoProc.autoProcProgramId) + .join( + models.AutoProcScaling, + models.AutoProcScaling.autoProcId == models.AutoProc.autoProcId, + ) + .filter(models.AutoProcScaling.autoProcScalingId == scaling_id) + ) + result = query.first() + return result.autoProcProgramId + + def insert_xray_centring( xrc: schemas.XrayCentring, session: sqlalchemy.orm.session.Session, diff --git a/src/dlstbx/services/ispybsvc.py b/src/dlstbx/services/ispybsvc.py index 1d5dc7e01..9a03e99b3 100644 --- a/src/dlstbx/services/ispybsvc.py +++ b/src/dlstbx/services/ispybsvc.py @@ -329,8 +329,16 @@ def do_register_processing( ): program = parameters("program") cmdline = parameters("cmdline") - environment = parameters("environment") or "" + environment = parameters("environment") or {} upstream_source = parameters("upstream_source") or "" + scaling_id = parameters("scaling_id") or environment.get("scaling_id") + if isinstance(scaling_id, list): + scaling_id = scaling_id[0] + parent_appid = ( + crud.get_app_id_for_scaling_id(session, int(scaling_id)) + if scaling_id + else None + ) processingpipelineid = self.get_pipeline_id(program, upstream_source) if isinstance(environment, dict): environment = ", ".join( @@ -338,7 +346,6 @@ def do_register_processing( ) environment = environment[: min(255, len(environment))] rpid = parameters("rpid") - parent_autoprocprogramid = parameters("parent_autoprocprogramid") or None if rpid and not rpid.isdigit(): self.log.error("Invalid processing id '%s'", rpid) return False @@ -349,14 +356,14 @@ def do_register_processing( processingCommandLine=cmdline, processingEnvironment=environment, processingPipelineId=processingpipelineid, - parentAutoProcProgramId=parent_autoprocprogramid, + parentAutoProcProgramId=parent_appid, recordTimeStamp=datetime.now(), ) session.add(new_app) session.commit() result = new_app.autoProcProgramId self.log.info( - f"Registered new program '{program}' for processing id '{rpid}' with command line '{cmdline}' and environment '{environment}', pipeline id '{processingpipelineid}' and parent program id '{parent_autoprocprogramid}' with result '{result}'.", + f"Registered new program '{program}' for processing id '{rpid}' with command line '{cmdline}' and environment '{environment}', pipeline id '{processingpipelineid}' and parent program id '{parent_appid}' with result '{result}'.", ) return {"success": True, "return_value": result} except ispyb.ISPyBException as e: