Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 110 additions & 0 deletions server/S3_IMPORT_EXPLAINER.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
# S3 / assetstore import behavior: Girder 3 vs Girder 5 (and DIVE)

This document explains why DIVE’s assetstore import integration had to change when moving to **Girder 5**, and how the current design works. It is aimed at maintainers debugging imports, batch postprocess, or bucket notifications.

## Girder 3 (older): import ran in the web server process

In typical Girder 3 deployments, **importing existing data from an S3-backed assetstore** was driven from the **same process that served the Girder API** (CherryPy / WSGI). Roughly:

1. A client called the assetstore import REST endpoint.
2. The request handler eventually called **`Assetstore().importData(...)`** in that process.
3. The S3 adapter listed keys and created items/files, firing events such as **`s3_assetstore_imported`** for each object.
4. Girder **plugins** (including DIVE’s `dive_server`) were loaded in that same process, so any **`events.bind(...)`** in `GirderPlugin.load()` saw those events.

So for DIVE on Girder 3, it was enough to register handlers in **`dive_server`** for:

- **`s3_assetstore_imported`** / **`filesystem_assetstore_imported`** (per-item metadata, folder typing, etc.)
- Optional REST hooks around **`rest.post.assetstore/:id/import`** if you needed request-scoped behavior

Everything ran **co-located** with the plugin loader.

## Girder 5: import runs on a Celery worker (`local` queue)

In **Girder 5**, the assetstore import REST handler **does not** call `Assetstore().importData()` in the web process. It enqueues a Celery task, e.g. **`importDataTask`**, on the **`local`** queue (see Girder’s `girder/api/v1/assetstore.py` and `girder/tasks.py`).

Implications:

| Concern | Girder 3 (typical) | Girder 5 |
|--------|---------------------|----------|
| Where `importData` runs | Web server process | **Celery worker** (`local`) |
| Where `s3_assetstore_imported` fires | Same process as `dive_server` | **Worker process** (no `dive_server` plugin load) |
| REST `import` handler duration | Could be long (sync work) | Short (enqueue + return) |
| When `rest.post...import.after` runs | After import if sync | **Right after enqueue**, before import finishes |

So **plugin `events.bind` in `dive_server` only applies to the Girder web process**. It does **not** run inside the Celery worker that executes `importDataTask`. Handlers bound only in `dive_server` **never see** per-file import events for the normal admin import path.

That is the core regression class: **same event names, different process**.

## REST `import.before` / `import.after` are the wrong lifecycle for async import

Girder still wraps REST routes with events like:

- `rest.post.assetstore/:id/import.before`
- `rest.post.assetstore/:id/import.after`

For Girder 5’s async import, **`.after` fires when the HTTP handler returns**, i.e. after **`importDataTask.delay(...)`**, not after the worker finishes importing. Any “post-import” work tied only to REST `.after` runs **too early** (empty or partial tree).

Model-level events are a better match for “import finished”:

- **`assetstore_import.after`** — triggered at the end of **`Assetstore().importData`** in **`girder/models/assetstore.py`**, in **whichever process** ran import (here: the worker).

DIVE’s post-import steps (dangling annotations, batch postprocess kickoff) are wired to **`assetstore_import.after`** on the worker, not to REST `.after`.

## Why `dive_tasks/worker_girder_events.py` exists

**`girder_worker`** discovers task modules via entry points (`girder_worker_plugins`); it does **not** load full Girder server plugins.

DIVE registers a small bootstrap module that runs when the worker starts:

- Binds **`s3_assetstore_imported`**, **`filesystem_assetstore_imported`**, **`assetstore_import.after`** to the same functions as before (`dive_server/event.py`).
- Binds **`jobs.schedule`** → **`scheduleLocal`** from **`girder_jobs`**, because **`Job().scheduleJob`** only *emits an event*; without the Jobs plugin loaded in the worker, **local jobs would never start** if something called `scheduleJob` from a task.

This file is loaded early via **`dive_tasks/__init__.py`** → **`task_imports`**.

## Batch postprocess: local job + daemon thread vs Celery task

Historically DIVE used **`Job().createLocalJob`** + **`Job().scheduleJob`**, with an entrypoint that starts a **daemon thread** (same pattern as **Slicer CLI Web** batch jobs on the **web server**).

Problems when the **parent import** already runs on a **`local` Celery worker**:

1. **`scheduleLocal`** was only guaranteed on the **web** process unless also bound on the worker (see above).
2. Even with **`scheduleLocal`**, starting a **daemon thread** at the tail of **`importDataTask`** is fragile: the import task returns, worker lifecycle / threading makes it easy for the job to **never reach RUNNING** or for work to be cut off.

The robust fix is to enqueue **`run_batch_postprocess_job`** on the **`local`** queue (**`dive_tasks/local_tasks.py`**): a normal Celery task loads the Girder job document and runs **`batch_postprocess_task`** synchronously inside that task.

## Bucket notifications (GCS) and `force_recursive`

**`bucket_notifications`** previously called **`Assetstore().importData(..., force_recursive=False)`** **inside the web handler**. That is correct for **incremental** object notifications (avoid re-walking huge subtrees on every push) but **blocks** the HTTP thread and risks Pub/Sub timeouts.

Girder’s stock **`importDataTask`** does **not** pass **`force_recursive`**; the S3 adapter defaults to **`force_recursive=True`**, which would **change semantics** for incremental notifications.

DIVE therefore uses **`import_assetstore_path_async`** (**`local_tasks.py`**), which forwards **`force_recursive=False`** (and enqueues on **`local`** so the push handler returns quickly).

## What was removed from `dive_server` and why

With **all** DIVE-driven **`importData`** paths going through **`local`** workers (admin **`importDataTask`** + **`import_assetstore_path_async`**), events fire only there. Duplicate **`events.bind`** in **`dive_server/__init__.py`** for assetstore imports was redundant and confusing, so it was removed. **`model.user.save.created`** and the rest of the plugin are unchanged.

If you add **new** code that calls **`Assetstore().importData`** **in the Girder web process**, you must either:

- Route that work through **`import_assetstore_path_async`** (or another **`local`** task), **or**
- Re-bind the assetstore handlers in **`dive_server`** for that path.

## File map (quick reference)

| Piece | Role |
|-------|------|
| `dive_tasks/worker_girder_events.py` | Register Girder **`events.bind`** in Celery worker processes |
| `dive_tasks/local_tasks.py` | **`local`** queue: batch postprocess job runner, bucket **`importData`** with **`force_recursive`** |
| `dive_server/event.py` | **`process_s3_import`**, **`process_fs_import`**, **`run_post_assetstore_import`** (handlers; bound from worker) |
| `bucket_notifications/views.py` | Enqueues **`import_assetstore_path_async.delay(...)`** |
| `dive_server/__init__.py` | No assetstore import binds (by design); plugins + routes only |

## Summary

- **Girder 5** moved heavy **`importData`** work off the web server onto **`importDataTask`** (**`local`** queue).
- **Server plugins** do not load in that worker, so **worker-side event registration** is required for DIVE metadata and post-import behavior.
- **REST import `.after`** is not a substitute for **`assetstore_import.after`** when import is async.
- **Batch postprocess** should run as a **dedicated `local` Celery task**, not only as a daemon thread off **`importDataTask`**.
- **Bucket notifications** should enqueue import on **`local`** and preserve **`force_recursive=False`**, not use raw **`importDataTask`**.

This set of changes aligns DIVE with Girder 5’s process model while preserving the same product behavior as on Girder 3.
13 changes: 7 additions & 6 deletions server/bucket_notifications/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from girder.models.folder import Folder
from girder.models.user import User

from dive_tasks.local_tasks import import_assetstore_path_async
from dive_utils.types import AssetstoreModel, GirderModel

from .constants import AssetstoreRuleMarker
Expand Down Expand Up @@ -73,14 +74,14 @@ def processNotification(store: AssetstoreModel, rootFolder: GirderModel, importP
# All the chain of parent directories exist
realImportPath = importPath

Assetstore().importData(
store,
target,
import_assetstore_path_async.delay(
str(store['_id']),
str(target['_id']),
'folder',
{'importPath': realImportPath},
None,
owner,
realImportPath,
str(owner['_id']),
force_recursive=False,
leaf_folders_as_items=False,
)

@access.admin
Expand Down
25 changes: 1 addition & 24 deletions server/dive_server/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
from dive_utils import constants

from .crud_annotation import GroupItem, RevisionLogItem, TrackItem
from .event import DIVES3Imports, process_fs_import, process_s3_import, send_new_user_email
from .event import send_new_user_email
from .views_annotation import AnnotationResource
from .views_configuration import ConfigurationResource
from .views_dataset import DatasetResource
Expand Down Expand Up @@ -69,29 +69,6 @@ def load(self, info):
},
}
info['serverRoot'].mount(None, '', conf)

diveS3Import = DIVES3Imports()
events.bind(
"rest.post.assetstore/:id/import.before",
"process_s3_import_before",
diveS3Import.process_s3_import_before,
)

events.bind(
"rest.post.assetstore/:id/import.after",
"process_s3_import_after",
diveS3Import.process_s3_import_after,
)
events.bind(
"filesystem_assetstore_imported",
"process_fs_import",
process_fs_import,
)
events.bind(
"s3_assetstore_imported",
"process_s3_import",
process_s3_import,
)
events.bind(
'model.user.save.created',
'send_new_user_email',
Expand Down
83 changes: 48 additions & 35 deletions server/dive_server/event.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import logging
import os

from bson.objectid import ObjectId
import cherrypy
from girder.api.rest import getApiUrl
from girder.models.collection import Collection
from girder.models.folder import Folder
from girder.models.item import Item
from girder.models.setting import Setting
Expand Down Expand Up @@ -82,8 +81,13 @@ def process_assetstore_import(event, meta: dict):
foldername = base_name
# resuse existing folder if it already exists with same name
dest = Folder().createFolder(parentFolder, foldername, creator=user, reuseExisting=True)
now = datetime.now()
if now - dest['created'] > timedelta(hours=1):
created = dest['created']
if getattr(created, 'tzinfo', None) is None:
created = created.replace(tzinfo=timezone.utc)
else:
created = created.astimezone(timezone.utc)
now = datetime.now(timezone.utc)
if now - created > timedelta(hours=1):
# Remove the old referenced item, replace it with the new one.
oldItem = Item().findOne({'folderId': dest['_id'], 'name': item['name']})
if oldItem is not None:
Expand Down Expand Up @@ -180,6 +184,14 @@ def process_dangling_annotation_files(folder, user):
process_dangling_annotation_files(child, user)


def _job_cherrypy_callback_url() -> str:
"""REST handlers have a CherryPy request; Celery workers do not."""
try:
return cherrypy.url()
except Exception:
return getWorkerApiUrl()


def convert_video_recursive(folder, user):
token = Token().createToken(user=user, days=2)

Expand All @@ -198,43 +210,44 @@ def convert_video_recursive(folder, user):
job = Job().createLocalJob(
module='dive_tasks.dive_batch_postprocess',
function='batchPostProcessingTaskLauncher',
kwargs={'params': dive_batch_postprocess_task_params, 'url': cherrypy.url()},
kwargs={'params': dive_batch_postprocess_task_params, 'url': _job_cherrypy_callback_url()},
title='Batch process Dive Batch Postprocess',
type='DIVE Batch Postprocess',
user=user,
public=True,
asynchronous=True,
)
job = Job().save(job)
Job().scheduleJob(job)


class DIVES3Imports:
destinationId = None
destinationType = None

def process_s3_import_before(self, event):
self.destinationId = event.info.get('params', {}).get('destinationId')
self.destinationType = event.info.get('params', {}).get('destinationType')

def process_s3_import_after(self, event):
if self.destinationType == 'folder' and self.destinationId is not None:
# go through all sub folders and add a new script to convert
destinationFolder = Folder().findOne({"_id": ObjectId(self.destinationId)})
userId = destinationFolder['creatorId'] or destinationFolder['baseParentId']
user = User().findOne({'_id': ObjectId(userId)})
process_dangling_annotation_files(destinationFolder, user)
convert_video_recursive(destinationFolder, user)
if self.destinationType == 'collection' and self.destinationId is not None:
destinationCollection = Collection().findOne({"_id": ObjectId(self.destinationId)})
userId = destinationCollection['creatorId'] or destinationCollection['baseParentId']
user = User().findOne({'_id': ObjectId(userId)})
child_folders = Folder().find({'parentId': ObjectId(self.destinationId)})
for child in child_folders:
process_dangling_annotation_files(child, user)
convert_video_recursive(child, user)
self.destinationId = None
self.destinationType = None
# Run on the ``local`` Celery queue instead of scheduleLocal + daemon thread.
# importDataTask completes right after this; a daemon thread is often killed or
# never updates the job, so the document stays INACTIVE.
from dive_tasks.local_tasks import run_batch_postprocess_job

run_batch_postprocess_job.delay(str(job['_id']))


def run_post_assetstore_import(event):
"""
Run after Assetstore.importData completes.

Bound on Celery ``local`` workers via ``dive_tasks.worker_girder_events``.
Admin and bucket-notification imports both run import in that process, not on Girder web.
"""
info = event.info
parent = info.get('parent')
parentType = info.get('parentType')
user = info.get('user')
if not parent or not parentType or not user:
return
userId = parent['creatorId'] or parent['baseParentId']
owner = User().findOne({'_id': ObjectId(userId)})
if parentType == 'folder':
process_dangling_annotation_files(parent, owner)
convert_video_recursive(parent, owner)
elif parentType in ('collection', 'user'):
child_folders = Folder().find({'parentId': parent['_id']})
for child in child_folders:
process_dangling_annotation_files(child, owner)
convert_video_recursive(child, owner)


def process_fs_import(event):
Expand Down
7 changes: 6 additions & 1 deletion server/dive_tasks/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,9 @@ def __init__(self, app, *args, **kwargs):
def task_imports(self):
# Return a list of python importable paths to the
# plugin's path directory
return ["dive_tasks.tasks"]
# worker_girder_events first: bind Girder handlers before any task module loads.
return [
'dive_tasks.worker_girder_events',
'dive_tasks.tasks',
'dive_tasks.local_tasks',
]
62 changes: 62 additions & 0 deletions server/dive_tasks/local_tasks.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
"""
Celery tasks bound to the ``local`` queue (same queue as Girder's ``importDataTask``).

Kept separate from ``tasks.py`` so the main worker task module stays focused on
GPU/default-queue work.
"""

from girder_worker.app import app


@app.task(queue='local', acks_late=True, ignore_result=True)
def run_batch_postprocess_job(job_id: str):
"""
Run DIVE batch postprocess for an existing Girder job document.

Scheduled on ``local`` so this runs in a normal Celery task instead of a
daemon thread spawned from ``scheduleLocal``. A thread started when the
import task is finishing is unreliable and often leaves the parent job
stuck in INACTIVE.
"""
from girder_jobs.models.job import Job

from dive_tasks.dive_batch_postprocess import batch_postprocess_task

job = Job().load(job_id, force=True)
batch_postprocess_task(job)


@app.task(queue='local', acks_late=True, ignore_result=True)
def import_assetstore_path_async(
assetstore_id: str,
parent_id: str,
parent_type: str,
import_path: str,
user_id: str,
*,
force_recursive: bool = False,
leaf_folders_as_items: bool = False,
):
"""
Run ``Assetstore.importData`` on the ``local`` queue (e.g. GCS bucket notifications).

Keeps ``force_recursive=False`` for incremental object notifications; Girder's
``importDataTask`` does not forward that flag and defaults to full recursion.
"""
from girder.models.assetstore import Assetstore
from girder.models.user import User
from girder.utility.model_importer import ModelImporter

user = User().load(user_id, force=True)
assetstore = Assetstore().load(assetstore_id)
parent = ModelImporter.model(parent_type).load(parent_id, force=True)
Assetstore().importData(
assetstore,
parent,
parent_type,
{'importPath': import_path},
None,
user,
force_recursive=force_recursive,
leafFoldersAsItems=leaf_folders_as_items,
)
Loading
Loading