Skip to content

Commit f8ebe2a

Browse files
stephen-riggstieneupin
authored andcommitted
Safer registration of data collections (#726)
Ensure that dcg, dc and pjid exist before inserting into murfey db. Add a sleep for the case where they cannot be registered to allow the database to settle.
1 parent b545268 commit f8ebe2a

File tree

8 files changed

+28
-27
lines changed

8 files changed

+28
-27
lines changed

.github/workflows/test.yml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,11 @@ name: Build and test
33
on: [push, pull_request]
44

55
env:
6-
ISPYB_DATABASE_SCHEMA: 4.8.0
6+
ISPYB_DATABASE_SCHEMA: 4.11.0
77
# Installs from GitHub
88
# Versions: https://github.com/DiamondLightSource/ispyb-database/tags
99
# Previous version(s):
10+
# 4.8.0
1011
# 4.2.1 # released 2024-08-19
1112
# 4.1.0 # released 2024-03-26
1213

src/murfey/server/feedback.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2160,7 +2160,7 @@ def feedback_callback(header: dict, message: dict, _db=murfey_db) -> None:
21602160
murfey_db=_db,
21612161
)
21622162
if murfey.server._transport_object:
2163-
if result.get("success", False):
2163+
if result.get("success"):
21642164
murfey.server._transport_object.transport.ack(header)
21652165
else:
21662166
# Send it directly to DLQ without trying to rerun it

src/murfey/workflows/register_data_collection.py

Lines changed: 10 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
import logging
2+
import time
23

34
import ispyb.sqlalchemy._auto_db_schema as ISPyBDB
45
from sqlmodel import select
@@ -37,6 +38,7 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]:
3738
dcgid = dcg[0].id
3839
# flush_data_collections(message["source"], murfey_db)
3940
else:
41+
time.sleep(2)
4042
logger.warning(
4143
"No data collection group ID was found for image directory "
4244
f"{sanitise(message['image_directory'])} and source "
@@ -82,21 +84,20 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]:
8284
else ""
8385
),
8486
).get("return_value", None)
87+
if dcid is None:
88+
time.sleep(2)
89+
logger.error(
90+
"Failed to register the following data collection: \n"
91+
f"{message} \n"
92+
"Requeueing message"
93+
)
94+
return {"success": False, "requeue": True}
8595
murfey_dc = MurfeyDB.DataCollection(
8696
id=dcid,
8797
tag=message.get("tag"),
8898
dcg_id=dcgid,
8999
)
90100
murfey_db.add(murfey_dc)
91101
murfey_db.commit()
92-
dcid = murfey_dc.id
93102
murfey_db.close()
94-
95-
if dcid is None:
96-
logger.error(
97-
"Failed to register the following data collection: \n"
98-
f"{message} \n"
99-
"Requeueing message"
100-
)
101-
return {"success": False, "requeue": True}
102103
return {"success": True}

src/murfey/workflows/register_data_collection_group.py

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -52,6 +52,15 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]:
5252
"return_value", None
5353
)
5454

55+
if dcgid is None:
56+
time.sleep(2)
57+
logger.error(
58+
"Failed to register the following data collection group: \n"
59+
f"{message} \n"
60+
"Requeuing message"
61+
)
62+
return {"success": False, "requeue": True}
63+
5564
atlas_record = ISPyBDB.Atlas(
5665
dataCollectionGroupId=dcgid,
5766
atlasImage=message.get("atlas", ""),
@@ -75,15 +84,6 @@ def run(message: dict, murfey_db: SQLModelSession) -> dict[str, bool]:
7584
murfey_db.commit()
7685
murfey_db.close()
7786

78-
if dcgid is None:
79-
time.sleep(2)
80-
logger.error(
81-
"Failed to register the following data collection group: \n"
82-
f"{message} \n"
83-
"Requeuing message"
84-
)
85-
return {"success": False, "requeue": True}
86-
8787
if dcg_hooks := entry_points(group="murfey.hooks", name="data_collection_group"):
8888
try:
8989
for hook in dcg_hooks:

src/murfey/workflows/register_processing_job.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@ def run(message: dict, murfey_db: SQLModelSession):
6363
pid = _transport_object.do_create_ispyb_job(record).get(
6464
"return_value", None
6565
)
66+
if pid is None:
67+
return {"success": False, "requeue": True}
6668
murfey_pj = MurfeyDB.ProcessingJob(
6769
id=pid, recipe=message["recipe"], dc_id=_dcid
6870
)
@@ -71,9 +73,6 @@ def run(message: dict, murfey_db: SQLModelSession):
7173
pid = murfey_pj.id
7274
murfey_db.close()
7375

74-
if pid is None:
75-
return {"success": False, "requeue": True}
76-
7776
# Update Prometheus counter for preprocessed movies
7877
prom.preprocessed_movies.labels(processing_job=pid)
7978

tests/workflows/test_register_data_collection.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,6 @@ def test_run(
9999
assert result == {"success": False, "requeue": True}
100100
else:
101101
mock_transport_object.do_insert_data_collection.assert_not_called()
102-
assert result == {"success": False, "requeue": True}
102+
assert result == {"success": True}
103103
else:
104104
assert result == {"success": True}

tests/workflows/test_register_data_collection_group.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -77,10 +77,10 @@ def test_run(
7777
else:
7878
if ispyb_session_id is not None:
7979
mock_transport_object.do_insert_data_collection_group.assert_called_once()
80-
mock_transport_object.do_insert_atlas.assert_called_once()
8180
if insert_dcg is not None:
81+
mock_transport_object.do_insert_atlas.assert_called_once()
8282
assert result == {"success": True}
8383
else:
8484
assert result == {"success": False, "requeue": True}
8585
else:
86-
assert result == {"success": False, "requeue": True}
86+
assert result == {"success": True}

tests/workflows/test_processing_job.py renamed to tests/workflows/test_register_processing_job.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,6 @@ def test_run(
104104
else:
105105
assert result == {"success": False, "requeue": True}
106106
else:
107-
assert result == {"success": False, "requeue": True}
107+
assert result == {"success": True}
108108
else:
109109
assert result == {"success": False, "requeue": True}

0 commit comments

Comments
 (0)