Skip to content

Commit 8687bba

Browse files
authored
Enhanced functionality of cross-instrument file transfer API endpoints (#680)
* Moved common correlative workflow functions into 'murfey.server.api.shared' and updated function names in 'session_info' and 'session_control' * Renamed module 'shared' to 'session_shared' to reflect which modules import from it * Changed 'upstream_data_directories' field in MachineConfig into a dictionary, and updated functions that use it * Changed 'find_upstream_visits' to return a nested dictionary where visits are grouped by instrument name * Added new MachineConfig field 'upstream_data_search_strings' to store file search strings for different upstream instruments * Added FastAPI endpoints to instrument and backend servers to request for general file downloads from different upstream instruments * Renamed 'murfey.server.api.shared' to 'murfey.server.api.session_shared', since we have multiple modules named 'shared' * Added some unit tests for the upstream file searching functions
1 parent 528a1fe commit 8687bba

11 files changed

Lines changed: 639 additions & 121 deletions

File tree

src/murfey/client/tui/screens.py

Lines changed: 21 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -685,15 +685,23 @@ def on_button_pressed(self, event: Button.Pressed):
685685
self.app.push_screen("launcher")
686686

687687
if machine_data.get("upstream_data_directories"):
688-
upstream_downloads = capture_get(
688+
upstream_downloads: dict[str, dict[str, Path]] = capture_get(
689689
base_url=str(self.app._environment.url.geturl()),
690690
router_name="session_control.correlative_router",
691691
function_name="find_upstream_visits",
692692
token=token,
693693
session_id=self.app._environment.murfey_session,
694694
).json()
695+
# Pass flattened dict for backwards compatibility
695696
self.app.install_screen(
696-
UpstreamDownloads(upstream_downloads), "upstream-downloads"
697+
UpstreamDownloads(
698+
{
699+
visit_name: visit_dir
700+
for _, upstream_visits in upstream_downloads.items()
701+
for visit_name, visit_dir in upstream_visits.items()
702+
}
703+
),
704+
"upstream-downloads",
697705
)
698706
self.app.push_screen("upstream-downloads")
699707

@@ -759,15 +767,23 @@ def on_button_pressed(self, event: Button.Pressed):
759767
self.app.push_screen("directory-select")
760768

761769
if machine_data.get("upstream_data_directories"):
762-
upstream_downloads = capture_get(
770+
upstream_downloads: dict[str, dict[str, Path]] = capture_get(
763771
base_url=str(self.app._environment.url.geturl()),
764772
router_name="session_control.correlative_router",
765773
function_name="find_upstream_visits",
766774
token=token,
767775
session_id=self.app._environment.murfey_session,
768776
).json()
777+
# Pass a flattened dict for backwards compatibility
769778
self.app.install_screen(
770-
UpstreamDownloads(upstream_downloads), "upstream-downloads"
779+
UpstreamDownloads(
780+
{
781+
visit_name: visit_dir
782+
for _, upstream_visits in upstream_downloads.items()
783+
for visit_name, visit_dir in upstream_visits.items()
784+
}
785+
),
786+
"upstream-downloads",
771787
)
772788
self.app.push_screen("upstream-downloads")
773789

@@ -817,7 +833,7 @@ def on_button_pressed(self, event: Button.Pressed):
817833
stream_response = capture_get(
818834
base_url=str(self.app._environment.url.geturl()),
819835
router_name="session_control.correlative_router",
820-
function_name="get_tiff",
836+
function_name="get_tiff_file",
821837
token=token,
822838
visit_name=event.button.label,
823839
session_id=self.app._environment.murfey_session,

src/murfey/instrument_server/api.py

Lines changed: 81 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -469,13 +469,91 @@ def upload_gain_reference(
469469
return {"success": True}
470470

471471

472-
class UpstreamTiffInfo(BaseModel):
472+
class UpstreamFileDownloadInfo(BaseModel):
473+
download_dir: Path
474+
upstream_instrument: str
475+
upstream_visit_path: Path
476+
477+
478+
@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request")
479+
def gather_upstream_files(
480+
visit_name: str,
481+
session_id: MurfeySessionID,
482+
upstream_file_download: UpstreamFileDownloadInfo,
483+
):
484+
"""
485+
Instrument server endpoint that will query the backend for files in the chosen
486+
visit directory
487+
"""
488+
# Check for forbidden characters
489+
if any(c in visit_name for c in ("/", "\\", ":", ";")):
490+
logger.error(
491+
f"Forbidden characters are present in visit name {sanitise(visit_name)}"
492+
)
493+
return {
494+
"succss": False,
495+
"detail": "Forbidden characters present in visit name",
496+
}
497+
498+
# Sanitise inputs
499+
download_dir = secure_path(upstream_file_download.download_dir)
500+
upstream_instrument = sanitise(upstream_file_download.upstream_instrument)
501+
upstream_visit_path = secure_path(upstream_file_download.upstream_visit_path)
502+
503+
# Get the list of files to download
504+
murfey_url = urlparse(_get_murfey_url(), allow_fragments=False)
505+
sanitised_visit_name = sanitise_nonpath(visit_name)
506+
url_path = url_path_for(
507+
"session_control.correlative_router",
508+
"gather_upstream_files",
509+
session_id=session_id,
510+
visit_name=sanitised_visit_name,
511+
)
512+
upstream_files: list[str] = requests.get(
513+
f"{murfey_url.geturl()}{url_path}",
514+
headers={"Authorization": f"Bearer {tokens[session_id]}"},
515+
json={
516+
"upstream_instrument": upstream_instrument,
517+
"upstream_visit_path": str(upstream_visit_path),
518+
},
519+
).json()
520+
521+
# Make the download directory and download gathered files
522+
download_dir.mkdir(exist_ok=True)
523+
for upstream_file in upstream_files:
524+
url_path = url_path_for(
525+
"session_control.correlative_router",
526+
"get_upstream_file",
527+
session_id=session_id,
528+
visit_name=sanitised_visit_name,
529+
upstream_file_path=upstream_file,
530+
)
531+
file_data = requests.get(
532+
f"{murfey_url.geturl()}{url_path}",
533+
headers={"Authorization": f"Bearer {tokens[session_id]}"},
534+
stream=True,
535+
)
536+
upstream_file_relative_path = secure_path(
537+
Path(upstream_file).relative_to(upstream_visit_path)
538+
)
539+
save_file = download_dir / upstream_file_relative_path
540+
save_file.parent.mkdir(parents=True, exist_ok=True)
541+
with open(save_file, "wb") as f:
542+
for chunk in file_data.iter_content(chunk_size=32 * 1024**2):
543+
f.write(chunk)
544+
logger.info(f"Saved file to {str(save_file)!r}")
545+
return {"success": True}
546+
547+
548+
class UpstreamTiffDownloadInfo(BaseModel):
473549
download_dir: Path
474550

475551

476552
@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_tiff_data_request")
477553
def gather_upstream_tiffs(
478-
visit_name: str, session_id: MurfeySessionID, upstream_tiff_info: UpstreamTiffInfo
554+
visit_name: str,
555+
session_id: MurfeySessionID,
556+
upstream_tiff_info: UpstreamTiffDownloadInfo,
479557
):
480558
sanitised_visit_name = sanitise_nonpath(visit_name)
481559
assert not any(c in visit_name for c in ("/", "\\", ":", ";"))
@@ -490,7 +568,7 @@ def gather_upstream_tiffs(
490568
)
491569
for tiff_path in upstream_tiff_paths:
492570
tiff_data = requests.get(
493-
f"{murfey_url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff', session_id=session_id, visit_name=sanitised_visit_name, tiff_path=tiff_path)}",
571+
f"{murfey_url.geturl()}{url_path_for('session_control.correlative_router', 'get_tiff_file', session_id=session_id, visit_name=sanitised_visit_name, tiff_path=tiff_path)}",
494572
stream=True,
495573
headers={"Authorization": f"Bearer {tokens[session_id]}"},
496574
)

src/murfey/server/api/instrument.py

Lines changed: 59 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
from murfey.util.api import url_path_for
2828
from murfey.util.config import get_machine_config
2929
from murfey.util.db import RsyncInstance, Session, SessionProcessingParameters
30-
from murfey.util.models import File, MultigridWatcherSetup
30+
from murfey.util.models import File, MultigridWatcherSetup, UpstreamFileRequestInfo
3131

3232
# Create APIRouter class object
3333
router = APIRouter(
@@ -396,6 +396,64 @@ async def request_upstream_tiff_data_download(
396396
return data
397397

398398

399+
@router.post("/visits/{visit_name}/sessions/{session_id}/upstream_file_data_request")
400+
async def request_upstream_file_data_download(
401+
visit_name: str,
402+
session_id: MurfeySessionID,
403+
upstream_file_request: UpstreamFileRequestInfo,
404+
db=murfey_db,
405+
):
406+
"""
407+
Forwards a request to the instrument server to trigger a file download request.
408+
"""
409+
# Load the current instrument's machine config
410+
instrument_name = (
411+
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
412+
)
413+
machine_config = get_machine_config(instrument_name=instrument_name)[
414+
instrument_name
415+
]
416+
417+
# Log and return errors if download directory or URL weren't specified
418+
if not machine_config.upstream_data_download_directory:
419+
log.error("No download directory was configured for this instrument")
420+
return {
421+
"success": False,
422+
"detail": "No download directory configured",
423+
}
424+
if not machine_config.instrument_server_url:
425+
log.error("Couldn't find instrument server URL to post request to")
426+
return {
427+
"success": False,
428+
"detail": "No instrument server URL",
429+
}
430+
431+
# Forward the download request
432+
download_dir = str(
433+
machine_config.upstream_data_download_directory / secure_filename(visit_name)
434+
)
435+
async with aiohttp.ClientSession() as clientsession:
436+
url_path = url_path_for(
437+
"api.router",
438+
"gather_upstream_files",
439+
visit_name=secure_filename(visit_name),
440+
session_id=session_id,
441+
)
442+
async with clientsession.post(
443+
f"{machine_config.instrument_server_url}{url_path}",
444+
headers={
445+
"Authorization": f"Bearer {instrument_server_tokens[session_id]['access_token']}"
446+
},
447+
json={
448+
"download_dir": download_dir,
449+
"upstream_instrument": upstream_file_request.upstream_instrument,
450+
"upstream_visit_path": str(upstream_file_request.upstream_visit_path),
451+
},
452+
) as resp:
453+
data = await resp.json()
454+
return data
455+
456+
399457
class RsyncerSource(BaseModel):
400458
source: str
401459

src/murfey/server/api/session_control.py

Lines changed: 46 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -9,27 +9,30 @@
99
from pydantic import BaseModel
1010
from sqlalchemy import func
1111
from sqlmodel import select
12-
from werkzeug.utils import secure_filename
1312

1413
import murfey.server.prometheus as prom
1514
from murfey.server import _transport_object
1615
from murfey.server.api.auth import (
1716
MurfeySessionIDInstrument as MurfeySessionID,
1817
validate_instrument_token,
1918
)
20-
from murfey.server.api.shared import (
19+
from murfey.server.api.session_shared import (
20+
find_upstream_visits as _find_upstream_visits,
21+
gather_upstream_files as _gather_upstream_files,
22+
gather_upstream_tiffs as _gather_upstream_tiffs,
2123
get_foil_hole as _get_foil_hole,
2224
get_foil_holes_from_grid_square as _get_foil_holes_from_grid_square,
2325
get_grid_squares as _get_grid_squares,
2426
get_grid_squares_from_dcg as _get_grid_squares_from_dcg,
2527
get_machine_config_for_instrument,
26-
get_upstream_tiff_dirs,
28+
get_tiff_file as _get_tiff_file,
29+
get_upstream_file as _get_upstream_file,
2730
remove_session_by_id,
2831
)
2932
from murfey.server.ispyb import DB as ispyb_db, get_all_ongoing_visits
3033
from murfey.server.murfey_db import murfey_db
3134
from murfey.util import sanitise
32-
from murfey.util.config import MachineConfig, get_machine_config
35+
from murfey.util.config import MachineConfig
3336
from murfey.util.db import (
3437
AutoProcProgram,
3538
ClientEnvironment,
@@ -49,6 +52,7 @@
4952
GridSquareParameters,
5053
RsyncerInfo,
5154
SearchMapParameters,
55+
UpstreamFileRequestInfo,
5256
Visit,
5357
)
5458
from murfey.workflows.spa.atlas import atlas_jpg_from_mrc
@@ -418,62 +422,53 @@ def register_batch_position(
418422

419423
@correlative_router.get("/sessions/{session_id}/upstream_visits")
420424
async def find_upstream_visits(session_id: MurfeySessionID, db=murfey_db):
421-
murfey_session = db.exec(select(Session).where(Session.id == session_id)).one()
422-
visit_name = murfey_session.visit
423-
instrument_name = murfey_session.instrument_name
424-
machine_config = get_machine_config(instrument_name=instrument_name)[
425-
instrument_name
426-
]
427-
upstream_visits = {}
428-
# Iterates through provided upstream directories
429-
for p in machine_config.upstream_data_directories:
430-
# Looks for visit name in file path
431-
for v in Path(p).glob(f"{visit_name.split('-')[0]}-*"):
432-
upstream_visits[v.name] = v / machine_config.processed_directory_name
433-
return upstream_visits
425+
return _find_upstream_visits(session_id=session_id, db=db)
426+
427+
428+
@correlative_router.get(
429+
"/visits/{visit_name}/sessions/{session_id}/upstream_file_paths"
430+
)
431+
async def gather_upstream_files(
432+
visit_name: str,
433+
session_id: MurfeySessionID,
434+
upstream_file_request: UpstreamFileRequestInfo,
435+
db=murfey_db,
436+
):
437+
return _gather_upstream_files(
438+
session_id=session_id,
439+
upstream_instrument=upstream_file_request.upstream_instrument,
440+
upstream_visit_path=upstream_file_request.upstream_visit_path,
441+
db=db,
442+
)
443+
444+
445+
@correlative_router.get(
446+
"/visits/{visit_name}/sessions/{session_id}/upstream_file/{upstream_file_path:path}"
447+
)
448+
async def get_upstream_file(
449+
visit_name: str,
450+
session_id: MurfeySessionID,
451+
upstream_file_path: str,
452+
db=murfey_db,
453+
):
454+
upstream_file = _get_upstream_file(upstream_file_path)
455+
return (
456+
FileResponse(path=upstream_file) if upstream_file is not None else upstream_file
457+
)
434458

435459

436460
@correlative_router.get(
437461
"/visits/{visit_name}/sessions/{session_id}/upstream_tiff_paths"
438462
)
439463
async def gather_upstream_tiffs(visit_name: str, session_id: int, db=murfey_db):
440-
"""
441-
Looks for TIFF files associated with the current session in the permitted storage
442-
servers, and returns their relative file paths as a list.
443-
"""
444-
instrument_name = (
445-
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
446-
)
447-
upstream_tiff_paths = []
448-
tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name)
449-
if not tiff_dirs:
450-
return None
451-
for tiff_dir in tiff_dirs:
452-
for f in tiff_dir.glob("**/*.tiff"):
453-
upstream_tiff_paths.append(str(f.relative_to(tiff_dir)))
454-
for f in tiff_dir.glob("**/*.tif"):
455-
upstream_tiff_paths.append(str(f.relative_to(tiff_dir)))
456-
return upstream_tiff_paths
464+
return _gather_upstream_tiffs(visit_name=visit_name, session_id=session_id, db=db)
457465

458466

459467
@correlative_router.get(
460468
"/visits/{visit_name}/sessions/{session_id}/upstream_tiff/{tiff_path:path}"
461469
)
462-
async def get_tiff(visit_name: str, session_id: int, tiff_path: str, db=murfey_db):
463-
instrument_name = (
464-
db.exec(select(Session).where(Session.id == session_id)).one().instrument_name
470+
async def get_tiff_file(visit_name: str, session_id: int, tiff_path: str, db=murfey_db):
471+
tiff_file = _get_tiff_file(
472+
visit_name=visit_name, session_id=session_id, tiff_path=tiff_path, db=db
465473
)
466-
tiff_dirs = get_upstream_tiff_dirs(visit_name, instrument_name)
467-
if not tiff_dirs:
468-
return None
469-
470-
tiff_path = "/".join(secure_filename(p) for p in tiff_path.split("/"))
471-
for tiff_dir in tiff_dirs:
472-
test_path = tiff_dir / tiff_path
473-
if test_path.is_file():
474-
break
475-
else:
476-
logger.warning(f"TIFF {tiff_path} not found")
477-
return None
478-
479-
return FileResponse(path=test_path)
474+
return FileResponse(path=tiff_file) if tiff_file is not None else tiff_file

0 commit comments

Comments
 (0)