refactor: unify Redis access, wire ingest pipeline, consolidate auth#61
refactor: unify Redis access, wire ingest pipeline, consolidate auth#61williaby wants to merge 2 commits into
Conversation
Implements the top-3 refactors from the architecture review.
1. Unify Redis access + non-blocking reads
- Add core/redis.py: single pooled, synchronous client factory
(decoded + raw pools) replacing three hand-rolled redis.Redis(...)
constructions in queue/client, queue/redis_store and websocket/events.
- Close pools on FastAPI shutdown lifespan.
- Add async, thread-offloaded wrappers (get_batch_status_async /
get_job_status_async) and switch async API/WebSocket handlers to them
so synchronous Redis calls no longer block the event loop.
- Dedupe _parse_iso_datetime in models to utils.time_utils.parse_iso_datetime.
2. Complete ingest -> persist -> enqueue flow
- ingest_files now persists the batch/jobs and enqueues them via
enqueue_batch_jobs, gated behind the new (default-off) enqueue_enabled
setting, with best-effort rollback + 503 on failure.
- Harden process_job_task: pipeline failures mark the job FAILED and
refresh batch progress instead of crashing the worker.
3. Consolidate authentication + wire exception hierarchy
- Extract shared JWKS load (with asyncio.Lock to prevent fetch stampede),
key resolution and JWT decode helpers; the WebSocket path now shares the
middleware's clock-skew leeway (previously divergent).
- Register a FastAPI handler mapping ProjectBaseError subclasses to HTTP
responses via http_status_for, making the centralized exception
hierarchy live.
Adds tests for the Redis factory, async wrappers, worker failure handling,
ingest enqueue wiring + rollback, leeway consistency, and exception mapping.
Full suite: 446 passed, coverage 91.59%.
https://claude.ai/code/session_01PA6dtgMhfzSe22VVtqBfxE
|
Warning Review limit reached
More reviews will be available in 34 minutes and 15 seconds. Learn how PR review limits work. Your organization has run out of usage credits. Purchase more in the billing tab. ⌛ How to resolve this issue?After more reviews become available, a review can be triggered using the We recommend that you space out your commits to avoid hitting the rate limit. 🚦 How do rate limits work?CodeRabbit enforces hourly rate limits for each developer per organization. Our paid plans include higher PR review limits than trial, open-source, and free plans. In all cases, reviews become available again over time. During sustained high-volume PR review activity, CodeRabbit may temporarily slow when the next review becomes available. Please see our Fair Usage Limits Policy for further information. ℹ️ Review info⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: 📒 Files selected for processing (21)
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
FIPS Compatibility Check -- PASSED
Status: PASSED |
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
✅ Performance Regression CheckStatus: PERFORMANCE OK
Threshold: +/-10% allowed regression ✅ Performance is within acceptable range. Additional Metrics
About Performance Regression TestingThis automated check compares
To reproduce locally: uv run --frozen python scripts/benchmark.py --iterations 1000 |
There was a problem hiding this comment.
Pull request overview
This PR implements a set of backend refactors to reduce duplication and improve end-to-end ingestion reliability: centralizing Redis client creation/pooling, completing the ingest→persist→enqueue pipeline (behind a default-off setting), consolidating Cloudflare Access JWT verification behavior across HTTP and WebSocket, and wiring the domain exception hierarchy into FastAPI.
Changes:
- Centralized synchronous Redis client creation into
core/redis.py(separate decoded vs raw pools) and updated queue/store components to use it. - Completed ingest pipeline wiring to persist/enqueue jobs when
enqueue_enabledis set, and hardened worker task failure handling to mark jobs/batches failed instead of crashing/stalling. - Consolidated Cloudflare auth helpers (JWKS load locking + shared decode behavior) and registered a FastAPI exception handler for
ProjectBaseError.
Reviewed changes
Copilot reviewed 20 out of 20 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
| src/rag_processor/core/redis.py | Adds shared decoded/raw Redis connection pools with shutdown cleanup. |
| src/rag_processor/queue/client.py | Switches RQ client creation to use the shared raw Redis pool. |
| src/rag_processor/queue/redis_store.py | Switches persistence layer to use the shared decoded Redis pool and adds batch deletion. |
| src/rag_processor/queue/jobs.py | Adds async wrappers for Redis reads and hardens process_job_task failure handling. |
| src/rag_processor/api/ingest.py | Wires persist+enqueue behind enqueue_enabled, adds rollback behavior on infra failures. |
| src/rag_processor/api/batch.py | Offloads Redis reads from the event loop for batch/job status endpoints. |
| src/rag_processor/websocket/router.py | Offloads Redis reads for WS subscription checks and event replay. |
| src/rag_processor/auth/cloudflare.py | Consolidates JWKS loading/key resolution/decode helpers and shares leeway across auth paths. |
| src/rag_processor/core/exceptions.py | Defines centralized domain exception hierarchy and HTTP status mapping. |
| src/rag_processor/main.py | Wires app lifespan shutdown cleanup and registers ProjectBaseError exception handler. |
| tests/unit/test_core_redis.py | Adds unit coverage for shared Redis pool behavior and pool shutdown reset. |
| tests/unit/test_jobs_async.py | Adds tests for async wrappers and worker pipeline failure handling. |
| tests/unit/test_ingest.py | Adds enqueue-gating tests and rollback behavior verification. |
| tests/unit/test_exception_handler.py | Adds tests for domain-exception→HTTP mapping and FastAPI handler wiring. |
| try: | ||
| await asyncio.to_thread(get_redis_store().delete_batch, batch.batch_id) | ||
| except Exception: # noqa: BLE001 - rollback is best-effort |
| # Verify batch exists and the caller owns it. Treat "not found" and | ||
| # "not authorized" identically to avoid leaking batch IDs. | ||
| batch, _ = get_batch_status(batch_id) | ||
| batch, _ = await asyncio.to_thread(get_batch_status, batch_id) |
Responds to Copilot review on PR #61. - enqueue_batch_jobs is now all-or-nothing: it tracks enqueued RQ jobs and, on any partial failure, cancels/deletes them and removes the batch's Redis state before re-raising. Prevents orphaned RQ jobs whose metadata would otherwise be deleted by the ingest rollback ("Job not found"). Extracted _rollback_enqueued_batch; added JOB_TIMEOUT_SECONDS constant. - ingest _persist_and_enqueue rollback simplified to filesystem cleanup only, since enqueue_batch_jobs now owns its Redis/RQ rollback. - websocket router uses the centralized get_batch_status_async wrapper instead of duplicating the asyncio.to_thread offload. Tests: added transactional rollback + success coverage for enqueue_batch_jobs; updated websocket/ingest tests accordingly. 448 passed. https://claude.ai/code/session_01PA6dtgMhfzSe22VVtqBfxE
✅ Performance Regression CheckStatus: PERFORMANCE OK
Threshold: +/-10% allowed regression ✅ Performance is within acceptable range. Additional Metrics
About Performance Regression TestingThis automated check compares
To reproduce locally: uv run --frozen python scripts/benchmark.py --iterations 1000 |
|



Summary
Implements the top-3 refactors from the repository architecture & maintainability review. Each is incremental and test-backed; the full suite stays green throughout.
🥇 #1 — Unified Redis access + non-blocking reads
core/redis.py: a single pooled, synchronous client factory (separate decoded and raw pools — RQ requires raw bytes) replacing three hand-rolledredis.Redis(...)constructions inqueue/client,queue/redis_store, andwebsocket/events. Pools are disconnected on FastAPI shutdown.get_batch_status_async/get_job_status_asyncthat offload synchronous Redis calls viaasyncio.to_thread; the async API and WebSocket handlers nowawaitthem instead of calling sync Redis directly on the loop._parse_iso_datetimeacross both models → the existingutils.time_utils.parse_iso_datetime.🥈 #2 — Completed the ingest → persist → enqueue flow
ingest_filesnow persists the batch/jobs and enqueues them (enqueue_batch_jobs), removing the long-standing TODO that left the pipeline non-functional end-to-end. Gated behind a new default-offenqueue_enabledsetting for safe rollout, with best-effort rollback +503on infra failure.process_job_task: pipeline failures now mark the jobFAILEDand refresh batch progress instead of crashing the worker / leaving the batch stuck inPROCESSING.🥉 #3 — Consolidated authentication + wired exception hierarchy
asyncio.Lockto prevent fetch stampedes on cache miss), key-resolution, and JWT-decode helpers. The WebSocket path now shares the middleware's clock-skew leeway — fixing a divergence where HTTP and WS accepted different tokens.ProjectBaseErrorsubclasses to HTTP responses viahttp_status_for, making the previously-unused centralized exception hierarchy live.Verification
Scope notes (transparent deferrals)
HTTPExceptioncall sites in handlers were kept rather than migrated to domain exceptions — that would change response-body contracts and break current tests. The hierarchy is now wired and tested; migrating call sites is a clean follow-up.to_threadoffload rather than a full async-Redis rewrite, because RQ requires a synchronous client shared with the worker — a parallel async implementation would reintroduce the duplication this PR removes.Test plan
uv run pytest— 446 passed, 1 skippeduv run ruff check ./ruff format --checkuv run basedpyright src/— 0 errorsuv run bandit -r src— 0 high/criticalhttps://claude.ai/code/session_01PA6dtgMhfzSe22VVtqBfxE
Generated by Claude Code