fix: WebSocket connection-manager concurrency + proxy-aware rate limiting#62
fix: WebSocket connection-manager concurrency + proxy-aware rate limiting#62williaby wants to merge 2 commits into
Conversation
…ting Tier 2 of the architecture review (stacked on PR #61). WebSocket ConnectionManager (C3): eliminate broadcast races - broadcast() now iterates a snapshot of the connection set, so a concurrent connect/disconnect during an awaited send can no longer raise "Set changed size during iteration". - Replace defaultdict with a plain dict and use non-resurrecting cleanup (.get/.pop) in broadcast and disconnect, so a batch emptied/removed by a concurrent disconnect is not silently recreated as an empty set (key leak). Rate limiting (H6): proxy-aware client IP - RateLimitMiddleware can resolve the client IP from a configured header (default CF-Connecting-IP) via the new rate_limit_trust_proxy / rate_limit_client_ip_header settings. Default is OFF so the header is only trusted behind a proxy that overwrites it (otherwise clients could spoof it to evade per-IP limits). Behind Cloudflare this makes per-IP limiting effective again instead of keying every request on the proxy IP. Adds concurrency regression tests for the manager and unit/integration tests for proxy IP resolution. 456 passed, coverage 92.16%. https://claude.ai/code/session_01PA6dtgMhfzSe22VVtqBfxE
|
Important Review skippedAuto reviews are disabled on base/target branches other than the default branch. 🗂️ Base branches to auto review (3)
Please check the settings in the CodeRabbit UI or the ⚙️ Run configurationConfiguration used: Path: .coderabbit.yaml Review profile: ASSERTIVE Plan: Pro Run ID: You can disable this status message by setting the Use the checkbox below for a quick retry:
✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Pull request overview
This PR targets tier-2 reliability/security improvements: safer WebSocket batch connection bookkeeping during concurrent disconnects, and configurable proxy-aware IP selection for rate limiting.
Changes:
- Replaces
ConnectionManager’sdefaultdictwith a plain dict and snapshot-based broadcast iteration. - Adds proxy header configuration for rate-limit client IP resolution.
- Adds regression/unit tests for WebSocket races and proxy-aware rate limiting.
Reviewed changes
Copilot reviewed 6 out of 6 changed files in this pull request and generated 2 comments.
Show a summary per file
| File | Description |
|---|---|
src/rag_processor/websocket/connection_manager.py |
Makes connection cleanup non-resurrecting and broadcast iteration mutation-safe. |
src/rag_processor/middleware/security.py |
Adds trusted proxy header support to rate-limit key selection and wires it through security config. |
src/rag_processor/core/config.py |
Adds settings for proxy-aware rate limiting. |
src/rag_processor/main.py |
Passes new rate-limit proxy settings into middleware configuration. |
tests/unit/test_websocket.py |
Adds regression coverage for broadcast/disconnect races and updates direct connection setup. |
tests/unit/test_middleware_security.py |
Adds tests for proxy header IP resolution and independent forwarded-IP rate limiting. |
| if forwarded: | ||
| # X-Forwarded-For-style headers may list multiple hops; the | ||
| # first is the originating client. | ||
| return forwarded.split(",")[0].strip() | ||
| logger.warning( | ||
| "trust_proxy_headers enabled but %s header missing; " | ||
| "falling back to direct peer address", | ||
| self.client_ip_header, | ||
| ) |
| trust_proxy_headers: bool = False | ||
| client_ip_header: str = "CF-Connecting-IP" |
- _get_client_ip now ignores a blank leading entry in the forwarded header (e.g. ", 10.0.0.1") and falls back to the peer address instead of keying every malformed request on "". - Document the trust_proxy_headers and client_ip_header fields in the SecurityConfig docstring. - Add a regression test for the blank-leading-entry fallback. https://claude.ai/code/session_01PA6dtgMhfzSe22VVtqBfxE
Summary
Tier 2 of the architecture review. Stacked on #61 — base is the PR #61 branch, so this diff shows only the tier-2 changes. Rebase/retarget to
mainonce #61 merges.C3 — WebSocket
ConnectionManagerconcurrency safetyThe previous manager had two real races when a broadcast overlapped a connect/disconnect on the same batch:
broadcast()iterated the live connection set whileawait-ing eachsend_json; a concurrentdisconnectmutating that set raised mid-iteration. Now it iterates a snapshot (list(...)).defaultdictaccess (self._connections[key].discard(...)), which recreated a batch key that a concurrentdisconnecthad just emptied and removed, leaking entries. Switched to a plaindictwith non-resurrecting cleanup (.get/.pop) in bothbroadcastanddisconnect.Rationale for not adding an
asyncio.Lock: under CPython's single-threaded event loop,connect/disconnectmutations are synchronous (atomic w.r.t. other coroutines); the only interleaving point is the awaited sends inbroadcast, which the snapshot + non-resurrecting cleanup fully cover. A lock would forcedisconnectto become async (rippling into the router and tests) for no additional correctness here.H6 — proxy-aware rate limiting
Behind Cloudflare,
request.client.hostis the proxy IP, so per-IP rate limiting effectively keyed every request on one IP.RateLimitMiddlewarecan now resolve the client IP from a configured header:rate_limit_trust_proxy(default off) andrate_limit_client_ip_header(defaultCF-Connecting-IP).request.client.host).X-Forwarded-For) use the first (originating) entry.Verification
New tests: broadcast tolerates concurrent disconnect; broadcast doesn't resurrect a removed batch; proxy IP resolution (trusted/untrusted/missing-header/forwarded-list/no-client) + an integration test proving distinct
CF-Connecting-IPvalues are limited independently.Test plan
uv run pytest— 456 passed, 1 skippeduv run ruff check ./ruff format --checkuv run basedpyright src/— 0 errorshttps://claude.ai/code/session_01PA6dtgMhfzSe22VVtqBfxE
Generated by Claude Code