Skip to content

Add TaskMetrics and emit_metrics for task performance tracking#181

Open
rcabrera-py wants to merge 2 commits intomainfrom
feat/improve-sqs-listener
Open

Add TaskMetrics and emit_metrics for task performance tracking#181
rcabrera-py wants to merge 2 commits intomainfrom
feat/improve-sqs-listener

Conversation

@rcabrera-py
Copy link
Copy Markdown
Contributor

@rcabrera-py rcabrera-py commented Apr 8, 2026

  • Introduced TaskMetrics class to encapsulate task-related metrics.
  • Implemented emit_metrics function to asynchronously send metrics data.
  • Updated run_task to include metrics tracking and logging.
  • Enhanced message consumer and task decorator to support graceful shutdown and metrics emission.
  • Added tests for metrics functionality and graceful shutdown behavior.

Summary by CodeRabbit

  • New Features

    • Emit task execution metrics with an optional callback (includes concurrent-task counts and duration).
    • Graceful shutdown via signal handlers to allow clean task termination and bounded wait for in-flight work.
    • Configurable message deletion behavior on failures; enriched logging to include execution duration and task identifiers.
  • Bug Fixes

    • Invalid message bodies now log a warning and are removed immediately.
  • Tests

    • Added extensive tests covering metrics, message processing, shutdown, retry/deletion behavior, and logging.

- Introduced TaskMetrics class to encapsulate task-related metrics.
- Implemented emit_metrics function to asynchronously send metrics data.
- Updated run_task to include metrics tracking and logging.
- Enhanced message consumer and task decorator to support graceful shutdown and metrics emission.
- Added tests for metrics functionality and graceful shutdown behavior.
@coderabbitai
Copy link
Copy Markdown

coderabbitai Bot commented Apr 8, 2026

Walkthrough

Adds metrics support and graceful shutdown to SQS task processing. agave/tasks/init.py now re-exports TaskMetrics and emit_metrics. New agave/tasks/metrics.py defines a TaskMetrics dataclass and an async emit_metrics function. agave/tasks/sqs_tasks.py: task and run_task signatures gained delete_on_failure and metrics_callback, message deletion and retry behavior were adjusted, duration_ms is logged, invalid JSON messages are logged and deleted, SIGTERM/SIGINT handlers and a shutdown event were added, background task typing tightened, and metrics-callback errors are caught. Tests and fixtures were added/updated and version bumped to 1.5.3.dev2.

Estimated code review effort

🎯 4 (Complex) | ⏱️ ~50 minutes

🚥 Pre-merge checks | ✅ 2 | ❌ 1

❌ Failed checks (1 warning)

Check name Status Explanation Resolution
Docstring Coverage ⚠️ Warning Docstring coverage is 37.50% which is insufficient. The required threshold is 80.00%. Write docstrings for the functions missing them to satisfy the coverage threshold.
✅ Passed checks (2 passed)
Check name Status Explanation
Description Check ✅ Passed Check skipped - CodeRabbit’s high-level summary is enabled.
Title check ✅ Passed The title 'Add TaskMetrics and emit_metrics for task performance tracking' clearly and accurately summarizes the main changes introduced in the pull request.

✏️ Tip: You can configure your own custom pre-merge checks in the settings.

✨ Finishing Touches
📝 Generate docstrings
  • Create stacked PR
  • Commit on current branch
🧪 Generate unit tests (beta)
  • Create PR with unit tests
  • Commit unit tests in branch feat/improve-sqs-listener

Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link
Copy Markdown

codecov Bot commented Apr 8, 2026

Codecov Report

✅ All modified and coverable lines are covered by tests.
✅ Project coverage is 100.00%. Comparing base (e7aa7f5) to head (17a3655).

Additional details and impacted files
@@            Coverage Diff            @@
##              main      #181   +/-   ##
=========================================
  Coverage   100.00%   100.00%           
=========================================
  Files           19        21    +2     
  Lines          711       775   +64     
  Branches        74        85   +11     
=========================================
+ Hits           711       775   +64     
Flag Coverage Δ
unittests 100.00% <100.00%> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
agave/tasks/sqs_tasks.py (2)

75-84: ⚠️ Potential issue | 🟠 Major

Don't report exhausted retries as retrying.

Once message_receive_count >= max_retries + 1, this attempt is terminal, but the code still logs and emits status='retrying'. That will undercount real failures and make downstream metrics think another retry is pending when the message is actually being deleted or handed off to DLQ redrive.

Example adjustment
     except RetryTask as retry:
         retries_exhausted = message_receive_count >= max_retries + 1
         delete_message = retries_exhausted and delete_on_failure
@@
-        log_data['response']['status'] = 'retrying'
+        log_data['response']['status'] = (
+            'max_retries_exhausted' if retries_exhausted else 'retrying'
+        )

Also applies to: 114-119

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agave/tasks/sqs_tasks.py` around lines 75 - 84, The code sets
log_data['response']['status']='retrying' even when retries_exhausted is true;
change the logic in the block that computes retries_exhausted/delete_message
(using message_receive_count, max_retries, delete_on_failure, retry.countdown,
ReceiptHandle, sqs.change_message_visibility) so that when retries_exhausted is
true you set log_data['response']['status'] to a terminal value (e.g.,
'exhausted' or 'failed') instead of 'retrying' and only set 'retrying' when you
actually change visibility and expect another attempt; apply the same fix to the
analogous section referenced around the other block (the 114-119 area) to keep
metrics accurate.

137-154: ⚠️ Potential issue | 🟠 Major

Re-check shutdown_event after the long poll returns.

A signal can arrive while receive_message() is awaiting. In that case the success path still yields whatever was just fetched and starts new work during shutdown.

Minimal fix
         try:
             response = await sqs.receive_message(
                 QueueUrl=queue_url,
                 WaitTimeSeconds=wait_time_seconds,
                 VisibilityTimeout=visibility_timeout,
                 AttributeNames=['ApproximateReceiveCount'],
             )
+            if shutdown_event.is_set():
+                return
             messages = response['Messages']
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agave/tasks/sqs_tasks.py` around lines 137 - 154, After the long poll
returns, re-check shutdown_event before processing results: in the async
function that calls receive_message (the block around receive_message, messages
and the for loop yielding each message), add a guard after the try/except that
checks if shutdown_event.is_set() and if so stop the generator (return/break)
instead of proceeding to iterate over messages and yield them; reference
receive_message, shutdown_event, messages and the for message in messages yield
path when making the change.
🧹 Nitpick comments (2)
tests/tasks/test_metrics.py (1)

24-30: Assert that the callback was awaited, not just called.

emit_metrics is an async boundary. assert_called_once_with(...) would still pass if someone accidentally dropped the await, so this test does not fully protect the behavior being added here.

Suggested test change
-    callback.assert_called_once_with(
+    callback.assert_awaited_once_with(
         task_name=TASK_NAME,
         queue_url=QUEUE_URL,
         status='success',
         duration_ms=150.5,
         concurrent_tasks=3,
     )
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/tasks/test_metrics.py` around lines 24 - 30, The test currently uses a
regular mock for callback and asserts callback.assert_called_once_with(...),
which doesn't verify the coroutine was awaited; make the callback an AsyncMock
(or otherwise ensure it's awaitable) and replace
callback.assert_called_once_with(...) with
callback.assert_awaited_once_with(...) after invoking emit_metrics (the async
boundary) so the test verifies emit_metrics awaited the callback; reference
symbols: emit_metrics, callback, TASK_NAME, QUEUE_URL.
tests/tasks/test_sqs_tasks.py (1)

656-680: Assert that the timed-out task's message is retained.

This only checks the warning. The important behavior in this path is that an unfinished task is not deleted, so the message can be retried after shutdown.

Suggested assertion to add
     assert any(
         'Graceful shutdown timeout' in r.message for r in caplog.records
     )
+    resp = await sqs_client.receive_message(WaitTimeSeconds=2)
+    assert 'Messages' in resp
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/tasks/test_sqs_tasks.py` around lines 656 - 680, The test
test_graceful_shutdown_timeout currently only asserts the shutdown warning; add
an assertion that the in-flight message was not deleted so it can be retried:
after the await task(...)(my_task)() call, query the test SQS fixture
(sqs_client) to receive or inspect messages on the queue and assert the original
enqueued_message (e.g., by message body or message_id) is still present; use
sqs_client.receive_messages or the equivalent helper in your test harness and
assert the message count or content matches enqueued_message to confirm it was
not deleted.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@agave/tasks/metrics.py`:
- Around line 12-26: emit_metrics currently types metrics_callback as
Optional[Callable] but awaits it unconditionally; update the type signature on
emit_metrics to require an async callable (e.g., metrics_callback:
Optional[Callable[..., Awaitable[None] | None]] | None) so callers know it must
be awaitable, and/or add a runtime check inside emit_metrics that raises a clear
TypeError if metrics_callback is provided but not inspect.isawaitable when
called; refer to the emit_metrics function and the metrics_callback parameter
and use TaskMetrics.concurrent_tasks_counter() when validating/awaiting the
callback.

In `@agave/tasks/sqs_tasks.py`:
- Around line 261-271: The run_task coroutine currently treats cancellations
like normal exceptions and the finally block leaves delete_message=True and
status='success', causing deleted messages on shutdown; modify run_task to
explicitly catch asyncio.CancelledError (from asyncio) around the task body or
around top-level await so you can set delete_message=False (or an explicit
status like 'cancelled') before the finally executes, then re-raise the
CancelledError so normal cancellation semantics remain; ensure this explicit
except asyncio.CancelledError handler is placed before any broad except
Exception handlers in run_task and that callers like the graceful shutdown logic
using get_running_fast_agave_tasks will therefore return the message to the
queue for retry instead of deleting it.

---

Outside diff comments:
In `@agave/tasks/sqs_tasks.py`:
- Around line 75-84: The code sets log_data['response']['status']='retrying'
even when retries_exhausted is true; change the logic in the block that computes
retries_exhausted/delete_message (using message_receive_count, max_retries,
delete_on_failure, retry.countdown, ReceiptHandle,
sqs.change_message_visibility) so that when retries_exhausted is true you set
log_data['response']['status'] to a terminal value (e.g., 'exhausted' or
'failed') instead of 'retrying' and only set 'retrying' when you actually change
visibility and expect another attempt; apply the same fix to the analogous
section referenced around the other block (the 114-119 area) to keep metrics
accurate.
- Around line 137-154: After the long poll returns, re-check shutdown_event
before processing results: in the async function that calls receive_message (the
block around receive_message, messages and the for loop yielding each message),
add a guard after the try/except that checks if shutdown_event.is_set() and if
so stop the generator (return/break) instead of proceeding to iterate over
messages and yield them; reference receive_message, shutdown_event, messages and
the for message in messages yield path when making the change.

---

Nitpick comments:
In `@tests/tasks/test_metrics.py`:
- Around line 24-30: The test currently uses a regular mock for callback and
asserts callback.assert_called_once_with(...), which doesn't verify the
coroutine was awaited; make the callback an AsyncMock (or otherwise ensure it's
awaitable) and replace callback.assert_called_once_with(...) with
callback.assert_awaited_once_with(...) after invoking emit_metrics (the async
boundary) so the test verifies emit_metrics awaited the callback; reference
symbols: emit_metrics, callback, TASK_NAME, QUEUE_URL.

In `@tests/tasks/test_sqs_tasks.py`:
- Around line 656-680: The test test_graceful_shutdown_timeout currently only
asserts the shutdown warning; add an assertion that the in-flight message was
not deleted so it can be retried: after the await task(...)(my_task)() call,
query the test SQS fixture (sqs_client) to receive or inspect messages on the
queue and assert the original enqueued_message (e.g., by message body or
message_id) is still present; use sqs_client.receive_messages or the equivalent
helper in your test harness and assert the message count or content matches
enqueued_message to confirm it was not deleted.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 0d09a8b3-b067-4aed-8d92-b2654be229bf

📥 Commits

Reviewing files that changed from the base of the PR and between e7aa7f5 and 1a85375.

📒 Files selected for processing (6)
  • agave/tasks/__init__.py
  • agave/tasks/metrics.py
  • agave/tasks/sqs_tasks.py
  • tests/conftest.py
  • tests/tasks/test_metrics.py
  • tests/tasks/test_sqs_tasks.py

Comment thread agave/tasks/metrics.py
Comment thread agave/tasks/sqs_tasks.py
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (1)
agave/tasks/sqs_tasks.py (1)

211-215: Add error handling for unsupported signal handler operations.

loop.add_signal_handler() and loop.remove_signal_handler() are not supported on Windows and cannot be called from non-main threads (raising NotImplementedError and ValueError respectively). Wrap signal handler registration at line 214 and restoration at line 284 in try/except blocks so the consumer gracefully handles platforms/contexts where signal hooks are unavailable.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agave/tasks/sqs_tasks.py` around lines 211 - 215, Wrap the calls to
loop.add_signal_handler(...) where previous_handlers is populated and the
corresponding loop.remove_signal_handler(...) restoration in try/except blocks
to catch NotImplementedError and ValueError so non-main threads and Windows
platforms don't crash; specifically surround the
loop.add_signal_handler(signal.SIGTERM/SIGINT, _handle_signal, sig) calls and
the loop.remove_signal_handler(...) cleanup in the same function with try/except
and log or ignore those exceptions, preserving previous_handlers behavior when
handlers cannot be installed or removed.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@agave/tasks/sqs_tasks.py`:
- Around line 113-127: The metrics emission awaits an external callback
(emit_metrics) without a timeout which can stall workers; wrap the await
emit_metrics(...) call in an asyncio timeout (e.g., asyncio.wait_for) with a
short configurable constant (or env var) and catch asyncio.TimeoutError (and
other exceptions) to log a warning and move on so BACKGROUND_TASKS slots are
freed; ensure you reference TaskMetrics, emit_metrics, metrics_callback and keep
the existing exception logging for non-timeout errors.

---

Nitpick comments:
In `@agave/tasks/sqs_tasks.py`:
- Around line 211-215: Wrap the calls to loop.add_signal_handler(...) where
previous_handlers is populated and the corresponding
loop.remove_signal_handler(...) restoration in try/except blocks to catch
NotImplementedError and ValueError so non-main threads and Windows platforms
don't crash; specifically surround the
loop.add_signal_handler(signal.SIGTERM/SIGINT, _handle_signal, sig) calls and
the loop.remove_signal_handler(...) cleanup in the same function with try/except
and log or ignore those exceptions, preserving previous_handlers behavior when
handlers cannot be installed or removed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: 434c9694-6a98-4e20-9d45-60c0b85d6bd3

📥 Commits

Reviewing files that changed from the base of the PR and between 1a85375 and 44c8d59.

📒 Files selected for processing (5)
  • agave/tasks/metrics.py
  • agave/tasks/sqs_tasks.py
  • agave/version.py
  • tests/tasks/test_loggin_tasks.py
  • tests/tasks/test_sqs_tasks.py
✅ Files skipped from review due to trivial changes (1)
  • agave/version.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • agave/tasks/metrics.py
  • tests/tasks/test_sqs_tasks.py

Comment thread agave/tasks/sqs_tasks.py
Comment on lines +113 to +127
if metrics_callback:
try:
task_metrics = TaskMetrics(
task_name=task_name,
queue_url=queue_url,
concurrent_tasks_counter=lambda: len(BACKGROUND_TASKS),
)
await emit_metrics(
metrics=task_metrics,
status=log_data['response']['status'],
duration_ms=duration_ms,
metrics_callback=metrics_callback,
)
except Exception as exc:
logger.warning(f'metrics_callback failed: {exc}')
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Bound metrics emission with a timeout to avoid stalling workers.

Line 120 awaits external callback flow without a timeout. If it hangs, the worker slot never frees, throttling or stalling consumption under load.

Suggested hardening
         if metrics_callback:
             try:
                 task_metrics = TaskMetrics(
                     task_name=task_name,
                     queue_url=queue_url,
                     concurrent_tasks_counter=lambda: len(BACKGROUND_TASKS),
                 )
-                await emit_metrics(
-                    metrics=task_metrics,
-                    status=log_data['response']['status'],
-                    duration_ms=duration_ms,
-                    metrics_callback=metrics_callback,
-                )
+                await asyncio.wait_for(
+                    emit_metrics(
+                        metrics=task_metrics,
+                        status=log_data['response']['status'],
+                        duration_ms=duration_ms,
+                        metrics_callback=metrics_callback,
+                    ),
+                    timeout=5,
+                )
+            except asyncio.TimeoutError:
+                logger.warning('metrics_callback timed out')
             except Exception as exc:
                 logger.warning(f'metrics_callback failed: {exc}')
🧰 Tools
🪛 Ruff (0.15.9)

[warning] 126-126: Do not catch blind exception: Exception

(BLE001)

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agave/tasks/sqs_tasks.py` around lines 113 - 127, The metrics emission awaits
an external callback (emit_metrics) without a timeout which can stall workers;
wrap the await emit_metrics(...) call in an asyncio timeout (e.g.,
asyncio.wait_for) with a short configurable constant (or env var) and catch
asyncio.TimeoutError (and other exceptions) to log a warning and move on so
BACKGROUND_TASKS slots are freed; ensure you reference TaskMetrics,
emit_metrics, metrics_callback and keep the existing exception logging for
non-timeout errors.

… callbacks

- Bump version from 1.5.2 to 1.5.3.dev0.
- Refine type annotations for metrics_callback in emit_metrics and run_task functions to specify Awaitable return type.
- Update test assertions to reflect changes in task retry status handling.
- Add a new test for immediate shutdown behavior in message consumer.
@rcabrera-py rcabrera-py force-pushed the feat/improve-sqs-listener branch from 44c8d59 to 17a3655 Compare April 8, 2026 19:18
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (1)
agave/tasks/sqs_tasks.py (1)

187-215: ⚠️ Potential issue | 🟠 Major

Use shared signal handlers and per-listener task tracking instead of per-listener shutdown state.

When multiple @task decorated functions run concurrently in the same event loop, calling loop.add_signal_handler() for each listener overwrites the previous handler. This means only the last listener receives the SIGTERM/SIGINT signal, leaving earlier listeners unaware of shutdown. Additionally, get_running_fast_agave_tasks() returns all 'fast-agave-task' tasks in the loop, causing the final wait in one listener to include tasks from other listeners, creating improper cross-listener dependencies.

Implement shared signal/shutdown state (a single set of signal handlers for the entire loop) that notifies all listeners, and use per-listener task sets for the final graceful wait.

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agave/tasks/sqs_tasks.py` around lines 187 - 215, The current listener
installs its own loop.add_signal_handler and shutdown_event which overwrites
handlers and shares shutdown/task state across listeners; instead create a
single shared shutdown state and signal handlers per event loop (e.g., attach a
loop-scoped marker/attribute and shared shutdown_event/previous_handlers on the
loop) and only install handlers once using loop.add_signal_handler guarded by
that marker (referencing loop.add_signal_handler and previous_handlers); for
each listener keep a per-listener task set (rather than relying on
get_running_fast_agave_tasks()) and have concurrency_controller register tasks
it starts into that per-listener set so the final graceful wait only awaits that
listener’s tasks, while the shared loop.shutdown_event/can_read state is used to
broadcast shutdown to all listeners.
♻️ Duplicate comments (1)
agave/tasks/sqs_tasks.py (1)

113-127: ⚠️ Potential issue | 🟠 Major

Bound metrics emission so a hung callback can’t hold a worker slot.

This still awaits the external metrics path inline while the semaphore is held. If the callback hangs, that slot never frees and graceful shutdown waits on it too. Please put a short timeout around the metrics path and warn/continue on timeout.

Suggested hardening
+METRICS_TIMEOUT_SECONDS = 5
+
 ...
         if metrics_callback:
             try:
                 task_metrics = TaskMetrics(
                     task_name=task_name,
                     queue_url=queue_url,
                     concurrent_tasks_counter=lambda: len(BACKGROUND_TASKS),
                 )
-                await emit_metrics(
-                    metrics=task_metrics,
-                    status=log_data['response']['status'],
-                    duration_ms=duration_ms,
-                    metrics_callback=metrics_callback,
-                )
+                await asyncio.wait_for(
+                    emit_metrics(
+                        metrics=task_metrics,
+                        status=log_data['response']['status'],
+                        duration_ms=duration_ms,
+                        metrics_callback=metrics_callback,
+                    ),
+                    timeout=METRICS_TIMEOUT_SECONDS,
+                )
+            except asyncio.TimeoutError:
+                logger.warning('metrics_callback timed out')
             except Exception as exc:
                 logger.warning(f'metrics_callback failed: {exc}')
🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@agave/tasks/sqs_tasks.py` around lines 113 - 127, Wrap the await
emit_metrics(...) call in an asyncio.wait_for with a short timeout (e.g., 1-2s)
so a hung metrics_callback can’t hold a worker slot; catch asyncio.TimeoutError
to logger.warning that metrics timed out and continue, and still catch other
Exceptions as before; reference TaskMetrics, emit_metrics, BACKGROUND_TASKS,
metrics_callback and logger.warning and ensure asyncio is imported.
🤖 Prompt for all review comments with AI agents
Verify each finding against the current code and only fix it if needed.

Inline comments:
In `@tests/tasks/test_sqs_tasks.py`:
- Around line 570-593: The test assumes FIFO ordering across different
MessageGroupId values which is flaky; update the test that sends three messages
(send_message calls with MessageGroupId) so they all use the same MessageGroupId
(e.g., reuse the same string) OR change the assertion logic: keep the different
MessageGroupId values but instead assert that the first item appended to the
processed list (in my_task) caused trigger_shutdown and that subsequent messages
(like 'msg2') were not processed after shutdown; modify the send_message calls
or the assertions around processed, my_task and trigger_shutdown accordingly.

---

Outside diff comments:
In `@agave/tasks/sqs_tasks.py`:
- Around line 187-215: The current listener installs its own
loop.add_signal_handler and shutdown_event which overwrites handlers and shares
shutdown/task state across listeners; instead create a single shared shutdown
state and signal handlers per event loop (e.g., attach a loop-scoped
marker/attribute and shared shutdown_event/previous_handlers on the loop) and
only install handlers once using loop.add_signal_handler guarded by that marker
(referencing loop.add_signal_handler and previous_handlers); for each listener
keep a per-listener task set (rather than relying on
get_running_fast_agave_tasks()) and have concurrency_controller register tasks
it starts into that per-listener set so the final graceful wait only awaits that
listener’s tasks, while the shared loop.shutdown_event/can_read state is used to
broadcast shutdown to all listeners.

---

Duplicate comments:
In `@agave/tasks/sqs_tasks.py`:
- Around line 113-127: Wrap the await emit_metrics(...) call in an
asyncio.wait_for with a short timeout (e.g., 1-2s) so a hung metrics_callback
can’t hold a worker slot; catch asyncio.TimeoutError to logger.warning that
metrics timed out and continue, and still catch other Exceptions as before;
reference TaskMetrics, emit_metrics, BACKGROUND_TASKS, metrics_callback and
logger.warning and ensure asyncio is imported.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Organization UI

Review profile: CHILL

Plan: Pro

Run ID: c2f5e8eb-cff8-428c-a0cc-2b18106ee9af

📥 Commits

Reviewing files that changed from the base of the PR and between 44c8d59 and 17a3655.

📒 Files selected for processing (5)
  • agave/tasks/metrics.py
  • agave/tasks/sqs_tasks.py
  • agave/version.py
  • tests/tasks/test_loggin_tasks.py
  • tests/tasks/test_sqs_tasks.py
✅ Files skipped from review due to trivial changes (3)
  • tests/tasks/test_loggin_tasks.py
  • agave/tasks/metrics.py
  • agave/version.py

Comment on lines +570 to +593
for i in range(3):
await sqs_client.send_message(
MessageBody=json.dumps(dict(id=f'msg{i}')),
MessageGroupId=str(i),
)

processed = []

async def my_task(data: dict) -> None:
processed.append(data['id'])
if data['id'] == 'msg0':
trigger_shutdown()
await asyncio.sleep(0.5)

await task(
queue_url=sqs_client.queue_url,
region_name=CORE_QUEUE_REGION,
wait_time_seconds=1,
visibility_timeout=10,
max_concurrent_tasks=1,
)(my_task)()

assert processed[0] == 'msg0'
assert 'msg2' not in processed
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟡 Minor

🧩 Analysis chain

🏁 Script executed:

cat -n tests/tasks/test_sqs_tasks.py | sed -n '570,593p'

Repository: cuenca-mx/agave

Length of output: 866


🏁 Script executed:

cat -n tests/tasks/test_sqs_tasks.py | sed -n '560,600p'

Repository: cuenca-mx/agave

Length of output: 1448


🏁 Script executed:

rg -n "test_no_new_messages_after_shutdown" tests/tasks/test_sqs_tasks.py -A 5 -B 2

Repository: cuenca-mx/agave

Length of output: 262


🏁 Script executed:

rg -n "sqs_client" tests/tasks/test_sqs_tasks.py -B 5 | head -30

Repository: cuenca-mx/agave

Length of output: 886


🏁 Script executed:

grep -n "import.*sqs\|from.*sqs\|moto\|pytest" tests/tasks/test_sqs_tasks.py | head -20

Repository: cuenca-mx/agave

Length of output: 97


🏁 Script executed:

fd -n "conftest.py" tests/ -x cat {}

Repository: cuenca-mx/agave

Length of output: 289


🏁 Script executed:

find tests -name "conftest.py" -type f

Repository: cuenca-mx/agave

Length of output: 76


🏁 Script executed:

cat -n tests/conftest.py

Repository: cuenca-mx/agave

Length of output: 10689


Remove the flaky ordering assumption from this test.

Each message uses a different MessageGroupId ("0", "1", "2"), but the test asserts processed[0] == 'msg0'. In SQS FIFO, ordering is only guaranteed within a message group, not across different groups. This test will be flaky depending on which message the consumer retrieves first.

To fix: Use the same MessageGroupId for all three messages, or replace the specific check with a verification that the first processed message triggers shutdown (regardless of which one it is).

🤖 Prompt for AI Agents
Verify each finding against the current code and only fix it if needed.

In `@tests/tasks/test_sqs_tasks.py` around lines 570 - 593, The test assumes FIFO
ordering across different MessageGroupId values which is flaky; update the test
that sends three messages (send_message calls with MessageGroupId) so they all
use the same MessageGroupId (e.g., reuse the same string) OR change the
assertion logic: keep the different MessageGroupId values but instead assert
that the first item appended to the processed list (in my_task) caused
trigger_shutdown and that subsequent messages (like 'msg2') were not processed
after shutdown; modify the send_message calls or the assertions around
processed, my_task and trigger_shutdown accordingly.

@rcabrera-py rcabrera-py added the enhancement New feature or request label Apr 14, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant