Skip to content
14 changes: 12 additions & 2 deletions src/murfey/client/analyser.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from murfey.client.contexts.clem import CLEMContext
from murfey.client.contexts.spa import SPAModularContext
from murfey.client.contexts.spa_metadata import SPAMetadataContext
from murfey.client.contexts.sxt import SXTContext
from murfey.client.contexts.tomo import TomographyContext
from murfey.client.contexts.tomo_metadata import TomographyMetadataContext
from murfey.client.destinations import find_longest_data_directory
Expand Down Expand Up @@ -110,8 +111,8 @@ def _find_extension(self, file_path: Path) -> bool:
if subframe_path := mdoc_data_block.get("SubFramePath"):
self._extension = Path(subframe_path).suffix
return True
# Check for LIF files separately
elif file_path.suffix == ".lif":
# Check for LIF files and TXRM files separately
elif file_path.suffix == ".lif" or file_path.suffix == ".txrm":
self._extension = file_path.suffix
return True
return False
Expand All @@ -138,6 +139,11 @@ def _find_context(self, file_path: Path) -> bool:
self._context = CLEMContext("leica", self._basepath, self._token)
return True

# SXT workflow checks
if file_path.suffix == ".txrm":
self._context = SXTContext("zeiss", self._basepath, self._token)
return True

# Tomography and SPA workflow checks
if "atlas" in file_path.parts:
self._context = AtlasContext(
Expand Down Expand Up @@ -321,6 +327,10 @@ def _analyse(self):
)
self.post_transfer(transferred_file)

elif isinstance(self._context, SXTContext):
logger.debug(f"File {transferred_file.name!r} is an SXT file")
self.post_transfer(transferred_file)

elif isinstance(self._context, AtlasContext):
logger.debug(f"File {transferred_file.name!r} is part of the atlas")
self.post_transfer(transferred_file)
Expand Down
38 changes: 38 additions & 0 deletions src/murfey/client/context.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,44 @@
logger = logging.getLogger("murfey.client.context")


def _file_transferred_to(
environment: MurfeyInstanceEnvironment, source: Path, file_path: Path, token: str
):
machine_config = get_machine_config_client(
str(environment.url.geturl()),
token,
instrument_name=environment.instrument_name,
)
if environment.visit in environment.default_destinations[source]:
return (
Path(machine_config.get("rsync_basepath", ""))
/ Path(environment.default_destinations[source])
/ file_path.relative_to(source) # need to strip out the rsync_module name
)
return (
Path(machine_config.get("rsync_basepath", ""))
/ Path(environment.default_destinations[source])
/ environment.visit
/ file_path.relative_to(source)
)


def _get_source(file_path: Path, environment: MurfeyInstanceEnvironment) -> Path | None:
possible_sources = []
for s in environment.sources:
if file_path.is_relative_to(s):
possible_sources.append(s)
if not possible_sources:
return None
elif len(possible_sources) == 1:
return possible_sources[0]
source = possible_sources[0]
for extra_source in possible_sources[1:]:
if extra_source.is_relative_to(source):
source = extra_source
return source


def _atlas_destination(
environment: MurfeyInstanceEnvironment, source: Path, token: str
) -> Path:
Expand Down
3 changes: 1 addition & 2 deletions src/murfey/client/contexts/atlas.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,7 @@

import xmltodict

from murfey.client.context import Context, _atlas_destination
from murfey.client.contexts.spa import _get_source
from murfey.client.context import Context, _atlas_destination, _get_source
from murfey.client.instance_environment import MurfeyInstanceEnvironment
from murfey.util.client import capture_post

Expand Down
45 changes: 6 additions & 39 deletions src/murfey/client/contexts/spa.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,12 @@

import xmltodict

from murfey.client.context import Context, ProcessingParameter
from murfey.client.context import (
Context,
ProcessingParameter,
_file_transferred_to,
_get_source,
)
from murfey.client.destinations import find_longest_data_directory
from murfey.client.instance_environment import (
MovieTracker,
Expand All @@ -26,28 +31,6 @@
logger = logging.getLogger("murfey.client.contexts.spa")


def _file_transferred_to(
environment: MurfeyInstanceEnvironment, source: Path, file_path: Path, token: str
):
machine_config = get_machine_config_client(
str(environment.url.geturl()),
token,
instrument_name=environment.instrument_name,
)
if environment.visit in environment.default_destinations[source]:
return (
Path(machine_config.get("rsync_basepath", ""))
/ Path(environment.default_destinations[source])
/ file_path.relative_to(source) # need to strip out the rsync_module name
)
return (
Path(machine_config.get("rsync_basepath", ""))
/ Path(environment.default_destinations[source])
/ environment.visit
/ file_path.relative_to(source)
)


def _grid_square_metadata_file(
f: Path, data_directories: list[Path], visit: str, grid_square: int
) -> Path:
Expand All @@ -66,22 +49,6 @@ def _grid_square_metadata_file(
return metadata_file


def _get_source(file_path: Path, environment: MurfeyInstanceEnvironment) -> Path | None:
possible_sources = []
for s in environment.sources:
if file_path.is_relative_to(s):
possible_sources.append(s)
if not possible_sources:
return None
elif len(possible_sources) == 1:
return possible_sources[0]
source = possible_sources[0]
for extra_source in possible_sources[1:]:
if extra_source.is_relative_to(source):
source = extra_source
return source


def _get_xml_list_index(key: str, xml_list: list) -> int:
for i, elem in enumerate(xml_list):
if elem["a:Key"] == key:
Expand Down
8 changes: 6 additions & 2 deletions src/murfey/client/contexts/spa_metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@

import xmltodict

from murfey.client.context import Context, ensure_dcg_exists
from murfey.client.contexts.spa import _file_transferred_to, _get_source
from murfey.client.context import (
Context,
_file_transferred_to,
_get_source,
ensure_dcg_exists,
)
from murfey.client.instance_environment import MurfeyInstanceEnvironment
from murfey.util.client import capture_post
from murfey.util.spa_metadata import (
Expand Down
218 changes: 218 additions & 0 deletions src/murfey/client/contexts/sxt.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
import logging
from pathlib import Path
from typing import Any

from txrm2tiff.inspector import Inspector
from txrm2tiff.txrm import open_txrm
from txrm2tiff.txrm_functions.general import read_stream
from txrm2tiff.xradia_properties.enums import XrmDataTypes

from murfey.client.context import (
Context,
_file_transferred_to,
_get_source,
ensure_dcg_exists,
)
from murfey.client.instance_environment import MurfeyInstanceEnvironment
from murfey.util.client import capture_post
from murfey.util.tomo import midpoint

logger = logging.getLogger("murfey.client.contexts.sxt")


class SXTContext(Context):
def __init__(self, acquisition_software: str, basepath: Path, token: str):
super().__init__("SXT", acquisition_software, token)
self._basepath = basepath

def register_sxt_data_collection(
self,
tilt_series: str,
data_collection_parameters: dict,
file_extension: str,
image_directory: str | Path,
environment: MurfeyInstanceEnvironment | None = None,
):
if not environment:
logger.error(
"No environment passed to register tomography data collections"
)
return
try:
metadata_source = (
self._basepath.parent / environment.visit / self._basepath.name
)
ensure_dcg_exists(
collection_type="sxt",
metadata_source=metadata_source,
environment=environment,
token=self._token,
)

dc_data: dict[str, Any] = {
"experiment_type": "sxt",
"file_extension": file_extension,
"acquisition_software": self._acquisition_software,
"image_directory": str(image_directory),
"data_collection_tag": tilt_series,
"source": str(self._basepath),
"tag": tilt_series,
}

# Once mdoc parameters are known register processing jobs
dc_data.update(
{
"pixel_size_on_image": data_collection_parameters.get(
"pixel_size_on_image"
),
"image_size_x": data_collection_parameters.get("image_size_x"),
"image_size_y": data_collection_parameters.get("image_size_y"),
"magnification": data_collection_parameters.get("magnification"),
}
)
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow.router",
function_name="start_dc",
token=self._token,
visit_name=environment.visit,
session_id=environment.murfey_session,
data=dc_data,
)

recipes_to_assign_pjids = [
"sxt-tomo-align",
]
for recipe in recipes_to_assign_pjids:
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow.router",
function_name="register_proc",
token=self._token,
visit_name=environment.visit,
session_id=environment.murfey_session,
data={
"tag": tilt_series,
"source": str(self._basepath),
"recipe": recipe,
"experiment_type": "sxt",
},
)
except Exception as e:
logger.error(f"ERROR {e}, {data_collection_parameters}", exc_info=True)

def post_transfer(
self,
transferred_file: Path,
environment: MurfeyInstanceEnvironment | None = None,
**kwargs,
) -> bool:
super().post_transfer(
transferred_file=transferred_file,
environment=environment,
**kwargs,
)

data_suffixes = [".txrm"]

if transferred_file.suffix in data_suffixes and environment:
source = _get_source(transferred_file, environment)
if not source:
logger.warning(f"No source found for file {transferred_file}")
return False

# Read the tilt angles and pixel size from the txrm
metadata = {
"source": str(self._basepath),
"tilt_series_tag": transferred_file.stem,
}
with open_txrm(
transferred_file, load_images=False, load_reference=False, strict=False
) as txrm:
inspector = Inspector(txrm)
angles = read_stream(
inspector.txrm.ole,
"ImageInfo/Angles",
XrmDataTypes.XRM_FLOAT,
strict=True,
)
metadata["minimum_angle"] = min(angles)
metadata["maximum_angle"] = max(angles)
metadata["pixel_size_microns"] = read_stream(
inspector.txrm.ole,
"ImageInfo/PixelSize",
XrmDataTypes.XRM_FLOAT,
strict=True,
)[0]
metadata["image_size_x"] = read_stream(
inspector.txrm.ole,
"ImageInfo/ImageWidth",
XrmDataTypes.XRM_INT,
strict=True,
)[0]
metadata["image_size_y"] = read_stream(
inspector.txrm.ole,
"ImageInfo/ImageHeight",
XrmDataTypes.XRM_INT,
strict=True,
)[0]
metadata["exposure_time"] = read_stream(
inspector.txrm.ole,
"ImageInfo/ExpTime",
XrmDataTypes.XRM_FLOAT,
strict=True,
)
metadata["magnification"] = read_stream(
inspector.txrm.ole,
"ImageInfo/XrayMagnification",
XrmDataTypes.XRM_FLOAT,
strict=True,
)
metadata["tilt_count"] = read_stream(
inspector.txrm.ole,
"ImageInfo/ImagesTaken",
XrmDataTypes.XRM_INT,
strict=True,
)[0]

self.register_sxt_data_collection(
tilt_series=transferred_file.stem,
data_collection_parameters=metadata,
file_extension=transferred_file.suffix,
image_directory=environment.default_destinations.get(
transferred_file.parent, transferred_file.parent
),
environment=environment,
)

logger.info(
f"The following tilt series will be processed: {transferred_file.stem}"
)
file_transferred_to = _file_transferred_to(
environment, source, transferred_file, self._token
)
capture_post(
base_url=str(environment.url.geturl()),
router_name="workflow.sxt_router",
function_name="process_sxt_tilt_series",
token=self._token,
visit_name=environment.visit,
session_id=environment.murfey_session,
data={
"session_id": environment.murfey_session,
"tag": transferred_file.stem,
"source": str(transferred_file.parent),
"pixel_size": metadata.get("pixel_size", 100),
"tilt_offset": midpoint(angles),
"txrm": str(file_transferred_to),
},
)
return True

def post_first_transfer(
self,
transferred_file: Path,
environment: MurfeyInstanceEnvironment | None = None,
**kwargs,
):
self.post_transfer(transferred_file, environment=environment, **kwargs)
Loading
Loading