From 46c12bd16039fe82dee7586d48051300b52a65bf Mon Sep 17 00:00:00 2001 From: Bryon Lewis Date: Fri, 1 May 2026 15:52:25 -0400 Subject: [PATCH] S3 asset store importing for girder 5 --- server/S3_IMPORT_EXPLAINER.md | 110 ++++++++++++++++++++++ server/bucket_notifications/views.py | 13 +-- server/dive_server/__init__.py | 25 +---- server/dive_server/event.py | 83 +++++++++------- server/dive_tasks/__init__.py | 7 +- server/dive_tasks/local_tasks.py | 62 ++++++++++++ server/dive_tasks/worker_girder_events.py | 46 +++++++++ 7 files changed, 280 insertions(+), 66 deletions(-) create mode 100644 server/S3_IMPORT_EXPLAINER.md create mode 100644 server/dive_tasks/local_tasks.py create mode 100644 server/dive_tasks/worker_girder_events.py diff --git a/server/S3_IMPORT_EXPLAINER.md b/server/S3_IMPORT_EXPLAINER.md new file mode 100644 index 000000000..28a757571 --- /dev/null +++ b/server/S3_IMPORT_EXPLAINER.md @@ -0,0 +1,110 @@ +# S3 / assetstore import behavior: Girder 3 vs Girder 5 (and DIVE) + +This document explains why DIVE’s assetstore import integration had to change when moving to **Girder 5**, and how the current design works. It is aimed at maintainers debugging imports, batch postprocess, or bucket notifications. + +## Girder 3 (older): import ran in the web server process + +In typical Girder 3 deployments, **importing existing data from an S3-backed assetstore** was driven from the **same process that served the Girder API** (CherryPy / WSGI). Roughly: + +1. A client called the assetstore import REST endpoint. +2. The request handler eventually called **`Assetstore().importData(...)`** in that process. +3. The S3 adapter listed keys and created items/files, firing events such as **`s3_assetstore_imported`** for each object. +4. Girder **plugins** (including DIVE’s `dive_server`) were loaded in that same process, so any **`events.bind(...)`** in `GirderPlugin.load()` saw those events. + +So for DIVE on Girder 3, it was enough to register handlers in **`dive_server`** for: + +- **`s3_assetstore_imported`** / **`filesystem_assetstore_imported`** (per-item metadata, folder typing, etc.) +- Optional REST hooks around **`rest.post.assetstore/:id/import`** if you needed request-scoped behavior + +Everything ran **co-located** with the plugin loader. + +## Girder 5: import runs on a Celery worker (`local` queue) + +In **Girder 5**, the assetstore import REST handler **does not** call `Assetstore().importData()` in the web process. It enqueues a Celery task, e.g. **`importDataTask`**, on the **`local`** queue (see Girder’s `girder/api/v1/assetstore.py` and `girder/tasks.py`). + +Implications: + +| Concern | Girder 3 (typical) | Girder 5 | +|--------|---------------------|----------| +| Where `importData` runs | Web server process | **Celery worker** (`local`) | +| Where `s3_assetstore_imported` fires | Same process as `dive_server` | **Worker process** (no `dive_server` plugin load) | +| REST `import` handler duration | Could be long (sync work) | Short (enqueue + return) | +| When `rest.post...import.after` runs | After import if sync | **Right after enqueue**, before import finishes | + +So **plugin `events.bind` in `dive_server` only applies to the Girder web process**. It does **not** run inside the Celery worker that executes `importDataTask`. Handlers bound only in `dive_server` **never see** per-file import events for the normal admin import path. + +That is the core regression class: **same event names, different process**. + +## REST `import.before` / `import.after` are the wrong lifecycle for async import + +Girder still wraps REST routes with events like: + +- `rest.post.assetstore/:id/import.before` +- `rest.post.assetstore/:id/import.after` + +For Girder 5’s async import, **`.after` fires when the HTTP handler returns**, i.e. after **`importDataTask.delay(...)`**, not after the worker finishes importing. Any “post-import” work tied only to REST `.after` runs **too early** (empty or partial tree). + +Model-level events are a better match for “import finished”: + +- **`assetstore_import.after`** — triggered at the end of **`Assetstore().importData`** in **`girder/models/assetstore.py`**, in **whichever process** ran import (here: the worker). + +DIVE’s post-import steps (dangling annotations, batch postprocess kickoff) are wired to **`assetstore_import.after`** on the worker, not to REST `.after`. + +## Why `dive_tasks/worker_girder_events.py` exists + +**`girder_worker`** discovers task modules via entry points (`girder_worker_plugins`); it does **not** load full Girder server plugins. + +DIVE registers a small bootstrap module that runs when the worker starts: + +- Binds **`s3_assetstore_imported`**, **`filesystem_assetstore_imported`**, **`assetstore_import.after`** to the same functions as before (`dive_server/event.py`). +- Binds **`jobs.schedule`** → **`scheduleLocal`** from **`girder_jobs`**, because **`Job().scheduleJob`** only *emits an event*; without the Jobs plugin loaded in the worker, **local jobs would never start** if something called `scheduleJob` from a task. + +This file is loaded early via **`dive_tasks/__init__.py`** → **`task_imports`**. + +## Batch postprocess: local job + daemon thread vs Celery task + +Historically DIVE used **`Job().createLocalJob`** + **`Job().scheduleJob`**, with an entrypoint that starts a **daemon thread** (same pattern as **Slicer CLI Web** batch jobs on the **web server**). + +Problems when the **parent import** already runs on a **`local` Celery worker**: + +1. **`scheduleLocal`** was only guaranteed on the **web** process unless also bound on the worker (see above). +2. Even with **`scheduleLocal`**, starting a **daemon thread** at the tail of **`importDataTask`** is fragile: the import task returns, worker lifecycle / threading makes it easy for the job to **never reach RUNNING** or for work to be cut off. + +The robust fix is to enqueue **`run_batch_postprocess_job`** on the **`local`** queue (**`dive_tasks/local_tasks.py`**): a normal Celery task loads the Girder job document and runs **`batch_postprocess_task`** synchronously inside that task. + +## Bucket notifications (GCS) and `force_recursive` + +**`bucket_notifications`** previously called **`Assetstore().importData(..., force_recursive=False)`** **inside the web handler**. That is correct for **incremental** object notifications (avoid re-walking huge subtrees on every push) but **blocks** the HTTP thread and risks Pub/Sub timeouts. + +Girder’s stock **`importDataTask`** does **not** pass **`force_recursive`**; the S3 adapter defaults to **`force_recursive=True`**, which would **change semantics** for incremental notifications. + +DIVE therefore uses **`import_assetstore_path_async`** (**`local_tasks.py`**), which forwards **`force_recursive=False`** (and enqueues on **`local`** so the push handler returns quickly). + +## What was removed from `dive_server` and why + +With **all** DIVE-driven **`importData`** paths going through **`local`** workers (admin **`importDataTask`** + **`import_assetstore_path_async`**), events fire only there. Duplicate **`events.bind`** in **`dive_server/__init__.py`** for assetstore imports was redundant and confusing, so it was removed. **`model.user.save.created`** and the rest of the plugin are unchanged. + +If you add **new** code that calls **`Assetstore().importData`** **in the Girder web process**, you must either: + +- Route that work through **`import_assetstore_path_async`** (or another **`local`** task), **or** +- Re-bind the assetstore handlers in **`dive_server`** for that path. + +## File map (quick reference) + +| Piece | Role | +|-------|------| +| `dive_tasks/worker_girder_events.py` | Register Girder **`events.bind`** in Celery worker processes | +| `dive_tasks/local_tasks.py` | **`local`** queue: batch postprocess job runner, bucket **`importData`** with **`force_recursive`** | +| `dive_server/event.py` | **`process_s3_import`**, **`process_fs_import`**, **`run_post_assetstore_import`** (handlers; bound from worker) | +| `bucket_notifications/views.py` | Enqueues **`import_assetstore_path_async.delay(...)`** | +| `dive_server/__init__.py` | No assetstore import binds (by design); plugins + routes only | + +## Summary + +- **Girder 5** moved heavy **`importData`** work off the web server onto **`importDataTask`** (**`local`** queue). +- **Server plugins** do not load in that worker, so **worker-side event registration** is required for DIVE metadata and post-import behavior. +- **REST import `.after`** is not a substitute for **`assetstore_import.after`** when import is async. +- **Batch postprocess** should run as a **dedicated `local` Celery task**, not only as a daemon thread off **`importDataTask`**. +- **Bucket notifications** should enqueue import on **`local`** and preserve **`force_recursive=False`**, not use raw **`importDataTask`**. + +This set of changes aligns DIVE with Girder 5’s process model while preserving the same product behavior as on Girder 3. diff --git a/server/bucket_notifications/views.py b/server/bucket_notifications/views.py index 83d741dd5..0e7fa670b 100644 --- a/server/bucket_notifications/views.py +++ b/server/bucket_notifications/views.py @@ -11,6 +11,7 @@ from girder.models.folder import Folder from girder.models.user import User +from dive_tasks.local_tasks import import_assetstore_path_async from dive_utils.types import AssetstoreModel, GirderModel from .constants import AssetstoreRuleMarker @@ -73,14 +74,14 @@ def processNotification(store: AssetstoreModel, rootFolder: GirderModel, importP # All the chain of parent directories exist realImportPath = importPath - Assetstore().importData( - store, - target, + import_assetstore_path_async.delay( + str(store['_id']), + str(target['_id']), 'folder', - {'importPath': realImportPath}, - None, - owner, + realImportPath, + str(owner['_id']), force_recursive=False, + leaf_folders_as_items=False, ) @access.admin diff --git a/server/dive_server/__init__.py b/server/dive_server/__init__.py index f6b69c460..37fa0fc58 100644 --- a/server/dive_server/__init__.py +++ b/server/dive_server/__init__.py @@ -14,7 +14,7 @@ from dive_utils import constants from .crud_annotation import GroupItem, RevisionLogItem, TrackItem -from .event import DIVES3Imports, process_fs_import, process_s3_import, send_new_user_email +from .event import send_new_user_email from .views_annotation import AnnotationResource from .views_configuration import ConfigurationResource from .views_dataset import DatasetResource @@ -69,29 +69,6 @@ def load(self, info): }, } info['serverRoot'].mount(None, '', conf) - - diveS3Import = DIVES3Imports() - events.bind( - "rest.post.assetstore/:id/import.before", - "process_s3_import_before", - diveS3Import.process_s3_import_before, - ) - - events.bind( - "rest.post.assetstore/:id/import.after", - "process_s3_import_after", - diveS3Import.process_s3_import_after, - ) - events.bind( - "filesystem_assetstore_imported", - "process_fs_import", - process_fs_import, - ) - events.bind( - "s3_assetstore_imported", - "process_s3_import", - process_s3_import, - ) events.bind( 'model.user.save.created', 'send_new_user_email', diff --git a/server/dive_server/event.py b/server/dive_server/event.py index c0c1a8ebd..fabb02507 100644 --- a/server/dive_server/event.py +++ b/server/dive_server/event.py @@ -1,11 +1,10 @@ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import logging import os from bson.objectid import ObjectId import cherrypy from girder.api.rest import getApiUrl -from girder.models.collection import Collection from girder.models.folder import Folder from girder.models.item import Item from girder.models.setting import Setting @@ -82,8 +81,13 @@ def process_assetstore_import(event, meta: dict): foldername = base_name # resuse existing folder if it already exists with same name dest = Folder().createFolder(parentFolder, foldername, creator=user, reuseExisting=True) - now = datetime.now() - if now - dest['created'] > timedelta(hours=1): + created = dest['created'] + if getattr(created, 'tzinfo', None) is None: + created = created.replace(tzinfo=timezone.utc) + else: + created = created.astimezone(timezone.utc) + now = datetime.now(timezone.utc) + if now - created > timedelta(hours=1): # Remove the old referenced item, replace it with the new one. oldItem = Item().findOne({'folderId': dest['_id'], 'name': item['name']}) if oldItem is not None: @@ -180,6 +184,14 @@ def process_dangling_annotation_files(folder, user): process_dangling_annotation_files(child, user) +def _job_cherrypy_callback_url() -> str: + """REST handlers have a CherryPy request; Celery workers do not.""" + try: + return cherrypy.url() + except Exception: + return getWorkerApiUrl() + + def convert_video_recursive(folder, user): token = Token().createToken(user=user, days=2) @@ -198,43 +210,44 @@ def convert_video_recursive(folder, user): job = Job().createLocalJob( module='dive_tasks.dive_batch_postprocess', function='batchPostProcessingTaskLauncher', - kwargs={'params': dive_batch_postprocess_task_params, 'url': cherrypy.url()}, + kwargs={'params': dive_batch_postprocess_task_params, 'url': _job_cherrypy_callback_url()}, title='Batch process Dive Batch Postprocess', type='DIVE Batch Postprocess', user=user, public=True, asynchronous=True, ) - job = Job().save(job) - Job().scheduleJob(job) - - -class DIVES3Imports: - destinationId = None - destinationType = None - - def process_s3_import_before(self, event): - self.destinationId = event.info.get('params', {}).get('destinationId') - self.destinationType = event.info.get('params', {}).get('destinationType') - - def process_s3_import_after(self, event): - if self.destinationType == 'folder' and self.destinationId is not None: - # go through all sub folders and add a new script to convert - destinationFolder = Folder().findOne({"_id": ObjectId(self.destinationId)}) - userId = destinationFolder['creatorId'] or destinationFolder['baseParentId'] - user = User().findOne({'_id': ObjectId(userId)}) - process_dangling_annotation_files(destinationFolder, user) - convert_video_recursive(destinationFolder, user) - if self.destinationType == 'collection' and self.destinationId is not None: - destinationCollection = Collection().findOne({"_id": ObjectId(self.destinationId)}) - userId = destinationCollection['creatorId'] or destinationCollection['baseParentId'] - user = User().findOne({'_id': ObjectId(userId)}) - child_folders = Folder().find({'parentId': ObjectId(self.destinationId)}) - for child in child_folders: - process_dangling_annotation_files(child, user) - convert_video_recursive(child, user) - self.destinationId = None - self.destinationType = None + # Run on the ``local`` Celery queue instead of scheduleLocal + daemon thread. + # importDataTask completes right after this; a daemon thread is often killed or + # never updates the job, so the document stays INACTIVE. + from dive_tasks.local_tasks import run_batch_postprocess_job + + run_batch_postprocess_job.delay(str(job['_id'])) + + +def run_post_assetstore_import(event): + """ + Run after Assetstore.importData completes. + + Bound on Celery ``local`` workers via ``dive_tasks.worker_girder_events``. + Admin and bucket-notification imports both run import in that process, not on Girder web. + """ + info = event.info + parent = info.get('parent') + parentType = info.get('parentType') + user = info.get('user') + if not parent or not parentType or not user: + return + userId = parent['creatorId'] or parent['baseParentId'] + owner = User().findOne({'_id': ObjectId(userId)}) + if parentType == 'folder': + process_dangling_annotation_files(parent, owner) + convert_video_recursive(parent, owner) + elif parentType in ('collection', 'user'): + child_folders = Folder().find({'parentId': parent['_id']}) + for child in child_folders: + process_dangling_annotation_files(child, owner) + convert_video_recursive(child, owner) def process_fs_import(event): diff --git a/server/dive_tasks/__init__.py b/server/dive_tasks/__init__.py index 761a91a48..48e53f38f 100644 --- a/server/dive_tasks/__init__.py +++ b/server/dive_tasks/__init__.py @@ -12,4 +12,9 @@ def __init__(self, app, *args, **kwargs): def task_imports(self): # Return a list of python importable paths to the # plugin's path directory - return ["dive_tasks.tasks"] + # worker_girder_events first: bind Girder handlers before any task module loads. + return [ + 'dive_tasks.worker_girder_events', + 'dive_tasks.tasks', + 'dive_tasks.local_tasks', + ] diff --git a/server/dive_tasks/local_tasks.py b/server/dive_tasks/local_tasks.py new file mode 100644 index 000000000..b60e58842 --- /dev/null +++ b/server/dive_tasks/local_tasks.py @@ -0,0 +1,62 @@ +""" +Celery tasks bound to the ``local`` queue (same queue as Girder's ``importDataTask``). + +Kept separate from ``tasks.py`` so the main worker task module stays focused on +GPU/default-queue work. +""" + +from girder_worker.app import app + + +@app.task(queue='local', acks_late=True, ignore_result=True) +def run_batch_postprocess_job(job_id: str): + """ + Run DIVE batch postprocess for an existing Girder job document. + + Scheduled on ``local`` so this runs in a normal Celery task instead of a + daemon thread spawned from ``scheduleLocal``. A thread started when the + import task is finishing is unreliable and often leaves the parent job + stuck in INACTIVE. + """ + from girder_jobs.models.job import Job + + from dive_tasks.dive_batch_postprocess import batch_postprocess_task + + job = Job().load(job_id, force=True) + batch_postprocess_task(job) + + +@app.task(queue='local', acks_late=True, ignore_result=True) +def import_assetstore_path_async( + assetstore_id: str, + parent_id: str, + parent_type: str, + import_path: str, + user_id: str, + *, + force_recursive: bool = False, + leaf_folders_as_items: bool = False, +): + """ + Run ``Assetstore.importData`` on the ``local`` queue (e.g. GCS bucket notifications). + + Keeps ``force_recursive=False`` for incremental object notifications; Girder's + ``importDataTask`` does not forward that flag and defaults to full recursion. + """ + from girder.models.assetstore import Assetstore + from girder.models.user import User + from girder.utility.model_importer import ModelImporter + + user = User().load(user_id, force=True) + assetstore = Assetstore().load(assetstore_id) + parent = ModelImporter.model(parent_type).load(parent_id, force=True) + Assetstore().importData( + assetstore, + parent, + parent_type, + {'importPath': import_path}, + None, + user, + force_recursive=force_recursive, + leafFoldersAsItems=leaf_folders_as_items, + ) diff --git a/server/dive_tasks/worker_girder_events.py b/server/dive_tasks/worker_girder_events.py new file mode 100644 index 000000000..cd30e890f --- /dev/null +++ b/server/dive_tasks/worker_girder_events.py @@ -0,0 +1,46 @@ +""" +Register Girder model events on the Celery worker. + +Assetstore imports run in ``importDataTask`` and ``import_assetstore_path_async`` (queue +``local``). Those processes do not load Girder server plugins, so per-file and post-import +handlers are registered here instead of in ``dive_server``. + +``run_post_assetstore_import`` schedules local Girder jobs (e.g. batch postprocess). Those +jobs only leave ``INACTIVE`` when ``jobs.schedule`` is handled; the Jobs plugin registers +that on the web server, so we bind the same handler here. +""" + +from girder import events +from girder_jobs import scheduleLocal + +from dive_server.event import ( + process_fs_import, + process_s3_import, + run_post_assetstore_import, +) + + +def _register(): + events.bind( + 's3_assetstore_imported', + 'dive_worker_s3_assetstore_imported', + process_s3_import, + ) + events.bind( + 'filesystem_assetstore_imported', + 'dive_worker_filesystem_assetstore_imported', + process_fs_import, + ) + events.bind( + 'assetstore_import.after', + 'dive_worker_assetstore_import_after', + run_post_assetstore_import, + ) + events.bind( + 'jobs.schedule', + 'dive_worker_jobs_schedule_local', + scheduleLocal, + ) + + +_register()