From cd24b854019683eb97e1949f335055b3c44e1351 Mon Sep 17 00:00:00 2001 From: Nathan Flurry Date: Sun, 12 Apr 2026 19:11:36 -0700 Subject: [PATCH 1/3] fix(pegboard-runner): clear terminal tunnel routes --- .../pegboard-runner/src/ws_to_tunnel_task.rs | 46 +++++- .../tests/support/ws_to_tunnel_task.rs | 150 +++++++++++++++++- 2 files changed, 186 insertions(+), 10 deletions(-) diff --git a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs index 550e9ff216..1c4b1a7c80 100644 --- a/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/src/ws_to_tunnel_task.rs @@ -860,6 +860,9 @@ async fn handle_tunnel_message_mk2( authorized_tunnel_routes: &HashMap<(protocol::mk2::GatewayId, protocol::mk2::RequestId), ()>, msg: protocol::mk2::ToServerTunnelMessage, ) -> Result<()> { + let route = (msg.message_id.gateway_id, msg.message_id.request_id); + let clear_route = should_clear_tunnel_route_mk2(&msg.message_kind); + // Extract inner data length before consuming msg let inner_data_len = tunnel_message_inner_data_len_mk2(&msg.message_kind); @@ -868,10 +871,7 @@ async fn handle_tunnel_message_mk2( return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build()); } - if !authorized_tunnel_routes - .contains_async(&(msg.message_id.gateway_id, msg.message_id.request_id)) - .await - { + if !authorized_tunnel_routes.contains_async(&route).await { return Err( errors::WsError::InvalidPacket("unauthorized tunnel message".to_string()).build(), ); @@ -899,6 +899,10 @@ async fn handle_tunnel_message_mk2( ) })?; + if clear_route { + authorized_tunnel_routes.remove_async(&route).await; + } + Ok(()) } @@ -909,6 +913,9 @@ async fn handle_tunnel_message_mk1( authorized_tunnel_routes: &HashMap<(protocol::mk2::GatewayId, protocol::mk2::RequestId), ()>, msg: protocol::ToServerTunnelMessage, ) -> Result<()> { + let route = (msg.message_id.gateway_id, msg.message_id.request_id); + let clear_route = should_clear_tunnel_route_mk1(&msg.message_kind); + // Ignore DeprecatedTunnelAck messages (used only for backwards compatibility) if matches!( msg.message_kind, @@ -925,10 +932,7 @@ async fn handle_tunnel_message_mk1( return Err(errors::WsError::InvalidPacket("payload too large".to_string()).build()); } - if !authorized_tunnel_routes - .contains_async(&(msg.message_id.gateway_id, msg.message_id.request_id)) - .await - { + if !authorized_tunnel_routes.contains_async(&route).await { return Err( errors::WsError::InvalidPacket("unauthorized tunnel message".to_string()).build(), ); @@ -950,9 +954,35 @@ async fn handle_tunnel_message_mk1( ) })?; + if clear_route { + authorized_tunnel_routes.remove_async(&route).await; + } + Ok(()) } +fn should_clear_tunnel_route_mk2(msg_kind: &protocol::mk2::ToServerTunnelMessageKind) -> bool { + match msg_kind { + protocol::mk2::ToServerTunnelMessageKind::ToServerResponseStart(response) => { + !response.stream + } + protocol::mk2::ToServerTunnelMessageKind::ToServerResponseChunk(chunk) => chunk.finish, + protocol::mk2::ToServerTunnelMessageKind::ToServerResponseAbort + | protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketClose(_) => true, + _ => false, + } +} + +fn should_clear_tunnel_route_mk1(msg_kind: &protocol::ToServerTunnelMessageKind) -> bool { + match msg_kind { + protocol::ToServerTunnelMessageKind::ToServerResponseStart(response) => !response.stream, + protocol::ToServerTunnelMessageKind::ToServerResponseChunk(chunk) => chunk.finish, + protocol::ToServerTunnelMessageKind::ToServerResponseAbort + | protocol::ToServerTunnelMessageKind::ToServerWebSocketClose(_) => true, + _ => false, + } +} + /// Returns the length of the inner data payload for a tunnel message kind. fn tunnel_message_inner_data_len_mk2(kind: &protocol::mk2::ToServerTunnelMessageKind) -> usize { use protocol::mk2::ToServerTunnelMessageKind; diff --git a/engine/packages/pegboard-runner/tests/support/ws_to_tunnel_task.rs b/engine/packages/pegboard-runner/tests/support/ws_to_tunnel_task.rs index 53b4278136..fbfcf2dc66 100644 --- a/engine/packages/pegboard-runner/tests/support/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-runner/tests/support/ws_to_tunnel_task.rs @@ -25,6 +25,74 @@ fn response_abort_message_mk2( } } +fn response_start_message_mk2( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, +) -> protocol::mk2::ToServerTunnelMessage { + response_start_message_mk2_with_stream(gateway_id, request_id, false) +} + +fn response_start_message_mk2_with_stream( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, + stream: bool, +) -> protocol::mk2::ToServerTunnelMessage { + protocol::mk2::ToServerTunnelMessage { + message_id: protocol::mk2::MessageId { + gateway_id, + request_id, + message_index: 0, + }, + message_kind: protocol::mk2::ToServerTunnelMessageKind::ToServerResponseStart( + protocol::mk2::ToServerResponseStart { + status: 200, + headers: Default::default(), + body: None, + stream, + }, + ), + } +} + +fn response_chunk_message_mk2( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, + finish: bool, +) -> protocol::mk2::ToServerTunnelMessage { + protocol::mk2::ToServerTunnelMessage { + message_id: protocol::mk2::MessageId { + gateway_id, + request_id, + message_index: 0, + }, + message_kind: protocol::mk2::ToServerTunnelMessageKind::ToServerResponseChunk( + protocol::mk2::ToServerResponseChunk { + body: b"chunk".to_vec(), + finish, + }, + ), + } +} + +fn websocket_message_mk2( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, +) -> protocol::mk2::ToServerTunnelMessage { + protocol::mk2::ToServerTunnelMessage { + message_id: protocol::mk2::MessageId { + gateway_id, + request_id, + message_index: 0, + }, + message_kind: protocol::mk2::ToServerTunnelMessageKind::ToServerWebSocketMessage( + protocol::mk2::ToServerWebSocketMessage { + data: b"ping".to_vec(), + binary: false, + }, + ), + } +} + fn response_abort_message_mk1( gateway_id: protocol::mk2::GatewayId, request_id: protocol::mk2::RequestId, @@ -39,6 +107,74 @@ fn response_abort_message_mk1( } } +fn websocket_message_mk1( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, +) -> protocol::ToServerTunnelMessage { + protocol::ToServerTunnelMessage { + message_id: protocol::MessageId { + gateway_id, + request_id, + message_index: 0, + }, + message_kind: protocol::ToServerTunnelMessageKind::ToServerWebSocketMessage( + protocol::ToServerWebSocketMessage { + data: b"ping".to_vec(), + binary: false, + }, + ), + } +} + +fn response_start_message_mk1( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, +) -> protocol::ToServerTunnelMessage { + response_start_message_mk1_with_stream(gateway_id, request_id, false) +} + +fn response_start_message_mk1_with_stream( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, + stream: bool, +) -> protocol::ToServerTunnelMessage { + protocol::ToServerTunnelMessage { + message_id: protocol::MessageId { + gateway_id, + request_id, + message_index: 0, + }, + message_kind: protocol::ToServerTunnelMessageKind::ToServerResponseStart( + protocol::ToServerResponseStart { + status: 200, + headers: Default::default(), + body: None, + stream, + }, + ), + } +} + +fn response_chunk_message_mk1( + gateway_id: protocol::mk2::GatewayId, + request_id: protocol::mk2::RequestId, + finish: bool, +) -> protocol::ToServerTunnelMessage { + protocol::ToServerTunnelMessage { + message_id: protocol::MessageId { + gateway_id, + request_id, + message_index: 0, + }, + message_kind: protocol::ToServerTunnelMessageKind::ToServerResponseChunk( + protocol::ToServerResponseChunk { + body: b"chunk".to_vec(), + finish, + }, + ), + } +} + #[tokio::test] async fn rejects_unissued_mk2_tunnel_message_pairs() { let pubsub = memory_pubsub("pegboard-runner-ws-to-tunnel-test-reject-mk2"); @@ -82,7 +218,7 @@ async fn republishes_issued_mk2_tunnel_message_pairs() { &pubsub, 1024, &authorized_tunnel_routes, - response_abort_message_mk2(gateway_id, request_id), + websocket_message_mk2(gateway_id, request_id), ) .await .unwrap(); @@ -92,6 +228,11 @@ async fn republishes_issued_mk2_tunnel_message_pairs() { .unwrap() .unwrap(); assert!(matches!(msg, NextOutput::Message(_))); + assert!( + authorized_tunnel_routes + .contains_async(&(gateway_id, request_id)) + .await + ); } #[tokio::test] @@ -137,7 +278,7 @@ async fn republishes_issued_mk1_tunnel_message_pairs() { &pubsub, 1024, &authorized_tunnel_routes, - response_abort_message_mk1(gateway_id, request_id), + websocket_message_mk1(gateway_id, request_id), ) .await .unwrap(); @@ -147,4 +288,9 @@ async fn republishes_issued_mk1_tunnel_message_pairs() { .unwrap() .unwrap(); assert!(matches!(msg, NextOutput::Message(_))); + assert!( + authorized_tunnel_routes + .contains_async(&(gateway_id, request_id)) + .await + ); } From 17014e148e761aedbd759d4d4de01e6a46bbbc94 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 06:11:31 +0000 Subject: [PATCH 2/3] chore: add temp review body file --- .agents/review_body.md | 74 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 74 insertions(+) create mode 100644 .agents/review_body.md diff --git a/.agents/review_body.md b/.agents/review_body.md new file mode 100644 index 0000000000..4995076903 --- /dev/null +++ b/.agents/review_body.md @@ -0,0 +1,74 @@ +## PR Review: `fix(pegboard-runner): clear terminal tunnel routes` + +### Summary + +This PR fixes a resource leak in the pegboard-runner's WebSocket-to-tunnel message handler. Previously, tunnel route authorizations (entries in `authorized_tunnel_routes`) were never removed when processing terminal tunnel messages (final HTTP responses, aborted requests, closed WebSocket connections). After this fix, terminal messages also remove the corresponding route entry, preventing stale route entries from accumulating across the lifetime of a runner connection. + +The fix is applied to both mk2 and mk1 protocol paths via new pure helper functions `should_clear_tunnel_route_mk2` and `should_clear_tunnel_route_mk1`. + +--- + +### Code Quality + +**Positive:** +- Extracting `route` and `clear_route` before consuming `msg` is correct and avoids borrow issues since `msg` is moved into the serialization call. +- `should_clear_tunnel_route_*` helpers are pure functions that clearly express intent. +- Uses `scc::HashMap` async API (`contains_async`, `remove_async`) consistently with the codebase's concurrency model. +- mk1 and mk2 paths are kept at feature parity per the Engine Runner Parity guideline. +- Commit message follows conventional commits format. + +--- + +### Issues + +#### Medium: Test helpers are dead code; clearing behavior is untested + +The test support file adds several helper constructors (`response_start_message_mk2`, `response_chunk_message_mk2`, `response_start_message_mk1`, `response_chunk_message_mk1`, etc.) that are never called in any test. The updated tests only verify that a `WebSocketMessage` (non-terminal) does **not** clear the route. There are no tests verifying the positive case that terminal messages **do** clear the route. + +Missing test cases: +- `ToServerResponseStart` with `stream: false` -> route cleared +- `ToServerResponseStart` with `stream: true` -> route **not** cleared +- `ToServerResponseChunk` with `finish: true` -> route cleared +- `ToServerResponseChunk` with `finish: false` -> route **not** cleared +- `ToServerResponseAbort` -> route cleared +- `ToServerWebSocketClose` -> route cleared +- Symmetric coverage for mk1 variants + +The existing `republishes_issued_mk*_tunnel_message_pairs` tests now only exercise the non-clearing path, so there is no test that sends a terminal message and asserts the route entry is subsequently absent. + +#### Low: Implicit fallthrough in `should_clear_tunnel_route_*` for future variants + +Both `should_clear` functions use `_ => false` as the catch-all. This means any future protocol variant added to the enum would silently default to not clearing the route. Depending on the variant this could be correct (safe default) or a bug. Explicitly enumerating all non-clearing variants or adding a comment would make the intent clear and surface a compile error if a new variant is added without deliberate handling. + +#### Low: `DeprecatedTunnelAck` early-return computes `clear_route` unnecessarily + +In `handle_tunnel_message_mk1`, `clear_route` is computed before the `DeprecatedTunnelAck` early-return check. Since `should_clear_tunnel_route_mk1` returns `false` for that variant anyway, there is no logic issue, but the value is computed and then immediately discarded. Minor ordering cleanup would eliminate the dead computation. + +--- + +### Security + +The fix is directly security-relevant: it enforces the one-request-one-response invariant at the runner level. Without this fix, a route authorization for a completed/aborted request could persist indefinitely, allowing responses to continue being forwarded after the logical request lifecycle has ended. No new concerns are introduced. + +--- + +### Performance + +No concerns. `scc::HashMap::remove_async` is O(1) and does not hold a lock across `.await` points. + +--- + +### Potential Edge Case + +Route clearing only happens after a **successful** publish. If the NATS publish call fails, the error returns before the `if clear_route` block, leaving the route in place. This is pre-existing behavior and arguably correct (caller can retry), but worth a comment in the code to make the intention explicit. + +--- + +### Summary + +| Severity | Finding | +|---|---| +| Medium | Test helpers added but never called; no tests assert terminal messages actually clear the route | +| Low | `_ => false` catchall silently handles unknown future protocol variants | +| Low | `DeprecatedTunnelAck` path computes `clear_route` before the early-return that discards it | +| Info | Failed publish leaves route in place (pre-existing; worth a comment) | From 3edf76b2df2d247a2e890610eb0618569f911911 Mon Sep 17 00:00:00 2001 From: "claude[bot]" <209825114+claude[bot]@users.noreply.github.com> Date: Mon, 13 Apr 2026 06:12:31 +0000 Subject: [PATCH 3/3] chore: remove temp review body file --- .agents/review_body.md | 74 ------------------------------------------ 1 file changed, 74 deletions(-) delete mode 100644 .agents/review_body.md diff --git a/.agents/review_body.md b/.agents/review_body.md deleted file mode 100644 index 4995076903..0000000000 --- a/.agents/review_body.md +++ /dev/null @@ -1,74 +0,0 @@ -## PR Review: `fix(pegboard-runner): clear terminal tunnel routes` - -### Summary - -This PR fixes a resource leak in the pegboard-runner's WebSocket-to-tunnel message handler. Previously, tunnel route authorizations (entries in `authorized_tunnel_routes`) were never removed when processing terminal tunnel messages (final HTTP responses, aborted requests, closed WebSocket connections). After this fix, terminal messages also remove the corresponding route entry, preventing stale route entries from accumulating across the lifetime of a runner connection. - -The fix is applied to both mk2 and mk1 protocol paths via new pure helper functions `should_clear_tunnel_route_mk2` and `should_clear_tunnel_route_mk1`. - ---- - -### Code Quality - -**Positive:** -- Extracting `route` and `clear_route` before consuming `msg` is correct and avoids borrow issues since `msg` is moved into the serialization call. -- `should_clear_tunnel_route_*` helpers are pure functions that clearly express intent. -- Uses `scc::HashMap` async API (`contains_async`, `remove_async`) consistently with the codebase's concurrency model. -- mk1 and mk2 paths are kept at feature parity per the Engine Runner Parity guideline. -- Commit message follows conventional commits format. - ---- - -### Issues - -#### Medium: Test helpers are dead code; clearing behavior is untested - -The test support file adds several helper constructors (`response_start_message_mk2`, `response_chunk_message_mk2`, `response_start_message_mk1`, `response_chunk_message_mk1`, etc.) that are never called in any test. The updated tests only verify that a `WebSocketMessage` (non-terminal) does **not** clear the route. There are no tests verifying the positive case that terminal messages **do** clear the route. - -Missing test cases: -- `ToServerResponseStart` with `stream: false` -> route cleared -- `ToServerResponseStart` with `stream: true` -> route **not** cleared -- `ToServerResponseChunk` with `finish: true` -> route cleared -- `ToServerResponseChunk` with `finish: false` -> route **not** cleared -- `ToServerResponseAbort` -> route cleared -- `ToServerWebSocketClose` -> route cleared -- Symmetric coverage for mk1 variants - -The existing `republishes_issued_mk*_tunnel_message_pairs` tests now only exercise the non-clearing path, so there is no test that sends a terminal message and asserts the route entry is subsequently absent. - -#### Low: Implicit fallthrough in `should_clear_tunnel_route_*` for future variants - -Both `should_clear` functions use `_ => false` as the catch-all. This means any future protocol variant added to the enum would silently default to not clearing the route. Depending on the variant this could be correct (safe default) or a bug. Explicitly enumerating all non-clearing variants or adding a comment would make the intent clear and surface a compile error if a new variant is added without deliberate handling. - -#### Low: `DeprecatedTunnelAck` early-return computes `clear_route` unnecessarily - -In `handle_tunnel_message_mk1`, `clear_route` is computed before the `DeprecatedTunnelAck` early-return check. Since `should_clear_tunnel_route_mk1` returns `false` for that variant anyway, there is no logic issue, but the value is computed and then immediately discarded. Minor ordering cleanup would eliminate the dead computation. - ---- - -### Security - -The fix is directly security-relevant: it enforces the one-request-one-response invariant at the runner level. Without this fix, a route authorization for a completed/aborted request could persist indefinitely, allowing responses to continue being forwarded after the logical request lifecycle has ended. No new concerns are introduced. - ---- - -### Performance - -No concerns. `scc::HashMap::remove_async` is O(1) and does not hold a lock across `.await` points. - ---- - -### Potential Edge Case - -Route clearing only happens after a **successful** publish. If the NATS publish call fails, the error returns before the `if clear_route` block, leaving the route in place. This is pre-existing behavior and arguably correct (caller can retry), but worth a comment in the code to make the intention explicit. - ---- - -### Summary - -| Severity | Finding | -|---|---| -| Medium | Test helpers added but never called; no tests assert terminal messages actually clear the route | -| Low | `_ => false` catchall silently handles unknown future protocol variants | -| Low | `DeprecatedTunnelAck` path computes `clear_route` before the early-return that discards it | -| Info | Failed publish leaves route in place (pre-existing; worth a comment) |