fix(core): guard job-record decode and return in-memory copies#59
Conversation
WalkthroughThe PR hardens the shared job store across InMemoryJobStore and RedisJobStore by adding deep-copy isolation to prevent state mutation, validating positive TTL values in the constructor, and gracefully handling corrupted Redis records via typed errors instead of raw exceptions. ChangesJob Store Hardening
🎯 3 (Moderate) | ⏱️ ~20 minutes
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches📝 Generate docstrings
🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
✅ FIPS Compatibility Check
Status: ✅ PASSED What is FIPS?FIPS 140-2/140-3 is a US government standard for cryptographic modules. Common issues:
|
Dependency Review✅ No vulnerabilities or license issues or OpenSSF Scorecard issues found.Scanned FilesNone |
There was a problem hiding this comment.
Pull request overview
This PR hardens the shared job-store abstraction used by the API and worker by improving corrupt Redis record handling and reducing in-memory store aliasing.
Changes:
- Wraps Redis job-record JSON decode failures in a typed
DatabaseError. - Changes in-memory
get/updateto return copied records. - Adds unit tests for corrupt Redis records and in-memory fetch mutation behavior.
Reviewed changes
Copilot reviewed 2 out of 2 changed files in this pull request and generated 2 comments.
| File | Description |
|---|---|
src/audio_processor/core/job_store.py |
Updates job-store copy semantics, Redis decode error handling, logging, and TTL fallback behavior. |
tests/unit/test_job_store.py |
Adds regression tests for corrupt Redis hash values and in-memory record copy behavior. |
| # Return a copy so callers cannot mutate stored state out of band; this | ||
| # matches RedisJobStore, which always returns a freshly decoded record. | ||
| record = self._jobs.get(job_id) | ||
| return dict(record) if record is not None else None | ||
|
|
||
| async def update(self, job_id: str, **fields: object) -> JobRecord: | ||
| """See :meth:`JobStore.update`.""" | ||
| record = self._jobs.setdefault(job_id, {}) | ||
| return _merge_fields(record, fields) | ||
| _merge_fields(record, fields) | ||
| return dict(record) |
There was a problem hiding this comment.
Fixed in 93144e3: InMemoryJobStore.get/update/create now use copy.deepcopy instead of a shallow dict(), so nested progress/input/result dicts are no longer aliased to stored state. update also deep-copies incoming field values. New tests test_get_returns_a_deep_copy and test_update_isolates_incoming_nested_fields cover nested mutation. Thanks for catching this.
| # Use ``is not None`` so an explicit 0/negative is not silently coerced | ||
| # to the default (a caller passing such a value likely has a bug). | ||
| self._ttl = ( | ||
| ttl_seconds if ttl_seconds is not None else settings.job_result_ttl_seconds | ||
| ) |
There was a problem hiding this comment.
Fixed in 93144e3: RedisJobStore.__init__ now raises ConfigurationError when the resolved TTL is <= 0 instead of letting EXPIRE delete the key immediately. New test test_non_positive_ttl_rejected covers 0 and -1. Good call: the previous behavior silently dropped newly written jobs.
PR Review (Claude Code /pr-review)
Status: Copilot 2 comments (both confirmed real) · CodeRabbit rate-limited (no review) · SonarCloud gate passed, 1 MINOR issue · CI all green. No Critical code defects. 4 Important findings: 1. Shallow copy leaves nested values aliased (job_store.py#L134) 2. Explicit 0/negative TTL deletes the job (job_store.py#L187) 3. CHANGELOG.md not updated for a 4. Suggested
Findings 1, 2, 4, 5 all live in 🤖 Generated with Claude Code |
Address pr-review findings on PR #53: - RedisJobStore._decode_hash converts a corrupt or legacy record (non-JSON field value) into a typed, logged DatabaseError instead of letting a raw JSONDecodeError propagate uncaught, which could 500 a GET route or wedge the worker's per-update decode loop. - InMemoryJobStore.get/update return copies so callers cannot mutate stored state out of band, matching RedisJobStore's detached records. - RedisJobStore TTL uses an explicit None check so an accidental 0 or negative is no longer silently coerced to the default. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
Follow-up to the /pr-review of #59. - InMemoryJobStore.get/update/create now deep-copy records so callers cannot mutate stored state out of band, including nested progress/ input/result dicts. The prior shallow dict() copy left those aliased and diverged from RedisJobStore, which decodes fresh objects per field. (Copilot) - RedisJobStore.__init__ rejects a non-positive ttl_seconds with ConfigurationError instead of letting Redis EXPIRE delete a newly written job immediately. (Copilot) - _decode_hash catches ValueError alone; json.JSONDecodeError is a ValueError subclass, so the tuple was redundant. (SonarCloud S5713) - Add tests for nested-copy isolation (get and update) and TTL rejection; document the change in CHANGELOG. Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
1a78f67 to
93144e3
Compare
PR Fix SummaryRebased onto Review comments (Copilot)
SonarQube
Tests / docs
Local gates: ruff format + lint clean, basedpyright 0 errors, pydoclint no violations, bandit 0 issues, 475 passed / 93.33% coverage. CI re-run triggered by the push. 🤖 Generated with Claude Code |
|
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@CHANGELOG.md`:
- Line 20: The changelog bullet exceeds the 120-character Markdown line length;
edit the single long bullet so it wraps to multiple lines under 120 chars while
preserving content and inline code/backticks — split after logical phrases
(e.g., after the mention of RedisJobStore._decode_hash, after
InMemoryJobStore.get/update/create, and after ttl_seconds/ConfigurationError) so
the line breaks keep the sentence readable and the bullet formatting intact.
In `@tests/unit/test_job_store.py`:
- Around line 76-124: Add a unit test that verifies InMemoryJobStore.create()
isolates/makes a deep copy of the input record: construct a mutable record
(include a nested dict like "progress"), call store.create("j1", record) on an
InMemoryJobStore instance, mutate the original top-level and nested fields of
record after create, then fetch the stored record with store.get("j1") and
assert the stored values (including nested dict) remain the original values;
reference the InMemoryJobStore class and its create and get methods when
locating where to add this test.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Path: .coderabbit.yaml
Review profile: ASSERTIVE
Plan: Pro
Run ID: 1a4ee3c2-0e41-4bb2-bc2a-fac04e6819a7
📒 Files selected for processing (3)
CHANGELOG.mdsrc/audio_processor/core/job_store.pytests/unit/test_job_store.py
| - fix(jobs): correct field paths for `duration_ms` and `language` in `process_audio_job` result assembly; both now read from `TranscriptionResult.metadata` where they actually live, preventing `AttributeError` at runtime | ||
| - fix(api): guard `content-length` header parsing against malformed values; `int()` conversion is now wrapped in a `ValueError` handler so a non-numeric header no longer raises an unhandled exception | ||
| - fix(tests): restore `tmp_path` fixture in `test_custom_initialization` for `AudioConverter`, `AudioConditioner`, and `VADProcessor`; hardcoded `/custom/temp` caused `PermissionError` on systems without root access | ||
| - fix(core): harden the shared job store. `RedisJobStore._decode_hash` now converts a corrupt or legacy (non-JSON) field value into a typed, logged `DatabaseError` instead of letting a raw `JSONDecodeError` propagate and 500 a `GET` route or wedge the worker decode loop. `InMemoryJobStore.get`/`update`/`create` deep-copy records so callers cannot mutate stored state out of band (including nested `progress`/`input`/`result` dicts), matching `RedisJobStore`. `RedisJobStore` now rejects a non-positive `ttl_seconds` with `ConfigurationError` rather than letting Redis `EXPIRE` delete newly written jobs immediately |
There was a problem hiding this comment.
Wrap this changelog bullet to the Markdown line-length limit.
This line exceeds the 120-character Markdown limit and should be split across continuation lines for consistency and lint compliance.
✂️ Proposed formatting fix
-- fix(core): harden the shared job store. `RedisJobStore._decode_hash` now converts a corrupt or legacy (non-JSON) field value into a typed, logged `DatabaseError` instead of letting a raw `JSONDecodeError` propagate and 500 a `GET` route or wedge the worker decode loop. `InMemoryJobStore.get`/`update`/`create` deep-copy records so callers cannot mutate stored state out of band (including nested `progress`/`input`/`result` dicts), matching `RedisJobStore`. `RedisJobStore` now rejects a non-positive `ttl_seconds` with `ConfigurationError` rather than letting Redis `EXPIRE` delete newly written jobs immediately
+- fix(core): harden the shared job store. `RedisJobStore._decode_hash` now converts a corrupt or
+ legacy (non-JSON) field value into a typed, logged `DatabaseError` instead of letting a raw
+ `JSONDecodeError` propagate and 500 a `GET` route or wedge the worker decode loop.
+ `InMemoryJobStore.get`/`update`/`create` deep-copy records so callers cannot mutate stored state
+ out of band (including nested `progress`/`input`/`result` dicts), matching `RedisJobStore`.
+ `RedisJobStore` now rejects a non-positive `ttl_seconds` with `ConfigurationError` rather than
+ letting Redis `EXPIRE` delete newly written jobs immediately.As per coding guidelines **/*.md: Code Quality: Use 120 character line length for Markdown files.
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
| - fix(core): harden the shared job store. `RedisJobStore._decode_hash` now converts a corrupt or legacy (non-JSON) field value into a typed, logged `DatabaseError` instead of letting a raw `JSONDecodeError` propagate and 500 a `GET` route or wedge the worker decode loop. `InMemoryJobStore.get`/`update`/`create` deep-copy records so callers cannot mutate stored state out of band (including nested `progress`/`input`/`result` dicts), matching `RedisJobStore`. `RedisJobStore` now rejects a non-positive `ttl_seconds` with `ConfigurationError` rather than letting Redis `EXPIRE` delete newly written jobs immediately | |
| - fix(core): harden the shared job store. `RedisJobStore._decode_hash` now converts a corrupt or | |
| legacy (non-JSON) field value into a typed, logged `DatabaseError` instead of letting a raw | |
| `JSONDecodeError` propagate and 500 a `GET` route or wedge the worker decode loop. | |
| `InMemoryJobStore.get`/`update`/`create` deep-copy records so callers cannot mutate stored state | |
| out of band (including nested `progress`/`input`/`result` dicts), matching `RedisJobStore`. | |
| `RedisJobStore` now rejects a non-positive `ttl_seconds` with `ConfigurationError` rather than | |
| letting Redis `EXPIRE` delete newly written jobs immediately. |
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@CHANGELOG.md` at line 20, The changelog bullet exceeds the 120-character
Markdown line length; edit the single long bullet so it wraps to multiple lines
under 120 chars while preserving content and inline code/backticks — split after
logical phrases (e.g., after the mention of RedisJobStore._decode_hash, after
InMemoryJobStore.get/update/create, and after ttl_seconds/ConfigurationError) so
the line breaks keep the sentence readable and the bullet formatting intact.
| @pytest.mark.asyncio | ||
| async def test_get_returns_a_copy(self) -> None: | ||
| """Mutating a fetched record must not alter stored state. | ||
|
|
||
| Matches RedisJobStore, which always returns a freshly decoded record, | ||
| so tests on the in-memory backend cannot mask aliasing bugs. | ||
| """ | ||
| store = InMemoryJobStore() | ||
| await store.create("j1", {"status": "queued"}) | ||
| fetched = await store.get("j1") | ||
| assert fetched is not None | ||
| fetched["status"] = "tampered" | ||
| again = await store.get("j1") | ||
| assert again is not None | ||
| assert again["status"] == "queued" | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_get_returns_a_deep_copy(self) -> None: | ||
| """Mutating a *nested* value of a fetched record must not leak. | ||
|
|
||
| A shallow ``dict`` copy would leave nested dicts (progress/input/result) | ||
| aliased to stored state; only a deep copy gives the same isolation as | ||
| RedisJobStore, which decodes fresh nested objects per field. | ||
| """ | ||
| store = InMemoryJobStore() | ||
| await store.create("j1", {"status": "queued", "progress": {"pct": 0}}) | ||
| fetched = await store.get("j1") | ||
| assert fetched is not None | ||
| progress = fetched["progress"] | ||
| assert isinstance(progress, dict) | ||
| progress["pct"] = 99 | ||
| again = await store.get("j1") | ||
| assert again is not None | ||
| assert again["progress"] == {"pct": 0} | ||
|
|
||
| @pytest.mark.asyncio | ||
| async def test_update_isolates_incoming_nested_fields(self) -> None: | ||
| """Mutating a nested object passed to update must not alter stored state. | ||
|
|
||
| Matches RedisJobStore, which JSON-encodes incoming values on write. | ||
| """ | ||
| store = InMemoryJobStore() | ||
| await store.create("j1", {"status": "queued"}) | ||
| progress = {"pct": 10} | ||
| await store.update("j1", progress=progress) | ||
| progress["pct"] = 99 | ||
| again = await store.get("j1") | ||
| assert again is not None | ||
| assert again["progress"] == {"pct": 10} |
There was a problem hiding this comment.
🧹 Nitpick | 🔵 Trivial | ⚡ Quick win
Add a direct test for create() input-mutation isolation.
You validate get() and update() isolation well, but there’s no explicit test that mutating the original record after create() does not affect stored state.
✅ Proposed test addition
+ `@pytest.mark.asyncio`
+ async def test_create_isolates_incoming_nested_fields(self) -> None:
+ """Mutating the input record after create must not alter stored state."""
+ store = InMemoryJobStore()
+ record = {"status": "queued", "progress": {"pct": 1}}
+ await store.create("j1", record)
+ record["progress"]["pct"] = 99
+ again = await store.get("j1")
+ assert again is not None
+ assert again["progress"] == {"pct": 1}As per coding guidelines tests/**/*.py: Ensure tests verify behavior not just execution, edge cases and error conditions are tested, and tests would catch regressions in the feature being tested.
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@tests/unit/test_job_store.py` around lines 76 - 124, Add a unit test that
verifies InMemoryJobStore.create() isolates/makes a deep copy of the input
record: construct a mutable record (include a nested dict like "progress"), call
store.create("j1", record) on an InMemoryJobStore instance, mutate the original
top-level and nested fields of record after create, then fetch the stored record
with store.get("j1") and assert the stored values (including nested dict) remain
the original values; reference the InMemoryJobStore class and its create and get
methods when locating where to add this test.



Summary
Follow-up hardening surfaced by the
/pr-reviewof #53. Almost all reviewfindings were already addressed on
mainvia concurrent pre-merge work(SecretStr API keys + auth validator, constant-time key comparison, rate-limit
hard cap and hashed identifier, enqueue-failure handling, timestamp parse guard,
broadened artifact-failure handling). This PR adds the one remaining net-new
item that did not land:
JobStorerobustness.Changes
RedisJobStore._decode_hashconverts a corrupt or legacy record (anon-JSON hash field value) into a typed, logged
DatabaseErrorinstead ofletting a raw
JSONDecodeErrorpropagate uncaught. Previously one poisonedrecord could 500 a
GETroute or wedge the worker's per-update decode loop.InMemoryJobStore.get/updatereturn copies so callers cannot mutatestored state out of band, matching
RedisJobStore(which always returns afreshly decoded record). This prevents in-memory-only aliasing bugs from
hiding behind passing tests.
RedisJobStoreTTL uses an explicitis not Nonecheck so an accidental0/negative is no longer silently coerced to the default.Tests
test_get_raises_on_corrupt_record: a non-JSON field value surfaces as aDatabaseError, not an uncaught crash.test_get_returns_a_copy: mutating a fetched in-memory record does not alterstored state.
ruffclean,basedpyright0 errors.Why
These are defensive-robustness improvements to the shared job store that the
API and ARQ worker both depend on as the single source of truth; a corrupt or
aliased record there has outsized blast radius.
Generated with Claude Code
Summary by CodeRabbit