From 6294d17ae046493ff16eb025b2cd409d1873904a Mon Sep 17 00:00:00 2001 From: toddn Date: Tue, 16 Sep 2025 16:34:55 -0500 Subject: [PATCH 01/18] for now - this sends message to queue, but didn't actually check listeners, needs to be added --- backend/app/routers/files.py | 37 +++++++++++++++++++++++++++++++++++- backend/message_listener.py | 4 ++++ docker-compose.dev.yml | 2 +- 3 files changed, 41 insertions(+), 2 deletions(-) diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index d4276e0ce..76f8af61a 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -2,7 +2,9 @@ import time from datetime import datetime, timedelta from typing import List, Optional, Union - +import json +from json import JSONEncoder +from aio_pika import Message from app import dependencies from app.config import settings from app.db.file.download import _increment_file_downloads @@ -37,6 +39,13 @@ router = APIRouter() security = HTTPBearer() +class CustomJSONEncoder(JSONEncoder): + def default(self, obj): + if isinstance(obj, PydanticObjectId): + return str(obj) + # Handle other non-serializable types if needed + return super().default(obj) + async def _resubmit_file_extractors( file: FileOut, @@ -135,6 +144,20 @@ async def add_file_entry( # Add entry to the file index await index_file(es, FileOut(**new_file.dict())) + # Publish a message when indexing is complete + + message_body = { + "event_type": "file_indexed", + "file_data": json.loads(new_file.json()), # This handles ObjectID serialization + "timestamp": datetime.now().isoformat() + } + + rabbitmq_client.basic_publish( + exchange='clowder', + routing_key='file_indexed_events', + body=json.dumps(message_body).encode('utf-8') + ) + # TODO - timing issue here, check_feed_listeners needs to happen asynchronously. time.sleep(1) @@ -163,6 +186,18 @@ async def add_local_file_entry( # Add entry to the file index await index_file(es, FileOut(**new_file.dict())) + # Publish a message when indexing is complete + message_body = { + "event_type": "file_indexed", + "file_data": json.loads(new_file.json()), # This handles ObjectID serialization + "timestamp": datetime.now().isoformat() + } + + rabbitmq_client.basic_publish( + exchange='clowder', + routing_key='file_indexed_events', + body=json.dumps(message_body).encode('utf-8') + ) # TODO - timing issue here, check_feed_listeners needs to happen asynchronously. time.sleep(1) diff --git a/backend/message_listener.py b/backend/message_listener.py index acc55a67d..97ec4d271 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -93,6 +93,9 @@ async def callback(message: AbstractIncomingMessage): async with message.process(): msg = json.loads(message.body.decode("utf-8")) + if "event_type" in msg and msg["event_type"] == "file_indexed": + print(f"This is an event type file indexed!") + job_id = msg["job_id"] message_str = msg["status"] timestamp = datetime.strptime( @@ -222,6 +225,7 @@ async def listen_for_messages(): if __name__ == "__main__": + logger.info(" Message listener starting...") start = datetime.now() while time_ran < timeout: try: diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index e396d5f35..7d34f513d 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -157,7 +157,7 @@ services: - rabbitmq extractors-messages: - image: "clowder/clowder2-messages:latest" + image: "clowder2-messages:test" build: dockerfile: backend/messages.Dockerfile environment: From c8a2b9af6d706fe114719c77caa455eeb0016835 Mon Sep 17 00:00:00 2001 From: toddn Date: Tue, 16 Sep 2025 17:23:11 -0500 Subject: [PATCH 02/18] message queue recieved, make sure to handle this as a separate case --- backend/message_listener.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/backend/message_listener.py b/backend/message_listener.py index 97ec4d271..02fd1a055 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -94,7 +94,7 @@ async def callback(message: AbstractIncomingMessage): msg = json.loads(message.body.decode("utf-8")) if "event_type" in msg and msg["event_type"] == "file_indexed": - print(f"This is an event type file indexed!") + logger.info(f"This is an event type file indexed!") job_id = msg["job_id"] message_str = msg["status"] @@ -209,6 +209,8 @@ async def listen_for_messages(): durable=True, ) await queue.bind(exchange) + await queue.bind(exchange, routing_key="file_indexed_events") # Add this line + logger.info(f" [*] Listening to {exchange}") await queue.consume( From e93ec9eb747b9e97f7f317e25c8433b2286aec20 Mon Sep 17 00:00:00 2001 From: toddn Date: Tue, 16 Sep 2025 17:25:20 -0500 Subject: [PATCH 03/18] fix listener to handle this without errors --- backend/message_listener.py | 139 ++++++++++++++++++------------------ 1 file changed, 70 insertions(+), 69 deletions(-) diff --git a/backend/message_listener.py b/backend/message_listener.py index 02fd1a055..c57b223a4 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -95,76 +95,77 @@ async def callback(message: AbstractIncomingMessage): if "event_type" in msg and msg["event_type"] == "file_indexed": logger.info(f"This is an event type file indexed!") - - job_id = msg["job_id"] - message_str = msg["status"] - timestamp = datetime.strptime( - msg["start"], "%Y-%m-%dT%H:%M:%S%z" - ) # incoming format: '2023-01-20T08:30:27-05:00' - timestamp = timestamp.replace(tzinfo=datetime.utcnow().tzinfo) - - # TODO: Updating an event message could go in rabbitmq/listeners - - # Check if the job exists, and update if so - job = await EventListenerJobDB.find_one( - EventListenerJobDB.id == PydanticObjectId(job_id) - ) - if job: - # Update existing job with new info - job.updated = timestamp - parsed = parse_message_status(message_str) - cleaned_msg = parsed["cleaned_msg"] - incoming_status = parsed["status"] - - # Don't override a finished status if a message comes in late - if job.status in [ - EventListenerJobStatus.SUCCEEDED, - EventListenerJobStatus.ERROR, - EventListenerJobStatus.SKIPPED, - ]: - cleaned_status = job.status - else: - cleaned_status = incoming_status - - # Prepare fields to update based on status (don't overwrite whole object to avoid async issues) - field_updates = { - EventListenerJobDB.status: cleaned_status, - EventListenerJobDB.latest_message: cleaned_msg, - EventListenerJobDB.updated: timestamp, - } - - if job.started is not None: - field_updates[EventListenerJobDB.duration] = ( - timestamp - job.started - ).total_seconds() - elif incoming_status == EventListenerJobStatus.STARTED: - field_updates[EventListenerJobDB.duration] = 0 - - logger.info(f"[{job_id}] {timestamp} {incoming_status.value} {cleaned_msg}") - - # Update the job timestamps/duration depending on what status we received - if incoming_status == EventListenerJobStatus.STARTED: - field_updates[EventListenerJobDB.started] = timestamp - elif incoming_status in [ - EventListenerJobStatus.SUCCEEDED, - EventListenerJobStatus.ERROR, - EventListenerJobStatus.SKIPPED, - ]: - # job.finished = timestamp - field_updates[EventListenerJobDB.finished] = timestamp - - await job.set(field_updates) - - # Add latest message to the job updates - event_msg = EventListenerJobUpdateDB( - job_id=job_id, status=cleaned_msg, timestamp=timestamp - ) - await event_msg.insert() - return True + # TODO - process file indexed event here else: - # We don't know what this job is. Reject the message. - logger.error("Job ID %s not found in database, skipping message." % job_id) - return False + job_id = msg["job_id"] + message_str = msg["status"] + timestamp = datetime.strptime( + msg["start"], "%Y-%m-%dT%H:%M:%S%z" + ) # incoming format: '2023-01-20T08:30:27-05:00' + timestamp = timestamp.replace(tzinfo=datetime.utcnow().tzinfo) + + # TODO: Updating an event message could go in rabbitmq/listeners + + # Check if the job exists, and update if so + job = await EventListenerJobDB.find_one( + EventListenerJobDB.id == PydanticObjectId(job_id) + ) + if job: + # Update existing job with new info + job.updated = timestamp + parsed = parse_message_status(message_str) + cleaned_msg = parsed["cleaned_msg"] + incoming_status = parsed["status"] + + # Don't override a finished status if a message comes in late + if job.status in [ + EventListenerJobStatus.SUCCEEDED, + EventListenerJobStatus.ERROR, + EventListenerJobStatus.SKIPPED, + ]: + cleaned_status = job.status + else: + cleaned_status = incoming_status + + # Prepare fields to update based on status (don't overwrite whole object to avoid async issues) + field_updates = { + EventListenerJobDB.status: cleaned_status, + EventListenerJobDB.latest_message: cleaned_msg, + EventListenerJobDB.updated: timestamp, + } + + if job.started is not None: + field_updates[EventListenerJobDB.duration] = ( + timestamp - job.started + ).total_seconds() + elif incoming_status == EventListenerJobStatus.STARTED: + field_updates[EventListenerJobDB.duration] = 0 + + logger.info(f"[{job_id}] {timestamp} {incoming_status.value} {cleaned_msg}") + + # Update the job timestamps/duration depending on what status we received + if incoming_status == EventListenerJobStatus.STARTED: + field_updates[EventListenerJobDB.started] = timestamp + elif incoming_status in [ + EventListenerJobStatus.SUCCEEDED, + EventListenerJobStatus.ERROR, + EventListenerJobStatus.SKIPPED, + ]: + # job.finished = timestamp + field_updates[EventListenerJobDB.finished] = timestamp + + await job.set(field_updates) + + # Add latest message to the job updates + event_msg = EventListenerJobUpdateDB( + job_id=job_id, status=cleaned_msg, timestamp=timestamp + ) + await event_msg.insert() + return True + else: + # We don't know what this job is. Reject the message. + logger.error("Job ID %s not found in database, skipping message." % job_id) + return False async def listen_for_messages(): From 7bb697fec06a30bc5710e7e4ae0a6859e9f2ff79 Mon Sep 17 00:00:00 2001 From: toddn Date: Tue, 16 Sep 2025 17:56:44 -0500 Subject: [PATCH 04/18] need to decode the message --- backend/message_listener.py | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/backend/message_listener.py b/backend/message_listener.py index c57b223a4..62e4e929d 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -95,6 +95,15 @@ async def callback(message: AbstractIncomingMessage): if "event_type" in msg and msg["event_type"] == "file_indexed": logger.info(f"This is an event type file indexed!") + msg = json.loads(message.body.decode("utf-8")) + + # Convert string IDs back to PydanticObjectId if needed + file_data = msg.get("file_data", {}) + if "id" in file_data and isinstance(file_data["id"], str): + file_data["id"] = PydanticObjectId(file_data["id"]) + + # Now you can create your FileOut object + # file_out = FileOut(**file_data) # TODO - process file indexed event here else: job_id = msg["job_id"] From fb08a6148c36844f49ad27c9ed8e2019e0712179 Mon Sep 17 00:00:00 2001 From: toddn Date: Wed, 17 Sep 2025 09:23:32 -0500 Subject: [PATCH 05/18] adding dependencies --- backend/message_listener.py | 37 +++++++++++++++++++++++++++++++++---- 1 file changed, 33 insertions(+), 4 deletions(-) diff --git a/backend/message_listener.py b/backend/message_listener.py index 62e4e929d..b129fb5a6 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -6,7 +6,14 @@ import string import time from datetime import datetime - +from app.models.files import ( + FileDB, + FileOut, +) +from app.models.users import ( + UserOut, +) +from app.routers.feeds import check_feed_listeners from aio_pika import connect_robust from aio_pika.abc import AbstractIncomingMessage from app.main import startup_beanie @@ -16,6 +23,8 @@ EventListenerJobStatus, EventListenerJobUpdateDB, ) +import os +from app.config import settings from beanie import PydanticObjectId logging.basicConfig(level=logging.INFO) @@ -24,6 +33,8 @@ timeout = 5 * 60 # five minute timeout time_ran = 0 +from app.dependencies import get_elasticsearchclient, get_rabbitmq + def parse_message_status(msg): @@ -99,12 +110,27 @@ async def callback(message: AbstractIncomingMessage): # Convert string IDs back to PydanticObjectId if needed file_data = msg.get("file_data", {}) + user = msg.get("user", {}) if "id" in file_data and isinstance(file_data["id"], str): file_data["id"] = PydanticObjectId(file_data["id"]) - # Now you can create your FileOut object - # file_out = FileOut(**file_data) - # TODO - process file indexed event here + if "id" in file_data and isinstance(file_data["id"], str): + file_data["id"] = PydanticObjectId(file_data["id"]) + + # Create FileOut object + file_out = FileOut(**file_data) + + # Create UserOut object from the user data in the message + user = UserOut(**user_data) + + # Now call check_feed_listeners with the injected dependencies + await check_feed_listeners( + es, # Elasticsearch client + file_out, + user, + rabbitmq_client, # RabbitMQ client + ) + else: job_id = msg["job_id"] message_str = msg["status"] @@ -180,6 +206,9 @@ async def callback(message: AbstractIncomingMessage): async def listen_for_messages(): await startup_beanie() + # Initialize dependencies using your existing functions + es = await get_elasticsearchclient() + # For some reason, Pydantic Settings environment variable overrides aren't being applied, so get them here. RABBITMQ_USER = os.getenv("RABBITMQ_USER", "guest") RABBITMQ_PASS = os.getenv("RABBITMQ_PASS", "guest") From f84f225ae52f081119267ff5fce9ec1107a06d0e Mon Sep 17 00:00:00 2001 From: toddn Date: Wed, 17 Sep 2025 10:16:23 -0500 Subject: [PATCH 06/18] need to test - adding dependencies needed --- backend/message_listener.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/backend/message_listener.py b/backend/message_listener.py index b129fb5a6..77fac1c6b 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -250,10 +250,14 @@ async def listen_for_messages(): await queue.bind(exchange) await queue.bind(exchange, routing_key="file_indexed_events") # Add this line - logger.info(f" [*] Listening to {exchange}") + + # Create a partial function that includes the dependencies + from functools import partial + callback_with_deps = partial(callback, es=es, rabbitmq_client=channel) + await queue.consume( - callback=callback, + callback=callback_with_deps, no_ack=False, ) @@ -263,6 +267,7 @@ async def listen_for_messages(): await asyncio.Future() finally: await connection.close() + await es.close() # Close ES connection when done if __name__ == "__main__": From ad01adcee9145f3f3d18c8257c8f449b3f3689f2 Mon Sep 17 00:00:00 2001 From: toddn Date: Thu, 18 Sep 2025 15:31:01 -0500 Subject: [PATCH 07/18] message listener now gets files, need to test if it works with extractors --- backend/app/routers/files.py | 6 ++++-- backend/message_listener.py | 17 ++++++----------- 2 files changed, 10 insertions(+), 13 deletions(-) diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index 76f8af61a..eaef1cfce 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -148,7 +148,8 @@ async def add_file_entry( message_body = { "event_type": "file_indexed", - "file_data": json.loads(new_file.json()), # This handles ObjectID serialization + "file_data": json.loads(new_file.json()), + "user": json.loads(user.json()),# This handles ObjectID serialization "timestamp": datetime.now().isoformat() } @@ -189,7 +190,8 @@ async def add_local_file_entry( # Publish a message when indexing is complete message_body = { "event_type": "file_indexed", - "file_data": json.loads(new_file.json()), # This handles ObjectID serialization + "file_data": json.loads(new_file.json()), + "user": json.loads(user.json()),# This handles ObjectID serialization "timestamp": datetime.now().isoformat() } diff --git a/backend/message_listener.py b/backend/message_listener.py index 77fac1c6b..85f161f36 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -96,32 +96,26 @@ def parse_message_status(msg): return {"status": EventListenerJobStatus.PROCESSING, "cleaned_msg": msg} -async def callback(message: AbstractIncomingMessage): - """This method receives messages from RabbitMQ and processes them. - the extractor info is parsed from the message and if the extractor is new - or is a later version, the db is updated. - """ +async def callback(message: AbstractIncomingMessage, es, rabbitmq_client): + """This method receives messages from RabbitMQ and processes them.""" async with message.process(): msg = json.loads(message.body.decode("utf-8")) if "event_type" in msg and msg["event_type"] == "file_indexed": logger.info(f"This is an event type file indexed!") - msg = json.loads(message.body.decode("utf-8")) # Convert string IDs back to PydanticObjectId if needed file_data = msg.get("file_data", {}) - user = msg.get("user", {}) - if "id" in file_data and isinstance(file_data["id"], str): - file_data["id"] = PydanticObjectId(file_data["id"]) + user_data = msg.get("user", {}) # Fixed variable name if "id" in file_data and isinstance(file_data["id"], str): file_data["id"] = PydanticObjectId(file_data["id"]) - # Create FileOut object + # Create FileOut object file_out = FileOut(**file_data) # Create UserOut object from the user data in the message - user = UserOut(**user_data) + user = UserOut(**user_data) # Use user_data, not user # Now call check_feed_listeners with the injected dependencies await check_feed_listeners( @@ -130,6 +124,7 @@ async def callback(message: AbstractIncomingMessage): user, rabbitmq_client, # RabbitMQ client ) + return True else: job_id = msg["job_id"] From 289047e11d869c555f37b89c51559cc459ca047a Mon Sep 17 00:00:00 2001 From: toddn Date: Thu, 18 Sep 2025 15:34:22 -0500 Subject: [PATCH 08/18] comment out calling check feed listeners, we need to check if this works with an extractor --- backend/app/routers/files.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index eaef1cfce..da48258a5 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -163,12 +163,12 @@ async def add_file_entry( time.sleep(1) # Submit file job to any qualifying feeds - await check_feed_listeners( - es, - FileOut(**new_file.dict()), - user, - rabbitmq_client, - ) + # await check_feed_listeners( + # es, + # FileOut(**new_file.dict()), + # user, + # rabbitmq_client, + # ) async def add_local_file_entry( @@ -205,12 +205,12 @@ async def add_local_file_entry( time.sleep(1) # Submit file job to any qualifying feeds - await check_feed_listeners( - es, - FileOut(**new_file.dict()), - user, - rabbitmq_client, - ) + # await check_feed_listeners( + # es, + # FileOut(**new_file.dict()), + # user, + # rabbitmq_client, + # ) # TODO: Move this to MongoDB middle layer From e85b964cb956ac546a5e8463f61c473706025e55 Mon Sep 17 00:00:00 2001 From: toddn Date: Thu, 18 Sep 2025 16:29:38 -0500 Subject: [PATCH 09/18] this might work --- backend/app/dependencies.py | 2 ++ backend/app/rabbitmq/listeners.py | 52 +++++++++++++++++++++++++------ 2 files changed, 44 insertions(+), 10 deletions(-) diff --git a/backend/app/dependencies.py b/backend/app/dependencies.py index 0eb8cd1c5..2954273b5 100644 --- a/backend/app/dependencies.py +++ b/backend/app/dependencies.py @@ -83,6 +83,8 @@ def get_rabbitmq() -> BlockingChannel: logger.debug("Connecting to rabbitmq at %s", settings.RABBITMQ_HOST) connection = pika.BlockingConnection(parameters) channel = connection.channel() + print(f"DEBUG: get_rabbitmq() called. Returning channel of type: {type(channel)}") + print(f"DEBUG: Channel object: {channel}") return channel diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index c9defa7f1..40ad30e6f 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -17,10 +17,32 @@ from app.routers.users import get_user_job_key from fastapi import Depends from pika.adapters.blocking_connection import BlockingChannel +import aio_pika +from aio_pika.abc import AbstractChannel -async def create_reply_queue(): - channel: BlockingChannel = dependencies.get_rabbitmq() +async def create_reply_queue(channel: AbstractChannel): + if (config_entry := await ConfigEntryDB.find_one({"key": "instance_id"})) is not None: + instance_id = config_entry.value + else: + instance_id = "".join( + random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) + for _ in range(10) + ) + config_entry = ConfigEntryDB(key="instance_id", value=instance_id) + await config_entry.insert() + + queue_name = f"clowder.{instance_id}" + + # Use aio_pika methods instead of pika methods + exchange = await channel.declare_exchange("clowder", durable=True) + queue = await channel.declare_queue(queue_name, durable=True, exclusive=False, auto_delete=False) + await queue.bind(exchange) + + return queue.name + +async def create_reply_queue(channel: BlockingChannel): + # channel: BlockingChannel = dependencies.get_rabbitmq() if ( config_entry := await ConfigEntryDB.find_one({"key": "instance_id"}) @@ -52,8 +74,12 @@ async def submit_file_job( routing_key: str, parameters: dict, user: UserOut, - rabbitmq_client: BlockingChannel, + rabbitmq_client: AbstractChannel, ): + # print(f"DEBUG submit_file_job: Got client of type: {type(rabbitmq_client)}") + # if not isinstance(rabbitmq_client, BlockingChannel): + # raise TypeError(f"Expected BlockingChannel, got {type(rabbitmq_client)}. This confirms a mixing issue.") + # Create an entry in job history with unique ID job = EventListenerJobDB( listener_id=routing_key, @@ -65,6 +91,7 @@ async def submit_file_job( ) await job.insert() + current_secretKey = await get_user_job_key(user.email) msg_body = EventListenerJobMessage( filename=file_out.name, @@ -75,15 +102,20 @@ async def submit_file_job( job_id=str(job.id), parameters=parameters, ) - reply_to = await create_reply_queue() + + # Use aio_pika publishing + # Get the existing clowder exchange + exchange = await rabbitmq_client.get_exchange("clowder") + reply_to = await create_reply_queue(rabbitmq_client) print("RABBITMQ_CLIENT: " + str(rabbitmq_client)) - rabbitmq_client.basic_publish( - exchange="", - routing_key=routing_key, - body=json.dumps(msg_body.dict(), ensure_ascii=False), - properties=pika.BasicProperties( - content_type="application/json", delivery_mode=1, reply_to=reply_to + await exchange.publish( + aio_pika.Message( + body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'), + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + reply_to=reply_to, ), + routing_key=routing_key, ) return str(job.id) From c48d8fdc923f07130274f632384aae8c1dcf6327 Mon Sep 17 00:00:00 2001 From: toddn Date: Sun, 21 Sep 2025 14:12:39 -0500 Subject: [PATCH 10/18] works, need to clean up --- backend/app/rabbitmq/listeners.py | 59 +++++++++++++++---------------- 1 file changed, 28 insertions(+), 31 deletions(-) diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index 40ad30e6f..2e00657de 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -41,32 +41,32 @@ async def create_reply_queue(channel: AbstractChannel): return queue.name -async def create_reply_queue(channel: BlockingChannel): - # channel: BlockingChannel = dependencies.get_rabbitmq() - - if ( - config_entry := await ConfigEntryDB.find_one({"key": "instance_id"}) - ) is not None: - instance_id = config_entry.value - else: - # If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier - instance_id = "".join( - random.choice( - string.ascii_uppercase + string.ascii_lowercase + string.digits - ) - for _ in range(10) - ) - config_entry = ConfigEntryDB(key="instance_id", value=instance_id) - await config_entry.insert() - - queue_name = "clowder.%s" % instance_id - channel.exchange_declare(exchange="clowder", durable=True) - result = channel.queue_declare( - queue=queue_name, durable=True, exclusive=False, auto_delete=False - ) - queue_name = result.method.queue - channel.queue_bind(exchange="clowder", queue=queue_name) - return queue_name +# async def create_reply_queue(channel: BlockingChannel): +# # channel: BlockingChannel = dependencies.get_rabbitmq() +# +# if ( +# config_entry := await ConfigEntryDB.find_one({"key": "instance_id"}) +# ) is not None: +# instance_id = config_entry.value +# else: +# # If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier +# instance_id = "".join( +# random.choice( +# string.ascii_uppercase + string.ascii_lowercase + string.digits +# ) +# for _ in range(10) +# ) +# config_entry = ConfigEntryDB(key="instance_id", value=instance_id) +# await config_entry.insert() +# +# queue_name = "clowder.%s" % instance_id +# channel.exchange_declare(exchange="clowder", durable=True) +# result = channel.queue_declare( +# queue=queue_name, durable=True, exclusive=False, auto_delete=False +# ) +# queue_name = result.method.queue +# channel.queue_bind(exchange="clowder", queue=queue_name) +# return queue_name async def submit_file_job( @@ -76,10 +76,7 @@ async def submit_file_job( user: UserOut, rabbitmq_client: AbstractChannel, ): - # print(f"DEBUG submit_file_job: Got client of type: {type(rabbitmq_client)}") - # if not isinstance(rabbitmq_client, BlockingChannel): - # raise TypeError(f"Expected BlockingChannel, got {type(rabbitmq_client)}. This confirms a mixing issue.") - + print(f"DEBUG submit_file_job: Got client of type: {type(rabbitmq_client)}") # Create an entry in job history with unique ID job = EventListenerJobDB( listener_id=routing_key, @@ -108,7 +105,7 @@ async def submit_file_job( exchange = await rabbitmq_client.get_exchange("clowder") reply_to = await create_reply_queue(rabbitmq_client) print("RABBITMQ_CLIENT: " + str(rabbitmq_client)) - await exchange.publish( + await rabbitmq_client.default_exchange.publish( aio_pika.Message( body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'), content_type="application/json", From 0017661a108e6bf7008c11b06e442f51b839663d Mon Sep 17 00:00:00 2001 From: toddn Date: Sun, 21 Sep 2025 14:39:59 -0500 Subject: [PATCH 11/18] rabbitmq is now abstract channel --- backend/app/dependencies.py | 26 +++++++++----- backend/app/rabbitmq/listeners.py | 45 +++++------------------- backend/app/routers/files.py | 57 +++++++++++++++++++------------ 3 files changed, 62 insertions(+), 66 deletions(-) diff --git a/backend/app/dependencies.py b/backend/app/dependencies.py index 2954273b5..de338ab1b 100644 --- a/backend/app/dependencies.py +++ b/backend/app/dependencies.py @@ -3,6 +3,9 @@ import boto3 import pika +import aio_pika +from aio_pika.abc import AbstractChannel + from app.config import settings from app.search.connect import connect_elasticsearch from minio import Minio @@ -74,20 +77,27 @@ async def get_external_fs() -> AsyncGenerator[Minio, None]: yield file_system -def get_rabbitmq() -> BlockingChannel: +async def get_rabbitmq() -> AbstractChannel: """Client to connect to RabbitMQ for listeners/extractors interactions.""" - credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) - parameters = pika.ConnectionParameters( - settings.RABBITMQ_HOST, credentials=credentials - ) + RABBITMQ_URL = f"amqp://{settings.RABBITMQ_USER}:{settings.RABBITMQ_PASS}@{settings.RABBITMQ_HOST}/" + logger.debug("Connecting to rabbitmq at %s", settings.RABBITMQ_HOST) - connection = pika.BlockingConnection(parameters) - channel = connection.channel() + connection = await aio_pika.connect_robust(RABBITMQ_URL) + channel = await connection.channel() + print(f"DEBUG: get_rabbitmq() called. Returning channel of type: {type(channel)}") - print(f"DEBUG: Channel object: {channel}") return channel +# Keep the old function for compatibility if needed +def get_blocking_rabbitmq() -> BlockingChannel: + """Legacy blocking RabbitMQ client (for extractors that need it)""" + credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) + parameters = pika.ConnectionParameters(settings.RABBITMQ_HOST, credentials=credentials) + connection = pika.BlockingConnection(parameters) + return connection.channel() + + async def get_elasticsearchclient(): es = await connect_elasticsearch() return es diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index 2e00657de..dd3dbf4af 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -41,33 +41,6 @@ async def create_reply_queue(channel: AbstractChannel): return queue.name -# async def create_reply_queue(channel: BlockingChannel): -# # channel: BlockingChannel = dependencies.get_rabbitmq() -# -# if ( -# config_entry := await ConfigEntryDB.find_one({"key": "instance_id"}) -# ) is not None: -# instance_id = config_entry.value -# else: -# # If no ID has been generated for this instance, generate a 10-digit alphanumeric identifier -# instance_id = "".join( -# random.choice( -# string.ascii_uppercase + string.ascii_lowercase + string.digits -# ) -# for _ in range(10) -# ) -# config_entry = ConfigEntryDB(key="instance_id", value=instance_id) -# await config_entry.insert() -# -# queue_name = "clowder.%s" % instance_id -# channel.exchange_declare(exchange="clowder", durable=True) -# result = channel.queue_declare( -# queue=queue_name, durable=True, exclusive=False, auto_delete=False -# ) -# queue_name = result.method.queue -# channel.queue_bind(exchange="clowder", queue=queue_name) -# return queue_name - async def submit_file_job( file_out: FileOut, @@ -102,7 +75,6 @@ async def submit_file_job( # Use aio_pika publishing # Get the existing clowder exchange - exchange = await rabbitmq_client.get_exchange("clowder") reply_to = await create_reply_queue(rabbitmq_client) print("RABBITMQ_CLIENT: " + str(rabbitmq_client)) await rabbitmq_client.default_exchange.publish( @@ -122,7 +94,7 @@ async def submit_dataset_job( routing_key: str, parameters: dict, user: UserOut, - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel, ): # Create an entry in job history with unique ID job = EventListenerJobDB( @@ -142,13 +114,14 @@ async def submit_dataset_job( job_id=str(job.id), parameters=parameters, ) - reply_to = await create_reply_queue() - rabbitmq_client.basic_publish( - exchange="", - routing_key=routing_key, - body=json.dumps(msg_body.dict(), ensure_ascii=False), - properties=pika.BasicProperties( - content_type="application/json", delivery_mode=1, reply_to=reply_to + reply_to = await create_reply_queue(rabbitmq_client) + await rabbitmq_client.default_exchange.publish( + aio_pika.Message( + body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'), + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + reply_to=reply_to, ), + routing_key=routing_key, ) return str(job.id) diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index da48258a5..b34ec9bc5 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -35,6 +35,8 @@ from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from minio import Minio from pika.adapters.blocking_connection import BlockingChannel +import aio_pika +from aio_pika.abc import AbstractChannel router = APIRouter() security = HTTPBearer() @@ -49,7 +51,7 @@ def default(self, obj): async def _resubmit_file_extractors( file: FileOut, - rabbitmq_client: BlockingChannel, + rabbitmq_client: AbstractChannel, user: UserOut, credentials: HTTPAuthorizationCredentials = Security(security), ): @@ -94,7 +96,7 @@ async def add_file_entry( user: UserOut, fs: Minio, es: Elasticsearch, - rabbitmq_client: BlockingChannel, + rabbitmq_client: AbstractChannel, file: Optional[io.BytesIO] = None, content_type: Optional[str] = None, public: bool = False, @@ -146,22 +148,28 @@ async def add_file_entry( # Publish a message when indexing is complete + + # FIXED: Use aio_pika publishing message_body = { "event_type": "file_indexed", "file_data": json.loads(new_file.json()), - "user": json.loads(user.json()),# This handles ObjectID serialization + "user": json.loads(user.json()), "timestamp": datetime.now().isoformat() } - rabbitmq_client.basic_publish( - exchange='clowder', - routing_key='file_indexed_events', - body=json.dumps(message_body).encode('utf-8') + # Get the exchange first + exchange = await rabbitmq_client.get_exchange("clowder") + + # Use aio_pika publish method + await exchange.publish( + aio_pika.Message( + body=json.dumps(message_body).encode('utf-8'), + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + ), + routing_key="file_indexed_events", ) - # TODO - timing issue here, check_feed_listeners needs to happen asynchronously. - time.sleep(1) - # Submit file job to any qualifying feeds # await check_feed_listeners( # es, @@ -175,7 +183,7 @@ async def add_local_file_entry( new_file: FileDB, user: UserOut, es: Elasticsearch, - rabbitmq_client: BlockingChannel, + rabbitmq_client: AbstractChannel, content_type: Optional[str] = None, ): """Insert FileDB object into MongoDB (makes Clowder ID). Bytes are not stored in DB and versioning not supported @@ -188,22 +196,27 @@ async def add_local_file_entry( # Add entry to the file index await index_file(es, FileOut(**new_file.dict())) # Publish a message when indexing is complete + message_body = { "event_type": "file_indexed", "file_data": json.loads(new_file.json()), - "user": json.loads(user.json()),# This handles ObjectID serialization + "user": json.loads(user.json()), "timestamp": datetime.now().isoformat() } - rabbitmq_client.basic_publish( - exchange='clowder', - routing_key='file_indexed_events', - body=json.dumps(message_body).encode('utf-8') + # Get the exchange first + exchange = await rabbitmq_client.get_exchange("clowder") + + # Use aio_pika publish method + await exchange.publish( + aio_pika.Message( + body=json.dumps(message_body).encode('utf-8'), + content_type="application/json", + delivery_mode=aio_pika.DeliveryMode.PERSISTENT, + ), + routing_key="file_indexed_events", ) - # TODO - timing issue here, check_feed_listeners needs to happen asynchronously. - time.sleep(1) - # Submit file job to any qualifying feeds # await check_feed_listeners( # es, @@ -255,7 +268,7 @@ async def update_file( file: UploadFile = File(...), es: Elasticsearch = Depends(dependencies.get_elasticsearchclient), credentials: HTTPAuthorizationCredentials = Security(security), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(FileAuthorization("uploader")), ): # Check all connection and abort if any one of them is not available @@ -593,7 +606,7 @@ async def post_file_extract( parameters: dict = None, user=Depends(get_current_user), credentials: HTTPAuthorizationCredentials = Security(security), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(FileAuthorization("uploader")), ): if extractorName is None: @@ -620,7 +633,7 @@ async def resubmit_file_extractions( file_id: str, user=Depends(get_current_user), credentials: HTTPAuthorizationCredentials = Security(security), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(FileAuthorization("editor")), ): """This route will check metadata. We get the extractors run from metadata from extractors. From f95221d509567a8812b655fffe9747f7f0a4af84 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 22 Sep 2025 14:22:32 -0500 Subject: [PATCH 12/18] using abstract channel not blocking channel --- backend/app/routers/datasets.py | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/backend/app/routers/datasets.py b/backend/app/routers/datasets.py index ae608174b..b55f333bb 100644 --- a/backend/app/routers/datasets.py +++ b/backend/app/routers/datasets.py @@ -70,6 +70,8 @@ from fastapi.security import HTTPBearer from minio import Minio from pika.adapters.blocking_connection import BlockingChannel +import aio_pika +from aio_pika.abc import AbstractChannel from pymongo import DESCENDING from rocrate.model.person import Person from rocrate.rocrate import ROCrate @@ -944,7 +946,7 @@ async def save_file( fs: Minio = Depends(dependencies.get_fs), file: UploadFile = File(...), es=Depends(dependencies.get_elasticsearchclient), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(Authorization("uploader")), ): if (dataset := await DatasetDB.get(PydanticObjectId(dataset_id))) is not None: @@ -996,7 +998,7 @@ async def save_files( user=Depends(get_current_user), fs: Minio = Depends(dependencies.get_fs), es=Depends(dependencies.get_elasticsearchclient), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(Authorization("uploader")), ): if (dataset := await DatasetDB.get(PydanticObjectId(dataset_id))) is not None: @@ -1056,7 +1058,7 @@ async def save_local_file( folder_id: Optional[str] = None, user=Depends(get_current_user), es=Depends(dependencies.get_elasticsearchclient), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(Authorization("uploader")), ): if (dataset := await DatasetDB.get(PydanticObjectId(dataset_id))) is not None: @@ -1110,7 +1112,7 @@ async def create_dataset_from_zip( fs: Minio = Depends(dependencies.get_fs), file: UploadFile = File(...), es: Elasticsearch = Depends(dependencies.get_elasticsearchclient), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), token: str = Depends(get_token), ): if file.filename.endswith(".zip") is False: @@ -1427,7 +1429,7 @@ async def get_dataset_extract( # parameters don't have a fixed model shape parameters: dict = None, user=Depends(get_current_user), - rabbitmq_client: BlockingChannel = Depends(dependencies.get_rabbitmq), + rabbitmq_client: AbstractChannel = Depends(dependencies.get_rabbitmq), allow: bool = Depends(Authorization("uploader")), ): if extractorName is None: From da4e6aba7813baa976d0f8c74db9c1495feb1bd7 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 22 Sep 2025 14:31:09 -0500 Subject: [PATCH 13/18] using abstract channel not blocking channel --- backend/app/routers/feeds.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/backend/app/routers/feeds.py b/backend/app/routers/feeds.py index 47e2b2d23..86c3b1740 100644 --- a/backend/app/routers/feeds.py +++ b/backend/app/routers/feeds.py @@ -4,7 +4,8 @@ from beanie.operators import Or, RegEx from fastapi import APIRouter, Depends, HTTPException from pika.adapters.blocking_connection import BlockingChannel - +import aio_pika +from aio_pika.abc import AbstractChannel from app.deps.authorization_deps import FeedAuthorization, ListenerAuthorization from app.keycloak_auth import get_current_user, get_current_username from app.models.feeds import FeedDB, FeedIn, FeedOut @@ -41,7 +42,7 @@ async def check_feed_listeners( es_client, file_out: FileOut, user: UserOut, - rabbitmq_client: BlockingChannel, + rabbitmq_client: AbstractChannel, ): """Automatically submit new file to listeners on feeds that fit the search criteria.""" listener_ids_found = [] From 43018ace56d836bee277b0f09d4e96909c9081b2 Mon Sep 17 00:00:00 2001 From: toddn Date: Wed, 5 Nov 2025 18:19:47 -0600 Subject: [PATCH 14/18] fixing lint --- backend/app/dependencies.py | 4 +++- backend/app/rabbitmq/listeners.py | 17 +++++++++++------ backend/app/routers/files.py | 10 +++++----- 3 files changed, 19 insertions(+), 12 deletions(-) diff --git a/backend/app/dependencies.py b/backend/app/dependencies.py index de338ab1b..d7e091ca9 100644 --- a/backend/app/dependencies.py +++ b/backend/app/dependencies.py @@ -93,7 +93,9 @@ async def get_rabbitmq() -> AbstractChannel: def get_blocking_rabbitmq() -> BlockingChannel: """Legacy blocking RabbitMQ client (for extractors that need it)""" credentials = pika.PlainCredentials(settings.RABBITMQ_USER, settings.RABBITMQ_PASS) - parameters = pika.ConnectionParameters(settings.RABBITMQ_HOST, credentials=credentials) + parameters = pika.ConnectionParameters( + settings.RABBITMQ_HOST, credentials=credentials + ) connection = pika.BlockingConnection(parameters) return connection.channel() diff --git a/backend/app/rabbitmq/listeners.py b/backend/app/rabbitmq/listeners.py index dd3dbf4af..405dd5329 100644 --- a/backend/app/rabbitmq/listeners.py +++ b/backend/app/rabbitmq/listeners.py @@ -22,11 +22,15 @@ async def create_reply_queue(channel: AbstractChannel): - if (config_entry := await ConfigEntryDB.find_one({"key": "instance_id"})) is not None: + if ( + config_entry := await ConfigEntryDB.find_one({"key": "instance_id"}) + ) is not None: instance_id = config_entry.value else: instance_id = "".join( - random.choice(string.ascii_uppercase + string.ascii_lowercase + string.digits) + random.choice( + string.ascii_uppercase + string.ascii_lowercase + string.digits + ) for _ in range(10) ) config_entry = ConfigEntryDB(key="instance_id", value=instance_id) @@ -36,7 +40,9 @@ async def create_reply_queue(channel: AbstractChannel): # Use aio_pika methods instead of pika methods exchange = await channel.declare_exchange("clowder", durable=True) - queue = await channel.declare_queue(queue_name, durable=True, exclusive=False, auto_delete=False) + queue = await channel.declare_queue( + queue_name, durable=True, exclusive=False, auto_delete=False + ) await queue.bind(exchange) return queue.name @@ -61,7 +67,6 @@ async def submit_file_job( ) await job.insert() - current_secretKey = await get_user_job_key(user.email) msg_body = EventListenerJobMessage( filename=file_out.name, @@ -79,7 +84,7 @@ async def submit_file_job( print("RABBITMQ_CLIENT: " + str(rabbitmq_client)) await rabbitmq_client.default_exchange.publish( aio_pika.Message( - body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'), + body=json.dumps(msg_body.dict(), ensure_ascii=False).encode("utf-8"), content_type="application/json", delivery_mode=aio_pika.DeliveryMode.PERSISTENT, reply_to=reply_to, @@ -117,7 +122,7 @@ async def submit_dataset_job( reply_to = await create_reply_queue(rabbitmq_client) await rabbitmq_client.default_exchange.publish( aio_pika.Message( - body=json.dumps(msg_body.dict(), ensure_ascii=False).encode('utf-8'), + body=json.dumps(msg_body.dict(), ensure_ascii=False).encode("utf-8"), content_type="application/json", delivery_mode=aio_pika.DeliveryMode.PERSISTENT, reply_to=reply_to, diff --git a/backend/app/routers/files.py b/backend/app/routers/files.py index b34ec9bc5..83f394498 100644 --- a/backend/app/routers/files.py +++ b/backend/app/routers/files.py @@ -41,6 +41,7 @@ router = APIRouter() security = HTTPBearer() + class CustomJSONEncoder(JSONEncoder): def default(self, obj): if isinstance(obj, PydanticObjectId): @@ -148,13 +149,12 @@ async def add_file_entry( # Publish a message when indexing is complete - # FIXED: Use aio_pika publishing message_body = { "event_type": "file_indexed", "file_data": json.loads(new_file.json()), "user": json.loads(user.json()), - "timestamp": datetime.now().isoformat() + "timestamp": datetime.now().isoformat(), } # Get the exchange first @@ -163,7 +163,7 @@ async def add_file_entry( # Use aio_pika publish method await exchange.publish( aio_pika.Message( - body=json.dumps(message_body).encode('utf-8'), + body=json.dumps(message_body).encode("utf-8"), content_type="application/json", delivery_mode=aio_pika.DeliveryMode.PERSISTENT, ), @@ -201,7 +201,7 @@ async def add_local_file_entry( "event_type": "file_indexed", "file_data": json.loads(new_file.json()), "user": json.loads(user.json()), - "timestamp": datetime.now().isoformat() + "timestamp": datetime.now().isoformat(), } # Get the exchange first @@ -210,7 +210,7 @@ async def add_local_file_entry( # Use aio_pika publish method await exchange.publish( aio_pika.Message( - body=json.dumps(message_body).encode('utf-8'), + body=json.dumps(message_body).encode("utf-8"), content_type="application/json", delivery_mode=aio_pika.DeliveryMode.PERSISTENT, ), From cebe9a2df53b1f33f6f6d59c02937a4a7a1df14b Mon Sep 17 00:00:00 2001 From: toddn Date: Wed, 5 Nov 2025 18:21:58 -0600 Subject: [PATCH 15/18] fix message listener --- backend/message_listener.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/backend/message_listener.py b/backend/message_listener.py index 85f161f36..ff6ea5b86 100644 --- a/backend/message_listener.py +++ b/backend/message_listener.py @@ -36,7 +36,6 @@ from app.dependencies import get_elasticsearchclient, get_rabbitmq - def parse_message_status(msg): """Determine if the message corresponds to start/middle/end of job if possible. See pyclowder.utils.StatusMessage.""" if ( @@ -171,7 +170,9 @@ async def callback(message: AbstractIncomingMessage, es, rabbitmq_client): elif incoming_status == EventListenerJobStatus.STARTED: field_updates[EventListenerJobDB.duration] = 0 - logger.info(f"[{job_id}] {timestamp} {incoming_status.value} {cleaned_msg}") + logger.info( + f"[{job_id}] {timestamp} {incoming_status.value} {cleaned_msg}" + ) # Update the job timestamps/duration depending on what status we received if incoming_status == EventListenerJobStatus.STARTED: @@ -194,7 +195,9 @@ async def callback(message: AbstractIncomingMessage, es, rabbitmq_client): return True else: # We don't know what this job is. Reject the message. - logger.error("Job ID %s not found in database, skipping message." % job_id) + logger.error( + "Job ID %s not found in database, skipping message." % job_id + ) return False @@ -249,6 +252,7 @@ async def listen_for_messages(): # Create a partial function that includes the dependencies from functools import partial + callback_with_deps = partial(callback, es=es, rabbitmq_client=channel) await queue.consume( From 983c5cbe238de5fc65f2291c6a02aaba115a06d9 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 10 Nov 2025 14:22:01 -0600 Subject: [PATCH 16/18] does this fix env variables? --- docker-compose.dev.yml | 2 ++ 1 file changed, 2 insertions(+) diff --git a/docker-compose.dev.yml b/docker-compose.dev.yml index 7d34f513d..62842fb12 100644 --- a/docker-compose.dev.yml +++ b/docker-compose.dev.yml @@ -150,6 +150,8 @@ services: - RABBITMQ_HOST=rabbitmq - RABBITMQ_USER=${RABBITMQ_USER:-guest} - RABBITMQ_PASS=${RABBITMQ_PASS:-guest} + - ELASTICSEARCH_HOST=elasticsearch # ← Might be missing + - ELASTICSEARCH_PORT=9200 networks: - clowder2 depends_on: From aecbf81cd32a14dcd282fc314daef5261f3ad936 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 10 Nov 2025 14:33:54 -0600 Subject: [PATCH 17/18] new mock rabbitmq for tests --- backend/app/tests/conftest.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/backend/app/tests/conftest.py b/backend/app/tests/conftest.py index 5c9b2587c..7bf1d660c 100644 --- a/backend/app/tests/conftest.py +++ b/backend/app/tests/conftest.py @@ -5,6 +5,23 @@ from app.main import app from app.tests.utils import delete_test_data, user_example from fastapi.testclient import TestClient +from unittest.mock import AsyncMock, patch + + +@pytest.fixture(autouse=True) +def mock_rabbitmq(): + """Mock RabbitMQ connections for all tests""" + with patch('aio_pika.connect_robust') as mock_connect: + mock_channel = AsyncMock() + mock_exchange = AsyncMock() + mock_connection = AsyncMock() + + mock_connect.return_value = mock_connection + mock_connection.__aenter__.return_value = mock_connection + mock_connection.channel.return_value = mock_channel + mock_channel.declare_exchange.return_value = mock_exchange + + yield mock_connect settings.MONGO_DATABASE = "clowder-tests" settings.elasticsearch_index = "clowder-tests" From 3d27d52f8f0dd0afcee90ffb1eef52e943b4a352 Mon Sep 17 00:00:00 2001 From: toddn Date: Mon, 10 Nov 2025 14:38:38 -0600 Subject: [PATCH 18/18] formatting --- backend/app/tests/conftest.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/backend/app/tests/conftest.py b/backend/app/tests/conftest.py index 7bf1d660c..6ba124af4 100644 --- a/backend/app/tests/conftest.py +++ b/backend/app/tests/conftest.py @@ -11,7 +11,7 @@ @pytest.fixture(autouse=True) def mock_rabbitmq(): """Mock RabbitMQ connections for all tests""" - with patch('aio_pika.connect_robust') as mock_connect: + with patch("aio_pika.connect_robust") as mock_connect: mock_channel = AsyncMock() mock_exchange = AsyncMock() mock_connection = AsyncMock() @@ -23,6 +23,7 @@ def mock_rabbitmq(): yield mock_connect + settings.MONGO_DATABASE = "clowder-tests" settings.elasticsearch_index = "clowder-tests"