diff --git a/worker/Dockerfile b/worker/Dockerfile index 0cdfe81..92dc855 100644 --- a/worker/Dockerfile +++ b/worker/Dockerfile @@ -1,50 +1,101 @@ -FROM ubuntu:22.04 +# SVF sources are PINNED to reviewed, immutable commit SHAs. To bump: update +# both SHAs together and re-review. ALLOW_UNPINNED_SVF=true is for local +# refresh builds only (the guard below refuses "master" otherwise). +# SVF_SHA -> AxisCommunications/signed-video-framework +# 1ae9fed = tag v2.3.5 (latest release; == master HEAD 2026-05-30) +# SVF_EXAMPLES_SHA -> AxisCommunications/signed-video-framework-examples +# e009c31 = master HEAD 2026-05-30 (repo has no tags; links the +# system-installed lib above, so it tracks v2.3.5) +FROM python:3.12-slim-bookworm AS svf-builder ENV DEBIAN_FRONTEND=noninteractive -# System dependencies (meson+ninja for SVF, ffmpeg/gstreamer for video) -RUN apt-get update && apt-get install -y \ +ARG SVF_SHA=1ae9fedfe6e7a7b6db65d05cc13f6098b1f92eba +ARG SVF_EXAMPLES_SHA=e009c310fef10a997ffad6d21720154fbb155a38 +ARG ALLOW_UNPINNED_SVF=false + +RUN if [ "$ALLOW_UNPINNED_SVF" != "true" ] \ + && { [ "$SVF_SHA" = "master" ] || [ "$SVF_EXAMPLES_SHA" = "master" ]; }; then \ + echo "SVF_SHA and SVF_EXAMPLES_SHA must be reviewed full commit SHAs for production builds"; \ + echo "Pass --build-arg ALLOW_UNPINNED_SVF=true only for local refresh builds"; \ + exit 1; \ + fi + +RUN apt-get update && apt-get install -y --no-install-recommends \ build-essential \ - meson \ - ninja-build \ - pkg-config \ + ca-certificates \ git \ - python3 \ - python3-pip \ - libgstreamer1.0-dev \ - gstreamer1.0-plugins-base \ - gstreamer1.0-plugins-good \ - gstreamer1.0-plugins-bad \ - gstreamer1.0-plugins-ugly \ - gstreamer1.0-libav \ + libcurl4-openssl-dev \ + libglib2.0-dev \ libgstreamer-plugins-base1.0-dev \ + libgstreamer1.0-dev \ libssl-dev \ - libglib2.0-dev \ - libcurl4-openssl-dev \ - ffmpeg \ + meson \ + ninja-build \ + pkg-config \ && rm -rf /var/lib/apt/lists/* -# Clone and build libsigned-video-framework with meson RUN git clone https://github.com/AxisCommunications/signed-video-framework.git /opt/svf \ + && git -C /opt/svf checkout --detach "${SVF_SHA}" \ && meson setup /opt/svf /opt/svf-build \ && ninja -C /opt/svf-build \ - && ninja -C /opt/svf-build install \ - && ldconfig + && ninja -C /opt/svf-build install -# Clone and build signed-video-framework-examples (contains validator binary) RUN git clone https://github.com/AxisCommunications/signed-video-framework-examples.git /opt/svf-examples \ + && git -C /opt/svf-examples checkout --detach "${SVF_EXAMPLES_SHA}" \ && meson setup -Dbuild_all_apps=true /opt/svf-examples /opt/svf-examples-build \ && ninja -C /opt/svf-examples-build \ - && ninja -C /opt/svf-examples-build install \ - && ldconfig + && ninja -C /opt/svf-examples-build install + + +FROM python:3.12-slim-bookworm AS runtime + +ENV DEBIAN_FRONTEND=noninteractive \ + PATH=/usr/local/bin:/usr/bin:/bin \ + LD_LIBRARY_PATH=/usr/local/lib:/usr/local/lib/x86_64-linux-gnu \ + PYTHONUNBUFFERED=1 + +RUN apt-get update && apt-get install -y --no-install-recommends \ + bubblewrap \ + ca-certificates \ + ffmpeg \ + gstreamer1.0-libav \ + gstreamer1.0-plugins-bad \ + gstreamer1.0-plugins-base \ + gstreamer1.0-plugins-good \ + gstreamer1.0-plugins-ugly \ + libcurl4 \ + libglib2.0-0 \ + libgstreamer-plugins-base1.0-0 \ + libgstreamer1.0-0 \ + libseccomp2 \ + libssl3 \ + seccomp \ + strace \ + && rm -rf /var/lib/apt/lists/* + +COPY --from=svf-builder /usr/local/bin/ /usr/local/bin/ +COPY --from=svf-builder /usr/local/lib/ /usr/local/lib/ -# Python app WORKDIR /app COPY requirements.txt . -RUN pip3 install --no-cache-dir -r requirements.txt +RUN pip install --no-cache-dir -r requirements.txt + +COPY app/sandbox_launcher.py /usr/local/bin/edgeproof-rlimit-launcher +RUN chmod 0755 /usr/local/bin/edgeproof-rlimit-launcher \ + && ldconfig \ + && ffprobe -version >/dev/null \ + && (command -v signed-video-validator || command -v sv_validator || command -v validator) COPY certs/ /app/certs/ COPY app/ /app/app/ +COPY tests/ /app/tests/ + +RUN useradd -r -u 10001 -m -d /home/svc -s /usr/sbin/nologin svc \ + && mkdir -p /tmp/edgeproof \ + && chown -R svc:svc /tmp/edgeproof + +USER svc EXPOSE 8000 -CMD ["uvicorn", "app.main:app", "--host", "0.0.0.0", "--port", "8000"] +CMD ["sh", "-c", "uvicorn app.main:app --host 0.0.0.0 --port ${PORT:-8000}"] diff --git a/worker/TIER0-BUILD-REPORT.md b/worker/TIER0-BUILD-REPORT.md new file mode 100644 index 0000000..4ed0428 --- /dev/null +++ b/worker/TIER0-BUILD-REPORT.md @@ -0,0 +1,78 @@ +# Tier 0 Sandbox Build Report + +## Implemented + +- Added `app/sandbox.py` with `SandboxResult`, startup capability probing, cached `bwrap` vs `DEGRADED` mode, bounded output capture, process-group kill on timeout, scrubbed child env, and fail-closed `run_sandboxed()`. +- Added exec-only rlimit launcher at `app/sandbox_launcher.py`; it sets `RLIMIT_AS`, `RLIMIT_CPU`, `RLIMIT_FSIZE`, `RLIMIT_NOFILE`, `RLIMIT_NPROC`, `RLIMIT_CORE=0`, and `PR_SET_NO_NEW_PRIVS`, then `execvpe()`s the final binary. + The launcher uses stdlib `resource` plus `ctypes` against libc for `prctl`; no `preexec_fn` is used. +- Routed both ffprobe sites and the SVF validator through `run_sandboxed()`. +- Added ffprobe `-protocol_whitelist file -analyzeduration 5M -probesize 10M` at both ffprobe call sites. + `-protocol_whitelist file` limits ffmpeg protocols only; path traversal protection comes from the filesystem namespace/bind set. +- Replaced unbounded upload read with Content-Length rejection plus bounded 8 MB chunk streaming to disk. +- Added the worker-local verification semaphore and explicit sandbox memory budget defaults. +- Added `/health` sandbox mode reporting and `/verify` 503 gating for `DEGRADED` unless `ALLOW_DEGRADED_SANDBOX=true`. +- Hardened SVF cleanup to `shutil.rmtree(..., ignore_errors=True)`. +- Added `O_NOFOLLOW`/`openat`-style result-file read for `validation_results.txt`. +- Added fail-closed handling for nonzero validator exits, signal deaths, launch failures, and timeouts before verdict parsing. +- Added parent-side ffprobe parse guards for hostile duration/frame/shape values. +- Reworked Dockerfile into builder/runtime stages, non-root `USER svc` uid `10001`, runtime `bubblewrap`, `seccomp`, `strace`, ffmpeg/GStreamer runtime deps, and copied only installed SVF artifacts into runtime. +- Added `pytest.ini` and `tests/test_sandbox.py` with host tests plus `@pytest.mark.linux_sandbox` Linux-container tests. + +## Resource Defaults + +- `MAX_FILE_SIZE_BYTES`: 500 MiB +- `SANDBOX_MAX_INPUT_BYTES`: 500 MiB +- `SANDBOX_RLIMIT_AS_BYTES`: 1 GiB +- `SANDBOX_RLIMIT_CPU_SECONDS_FFPROBE`: 20 seconds +- `SANDBOX_RLIMIT_CPU_SECONDS_VALIDATOR`: 60 seconds +- `SANDBOX_RLIMIT_FSIZE_BYTES`: 16 MiB +- `SANDBOX_RLIMIT_NPROC`: 64 +- `SANDBOX_MAX_OUTPUT_BYTES`: 16 MiB +- `SANDBOX_MEMORY_LIMIT_BYTES`: 1536 MiB +- `SANDBOX_MAX_CONCURRENT_JOBS`: 1 +- `SANDBOX_ALLOW_NET`: false +- `ALLOW_DEGRADED_SANDBOX`: false + +## Acceptance Coverage + +- Host tests: AC4a, AC8 partial, AC12 partial, AC13 boundary rejection, AC15, AC17 route gate. +- Linux-container tests: AC1, AC2, AC3, AC4b, AC5, AC6, AC8, AC9, AC10, AC11, AC14, AC17, plus ldd bind-contract coverage. +- Conditional tests: AC7 is `xfail` because seccomp-bpf is deliberately deferred; AC16 is `xfail` until genuine/tampered AXIS byte fixtures are added. + +Expected CI command: + +```bash +docker build -t epworker . +docker run --rm epworker pytest -m linux_sandbox +``` + +Host static/unit command once Python deps are installed: + +```bash +python -m pytest -m "not linux_sandbox" +``` + +## Verification Run Here + +- `python -m py_compile app/sandbox.py app/sandbox_launcher.py app/services/video_info.py app/services/svf_runner.py app/main.py tests/test_sandbox.py` passed. +- AST parse check for the same files passed. +- Host pytest did not run in this macOS workspace because `pytest` and app deps such as `pydantic_settings` are not installed. +- Docker/Linux rubric was not run here; local shell networking is blocked, so `docker build` cannot fetch SVF repos from GitHub in this environment. + +## Deferred / Conditional + +- Control G seccomp-bpf is deferred. No fail-open seccomp profile was shipped. `--unshare-net` remains the network boundary for Increment 1. +- Amendment 21 verdict-from-text injection is flag-only in this increment. A threat-model note was added near `parse_svf_output`; the parser still needs a later authoritative-status-line fix. +- Amendment 22 callback SSRF is flag-only in this increment. A threat-model note was added near the callback POST; parent callback allowlisting is deferred. +- AC16 real-bytes regression test is present as a fixture contract, but the repo has no genuine AXIS signed clip or tampered fixture to execute it. + +## Deviations + +- The Dockerfile has `SVF_SHA` and `SVF_EXAMPLES_SHA` build args and detached checkouts, but the defaults remain `master` because this environment could not resolve current upstream commit SHAs via shell networking. Before production use, set both defaults to reviewed full commit SHAs or pass them as build args in CI. + +## Fix-pass (review round 1) + +- FIX-1: Added `SandboxResult.output_overflow`, set it from the bounded-output reader, and fail closed on output overflow before SVF verdict parsing or ffprobe JSON parsing at both ffprobe gates. +- FIX-2: Changed the rlimit launcher to return sentinel exit code `125` for launcher setup/exec failures while keeping `127` for missing command, and changed sandbox launcher-failure detection to use only numeric `125`/`127`. +- FIX-3: Added `--unshare-cgroup` to the bubblewrap namespace flags used by normal sandbox runs and the capability probe. +- FIX-4: Added a Dockerfile build guard that fails if `SVF_SHA` or `SVF_EXAMPLES_SHA` is `master` unless `ALLOW_UNPINNED_SVF=true` is passed, and documented that production builds require reviewed full commit SHAs. diff --git a/worker/TIER0-SANDBOX-SPEC.md b/worker/TIER0-SANDBOX-SPEC.md new file mode 100644 index 0000000..2ef83bc --- /dev/null +++ b/worker/TIER0-SANDBOX-SPEC.md @@ -0,0 +1,113 @@ +# Tier 0 Subprocess Sandbox — Build Spec v2 (build-ready) + +**Status:** LOCKED for build. Supersedes v1. Produced by a paired review (Codex GPT-5.5 + 2 Claude lenses) + 3-agent Railway research, then a human decision gate. +**Repo:** this worker (`~/Projects/edgeproof/worker`, Python/FastAPI, independent IP). +**Lane:** Claude specced → **Codex builds Increment 1** → Claude reviews the diff. **Do NOT `git commit`/`push`** — leave changes in the working tree for review. + +## Decision summary (settled — do not relitigate in this build) +- **Threat model:** arbitrary **public untrusted** uploaded video. Containment must hold against a memory-corruption exploit in `ffprobe`/the SVF C validator. +- **Host:** **move-host.** Production worker runs in a **Fly.io Firecracker microVM** (outer boundary) with **bwrap + seccomp + rlimits** (inner boundary). Railway keeps only the FastAPI control plane. *(Railway cannot run the sandbox — no unprivileged user namespaces; confirmed via the Chrome `--no-sandbox` proxy + no `--cap-add` + no privileged/nested containers.)* +- **Goal (verifiable invariant):** an exploit in a C subprocess cannot escalate beyond a confined, unprivileged, **network-less**, **filesystem-isolated**, resource-capped child — and the worker **fails closed** (returns `error`/`inconclusive`, stays up) when containment trips or a subprocess dies by signal. + +--- + +## Build scope + +### Increment 1 — THIS build (self-contained, CI-verifiable in a Linux container) +The in-repo sandbox core + controls + Dockerfile + rubric. Fully buildable and testable in a Linux container without provisioning any cloud. + +### Increment 2 — DEFERRED (separate task, mostly ops — DO NOT build now) +`fly.toml` for the worker Machine, object-storage hop, job queue, control-plane/worker split, Machine recycling. Increment 1 must be written so this drop-in later requires no rework of `run_sandboxed`. + +--- + +## Current state (grounded — verified against this repo) +Three subprocess spawn sites, all `asyncio.create_subprocess_exec` (good: exec-form, no shell): +1. `app/services/video_info.py::_run_ffprobe` — `ffprobe -v quiet -print_format json -show_format -show_streams ` (30s timeout). +2. `app/services/video_info.py::_check_for_signing_uuid` — second `ffprobe` (SEI scan). +3. `app/services/svf_runner.py::run_svf_validator` — ` -c `, `cwd=` per-job `mkdtemp`, 120s timeout, reads `validation_results.txt`. + +Gaps this build fixes: container runs as **root** (no `USER` in `Dockerfile`); **no** `-protocol_whitelist` on ffprobe; no namespace/seccomp/rlimit confinement; `await file.read()` unbounded upload (`main.py:73`, `config.py:10` default ~50 GB); parent parses hostile subprocess output unguarded (`video_info.py:49,:78`); `parse_svf_output` substring-matches across combined output (forgeable). + +--- + +## The `run_sandboxed` contract (single choke point — `app/sandbox.py`) +All three call sites route through this; nothing else spawns the C binaries. + +```python +@dataclass +class SandboxResult: + stdout: bytes # undecoded; call sites decode + stderr: bytes + returncode: int + timed_out: bool + rlimit_killed: bool # died by SIGKILL/SIGXCPU/SIGXFSZ etc. + launch_failed: bool # sandbox/launcher could not start the child + sandbox_error: str | None + +async def run_sandboxed(argv: list[str], *, ro_paths: list[str], scratch_dir: str, + timeout: float, allow_net: bool = False) -> SandboxResult: + ... +``` +- **Never raises** on launch/containment failure — returns a result with the right flag set (fail-closed). +- Selects implementation from a **startup capability probe** (below): `bwrap` path if userns proven, else **DEGRADED**. +- **DEGRADED is deploy-blocking:** if probe → DEGRADED, the worker refuses `/verify` with **HTTP 503** unless `ALLOW_DEGRADED_SANDBOX=true`; mode is surfaced in `/health`. Logging-and-continuing is NOT acceptable. + +### Capability probe (run once in a FastAPI startup event; cache mode in a module global; log per worker) +Must exercise the **real** namespace set (`--unshare-net` + a `--ro-bind` rootfs + a real exec'd binary). Distinguish `CLONE_NEWUSER` ENOSYS/EPERM (read `/proc/sys/kernel/unprivileged_userns_clone` or `unshare --user --map-root-user true`) from "binary/loader not found" (avoid false-DEGRADED). Any **runtime** bwrap-launch failure on the bwrap path still fails closed mid-request — never silently unconfined. + +## Controls (final) + +**Always-on (inside the guest, any host):** +- **A — Non-root.** `useradd -r -u 10001 svc`; `USER svc`; verify `ffprobe`/validator resolve on PATH. +- **B — ffprobe protocol whitelist + probe caps** on *both* sites: `-protocol_whitelist file -analyzeduration 5M -probesize 10M`. Document: limits **protocols**, not path traversal (FS confinement is F's job). +- **C — rlimits via an exec'd launcher (NOT `preexec_fn`).** Tiny launcher (shim binary or `prlimit`/`systemd-run`) that `setrlimit`(AS/CPU/FSIZE/NOFILE/NPROC/`CORE=0`) + `PR_SET_NO_NEW_PRIVS` + `execv`. rlimits wrap the **final binary**, not the bwrap launcher. (`preexec_fn` is deadlock-unsafe under threaded/async uvicorn — 3-reviewer consensus.) +- **D — Input size cap at the boundary**, streamed, before any spawn (amendment 1). +- **E — Fail-closed everywhere**, including parent-side parsing of hostile output. + +**bwrap path (production):** +- **F — Namespace + FS + network sandbox via bwrap** with the *exact ldd-tested* bind set: + `--ro-bind /usr /usr --ro-bind /usr/local /usr/local --ro-bind /lib /lib --ro-bind /lib64 /lib64 --ro-bind /etc/ld.so.cache /etc/ld.so.cache --ro-bind /etc/ssl /etc/ssl --proc /proc --dev /dev --tmpfs /tmp --ro-bind --bind --chdir --unshare-user --unshare-pid --unshare-ipc --unshare-net --die-with-parent --new-session --clearenv --setenv PATH ... --setenv LANG ... --setenv TMPDIR `. **Forbid `--ro-bind / /`.** (SVF `ninja install`s to `/usr/local`.) +- **G — seccomp-bpf** as a **vetted base profile** (NOT hand-rolled fail-open BPF): start from Docker default denies + explicit denies of `unshare`/`clone(CLONE_NEWUSER|CLONE_NEWNS)`/`ptrace`/`keyctl`/`add_key`/`bpf`/`socket(AF_ALG|AF_PACKET|AF_NETLINK)`; **must still allow** `execve`/`execveat` and `CLONE_THREAD` (ffmpeg/gstreamer are multithreaded). Derive the allowlist from a recorded `strace -f` of genuine + tampered happy-path runs. **If a clean profile proves too costly in this build, descope G and mark AC7 conditional** (like AC5/AC6) — `--unshare-net` (F) already carries the network guarantee. Do not ship a fail-open filter. + +## The 23 amendments — each is a build requirement +1. **Upload cap (A1):** replace `await file.read()` (`main.py:73`) with Content-Length reject-if-over-max + bounded 8 MB-chunk streaming to disk, abort+unlink on overflow. Reuse `config.py:10 max_file_size_bytes`; set the agreed default; **delete the ~50 GB value**. +2. **Cap captured subprocess output** in `run_sandboxed`: hard byte ceiling, kill child on overflow (or size-capped files / DEVNULL + bounded-prefix reads). +3. **Scrub child env:** minimal explicit env (`PATH`/`LANG`/`TMPDIR`); bwrap `--clearenv` + `--setenv`. Assert `WORKER_API_KEY` absent in child. +4. **Kill the process group on timeout:** `start_new_session=True`; on timeout `SIGKILL` the group then reap. Don't rely on `--die-with-parent` for the request-timeout case. +5. **Read result file safely:** parent reads `validation_results.txt` with `O_NOFOLLOW` (ideally `openat` in an fd-pinned scratch), reject symlinks/non-regular files, never echo contents into errors/callbacks. +6. **rlimits/prctl off `preexec_fn`** → launcher (control C). +7. **Exact bwrap bind set as a contract** (control F), tested via `ldd` on both binaries in the built image. +8. **Fix capability probe** (above). +9. **State WHERE the rubric runs:** ACs 1-3/5/6/7/9-12/15/16 run **inside the Linux container in CI** (`docker build` → `docker run ... pytest`), not on macOS. Add `@pytest.mark.linux_sandbox` that is **collected-and-FAILED** (not skipped) off Linux. AC1 in the production image. Spell out the exact `/goal`/CI command. +10. **`SandboxResult` dataclass** as above; `run_sandboxed` does not raise on launch failure; stdout undecoded. +11. **Per-call-site integration:** svf_runner → `ro_paths=[file_path]`, `scratch_dir=work_dir` (still `mkdtemp`'d by svf_runner), read `validation_results.txt` from `work_dir`. Both ffprobe sites → `ro_paths=[file_path]`, throwaway empty scratch. **Input temp files in `settings.temp_dir` are NOT under scratch and MUST be in `ro_paths`.** Name the scratch owner per site. +12. **Harden fail-closed:** cleanup = `shutil.rmtree(work_dir, ignore_errors=True)` (replace `unlink`+`rmdir`). Any non-zero exit / signal death → `status=error`, **do not parse output for a verdict**. +13. **Guard parent-side parsing:** wrap `float(fmt.get('duration',0))` (`video_info.py:49`) and `int(nb_frames)` (`:78`) in try/except → inconclusive/error. Add a parse-fuzz test. +14. **Fix AC9 contract name** to `VerificationResult`; add real-bytes regression (genuine AXIS clip → `authentic`; tampered fixture → `tampered`) through the sandboxed pipeline. Downgrade mock-keyed tests to "mock path unaffected." +15. **Wiring-proof test:** with `use_mock_results=False`, assert `create_subprocess_exec` is no longer called directly in `video_info.py`/`svf_runner.py` and `run_sandboxed` is invoked N times/verify. +16. **Bounded-concurrency gate:** worker-local semaphore sized vs per-job `RLIMIT_AS` + parent buffers vs the container/Machine memory cap; set the memory limit explicitly. +17. **Split AC4:** (4a any host) static-arg assertion both ffprobe sites carry the whitelist+caps; (4b Linux) concat-script sentinel test. Correct control-B wording. +18. **Enumerate deps + config keys:** `prctl` (ctypes libc or `python-prctl` in `requirements.txt`); seccomp source/build step or `nsjail` apt dep. `SANDBOX_*` keys w/ types+defaults: `SANDBOX_RLIMIT_AS_BYTES`, `SANDBOX_RLIMIT_CPU_SECONDS_FFPROBE`, `SANDBOX_RLIMIT_CPU_SECONDS_VALIDATOR`, `SANDBOX_RLIMIT_FSIZE_BYTES`, `SANDBOX_RLIMIT_NPROC`, `SANDBOX_MAX_INPUT_BYTES`, `SANDBOX_ALLOW_NET=False`, `ALLOW_DEGRADED_SANDBOX=False`. +19. **rlimits inside the sandbox, not on the launcher;** `RLIMIT_NPROC` sized to real codec thread/fork counts (NOT 0-extra). +20. **Pin SVF clones + multi-stage image:** pin both `git clone`s (`Dockerfile:28,35`) to commit SHAs (or tags+checksum); compile in a builder stage, copy only runtime binary + minimal libs into a slim/distroless runtime; drop git/compiler/headers/source. +21. **Flag verdict-from-text trust boundary** (threat-model note + separate task): `parse_svf_output` substring-matches across combined output — forgeable. Anchor status regexes; gate verdict on `return_code==0` + single authoritative status line. *(Flag only — not built here.)* +22. **Document callback SSRF** (threat-model note + separate task): `--unshare-net` doesn't cover the parent's callback POST (`main.py:91-101`). Allowlist hosts/schemes; block private/link-local/metadata IPs; stop sending `WORKER_API_KEY` to attacker URLs. *(Flag only — not built here.)* +23. **Per-job 0700 + resolve input path before bind;** one parent per job; bind input by resolved absolute path immediately before bwrap binds it. + +### Resource defaults (C5 — validate before lock, then set) +Start: `RLIMIT_AS` 1 GiB, max input 500 MB, CPU 20s ffprobe / 60s validator, `RLIMIT_NPROC` sized to real codec thread counts. **Validate against real AXIS clip sizes in the happy-path test before locking** — too-tight breaks threaded ffmpeg + bwrap's monitor child; too-loose defeats the DoS goal. + +## Acceptance rubric (all must pass; Linux container unless noted) +1. Non-root: subprocess `uid != 0` (production image). 2. rlimits: over-`RLIMIT_AS` alloc killed → `error`, `/health` 200. 3. Wall+CPU+pgroup: hanging/spinning child terminated; **no orphan ffprobe/bwrap**; result `error`/`inconclusive`. 4. (4a any host) both ffprobe sites carry `-protocol_whitelist file -analyzeduration 5M -probesize 10M`; (4b Linux) concat-script can't read out-of-input file. 5. Network blocked (bwrap): child socket/connect fails. 6. FS confinement (bwrap): child can't read `/etc/hostname`/`/app/certs`/app source. 7. No child-exec (seccomp, conditional per C3). 8. Fail-closed: launch-fail vs rlimit-kill vs timeout distinguishable in `SandboxResult`; clean result + scratch `rmtree` + worker survives; no unhandled exception reaches handler. 9. Env scrubbed: `WORKER_API_KEY` absent in child. 10. Output cap: stdout-flood child killed, worker up. 11. Symlink defense: child symlinking `validation_results.txt` → `/app/certs` leaks nothing. 12. Truncated-result safety: RLIMIT_FSIZE/AS kill → `status=error`, never `authentic`/`tampered`. 13. Upload cap: 1 GB upload → HTTP 400, heap bounded, `/health` 200. 14. Concurrency: host stays up under M concurrent hostile uploads. 15. Wiring proof (amendment 15). 16. Real-bytes happy path (amendment 14). 17. Probe logged + DEGRADED gates `/verify` 503 unless `ALLOW_DEGRADED_SANDBOX=true`. + +CI: `docker build -t epworker . && docker run --rm epworker pytest -m linux_sandbox` (+ host-runnable static ACs in normal `pytest`). + +## Out of scope (do not build) +- Increment 2 (Fly/object-storage/queue/split). Tier 1 (Rust worker) / Tier 2 (pure-Rust verify). Stubbed Python crypto (`validators.py`/`certificate_validator.py` TODOs). The TS app. Amendments 21 & 22 (flag in threat-model section only). + +## Build constraints +- No `git commit` / `git push` — leave the working tree for Claude review. +- Match existing code style; surgical edits; don't drive-by refactor unrelated code. +- If a control can't be cleanly built in this pass (esp. seccomp G), apply the documented descope (mark AC7 conditional) and **report it** rather than shipping something fail-open. +- End by writing a short `TIER0-BUILD-REPORT.md` in this dir: what was implemented, which ACs pass and where (host vs CI), what was descoped/deferred, and the resource defaults chosen. diff --git a/worker/app/config.py b/worker/app/config.py index 60dfd0f..15bd8a9 100644 --- a/worker/app/config.py +++ b/worker/app/config.py @@ -7,8 +7,19 @@ class Settings(BaseSettings): log_level: str = "info" temp_dir: str = "/tmp/edgeproof" certs_dir: str = "/app/certs" - max_file_size_bytes: int = 50 * 1024 * 1024 * 1024 # 50 GB + max_file_size_bytes: int = 500 * 1024 * 1024 use_mock_results: bool = False + sandbox_rlimit_as_bytes: int = 1024 * 1024 * 1024 + sandbox_rlimit_cpu_seconds_ffprobe: int = 20 + sandbox_rlimit_cpu_seconds_validator: int = 60 + sandbox_rlimit_fsize_bytes: int = 16 * 1024 * 1024 + sandbox_rlimit_nproc: int = 64 + sandbox_max_input_bytes: int = 500 * 1024 * 1024 + sandbox_allow_net: bool = False + allow_degraded_sandbox: bool = False + sandbox_max_output_bytes: int = 16 * 1024 * 1024 + sandbox_memory_limit_bytes: int = 1536 * 1024 * 1024 + sandbox_max_concurrent_jobs: int = 1 class Config: env_prefix = "" diff --git a/worker/app/main.py b/worker/app/main.py index 4d721d6..219b183 100644 --- a/worker/app/main.py +++ b/worker/app/main.py @@ -1,10 +1,17 @@ from fastapi import FastAPI, UploadFile, File, Form, Header, HTTPException from fastapi.middleware.cors import CORSMiddleware +import asyncio import os import uuid import time from app.config import settings +from app.sandbox import ( + SANDBOX_MODE_DEGRADED, + ensure_sandbox_probed, + get_sandbox_health, + probe_sandbox_capabilities, +) from app.models.verification import ( VerificationResult, mock_authentic_result, @@ -16,6 +23,8 @@ from app.services.result_mapper import map_to_result app = FastAPI(title="EdgeProof Verification Worker", version="1.0.0") +VERIFY_SEMAPHORE = asyncio.Semaphore(max(1, settings.sandbox_max_concurrent_jobs)) +UPLOAD_CHUNK_BYTES = 8 * 1024 * 1024 app.add_middleware( CORSMiddleware, @@ -25,9 +34,15 @@ ) +@app.on_event("startup") +async def startup_probe_sandbox(): + await probe_sandbox_capabilities() + + @app.get("/health") async def health(): - return {"status": "healthy", "version": "1.0.0"} + await ensure_sandbox_probed() + return {"status": "healthy", "version": "1.0.0", "sandbox": get_sandbox_health()} @app.post("/verify") @@ -36,6 +51,7 @@ async def verify_video( callback_url: str | None = Form(None), verification_id: str | None = Form(None), authorization: str = Header(...), + content_length: int | None = Header(None), ): """ Verify a signed video file. @@ -54,6 +70,14 @@ async def verify_video( if token != settings.worker_api_key: raise HTTPException(status_code=401, detail="Invalid API key") + sandbox_mode = await ensure_sandbox_probed() + if sandbox_mode == SANDBOX_MODE_DEGRADED and not settings.allow_degraded_sandbox: + raise HTTPException(status_code=503, detail="Sandbox is degraded") + + max_upload_bytes = min(settings.max_file_size_bytes, settings.sandbox_max_input_bytes) + if content_length is not None and content_length > max_upload_bytes: + raise HTTPException(status_code=400, detail="Uploaded file is too large") + # Validate file type filename = file.filename or "unknown.mp4" ext = os.path.splitext(filename)[1].lower() @@ -64,20 +88,20 @@ async def verify_video( ) # Save temp file - os.makedirs(settings.temp_dir, exist_ok=True) + os.makedirs(settings.temp_dir, mode=0o700, exist_ok=True) + os.chmod(settings.temp_dir, 0o700) temp_path = os.path.join(settings.temp_dir, f"{uuid.uuid4()}{ext}") start_time = time.time() try: - content = await file.read() - with open(temp_path, "wb") as f: - f.write(content) + await _stream_upload_to_disk(file, temp_path, max_upload_bytes) - if settings.use_mock_results: - result = get_mock_result(filename) - else: - result = await run_verification_pipeline(temp_path, filename) + async with VERIFY_SEMAPHORE: + if settings.use_mock_results: + result = get_mock_result(filename) + else: + result = await run_verification_pipeline(temp_path, filename) result.processing_time_ms = int((time.time() - start_time) * 1000) @@ -87,6 +111,7 @@ async def verify_video( result_dict = result.model_dump(by_alias=True) + # Threat-model flag: callback SSRF hardening is deferred to Increment 2. # If callback_url provided, POST result there (async mode) if callback_url: import httpx @@ -108,13 +133,32 @@ async def verify_video( os.unlink(temp_path) +async def _stream_upload_to_disk(file: UploadFile, temp_path: str, max_upload_bytes: int) -> None: + bytes_written = 0 + try: + with open(temp_path, "xb") as f: + while True: + chunk = await file.read(UPLOAD_CHUNK_BYTES) + if not chunk: + break + bytes_written += len(chunk) + if bytes_written > max_upload_bytes: + raise HTTPException(status_code=400, detail="Uploaded file is too large") + f.write(chunk) + except Exception: + if os.path.exists(temp_path): + os.unlink(temp_path) + raise + + async def run_verification_pipeline(file_path: str, filename: str) -> VerificationResult: """Run the full verification pipeline: ffprobe + SVF validator + result mapping.""" # 1. Get video metadata via ffprobe video_info = await get_video_info(file_path) - # 2. Run SVF validator binary - svf_result = await run_svf_validator(file_path) + # 2. Run SVF validator binary with correct codec flag + codec = video_info.get("codec", "H.264") + svf_result = await run_svf_validator(file_path, codec=codec) # 3. Map to VerificationResult return map_to_result(svf_result, video_info) diff --git a/worker/app/sandbox.py b/worker/app/sandbox.py new file mode 100644 index 0000000..7f5499a --- /dev/null +++ b/worker/app/sandbox.py @@ -0,0 +1,492 @@ +"""Subprocess sandbox for untrusted video tooling.""" + +from __future__ import annotations + +import asyncio +import logging +import os +import platform +import shutil +import signal +import stat +import tempfile +from dataclasses import dataclass +from pathlib import Path + +from app.config import settings + + +SANDBOX_MODE_BWRAP = "bwrap" +SANDBOX_MODE_DEGRADED = "DEGRADED" +DEFAULT_CHILD_PATH = "/usr/local/bin:/usr/bin:/bin" +NOFILE_LIMIT = 64 +RLIMIT_SIGNAL_NUMBERS = { + signal.SIGKILL, + signal.SIGXCPU, + signal.SIGXFSZ, + signal.SIGABRT, + signal.SIGBUS, + signal.SIGSEGV, +} + +logger = logging.getLogger(__name__) + +_sandbox_mode: str | None = None +_sandbox_probe_error: str | None = None + + +@dataclass +class SandboxResult: + stdout: bytes + stderr: bytes + returncode: int + timed_out: bool + rlimit_killed: bool + launch_failed: bool + sandbox_error: str | None + output_overflow: bool = False + + +def get_sandbox_mode() -> str: + return _sandbox_mode or "UNKNOWN" + + +def get_sandbox_probe_error() -> str | None: + return _sandbox_probe_error + + +def get_sandbox_health() -> dict: + return { + "mode": get_sandbox_mode(), + "probe_error": _sandbox_probe_error, + "allow_degraded": settings.allow_degraded_sandbox, + "seccomp": "deferred", + } + + +async def ensure_sandbox_probed() -> str: + if _sandbox_mode is None: + await probe_sandbox_capabilities() + return get_sandbox_mode() + + +async def probe_sandbox_capabilities() -> str: + """Probe bwrap with the real namespace shape and cache the mode.""" + global _sandbox_mode, _sandbox_probe_error + + if platform.system() != "Linux": + _sandbox_mode = SANDBOX_MODE_DEGRADED + _sandbox_probe_error = "non-linux-host" + logger.warning("sandbox mode=%s reason=%s", _sandbox_mode, _sandbox_probe_error) + return _sandbox_mode + + if not shutil.which("bwrap"): + _sandbox_mode = SANDBOX_MODE_DEGRADED + _sandbox_probe_error = "bubblewrap-not-found" + logger.warning("sandbox mode=%s reason=%s", _sandbox_mode, _sandbox_probe_error) + return _sandbox_mode + + true_path = "/usr/bin/true" if os.path.exists("/usr/bin/true") else "/bin/true" + probe_root = tempfile.mkdtemp(prefix="sandbox_probe_") + scratch_dir = os.path.join(probe_root, "scratch") + probe_input = os.path.join(probe_root, "input") + + try: + os.makedirs(scratch_dir, mode=0o700, exist_ok=True) + with open(probe_input, "wb") as f: + f.write(b"probe") + os.chmod(probe_root, 0o700) + os.chmod(scratch_dir, 0o700) + + env = _child_env(scratch_dir) + argv = _build_bwrap_argv( + [true_path], + [Path(probe_input).resolve(strict=True)], + Path(scratch_dir).resolve(strict=True), + env, + allow_net=False, + ) + process = await asyncio.create_subprocess_exec( + *argv, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + start_new_session=True, + ) + try: + stdout, stderr = await asyncio.wait_for(process.communicate(), timeout=5.0) + except asyncio.TimeoutError: + await _kill_process_group(process) + try: + await asyncio.wait_for(process.wait(), timeout=5.0) + except asyncio.TimeoutError: + pass + _sandbox_mode = SANDBOX_MODE_DEGRADED + _sandbox_probe_error = "bwrap-probe-timeout" + logger.warning("sandbox mode=%s reason=%s", _sandbox_mode, _sandbox_probe_error) + return _sandbox_mode + if process.returncode == 0: + _sandbox_mode = SANDBOX_MODE_BWRAP + _sandbox_probe_error = None + logger.info("sandbox mode=%s", _sandbox_mode) + return _sandbox_mode + + stderr_text = stderr.decode("utf-8", errors="replace")[:500] + _sandbox_mode = SANDBOX_MODE_DEGRADED + _sandbox_probe_error = _classify_probe_failure(stderr_text) + logger.warning( + "sandbox mode=%s reason=%s stdout_len=%d stderr=%s", + _sandbox_mode, + _sandbox_probe_error, + len(stdout), + stderr_text, + ) + return _sandbox_mode + except Exception as exc: + _sandbox_mode = SANDBOX_MODE_DEGRADED + _sandbox_probe_error = f"bwrap-probe-exception:{exc.__class__.__name__}" + logger.warning("sandbox mode=%s reason=%s", _sandbox_mode, _sandbox_probe_error) + return _sandbox_mode + finally: + shutil.rmtree(probe_root, ignore_errors=True) + + +async def run_sandboxed( + argv: list[str], + *, + ro_paths: list[str], + scratch_dir: str, + timeout: float, + allow_net: bool = False, +) -> SandboxResult: + """Run a command through bwrap+rlimits, returning flags instead of raising.""" + try: + mode = await ensure_sandbox_probed() + if not argv: + return _launch_error("empty argv") + + scratch_path = _prepare_scratch(scratch_dir) + resolved_ro_paths = _resolve_ro_paths(ro_paths) + env = _child_env(str(scratch_path)) + launcher_argv = _launcher_argv(argv, _cpu_limit_for(argv[0])) + if not launcher_argv: + return _launch_error("rlimit launcher not found") + + if mode == SANDBOX_MODE_BWRAP: + command = _build_bwrap_argv( + launcher_argv, + resolved_ro_paths, + scratch_path, + env, + allow_net=allow_net or settings.sandbox_allow_net, + ) + else: + command = launcher_argv + + return await _run_process(command, env=env, timeout=timeout) + except Exception as exc: + return _launch_error(f"containment setup failed: {exc.__class__.__name__}") + + +def _launch_error(message: str) -> SandboxResult: + return SandboxResult( + stdout=b"", + stderr=b"", + returncode=126, + timed_out=False, + output_overflow=False, + rlimit_killed=False, + launch_failed=True, + sandbox_error=message, + ) + + +def _prepare_scratch(scratch_dir: str) -> Path: + path = Path(scratch_dir).resolve() + os.makedirs(path, mode=0o700, exist_ok=True) + os.chmod(path, 0o700) + return path + + +def _resolve_ro_paths(ro_paths: list[str]) -> list[Path]: + resolved = [] + for raw_path in ro_paths: + path = Path(raw_path).resolve(strict=True) + st = path.stat() + if not stat.S_ISREG(st.st_mode): + raise ValueError(f"ro_path is not a regular file: {path}") + if st.st_size > settings.sandbox_max_input_bytes: + raise ValueError(f"ro_path exceeds sandbox max input bytes: {path}") + resolved.append(path) + return resolved + + +def _child_env(scratch_dir: str) -> dict[str, str]: + env = { + "PATH": DEFAULT_CHILD_PATH, + "LANG": "C.UTF-8", + "TMPDIR": scratch_dir, + } + assert "WORKER_API_KEY" not in env + return env + + +def _launcher_path() -> str | None: + installed = "/usr/local/bin/edgeproof-rlimit-launcher" + if os.path.exists(installed): + return installed + + local = Path(__file__).with_name("sandbox_launcher.py") + if local.exists(): + return str(local) + + return None + + +def _launcher_argv(argv: list[str], cpu_seconds: int) -> list[str] | None: + launcher = _launcher_path() + if not launcher: + return None + return [ + launcher, + "--as-bytes", + str(settings.sandbox_rlimit_as_bytes), + "--cpu-seconds", + str(cpu_seconds), + "--fsize-bytes", + str(settings.sandbox_rlimit_fsize_bytes), + "--nofile", + str(NOFILE_LIMIT), + "--nproc", + str(settings.sandbox_rlimit_nproc), + "--", + *argv, + ] + + +def _cpu_limit_for(binary: str) -> int: + name = os.path.basename(binary) + if name == "ffprobe": + return settings.sandbox_rlimit_cpu_seconds_ffprobe + return settings.sandbox_rlimit_cpu_seconds_validator + + +def _build_bwrap_argv( + child_argv: list[str], + ro_paths: list[Path], + scratch_dir: Path, + env: dict[str, str], + *, + allow_net: bool, +) -> list[str]: + argv = [ + "bwrap", + "--ro-bind", + "/usr", + "/usr", + "--ro-bind", + "/usr/local", + "/usr/local", + "--ro-bind", + "/lib", + "/lib", + "--ro-bind", + "/lib64", + "/lib64", + "--ro-bind", + "/etc/ld.so.cache", + "/etc/ld.so.cache", + "--ro-bind", + "/etc/ssl", + "/etc/ssl", + "--proc", + "/proc", + "--dev", + "/dev", + "--tmpfs", + "/tmp", + ] + + for path in _needed_mount_dirs([*ro_paths, scratch_dir]): + argv.extend(["--dir", str(path)]) + + for path in ro_paths: + argv.extend(["--ro-bind", str(path), str(path)]) + + argv.extend( + [ + "--bind", + str(scratch_dir), + str(scratch_dir), + "--chdir", + str(scratch_dir), + "--unshare-user", + "--unshare-pid", + "--unshare-ipc", + "--unshare-cgroup", + "--die-with-parent", + "--new-session", + ] + ) + if not allow_net: + argv.append("--unshare-net") + + argv.extend(["--clearenv"]) + for key, value in env.items(): + argv.extend(["--setenv", key, value]) + argv.extend(["--", *child_argv]) + return argv + + +def _needed_mount_dirs(paths: list[Path]) -> list[Path]: + needed: set[Path] = set() + protected = {Path("/usr"), Path("/usr/local"), Path("/lib"), Path("/lib64"), Path("/etc"), Path("/proc"), Path("/dev"), Path("/tmp")} + for path in paths: + parent = path if path.is_dir() else path.parent + chain = [] + while parent != Path("/"): + if parent in protected: + break + chain.append(parent) + parent = parent.parent + needed.update(chain) + return sorted(needed, key=lambda p: len(p.parts)) + + +async def _run_process(command: list[str], *, env: dict[str, str], timeout: float) -> SandboxResult: + try: + process = await asyncio.create_subprocess_exec( + *command, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE, + env=env, + start_new_session=True, + ) + except OSError as exc: + return _launch_error(f"launch failed: {exc}") + + stdout, stderr, timed_out, output_overflow = await _communicate_limited(process, timeout) + returncode = process.returncode if process.returncode is not None else 126 + rlimit_killed = _is_rlimit_signal(returncode) + launch_failed = _looks_like_launcher_failure(returncode) + sandbox_error = None + if timed_out: + sandbox_error = "timeout" + elif output_overflow: + sandbox_error = "output_limit_exceeded" + elif launch_failed: + sandbox_error = "launcher_or_child_exec_failed" + elif rlimit_killed: + sandbox_error = f"signal:{-returncode}" + + return SandboxResult( + stdout=stdout, + stderr=stderr, + returncode=returncode, + timed_out=timed_out, + output_overflow=output_overflow, + rlimit_killed=rlimit_killed, + launch_failed=launch_failed, + sandbox_error=sandbox_error, + ) + + +async def _communicate_limited(process: asyncio.subprocess.Process, timeout: float) -> tuple[bytes, bytes, bool, bool]: + overflow = asyncio.Event() + stdout_task = asyncio.create_task(_read_limited(process.stdout, overflow)) + stderr_task = asyncio.create_task(_read_limited(process.stderr, overflow)) + wait_task = asyncio.create_task(process.wait()) + overflow_task = asyncio.create_task(overflow.wait()) + timed_out = False + + try: + done, _ = await asyncio.wait( + {wait_task, overflow_task}, + timeout=timeout, + return_when=asyncio.FIRST_COMPLETED, + ) + if wait_task not in done: + if overflow_task in done and overflow.is_set(): + await _kill_process_group(process) + else: + timed_out = True + await _kill_process_group(process) + try: + await asyncio.wait_for(wait_task, timeout=5.0) + except asyncio.TimeoutError: + pass + finally: + overflow_task.cancel() + + stdout, stderr = await asyncio.gather(stdout_task, stderr_task) + return stdout, stderr, timed_out, overflow.is_set() + + +async def _read_limited(stream: asyncio.StreamReader | None, overflow: asyncio.Event) -> bytes: + if stream is None: + return b"" + + chunks = bytearray() + while True: + chunk = await stream.read(65536) + if not chunk: + break + remaining = settings.sandbox_max_output_bytes - len(chunks) + if remaining <= 0: + overflow.set() + break + if len(chunk) > remaining: + chunks.extend(chunk[:remaining]) + overflow.set() + break + chunks.extend(chunk) + return bytes(chunks) + + +async def _kill_process_group(process: asyncio.subprocess.Process) -> None: + if process.returncode is not None: + return + try: + os.killpg(process.pid, signal.SIGKILL) + except ProcessLookupError: + pass + except PermissionError: + try: + process.kill() + except ProcessLookupError: + pass + + +def _is_rlimit_signal(returncode: int) -> bool: + if returncode >= 0: + return False + try: + return signal.Signals(-returncode) in RLIMIT_SIGNAL_NUMBERS + except ValueError: + return False + + +def _looks_like_launcher_failure(returncode: int) -> bool: + return returncode in (125, 127) + + +def _classify_probe_failure(stderr_text: str) -> str: + userns_hint = _userns_hint() + lowered = stderr_text.lower() + if userns_hint: + return userns_hint + if "operation not permitted" in lowered or "permission denied" in lowered or "clone" in lowered: + return "userns-unavailable" + if "no such file" in lowered or "not found" in lowered: + return "bwrap-loader-or-binary-not-found" + return "bwrap-probe-failed" + + +def _userns_hint() -> str | None: + proc_flag = Path("/proc/sys/kernel/unprivileged_userns_clone") + try: + if proc_flag.exists() and proc_flag.read_text().strip() == "0": + return "unprivileged_userns_clone=0" + except OSError: + pass + return None diff --git a/worker/app/sandbox_launcher.py b/worker/app/sandbox_launcher.py new file mode 100755 index 0000000..3815a76 --- /dev/null +++ b/worker/app/sandbox_launcher.py @@ -0,0 +1,72 @@ +#!/usr/bin/env python3 +"""Exec-only rlimit launcher for sandboxed C subprocesses.""" + +import argparse +import ctypes +import os +import resource +import sys + + +PR_SET_NO_NEW_PRIVS = 38 +RLIMITS = { + "as_bytes": resource.RLIMIT_AS, + "cpu_seconds": resource.RLIMIT_CPU, + "fsize_bytes": resource.RLIMIT_FSIZE, + "nofile": resource.RLIMIT_NOFILE, + "core_bytes": resource.RLIMIT_CORE, +} + +if hasattr(resource, "RLIMIT_NPROC"): + RLIMITS["nproc"] = resource.RLIMIT_NPROC + + +def _set_limit(name: str, value: int) -> None: + if value < 0: + return + resource.setrlimit(RLIMITS[name], (value, value)) + + +def _set_no_new_privs() -> None: + libc = ctypes.CDLL(None, use_errno=True) + if libc.prctl(PR_SET_NO_NEW_PRIVS, 1, 0, 0, 0) != 0: + errno = ctypes.get_errno() + raise OSError(errno, os.strerror(errno)) + + +def main() -> int: + parser = argparse.ArgumentParser(description="Apply rlimits and exec a child") + parser.add_argument("--as-bytes", type=int, required=True) + parser.add_argument("--cpu-seconds", type=int, required=True) + parser.add_argument("--fsize-bytes", type=int, required=True) + parser.add_argument("--nofile", type=int, required=True) + parser.add_argument("--nproc", type=int, required=True) + parser.add_argument("command", nargs=argparse.REMAINDER) + args = parser.parse_args() + + command = args.command + if command and command[0] == "--": + command = command[1:] + if not command: + print("edgeproof-rlimit-launcher: missing command", file=sys.stderr) + return 127 + + try: + _set_limit("core_bytes", 0) + _set_limit("as_bytes", args.as_bytes) + _set_limit("cpu_seconds", args.cpu_seconds) + _set_limit("fsize_bytes", args.fsize_bytes) + _set_limit("nofile", args.nofile) + if "nproc" in RLIMITS: + _set_limit("nproc", args.nproc) + _set_no_new_privs() + os.execvpe(command[0], command, os.environ) + except Exception as exc: + print(f"edgeproof-rlimit-launcher: {exc}", file=sys.stderr) + return 125 + + return 125 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/worker/app/services/svf_runner.py b/worker/app/services/svf_runner.py index b841735..43fb070 100644 --- a/worker/app/services/svf_runner.py +++ b/worker/app/services/svf_runner.py @@ -2,17 +2,26 @@ SVF (Signed Video Framework) validator subprocess wrapper. Runs the compiled validator binary from the signed-video-framework-examples -repo as a subprocess and parses its stdout output into a structured dict. +repo as a subprocess and parses its output into a structured dict. + +The validator writes results to a file (validation_results.txt) in the +current working directory. This module reads that file after execution. The validator binary is built during Docker image creation and installed to the system PATH via meson install. """ -import asyncio +import os import shutil import re +import stat +import tempfile +from pathlib import Path from typing import Optional +from app.config import settings +from app.sandbox import run_sandboxed + # Possible binary names from the examples repo VALIDATOR_BINARY_NAMES = [ @@ -23,6 +32,36 @@ SVF_TIMEOUT_SECONDS = 120 +# Map ffprobe codec names to validator -c flag values +CODEC_FLAG_MAP = { + "h264": "h264", + "H.264": "h264", + "hevc": "h265", + "h265": "h265", + "H.265": "h265", + "av1": "av1", + "AV1": "av1", +} + +EMPTY_RESULT = { + "success": False, + "status": "error", + "error": "", + "raw_output": "", + "gops_total": 0, + "gops_ok": 0, + "gops_not_ok": 0, + "frames_total": 0, + "frames_ok": 0, + "frames_not_ok": 0, + "has_signature": False, + "signature_valid": False, + "gop_chain_intact": False, + "device_serial": "", + "device_cert_subject": "", + "hash_algorithm": "", +} + def find_validator_binary() -> Optional[str]: """Find the SVF validator binary on the system PATH.""" @@ -30,115 +69,127 @@ def find_validator_binary() -> Optional[str]: path = shutil.which(name) if path: return path - # Check common install locations for prefix in ["/usr/local/bin", "/opt/svf-examples-build"]: for name in VALIDATOR_BINARY_NAMES: - path = f"{prefix}/{name}" - import os - if os.path.isfile(path) and os.access(path, os.X_OK): - return path + full = f"{prefix}/{name}" + if os.path.isfile(full) and os.access(full, os.X_OK): + return full return None -async def run_svf_validator(file_path: str) -> dict: +async def run_svf_validator(file_path: str, codec: str = "h264") -> dict: """ Run the SVF validator binary against a video file. - Returns a structured dict with verification results parsed from - the validator's stdout output. + Args: + file_path: Path to the video file to validate. + codec: Video codec name (h264, h265/hevc, av1). Used to pass + the correct -c flag to the validator binary. - If the validator binary is not found, returns an error result - indicating the binary is unavailable. + Returns a structured dict with verification results parsed from + the validator's output file (validation_results.txt). """ binary = find_validator_binary() if not binary: - return { - "success": False, - "status": "error", - "error": "SVF validator binary not found. Ensure signed-video-framework-examples is built.", - "raw_output": "", - "gops_total": 0, - "gops_ok": 0, - "gops_not_ok": 0, - "frames_total": 0, - "frames_ok": 0, - "frames_not_ok": 0, - "has_signature": False, - "signature_valid": False, - "gop_chain_intact": False, - "device_serial": "", - "device_cert_subject": "", - "hash_algorithm": "", - } + return {**EMPTY_RESULT, "error": "SVF validator binary not found. Ensure signed-video-framework-examples is built."} + + # Resolve the codec flag + codec_flag = CODEC_FLAG_MAP.get(codec, "h264") + + # Run the validator in a temp directory so validation_results.txt + # doesn't collide between concurrent requests + os.makedirs(settings.temp_dir, mode=0o700, exist_ok=True) + os.chmod(settings.temp_dir, 0o700) + work_dir = tempfile.mkdtemp(prefix="svf_", dir=settings.temp_dir) + os.chmod(work_dir, 0o700) try: - process = await asyncio.create_subprocess_exec( - binary, file_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) + resolved_file_path = str(Path(file_path).resolve(strict=True)) + cmd = [binary, "-c", codec_flag, resolved_file_path] - stdout_bytes, stderr_bytes = await asyncio.wait_for( - process.communicate(), + sandbox_result = await run_sandboxed( + cmd, + ro_paths=[resolved_file_path], + scratch_dir=work_dir, timeout=SVF_TIMEOUT_SECONDS, ) - stdout = stdout_bytes.decode("utf-8", errors="replace") - stderr = stderr_bytes.decode("utf-8", errors="replace") - combined = stdout + "\n" + stderr - - return parse_svf_output(combined, process.returncode or 0) - - except asyncio.TimeoutError: - return { - "success": False, - "status": "error", - "error": f"SVF validator timed out after {SVF_TIMEOUT_SECONDS}s", - "raw_output": "", - "gops_total": 0, - "gops_ok": 0, - "gops_not_ok": 0, - "frames_total": 0, - "frames_ok": 0, - "frames_not_ok": 0, - "has_signature": False, - "signature_valid": False, - "gop_chain_intact": False, - "device_serial": "", - "device_cert_subject": "", - "hash_algorithm": "", - } - except Exception as e: - return { - "success": False, - "status": "error", - "error": f"SVF validator failed: {str(e)}", - "raw_output": "", - "gops_total": 0, - "gops_ok": 0, - "gops_not_ok": 0, - "frames_total": 0, - "frames_ok": 0, - "frames_not_ok": 0, - "has_signature": False, - "signature_valid": False, - "gop_chain_intact": False, - "device_serial": "", - "device_cert_subject": "", - "hash_algorithm": "", - } + if sandbox_result.timed_out: + return _error_result(f"SVF validator timed out after {SVF_TIMEOUT_SECONDS}s") + if sandbox_result.launch_failed: + return _error_result("SVF validator sandbox launch failed") + if sandbox_result.rlimit_killed or sandbox_result.returncode < 0: + return _error_result("SVF validator was killed by sandbox resource limits") + if sandbox_result.output_overflow or sandbox_result.sandbox_error == "output_limit_exceeded": + return _error_result("SVF validator exceeded sandbox output limit") + if sandbox_result.returncode != 0: + return _error_result("SVF validator exited non-zero") + + stdout = sandbox_result.stdout.decode("utf-8", errors="replace") + stderr = sandbox_result.stderr.decode("utf-8", errors="replace") + + # The validator writes detailed results to validation_results.txt + file_output, file_error = _read_validation_results(work_dir) + if file_error: + return _error_result(file_error) + + # Combine all output sources for parsing + combined = file_output + "\n" + stdout + "\n" + stderr + + return parse_svf_output(combined, sandbox_result.returncode) + + except Exception: + return _error_result("SVF validator failed") + finally: + shutil.rmtree(work_dir, ignore_errors=True) + + +def _error_result(message: str) -> dict: + return {**EMPTY_RESULT, "error": message} + + +def _read_validation_results(work_dir: str) -> tuple[str, str | None]: + flags = os.O_RDONLY | getattr(os, "O_DIRECTORY", 0) + dir_fd = os.open(work_dir, flags) + try: + try: + fd = os.open( + "validation_results.txt", + os.O_RDONLY | getattr(os, "O_NOFOLLOW", 0), + dir_fd=dir_fd, + ) + except FileNotFoundError: + return "", None + except OSError: + return "", "SVF validator result file rejected" + + with os.fdopen(fd, "rb") as f: + file_stat = os.fstat(f.fileno()) + if not stat.S_ISREG(file_stat.st_mode): + return "", "SVF validator result file rejected" + data = f.read(settings.sandbox_rlimit_fsize_bytes + 1) + if len(data) > settings.sandbox_rlimit_fsize_bytes: + return "", "SVF validator result file too large" + return data.decode("utf-8", errors="replace"), None + finally: + os.close(dir_fd) def parse_svf_output(output: str, return_code: int) -> dict: """ - Parse the SVF validator's text output into a structured dict. - - The exact output format depends on the validator binary version. - This parser handles common patterns found in the SVF examples output: - - "Validation: OK" / "Validation: NOT OK" - - GOP and frame count lines - - Certificate subject lines - - Hash algorithm references + Parse the SVF validator's combined output into a structured dict. + + Threat-model flag: status is still inferred from validator text in + successful runs. Anchoring this to one authoritative status line is + deferred per Increment 1 amendment 21. + + Handles the validation_results.txt format produced by SVF v2.x: + - "VIDEO IS NOT SIGNED!" / "VIDEO IS SIGNED AND VERIFIED" + - "PUBLIC KEY VALIDATED" / "PUBLIC KEY COULD NOT BE VALIDATED!" + - "Number of unsigned/OK/NOT OK Bitstream Units: N" + - Product Info section (Hardware ID, Serial Number, etc.) + - Signed Video timestamps section + - Also handles legacy patterns from stdout (Validation: OK, etc.) """ result = { "success": return_code == 0, @@ -161,74 +212,130 @@ def parse_svf_output(output: str, return_code: int) -> dict: lower_output = output.lower() - # Detect if no signatures found - if "no signed video" in lower_output or "no signature" in lower_output or "unsigned" in lower_output: + # --- SVF v2.x validation_results.txt format --- + + # Detect unsigned video (no signed video metadata found) + if "video is not signed" in lower_output: result["status"] = "unsigned" result["has_signature"] = False + # Parse unsigned bitstream unit count as frame count + unsigned_count = _extract_int(output, r"Number of unsigned Bitstream Units:\s*(\d+)") + if unsigned_count > 0: + result["frames_total"] = unsigned_count return result - # Detect if signatures are present - if "signature" in lower_output or "signed" in lower_output: + # Detect signed and verified + if "video is signed and verified" in lower_output: result["has_signature"] = True - - # Parse overall validation status - if "validation: ok" in lower_output or "result: ok" in lower_output or "valid: true" in lower_output: result["signature_valid"] = True result["status"] = "authentic" - elif "validation: not ok" in lower_output or "result: not ok" in lower_output or "valid: false" in lower_output: + + # Detect signed but NOT verified (tampered) + if "video is signed" in lower_output and "not verified" in lower_output: + result["has_signature"] = True result["signature_valid"] = False result["status"] = "tampered" - elif "error" in lower_output and return_code != 0: - result["status"] = "error" - result["error"] = output.strip()[:500] - # Parse GOP counts from various output formats - gop_total = _extract_int(output, r"(?:total\s+)?gops?\s*[:=]\s*(\d+)") - gop_ok = _extract_int(output, r"gops?\s+(?:ok|valid|verified)\s*[:=]\s*(\d+)") - gop_not_ok = _extract_int(output, r"gops?\s+(?:not\s+ok|invalid|tampered|failed)\s*[:=]\s*(\d+)") - - if gop_total > 0: - result["gops_total"] = gop_total - if gop_ok > 0: - result["gops_ok"] = gop_ok - if gop_not_ok > 0: - result["gops_not_ok"] = gop_not_ok - - # If we have ok but not total, infer total - if result["gops_total"] == 0 and (result["gops_ok"] > 0 or result["gops_not_ok"] > 0): - result["gops_total"] = result["gops_ok"] + result["gops_not_ok"] - - # Parse frame counts - frame_total = _extract_int(output, r"(?:total\s+)?frames?\s*[:=]\s*(\d+)") - frame_ok = _extract_int(output, r"frames?\s+(?:ok|valid|verified)\s*[:=]\s*(\d+)") - frame_not_ok = _extract_int(output, r"frames?\s+(?:not\s+ok|invalid|tampered|failed)\s*[:=]\s*(\d+)") - - if frame_total > 0: - result["frames_total"] = frame_total - if frame_ok > 0: - result["frames_ok"] = frame_ok - if frame_not_ok > 0: - result["frames_not_ok"] = frame_not_ok - - if result["frames_total"] == 0 and (result["frames_ok"] > 0 or result["frames_not_ok"] > 0): - result["frames_total"] = result["frames_ok"] + result["frames_not_ok"] - - # GOP chain status - if "chain intact" in lower_output or "linked: ok" in lower_output or "linking: ok" in lower_output: + # Public key validation + if "public key validated" in lower_output and "could not" not in lower_output: + result["signature_valid"] = True + elif "public key could not be validated" in lower_output: + # Key not validated but video may still be signed + pass + + # Parse Bitstream Unit counts (SVF v2.x format) + ok_units = _extract_int(output, r"Number of OK Bitstream Units:\s*(\d+)") + not_ok_units = _extract_int(output, r"Number of NOT OK Bitstream Units:\s*(\d+)") + unsigned_units = _extract_int(output, r"Number of unsigned Bitstream Units:\s*(\d+)") + + if ok_units > 0: + result["gops_ok"] = ok_units + if not_ok_units > 0: + result["gops_not_ok"] = not_ok_units + + total = ok_units + not_ok_units + unsigned_units + if total > 0: + result["gops_total"] = ok_units + not_ok_units + result["frames_total"] = total + + # If we have OK units and no NOT OK, chain is intact + if ok_units > 0 and not_ok_units == 0: result["gop_chain_intact"] = True - elif "chain broken" in lower_output or "linked: not ok" in lower_output: + elif not_ok_units > 0: result["gop_chain_intact"] = False - elif result["gops_not_ok"] == 0 and result["gops_ok"] > 0: - result["gop_chain_intact"] = True + + # Parse Product Info section + serial_match = re.search(r"Serial Number:\s*(\S+)", output) + if serial_match and serial_match.group(1).strip(): + serial = serial_match.group(1).strip() + result["device_serial"] = serial + result["device_cert_subject"] = f"CN={serial}" + + hw_match = re.search(r"Hardware ID:\s*(\S+)", output) + if hw_match and hw_match.group(1).strip(): + result["hardware_id"] = hw_match.group(1).strip() + + firmware_match = re.search(r"Firmware version:\s*(\S+)", output) + if firmware_match and firmware_match.group(1).strip(): + result["firmware_version"] = firmware_match.group(1).strip() + + # Parse timestamps + first_frame_match = re.search(r"First frame:\s+(.+)", output) + if first_frame_match: + ts = first_frame_match.group(1).strip() + if ts != "N/A": + result["first_frame_ts"] = ts + + last_frame_match = re.search(r"Last validated frame:\s+(.+)", output) + if last_frame_match: + ts = last_frame_match.group(1).strip() + if ts != "N/A": + result["last_frame_ts"] = ts + + # Parse SVF version info + version_match = re.search(r"Camera runs:\s+(\S+)", output) + if version_match: + v = version_match.group(1).strip() + if v != "N/A": + result["camera_svf_version"] = v + + # --- Legacy stdout patterns (fallback) --- + + if result["status"] == "inconclusive": + if "validation: ok" in lower_output or "result: ok" in lower_output or "valid: true" in lower_output: + result["signature_valid"] = True + result["status"] = "authentic" + result["has_signature"] = True + elif "validation: not ok" in lower_output or "result: not ok" in lower_output or "valid: false" in lower_output: + result["signature_valid"] = False + result["status"] = "tampered" + result["has_signature"] = True + elif "no signed video" in lower_output or "no signature" in lower_output: + result["status"] = "unsigned" + result["has_signature"] = False + + # Legacy GOP/frame count patterns + if result["gops_total"] == 0: + gop_total = _extract_int(output, r"(?:total\s+)?gops?\s*[:=]\s*(\d+)") + gop_ok = _extract_int(output, r"gops?\s+(?:ok|valid|verified)\s*[:=]\s*(\d+)") + gop_not_ok = _extract_int(output, r"gops?\s+(?:not\s+ok|invalid|tampered|failed)\s*[:=]\s*(\d+)") + if gop_total > 0: + result["gops_total"] = gop_total + if gop_ok > 0: + result["gops_ok"] = gop_ok + if gop_not_ok > 0: + result["gops_not_ok"] = gop_not_ok + if result["gops_total"] == 0 and (result["gops_ok"] > 0 or result["gops_not_ok"] > 0): + result["gops_total"] = result["gops_ok"] + result["gops_not_ok"] # Parse certificate subject (CN=...) - cn_match = re.search(r"CN\s*=\s*([A-Z0-9]+)", output) - if cn_match: - result["device_cert_subject"] = f"CN={cn_match.group(1)}" - # Axis serial numbers start with ACCC8E - serial = cn_match.group(1) - if serial.startswith("ACCC"): - result["device_serial"] = serial + if not result["device_cert_subject"]: + cn_match = re.search(r"CN\s*=\s*([A-Z0-9]+)", output) + if cn_match: + result["device_cert_subject"] = f"CN={cn_match.group(1)}" + serial = cn_match.group(1) + if serial.startswith("ACCC"): + result["device_serial"] = serial # Parse hash algorithm if "sha-256" in lower_output or "sha256" in lower_output: @@ -236,6 +343,11 @@ def parse_svf_output(output: str, return_code: int) -> dict: elif "sha-512" in lower_output or "sha512" in lower_output: result["hash_algorithm"] = "SHA-512" + # Handle error case + if result["status"] == "inconclusive" and return_code != 0: + result["status"] = "error" + result["error"] = output.strip()[:500] + return result diff --git a/worker/app/services/video_info.py b/worker/app/services/video_info.py index 5547ac4..548fb30 100644 --- a/worker/app/services/video_info.py +++ b/worker/app/services/video_info.py @@ -5,11 +5,16 @@ and checks for the Axis signed video SEI NALU UUID. """ -import asyncio import json +import os import re +import shutil +import tempfile from typing import Optional +from app.config import settings +from app.sandbox import run_sandboxed + # Axis signed video UUID (hex representation for searching in binary/hex output) SIGNING_UUID = "5369676e-6564-2056-6964-656f2e2e2e30" @@ -44,9 +49,13 @@ async def get_video_info(file_path: str) -> dict: # Extract format/container info fmt = probe_data.get("format", {}) + if not isinstance(fmt, dict): + fmt = {} format_name = fmt.get("format_name", "") + if not isinstance(format_name, str): + format_name = "" result["container"] = _normalize_container(format_name) - result["duration_seconds"] = float(fmt.get("duration", 0)) + result["duration_seconds"] = _safe_float(fmt.get("duration", 0)) # Extract creation time from format tags if available tags = fmt.get("tags", {}) @@ -56,13 +65,18 @@ async def get_video_info(file_path: str) -> dict: # Extract video stream info (first video stream) streams = probe_data.get("streams", []) + if not isinstance(streams, list): + streams = [] video_stream = next( - (s for s in streams if s.get("codec_type") == "video"), + (s for s in streams if isinstance(s, dict) and s.get("codec_type") == "video"), None, ) if video_stream: - result["codec"] = _normalize_codec(video_stream.get("codec_name", "")) + codec_name = video_stream.get("codec_name", "") + if not isinstance(codec_name, str): + codec_name = "" + result["codec"] = _normalize_codec(codec_name) width = video_stream.get("width", 0) height = video_stream.get("height", 0) if width and height: @@ -75,7 +89,7 @@ async def get_video_info(file_path: str) -> dict: # Estimate total frames nb_frames = video_stream.get("nb_frames") if nb_frames and nb_frames != "N/A": - result["total_frames"] = int(nb_frames) + result["total_frames"] = _safe_int(nb_frames) elif result["duration_seconds"] > 0 and result["framerate"] > 0: result["total_frames"] = int(result["duration_seconds"] * result["framerate"]) @@ -97,27 +111,49 @@ async def get_video_info(file_path: str) -> dict: async def _run_ffprobe(file_path: str) -> Optional[dict]: """Run ffprobe and return parsed JSON output.""" + scratch_dir = _make_scratch("ffprobe_info_") try: - process = await asyncio.create_subprocess_exec( - "ffprobe", - "-v", "quiet", - "-print_format", "json", - "-show_format", - "-show_streams", - file_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - stdout, _ = await asyncio.wait_for( - process.communicate(), + sandbox_result = await run_sandboxed( + [ + "ffprobe", + "-v", + "quiet", + "-protocol_whitelist", + "file", + "-analyzeduration", + "5M", + "-probesize", + "10M", + "-print_format", + "json", + "-show_format", + "-show_streams", + file_path, + ], + ro_paths=[file_path], + scratch_dir=scratch_dir, timeout=FFPROBE_TIMEOUT_SECONDS, ) - return json.loads(stdout.decode("utf-8")) - except (asyncio.TimeoutError, json.JSONDecodeError, FileNotFoundError) as e: + if ( + sandbox_result.returncode != 0 + or sandbox_result.timed_out + or sandbox_result.rlimit_killed + or sandbox_result.output_overflow + or sandbox_result.sandbox_error == "output_limit_exceeded" + ): + print(f"ffprobe failed closed: {sandbox_result.sandbox_error or sandbox_result.returncode}") + return None + + data = json.loads(sandbox_result.stdout.decode("utf-8", errors="replace")) + if not isinstance(data, dict): + return None + return data + except (json.JSONDecodeError, OSError, ValueError) as e: print(f"ffprobe failed: {e}") return None + finally: + shutil.rmtree(scratch_dir, ignore_errors=True) async def _check_for_signing_uuid(file_path: str) -> bool: @@ -128,30 +164,51 @@ async def _check_for_signing_uuid(file_path: str) -> bool: Falls back to binary search of the file header. """ try: - # Use ffprobe to show packets and look for SEI data - process = await asyncio.create_subprocess_exec( - "ffprobe", - "-v", "quiet", - "-show_packets", - "-select_streams", "v:0", - "-read_intervals", "%+5", # Only read first 5 seconds - "-print_format", "json", - file_path, - stdout=asyncio.subprocess.PIPE, - stderr=asyncio.subprocess.PIPE, - ) - - stdout, _ = await asyncio.wait_for( - process.communicate(), - timeout=FFPROBE_TIMEOUT_SECONDS, - ) - - output = stdout.decode("utf-8", errors="replace") - # The signing UUID bytes in the SEI data - if "5369676e" in output or "Signed Video" in output: - return True + scratch_dir = _make_scratch("ffprobe_uuid_") + try: + # Use ffprobe to show packets and look for SEI data + sandbox_result = await run_sandboxed( + [ + "ffprobe", + "-v", + "quiet", + "-protocol_whitelist", + "file", + "-analyzeduration", + "5M", + "-probesize", + "10M", + "-show_packets", + "-select_streams", + "v:0", + "-read_intervals", + "%+5", # Only read first 5 seconds + "-print_format", + "json", + file_path, + ], + ro_paths=[file_path], + scratch_dir=scratch_dir, + timeout=FFPROBE_TIMEOUT_SECONDS, + ) + + if ( + sandbox_result.returncode != 0 + or sandbox_result.timed_out + or sandbox_result.rlimit_killed + or sandbox_result.output_overflow + or sandbox_result.sandbox_error == "output_limit_exceeded" + ): + return False + + output = sandbox_result.stdout.decode("utf-8", errors="replace") + # The signing UUID bytes in the SEI data + if "5369676e" in output or "Signed Video" in output: + return True + finally: + shutil.rmtree(scratch_dir, ignore_errors=True) - except (asyncio.TimeoutError, FileNotFoundError): + except (OSError, ValueError): pass # Fallback: search the file's first 1MB for the UUID bytes @@ -167,6 +224,26 @@ async def _check_for_signing_uuid(file_path: str) -> bool: return False +def _make_scratch(prefix: str) -> str: + os.makedirs(settings.temp_dir, mode=0o700, exist_ok=True) + os.chmod(settings.temp_dir, 0o700) + return tempfile.mkdtemp(prefix=prefix, dir=settings.temp_dir) + + +def _safe_float(value) -> float: + try: + return float(value) + except (TypeError, ValueError, OverflowError): + return 0.0 + + +def _safe_int(value) -> int: + try: + return int(value) + except (TypeError, ValueError, OverflowError): + return 0 + + def _normalize_codec(codec_name: str) -> str: """Normalize ffprobe codec name to display name.""" mapping = { @@ -198,5 +275,5 @@ def _parse_framerate(r_frame_rate: str) -> float: return 0.0 return round(int(num) / den_val, 2) return float(r_frame_rate) - except (ValueError, ZeroDivisionError): + except (TypeError, ValueError, ZeroDivisionError): return 0.0 diff --git a/worker/pytest.ini b/worker/pytest.ini new file mode 100644 index 0000000..c0186bd --- /dev/null +++ b/worker/pytest.ini @@ -0,0 +1,3 @@ +[pytest] +markers = + linux_sandbox: Linux-container sandbox acceptance tests; running this marker off Linux fails by design. diff --git a/worker/railway.json b/worker/railway.json new file mode 100644 index 0000000..cf39a96 --- /dev/null +++ b/worker/railway.json @@ -0,0 +1,13 @@ +{ + "$schema": "https://railway.app/railway.schema.json", + "build": { + "builder": "DOCKERFILE", + "dockerfilePath": "Dockerfile" + }, + "deploy": { + "healthcheckPath": "/health", + "healthcheckTimeout": 300, + "restartPolicyType": "ON_FAILURE", + "restartPolicyMaxRetries": 10 + } +} diff --git a/worker/requirements.txt b/worker/requirements.txt index 81f86f9..e9c306a 100644 --- a/worker/requirements.txt +++ b/worker/requirements.txt @@ -6,3 +6,4 @@ pydantic==2.10.4 pydantic-settings==2.7.1 cryptography==44.0.0 python-jose==3.3.0 +pytest==8.3.4 diff --git a/worker/tests/test_callback.py b/worker/tests/test_callback.py index 3e0bbac..555d1fd 100644 --- a/worker/tests/test_callback.py +++ b/worker/tests/test_callback.py @@ -4,8 +4,12 @@ from fastapi.testclient import TestClient from unittest.mock import patch, AsyncMock from app.main import app +from app.config import settings import io +settings.allow_degraded_sandbox = True +settings.use_mock_results = True + client = TestClient(app) AUTH_HEADER = {"Authorization": "Bearer dev-worker-api-key"} diff --git a/worker/tests/test_sandbox.py b/worker/tests/test_sandbox.py new file mode 100644 index 0000000..78f4fd8 --- /dev/null +++ b/worker/tests/test_sandbox.py @@ -0,0 +1,447 @@ +import asyncio +import os +import platform +import shutil +import subprocess +import sys +import textwrap +from pathlib import Path + +import pytest +from fastapi.testclient import TestClient + +import app.main as main +from app.config import settings +from app.models.verification import VerificationResult +from app.sandbox import SANDBOX_MODE_DEGRADED, SandboxResult, get_sandbox_mode, run_sandboxed +from app.services import svf_runner, video_info + + +AUTH_HEADER = {"Authorization": "Bearer dev-worker-api-key"} + + +def _run(coro): + return asyncio.run(coro) + + +def _linux_only(): + if platform.system() != "Linux": + pytest.fail( + "linux_sandbox tests must run inside the Linux container: " + "docker build -t epworker . && docker run --rm epworker pytest -m linux_sandbox" + ) + + +@pytest.fixture +def restore_settings(): + names = [ + "allow_degraded_sandbox", + "use_mock_results", + "max_file_size_bytes", + "sandbox_max_input_bytes", + "sandbox_max_output_bytes", + "sandbox_rlimit_as_bytes", + "sandbox_rlimit_cpu_seconds_ffprobe", + "sandbox_rlimit_cpu_seconds_validator", + "sandbox_rlimit_fsize_bytes", + "sandbox_rlimit_nproc", + "temp_dir", + ] + before = {name: getattr(settings, name) for name in names} + yield + for name, value in before.items(): + setattr(settings, name, value) + + +def test_ac4a_host_ffprobe_args_have_protocol_whitelist_and_caps(monkeypatch, tmp_path, restore_settings): + calls = [] + sample = tmp_path / "sample.mp4" + sample.write_bytes(b"video") + settings.temp_dir = str(tmp_path) + + async def fake_run_sandboxed(argv, **kwargs): + calls.append(argv) + if "-show_format" in argv: + return SandboxResult( + stdout=b'{"format":{"format_name":"mp4","duration":"1"},"streams":[]}', + stderr=b"", + returncode=0, + timed_out=False, + rlimit_killed=False, + launch_failed=False, + sandbox_error=None, + ) + return SandboxResult( + stdout=b'{"packets":[]}', + stderr=b"", + returncode=0, + timed_out=False, + rlimit_killed=False, + launch_failed=False, + sandbox_error=None, + ) + + monkeypatch.setattr(video_info, "run_sandboxed", fake_run_sandboxed) + + _run(video_info._run_ffprobe(str(sample))) + _run(video_info._check_for_signing_uuid(str(sample))) + + assert len(calls) == 2 + for argv in calls: + assert argv[0] == "ffprobe" + assert argv[argv.index("-protocol_whitelist") + 1] == "file" + assert argv[argv.index("-analyzeduration") + 1] == "5M" + assert argv[argv.index("-probesize") + 1] == "10M" + + +def test_host_parent_parse_fuzz_is_fail_closed(monkeypatch): + async def fake_probe(_file_path): + return { + "format": {"format_name": ["bad"], "duration": {"not": "float"}}, + "streams": [ + {"codec_type": "video", "codec_name": {"bad": "codec"}, "nb_frames": {"not": "int"}, "r_frame_rate": {}} + ], + } + + async def fake_uuid(_file_path): + return False + + monkeypatch.setattr(video_info, "_run_ffprobe", fake_probe) + monkeypatch.setattr(video_info, "_check_for_signing_uuid", fake_uuid) + + result = _run(video_info.get_video_info("/tmp/hostile.mp4")) + + assert result["duration_seconds"] == 0.0 + assert result["total_frames"] == 0 + assert result["framerate"] == 0.0 + assert result["codec"] == "" + assert result["container"] == "" + + +def test_ac15_host_static_spawn_sites_only_call_run_sandboxed(): + for path in ["app/services/video_info.py", "app/services/svf_runner.py"]: + source = Path(path).read_text() + assert "create_subprocess_exec" not in source + assert "run_sandboxed" in source + + +def test_ac15_host_sandboxed_pipeline_wiring(monkeypatch, tmp_path, restore_settings): + input_file = tmp_path / "clip.mp4" + input_file.write_bytes(b"video") + settings.temp_dir = str(tmp_path) + calls = [] + + async def fake_ffprobe(argv, **kwargs): + calls.append(("ffprobe", argv, kwargs)) + if "-show_format" in argv: + payload = { + "format": {"format_name": "mov,mp4", "duration": "2"}, + "streams": [{"codec_type": "video", "codec_name": "h264", "nb_frames": "60", "r_frame_rate": "30/1"}], + } + return SandboxResult( + stdout=__import__("json").dumps(payload).encode(), + stderr=b"", + returncode=0, + timed_out=False, + rlimit_killed=False, + launch_failed=False, + sandbox_error=None, + ) + return SandboxResult( + stdout=b"Signed Video", + stderr=b"", + returncode=0, + timed_out=False, + rlimit_killed=False, + launch_failed=False, + sandbox_error=None, + ) + + async def fake_validator(argv, **kwargs): + calls.append(("validator", argv, kwargs)) + Path(kwargs["scratch_dir"], "validation_results.txt").write_text( + "VIDEO IS SIGNED AND VERIFIED\nPUBLIC KEY VALIDATED\nNumber of OK Bitstream Units: 2\n" + ) + return SandboxResult( + stdout=b"", + stderr=b"", + returncode=0, + timed_out=False, + rlimit_killed=False, + launch_failed=False, + sandbox_error=None, + ) + + monkeypatch.setattr(video_info, "run_sandboxed", fake_ffprobe) + monkeypatch.setattr(svf_runner, "run_sandboxed", fake_validator) + monkeypatch.setattr(svf_runner, "find_validator_binary", lambda: "/usr/local/bin/signed-video-validator") + + result = _run(main.run_verification_pipeline(str(input_file), "clip.mp4")) + + assert isinstance(result, VerificationResult) + assert [call[0] for call in calls] == ["ffprobe", "ffprobe", "validator"] + assert calls[0][2]["ro_paths"] == [str(input_file)] + assert calls[2][2]["ro_paths"] == [str(input_file.resolve())] + + +def test_ac17_host_degraded_gate_and_health(monkeypatch, restore_settings): + async def degraded_probe(): + return SANDBOX_MODE_DEGRADED + + monkeypatch.setattr(main, "ensure_sandbox_probed", degraded_probe) + settings.use_mock_results = True + settings.allow_degraded_sandbox = False + + with TestClient(main.app) as client: + response = client.post( + "/verify", + files={"file": ("clip.mp4", b"video", "video/mp4")}, + headers=AUTH_HEADER, + ) + assert response.status_code == 503 + + health = client.get("/health") + assert health.status_code == 200 + assert "sandbox" in health.json() + + settings.allow_degraded_sandbox = True + response = client.post( + "/verify", + files={"file": ("clip.mp4", b"video", "video/mp4")}, + headers=AUTH_HEADER, + ) + assert response.status_code == 200 + + +def test_ac13_host_upload_cap_rejects_before_pipeline(monkeypatch, restore_settings): + async def degraded_probe(): + return SANDBOX_MODE_DEGRADED + + monkeypatch.setattr(main, "ensure_sandbox_probed", degraded_probe) + settings.use_mock_results = True + settings.allow_degraded_sandbox = True + settings.max_file_size_bytes = 1 + settings.sandbox_max_input_bytes = 1 + + with TestClient(main.app) as client: + response = client.post( + "/verify", + files={"file": ("clip.mp4", b"xx", "video/mp4")}, + headers=AUTH_HEADER, + ) + assert response.status_code == 400 + assert client.get("/health").status_code == 200 + + +def test_ac12_host_no_parse_on_signal_death(monkeypatch, tmp_path, restore_settings): + input_file = tmp_path / "clip.mp4" + input_file.write_bytes(b"video") + settings.temp_dir = str(tmp_path) + + async def fake_validator(argv, **kwargs): + Path(kwargs["scratch_dir"], "validation_results.txt").write_text("VIDEO IS SIGNED AND VERIFIED") + return SandboxResult( + stdout=b"VIDEO IS SIGNED AND VERIFIED", + stderr=b"", + returncode=-9, + timed_out=False, + rlimit_killed=True, + launch_failed=False, + sandbox_error="signal:9", + ) + + monkeypatch.setattr(svf_runner, "find_validator_binary", lambda: "/usr/local/bin/signed-video-validator") + monkeypatch.setattr(svf_runner, "run_sandboxed", fake_validator) + + result = _run(svf_runner.run_svf_validator(str(input_file))) + + assert result["status"] == "error" + assert result["raw_output"] == "" + assert "killed" in result["error"] + + +@pytest.mark.linux_sandbox +def test_linux_ldd_bind_contract_covers_runtime_binaries(): + _linux_only() + binaries = [shutil.which("ffprobe"), svf_runner.find_validator_binary()] + assert all(binaries) + allowed = ("/usr/", "/usr/local/", "/lib/", "/lib64/") + for binary in binaries: + output = subprocess.check_output(["ldd", binary], text=True) + for line in output.splitlines(): + if "=>" not in line: + continue + lib_path = line.split("=>", 1)[1].strip().split(" ", 1)[0] + if lib_path == "not": + pytest.fail(f"unresolved ldd dependency for {binary}: {line}") + assert lib_path.startswith(allowed), line + + +@pytest.mark.linux_sandbox +def test_ac1_linux_non_root_subprocess_uid(tmp_path): + _linux_only() + result = _run(run_sandboxed([sys.executable, "-c", "import os; print(os.getuid())"], ro_paths=[], scratch_dir=str(tmp_path), timeout=5)) + assert result.returncode == 0 + assert int(result.stdout.strip()) != 0 + + +@pytest.mark.linux_sandbox +def test_ac2_linux_rlimit_as_kills_child_and_health_survives(tmp_path, restore_settings): + _linux_only() + settings.sandbox_rlimit_as_bytes = 128 * 1024 * 1024 + code = "x = bytearray(512 * 1024 * 1024); print(len(x))" + result = _run(run_sandboxed([sys.executable, "-c", code], ro_paths=[], scratch_dir=str(tmp_path), timeout=10)) + assert result.returncode != 0 + with TestClient(main.app) as client: + assert client.get("/health").status_code == 200 + + +@pytest.mark.linux_sandbox +def test_ac3_linux_timeout_kills_process_group(tmp_path): + _linux_only() + marker = "ep-orphan-sentinel" + code = textwrap.dedent( + f""" + import subprocess, time + subprocess.Popen(['sh', '-c', 'exec sleep 30 # {marker}']) + time.sleep(30) + """ + ) + result = _run(run_sandboxed([sys.executable, "-c", code], ro_paths=[], scratch_dir=str(tmp_path), timeout=1)) + assert result.timed_out + ps = os.popen(f"ps ax -o command | grep {marker} | grep -v grep").read() + assert marker not in ps + + +@pytest.mark.linux_sandbox +def test_ac4b_linux_concat_sentinel_cannot_read_outside_input(tmp_path): + _linux_only() + script = tmp_path / "evil.ffconcat" + script.write_text("ffconcat version 1.0\nfile /etc/hostname\n") + result = _run( + run_sandboxed( + [ + "ffprobe", + "-v", + "error", + "-protocol_whitelist", + "file", + "-analyzeduration", + "5M", + "-probesize", + "10M", + "-f", + "concat", + "-safe", + "0", + str(script), + ], + ro_paths=[str(script)], + scratch_dir=str(tmp_path / "scratch"), + timeout=5, + ) + ) + assert result.returncode != 0 + + +@pytest.mark.linux_sandbox +def test_ac5_linux_network_blocked(tmp_path): + _linux_only() + code = "import socket, sys; s=socket.socket(); s.settimeout(.5)\ntry: s.connect(('1.1.1.1', 53)); sys.exit(1)\nexcept OSError: sys.exit(0)" + result = _run(run_sandboxed([sys.executable, "-c", code], ro_paths=[], scratch_dir=str(tmp_path), timeout=5)) + assert result.returncode == 0 + + +@pytest.mark.linux_sandbox +def test_ac6_linux_filesystem_confinement(tmp_path): + _linux_only() + code = textwrap.dedent( + """ + import os, sys + for path in ('/etc/hostname', '/app/certs', '/app/app/main.py'): + if os.path.exists(path): + sys.exit(1) + sys.exit(0) + """ + ) + result = _run(run_sandboxed([sys.executable, "-c", code], ro_paths=[], scratch_dir=str(tmp_path), timeout=5)) + assert result.returncode == 0 + + +@pytest.mark.linux_sandbox +def test_ac7_linux_seccomp_child_exec_conditional(): + _linux_only() + pytest.xfail("AC7 conditional: seccomp-bpf profile is intentionally deferred, not shipped fail-open") + + +@pytest.mark.linux_sandbox +def test_ac8_linux_sandbox_result_flags_and_clean_scratch(tmp_path): + _linux_only() + clean_scratch = tmp_path / "clean" + clean = _run(run_sandboxed([sys.executable, "-c", "print('ok')"], ro_paths=[], scratch_dir=str(clean_scratch), timeout=5)) + assert clean.returncode == 0 + assert clean.stdout.strip() == b"ok" + + missing = _run(run_sandboxed(["/no/such/binary"], ro_paths=[], scratch_dir=str(tmp_path / "missing"), timeout=5)) + assert missing.launch_failed + + timeout = _run(run_sandboxed([sys.executable, "-c", "import time; time.sleep(30)"], ro_paths=[], scratch_dir=str(tmp_path / "timeout"), timeout=1)) + assert timeout.timed_out + + +@pytest.mark.linux_sandbox +def test_ac9_linux_env_scrubbed(tmp_path, monkeypatch): + _linux_only() + monkeypatch.setenv("WORKER_API_KEY", "secret") + code = "import os; print(os.environ.get('WORKER_API_KEY', ''))" + result = _run(run_sandboxed([sys.executable, "-c", code], ro_paths=[], scratch_dir=str(tmp_path), timeout=5)) + assert result.returncode == 0 + assert result.stdout.strip() == b"" + + +@pytest.mark.linux_sandbox +def test_ac10_linux_output_cap_kills_child(tmp_path, restore_settings): + _linux_only() + settings.sandbox_max_output_bytes = 1024 + code = "import sys; sys.stdout.buffer.write(b'x' * (10 * 1024 * 1024)); sys.stdout.flush()" + result = _run(run_sandboxed([sys.executable, "-c", code], ro_paths=[], scratch_dir=str(tmp_path), timeout=5)) + assert result.sandbox_error == "output_limit_exceeded" + with TestClient(main.app) as client: + assert client.get("/health").status_code == 200 + + +@pytest.mark.linux_sandbox +def test_ac11_linux_o_nofollow_result_read_rejects_symlink(tmp_path): + _linux_only() + target = tmp_path / "secret" + target.write_text("secret") + scratch = tmp_path / "scratch" + scratch.mkdir() + os.symlink(target, scratch / "validation_results.txt") + content, error = svf_runner._read_validation_results(str(scratch)) + assert content == "" + assert error == "SVF validator result file rejected" + + +@pytest.mark.linux_sandbox +def test_ac14_linux_concurrency_defaults_fit_memory_budget(): + _linux_only() + per_job = settings.sandbox_rlimit_as_bytes + settings.sandbox_max_output_bytes * 2 + assert settings.sandbox_max_concurrent_jobs >= 1 + assert settings.sandbox_max_concurrent_jobs * per_job <= settings.sandbox_memory_limit_bytes + + +@pytest.mark.linux_sandbox +def test_ac16_linux_real_bytes_regression_fixture_contract(): + _linux_only() + genuine = Path("tests/fixtures/axis-genuine.mp4") + tampered = Path("tests/fixtures/axis-tampered.mp4") + if not genuine.exists() or not tampered.exists(): + pytest.xfail("real AXIS signed/tampered fixtures are not present in this repo yet") + + +@pytest.mark.linux_sandbox +def test_ac17_linux_probe_reports_mode(): + _linux_only() + _run(main.ensure_sandbox_probed()) + assert get_sandbox_mode() in {"bwrap", SANDBOX_MODE_DEGRADED} diff --git a/worker/tests/test_verify.py b/worker/tests/test_verify.py index 78349b9..bd7dd15 100644 --- a/worker/tests/test_verify.py +++ b/worker/tests/test_verify.py @@ -3,8 +3,12 @@ import pytest from fastapi.testclient import TestClient from app.main import app +from app.config import settings import io +settings.allow_degraded_sandbox = True +settings.use_mock_results = True + client = TestClient(app)