Skip to content

Commit 8739a6c

Browse files
Remove the logic for skipping existing processing (#735)
When we had the TUI we introduced logic for skipping processing in the case that someone started a fresh murfey with existing data. This no longer makes any sense as murfey can handle switching between tomo and SPA without a fresh instance. Therefore this PR removes all skip_existing_processing and first_loop logic. The case that needs checking is if processing is disabled, then it should not become enabled on reconnection. From the code this loops ok to me as the start_multigrid_watcher requests to the instrument server read session.process from the database.
1 parent 08b9a0e commit 8739a6c

File tree

7 files changed

+72
-138
lines changed

7 files changed

+72
-138
lines changed

src/murfey/client/analyser.py

Lines changed: 2 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -308,11 +308,7 @@ def _analyse(self):
308308
dc_metadata["acquisition_software"] = (
309309
self._context._acquisition_software
310310
)
311-
self.notify(
312-
{
313-
"form": dc_metadata,
314-
}
315-
)
311+
self.notify(dc_metadata)
316312

317313
# If a file with a CLEM context is identified, immediately post it
318314
elif isinstance(self._context, CLEMContext):
@@ -366,11 +362,7 @@ def _analyse(self):
366362
dc_metadata["acquisition_software"] = (
367363
self._context._acquisition_software
368364
)
369-
self.notify(
370-
{
371-
"form": dc_metadata,
372-
}
373-
)
365+
self.notify(dc_metadata)
374366
elif isinstance(
375367
self._context,
376368
(

src/murfey/client/multigrid_control.py

Lines changed: 66 additions & 101 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ class MultigridController:
4040
finalising: bool = False
4141
dormant: bool = False
4242
multigrid_watcher_active: bool = True
43-
processing_enabled: bool = True
4443
do_transfer: bool = True
4544
dummy_dc: bool = False
4645
force_mdoc_metadata: bool = True
@@ -86,8 +85,6 @@ def __post_init__(self):
8685
for ds in val.values()
8786
for s in ds
8887
]
89-
self._data_collection_form_complete = False
90-
self._register_dc: bool | None = None
9188
self.rsync_processes = self.rsync_processes or {}
9289
self.analysers = self.analysers or {}
9390

@@ -260,7 +257,6 @@ def _start_rsyncer_multigrid(
260257
self._start_rsyncer(
261258
source,
262259
destination,
263-
force_metadata=self.processing_enabled,
264260
analyse=analyse,
265261
remove_files=remove_files,
266262
tag=tag,
@@ -324,7 +320,6 @@ def _start_rsyncer(
324320
source: Path,
325321
destination: str,
326322
visit_path: str = "",
327-
force_metadata: bool = False,
328323
analyse: bool = True,
329324
remove_files: bool = False,
330325
tag: str = "",
@@ -455,12 +450,7 @@ def rsync_result(update: RSyncerUpdate):
455450
force_mdoc_metadata=self.force_mdoc_metadata,
456451
limited=limited,
457452
)
458-
if force_metadata:
459-
self.analysers[source].subscribe(
460-
partial(self._start_dc, from_form=True)
461-
)
462-
else:
463-
self.analysers[source].subscribe(self._data_collection_form)
453+
self.analysers[source].subscribe(self._start_dc)
464454
self.analysers[source].start()
465455
if transfer:
466456
self.rsync_processes[source].subscribe(self.analysers[source].enqueue)
@@ -502,32 +492,13 @@ def _rsync_update_converter(p: Path) -> None:
502492
)
503493
self._environment.watchers[source].start()
504494

505-
def _data_collection_form(self, response: dict):
506-
log.info("data collection form ready")
507-
if self._data_collection_form_complete:
508-
return
509-
if self._register_dc and response.get("form"):
510-
self._form_values = {k: str(v) for k, v in response.get("form", {}).items()}
511-
log.info(
512-
f"gain reference is set to {self._form_values.get('gain_ref')}, {self._environment.gain_ref}"
513-
)
514-
if self._form_values.get("gain_ref") in (None, "None"):
515-
self._form_values["gain_ref"] = self._environment.gain_ref
516-
self._data_collection_form_complete = True
517-
elif self._register_dc is None:
518-
self._data_collection_form_complete = True
519-
520-
def _start_dc(self, metadata_json, from_form: bool = False):
495+
def _start_dc(self, metadata_json):
521496
if self.dummy_dc:
522497
return
523-
# for multigrid the analyser sends the message straight to _start_dc by-passing user input
524-
# it is then necessary to extract the data from the message
525-
if from_form:
526-
metadata_json = metadata_json.get("form", {})
527-
# Safely convert all entries into strings, but leave None as-is
528-
metadata_json = {
529-
k: str(v) if v is not None else None for k, v in metadata_json.items()
530-
}
498+
# Safely convert all entries into strings, but leave None as-is
499+
metadata_json = {
500+
k: str(v) if v is not None else None for k, v in metadata_json.items()
501+
}
531502
self._environment.dose_per_frame = metadata_json.get("dose_per_frame")
532503
self._environment.gain_ref = metadata_json.get("gain_ref")
533504
self._environment.symmetry = metadata_json.get("symmetry")
@@ -601,82 +572,76 @@ def _start_dc(self, metadata_json, from_form: bool = False):
601572
environment=self._environment,
602573
token=self.token,
603574
)
604-
if from_form:
605-
data = {
606-
"voltage": metadata_json["voltage"],
607-
"pixel_size_on_image": metadata_json["pixel_size_on_image"],
608-
"experiment_type": metadata_json["experiment_type"],
609-
"image_size_x": metadata_json["image_size_x"],
610-
"image_size_y": metadata_json["image_size_y"],
611-
"file_extension": metadata_json["file_extension"],
612-
"acquisition_software": metadata_json["acquisition_software"],
613-
"image_directory": str(
614-
self._environment.default_destinations[source]
615-
),
616-
"tag": str(source),
617-
"source": str(source),
618-
"magnification": metadata_json["magnification"],
619-
"total_exposed_dose": metadata_json.get("total_exposed_dose"),
620-
"c2aperture": metadata_json.get("c2aperture"),
621-
"exposure_time": metadata_json.get("exposure_time"),
622-
"slit_width": metadata_json.get("slit_width"),
623-
"phase_plate": metadata_json.get("phase_plate", False),
624-
}
575+
data = {
576+
"voltage": metadata_json["voltage"],
577+
"pixel_size_on_image": metadata_json["pixel_size_on_image"],
578+
"experiment_type": metadata_json["experiment_type"],
579+
"image_size_x": metadata_json["image_size_x"],
580+
"image_size_y": metadata_json["image_size_y"],
581+
"file_extension": metadata_json["file_extension"],
582+
"acquisition_software": metadata_json["acquisition_software"],
583+
"image_directory": str(self._environment.default_destinations[source]),
584+
"tag": str(source),
585+
"source": str(source),
586+
"magnification": metadata_json["magnification"],
587+
"total_exposed_dose": metadata_json.get("total_exposed_dose"),
588+
"c2aperture": metadata_json.get("c2aperture"),
589+
"exposure_time": metadata_json.get("exposure_time"),
590+
"slit_width": metadata_json.get("slit_width"),
591+
"phase_plate": metadata_json.get("phase_plate", False),
592+
}
593+
capture_post(
594+
base_url=str(self._environment.url.geturl()),
595+
router_name="workflow.router",
596+
function_name="start_dc",
597+
token=self.token,
598+
visit_name=self._environment.visit,
599+
session_id=self.session_id,
600+
data=data,
601+
)
602+
for recipe in (
603+
"em-spa-preprocess",
604+
"em-spa-extract",
605+
"em-spa-class2d",
606+
"em-spa-class3d",
607+
"em-spa-refine",
608+
):
625609
capture_post(
626610
base_url=str(self._environment.url.geturl()),
627611
router_name="workflow.router",
628-
function_name="start_dc",
612+
function_name="register_proc",
629613
token=self.token,
630614
visit_name=self._environment.visit,
631615
session_id=self.session_id,
632-
data=data,
633-
)
634-
for recipe in (
635-
"em-spa-preprocess",
636-
"em-spa-extract",
637-
"em-spa-class2d",
638-
"em-spa-class3d",
639-
"em-spa-refine",
640-
):
641-
capture_post(
642-
base_url=str(self._environment.url.geturl()),
643-
router_name="workflow.router",
644-
function_name="register_proc",
645-
token=self.token,
646-
visit_name=self._environment.visit,
647-
session_id=self.session_id,
648-
data={
649-
"tag": str(source),
650-
"source": str(source),
651-
"recipe": recipe,
652-
},
653-
)
654-
log.info(f"Posting SPA processing parameters: {metadata_json}")
655-
response = capture_post(
656-
base_url=str(self._environment.url.geturl()),
657-
router_name="workflow.spa_router",
658-
function_name="register_spa_proc_params",
659-
token=self.token,
660-
session_id=self.session_id,
661616
data={
662-
**{
663-
k: None if v == "None" else v
664-
for k, v in metadata_json.items()
665-
},
666617
"tag": str(source),
618+
"source": str(source),
619+
"recipe": recipe,
667620
},
668621
)
669-
if response and not str(response.status_code).startswith("2"):
670-
log.warning(f"{response.reason}")
671-
capture_post(
672-
base_url=str(self._environment.url.geturl()),
673-
router_name="workflow.spa_router",
674-
function_name="flush_spa_processing",
675-
token=self.token,
676-
visit_name=self._environment.visit,
677-
session_id=self.session_id,
678-
data={"tag": str(source)},
679-
)
622+
log.info(f"Posting SPA processing parameters: {metadata_json}")
623+
response = capture_post(
624+
base_url=str(self._environment.url.geturl()),
625+
router_name="workflow.spa_router",
626+
function_name="register_spa_proc_params",
627+
token=self.token,
628+
session_id=self.session_id,
629+
data={
630+
**{k: None if v == "None" else v for k, v in metadata_json.items()},
631+
"tag": str(source),
632+
},
633+
)
634+
if response and not str(response.status_code).startswith("2"):
635+
log.warning(f"{response.reason}")
636+
capture_post(
637+
base_url=str(self._environment.url.geturl()),
638+
router_name="workflow.spa_router",
639+
function_name="flush_spa_processing",
640+
token=self.token,
641+
visit_name=self._environment.visit,
642+
session_id=self.session_id,
643+
data={"tag": str(source)},
644+
)
680645

681646
def _increment_file_count(
682647
self, observed_files: List[Path], source: str, destination: str

src/murfey/client/watchdir_multigrid.py

Lines changed: 4 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@ def __init__(
1717
self,
1818
path: str | os.PathLike,
1919
machine_config: dict,
20-
skip_existing_processing: bool = False,
2120
):
2221
super().__init__()
2322
self._basepath = Path(path)
@@ -30,7 +29,6 @@ def __init__(
3029
)
3130
# Toggleable settings
3231
self._analyse = True
33-
self._skip_existing_processing = skip_existing_processing
3432
self._stopping = False
3533

3634
def start(self):
@@ -61,21 +59,14 @@ def _handle_metadata(self, directory: Path, extra_directory: str):
6159
)
6260
self._seen_dirs.append(directory)
6361

64-
def _handle_fractions(self, directory: Path, first_loop: bool):
62+
def _handle_fractions(self, directory: Path):
6563
processing_started = False
6664
for d02 in directory.glob("Images-Disc*"):
6765
if d02 not in self._seen_dirs:
68-
# If 'skip_existing_processing' is set, do not process for
69-
# any data directories found on the first loop.
70-
# This allows you to avoid triggering processing again if Murfey is restarted
7166
self.notify(
7267
d02,
7368
remove_files=True,
74-
analyse=(
75-
not (first_loop and self._skip_existing_processing)
76-
if self._analyse
77-
else False
78-
),
69+
analyse=self._analyse,
7970
tag="fractions",
8071
)
8172
self._seen_dirs.append(d02)
@@ -88,17 +79,12 @@ def _handle_fractions(self, directory: Path, first_loop: bool):
8879
):
8980
self.notify(
9081
directory,
91-
analyse=(
92-
not (first_loop and self._skip_existing_processing)
93-
if self._analyse
94-
else False
95-
),
82+
analyse=self._analyse,
9683
tag="fractions",
9784
)
9885
self._seen_dirs.append(directory)
9986

10087
def _process(self):
101-
first_loop = True
10288
while not self._stopping:
10389
for d in self._basepath.glob("*"):
10490
if d.name in self._machine_config["create_directories"]:
@@ -133,18 +119,14 @@ def _process(self):
133119
self._handle_fractions(
134120
sample.parent.parent.parent
135121
/ f"{sample.parent.name}_{sample.name}",
136-
first_loop,
137122
)
138123

139124
else:
140125
if d.is_dir() and d not in self._seen_dirs:
141126
self._handle_metadata(
142127
d, extra_directory=f"metadata_{d.name}"
143128
)
144-
self._handle_fractions(d.parent.parent / d.name, first_loop)
145-
146-
if first_loop:
147-
first_loop = False
129+
self._handle_fractions(d.parent.parent / d.name)
148130
time.sleep(15)
149131

150132
self.notify(final=True)

src/murfey/instrument_server/api.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,6 @@ def setup_multigrid_watcher(
174174
session_id,
175175
murfey_url=_get_murfey_url(),
176176
do_transfer=True,
177-
processing_enabled=not watcher_spec.skip_existing_processing,
178177
_machine_config=machine_config,
179178
token=tokens.get(session_id, "token"),
180179
data_collection_parameters=data_collection_parameters.get(label, {}),
@@ -190,7 +189,6 @@ def setup_multigrid_watcher(
190189
watchers[session_id] = MultigridDirWatcher(
191190
watcher_spec.source,
192191
machine_config,
193-
skip_existing_processing=watcher_spec.skip_existing_processing,
194192
)
195193
watchers[session_id].subscribe(
196194
partial(

src/murfey/server/api/instrument.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,6 @@ async def setup_multigrid_watcher(
154154
"visit": visit,
155155
"label": visit,
156156
"instrument_name": instrument_name,
157-
"skip_existing_processing": watcher_spec.skip_existing_processing,
158157
"destination_overrides": {
159158
str(k): v for k, v in watcher_spec.destination_overrides.items()
160159
},

src/murfey/util/instrument_models.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ class MultigridWatcherSpec(BaseModel):
1010
label: str
1111
visit: str
1212
instrument_name: str
13-
skip_existing_processing: bool = False
1413
destination_overrides: Dict[Path, str] = {}
1514
rsync_restarts: List[str] = []
1615
visit_end_time: Optional[datetime] = None

src/murfey/util/models.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,6 @@ class BatchPositionParameters(BaseModel):
203203

204204
class MultigridWatcherSetup(BaseModel):
205205
source: Path
206-
skip_existing_processing: bool = False
207206
destination_overrides: Dict[Path, str] = {}
208207
rsync_restarts: List[str] = []
209208

0 commit comments

Comments
 (0)