From 09fa7c1bdfddc620c94c292c40d3aea9853cbbe7 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Mon, 6 Apr 2026 16:29:11 -0700 Subject: [PATCH] fix: misc token fixes --- Cargo.lock | 1 + engine/packages/api-public/Cargo.toml | 1 + engine/packages/api-public/src/ctx.rs | 20 +++++-- engine/packages/guard/src/routing/envoy.rs | 7 ++- engine/packages/pegboard-envoy/src/conn.rs | 13 +++++ .../packages/pegboard-kv-channel/src/lib.rs | 19 +++---- engine/packages/pegboard-outbound/src/lib.rs | 57 ++++++++++--------- .../pegboard/src/ops/envoy/update_ping.rs | 4 +- .../envoy-client/src/tasks/envoy/index.ts | 19 ++++++- .../packages/sqlite-native/src/channel.rs | 2 +- 10 files changed, 90 insertions(+), 53 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 8e5a6b3311..88113125e0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4370,6 +4370,7 @@ dependencies = [ "rivet-util", "serde", "serde_json", + "subtle", "tokio", "tower-http", "tracing", diff --git a/engine/packages/api-public/Cargo.toml b/engine/packages/api-public/Cargo.toml index 1d84a529b2..22f8f20b91 100644 --- a/engine/packages/api-public/Cargo.toml +++ b/engine/packages/api-public/Cargo.toml @@ -28,6 +28,7 @@ rivet-types.workspace = true rivet-util.workspace = true serde_json.workspace = true serde.workspace = true +subtle.workspace = true tokio.workspace = true tower-http.workspace = true tracing.workspace = true diff --git a/engine/packages/api-public/src/ctx.rs b/engine/packages/api-public/src/ctx.rs index be9d6a165e..94af879edb 100644 --- a/engine/packages/api-public/src/ctx.rs +++ b/engine/packages/api-public/src/ctx.rs @@ -1,3 +1,4 @@ +use anyhow::Result; use std::{ ops::Deref, sync::{ @@ -5,8 +6,7 @@ use std::{ atomic::{AtomicBool, Ordering}, }, }; - -use anyhow::Result; +use subtle::ConstantTimeEq; #[derive(Clone)] pub struct ApiCtx { @@ -31,11 +31,19 @@ impl ApiCtx { self.authentication_handled.store(true, Ordering::Relaxed); - if self.token.as_ref() == Some(auth.admin_token.read()) { - Ok(()) - } else { - Err(rivet_api_builder::ApiForbidden.build()) + let Some(token) = &self.token else { + return Err(rivet_api_builder::ApiForbidden.build()); + }; + + if token + .as_bytes() + .ct_ne(auth.admin_token.read().as_bytes()) + .into() + { + return Err(rivet_api_builder::ApiForbidden.build()); } + + Ok(()) } pub fn skip_auth(&self) { diff --git a/engine/packages/guard/src/routing/envoy.rs b/engine/packages/guard/src/routing/envoy.rs index 5ee4cedf47..f2f6ae6ea7 100644 --- a/engine/packages/guard/src/routing/envoy.rs +++ b/engine/packages/guard/src/routing/envoy.rs @@ -2,6 +2,7 @@ use anyhow::Result; use gas::prelude::*; use rivet_guard_core::{RoutingOutput, request_context::RequestContext}; use std::sync::Arc; +use subtle::ConstantTimeEq; use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN, validate_regional_host}; @@ -81,7 +82,11 @@ async fn route_envoy_internal( }; // Validate token - if token != auth.admin_token.read() { + if token + .as_bytes() + .ct_ne(auth.admin_token.read().as_bytes()) + .into() + { return Err(rivet_api_builder::ApiForbidden.build()); } diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index 95e0fc1d7d..723fb6be30 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -228,6 +228,7 @@ pub async fn handle_init( ); tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?; + tx.delete(&old_lb_key); } // Insert into LB @@ -258,6 +259,18 @@ pub async fn handle_init( } } + // Update the pool's protocol version. This is required for serverful pools because normally + // the pool's protocol version is updated via the metadata_poller wf but that only runs for + // serverless pools. + tx.write( + &pegboard::keys::runner_config::ProtocolVersionKey::new( + namespace_id, + pool_name.clone(), + ), + protocol_version, + )?; + + // Write envoy metadata if let Some(metadata) = &init.metadata { let metadata = MetadataKeyData { metadata: diff --git a/engine/packages/pegboard-kv-channel/src/lib.rs b/engine/packages/pegboard-kv-channel/src/lib.rs index d403838f4a..6354ca2d15 100644 --- a/engine/packages/pegboard-kv-channel/src/lib.rs +++ b/engine/packages/pegboard-kv-channel/src/lib.rs @@ -94,14 +94,11 @@ impl CustomServeTrait for PegboardKvChannelCustomServe { // Parse URL params. let url = url::Url::parse(&format!("ws://placeholder{}", req_ctx.path())) .context("failed to parse WebSocket URL")?; - let params: HashMap = url - .query_pairs() - .map(|(k, v)| (k.to_string(), v.to_string())) - .collect(); // Validate protocol version. - let protocol_version: u32 = params - .get("protocol_version") + let protocol_version: u32 = url + .query_pairs() + .find_map(|(n, v)| (n == "protocol_version").then_some(v)) .context("missing protocol_version query param")? .parse() .context("invalid protocol_version")?; @@ -112,10 +109,11 @@ impl CustomServeTrait for PegboardKvChannelCustomServe { ); // Resolve namespace. - let namespace_name = params - .get("namespace") + let namespace_name = url + .query_pairs() + .find_map(|(n, v)| (n == "namespace").then_some(v)) .context("missing namespace query param")? - .clone(); + .to_string(); let namespace = ctx .op(namespace::ops::resolve_for_name_global::Input { name: namespace_name.clone(), @@ -820,9 +818,6 @@ async fn handle_kv_delete_range( /// Look up an actor by ID and return the parsed ID and actor name. /// /// Defense-in-depth: verifies the actor belongs to the authenticated namespace. -/// The admin_token is a global credential, so this is not strictly necessary -/// today, but prevents cross-namespace access if a less-privileged auth -/// mechanism is introduced in the future. async fn resolve_actor( ctx: &StandaloneCtx, actor_id: &str, diff --git a/engine/packages/pegboard-outbound/src/lib.rs b/engine/packages/pegboard-outbound/src/lib.rs index 3cf2d9ba58..d2dfb2ae6e 100644 --- a/engine/packages/pegboard-outbound/src/lib.rs +++ b/engine/packages/pegboard-outbound/src/lib.rs @@ -166,16 +166,15 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> tracing::debug!(?namespace_id, %pool_name, ?actor_id, ?generation, "received outbound request"); - // Check pool let db = ctx.udb()?; - let (pool_res, namespace_res, preloaded_kv) = tokio::try_join!( + let (namespace_res, pool_res, preloaded_kv) = tokio::try_join!( + ctx.op(namespace::ops::get_global::Input { + namespace_ids: vec![namespace_id], + }), ctx.op(pegboard::ops::runner_config::get::Input { runners: vec![(namespace_id, pool_name.clone())], bypass_cache: false, }), - ctx.op(namespace::ops::get_global::Input { - namespace_ids: vec![namespace_id], - }), pegboard::actor_kv::preload::fetch_preloaded_kv( &db, ctx.config().pegboard(), @@ -184,10 +183,6 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> &actor_config.name, ), )?; - let Some(pool) = pool_res.into_iter().next() else { - tracing::debug!("pool does not exist, ending outbound handler"); - return Ok(()); - }; let Some(namespace) = namespace_res.into_iter().next() else { tracing::error!("namespace not found, ending outbound handler"); report_error( @@ -199,20 +194,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> .await; return Ok(()); }; - - let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![ - protocol::CommandWrapper { - checkpoint, - inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { - config: actor_config, - // Empty because request ids are ephemeral. This is intercepted by guard and - // populated before it reaches the envoy - hibernating_requests: Vec::new(), - preloaded_kv, - }), - }, - ])) - .serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?; + let Some(pool) = pool_res.into_iter().next() else { + tracing::debug!("pool does not exist, ending outbound handler"); + return Ok(()); + }; let RunnerConfigKind::Serverless { url, @@ -228,6 +213,20 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> return Ok(()); }; + let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![ + protocol::CommandWrapper { + checkpoint, + inner: protocol::Command::CommandStartActor(protocol::CommandStartActor { + config: actor_config, + // Empty because request ids are ephemeral. This is intercepted by guard and + // populated before it reaches the envoy + hibernating_requests: Vec::new(), + preloaded_kv, + }), + }, + ])) + .serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?; + // Send ack to actor wf before starting an outbound req ctx.signal(pegboard::workflows::actor2::Allocated { generation }) .to_workflow::() @@ -250,6 +249,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()> &url, headers, request_lifespan, + ctx.config() + .auth + .as_ref() + .map(|a| a.admin_token.read().as_str()), ) .await; @@ -272,15 +275,13 @@ async fn serverless_outbound_req( url: &str, headers: HashMap, request_lifespan: u32, + token: Option<&str>, ) -> Result<()> { let current_dc = ctx.config().topology().current_dc()?; let mut term_signal = TermSignal::get(); - let token = if let Some(auth) = &ctx.config().auth { - Some(( - X_RIVET_TOKEN, - HeaderValue::try_from(auth.admin_token.read())?, - )) + let token = if let Some(token) = token { + Some((X_RIVET_TOKEN, HeaderValue::try_from(token)?)) } else { None }; diff --git a/engine/packages/pegboard/src/ops/envoy/update_ping.rs b/engine/packages/pegboard/src/ops/envoy/update_ping.rs index b0c2466bb8..949ad540f1 100644 --- a/engine/packages/pegboard/src/ops/envoy/update_ping.rs +++ b/engine/packages/pegboard/src/ops/envoy/update_ping.rs @@ -68,10 +68,8 @@ pub async fn pegboard_envoy_update_ping(ctx: &OperationCtx, input: &Input) -> Re input.envoy_key.clone(), ); - // Add read conflict - tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?; - // Clear old key + tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?; tx.delete(&old_lb_key); tx.write( diff --git a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts index efd0a19087..b11e18f94a 100644 --- a/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts +++ b/engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts @@ -33,6 +33,7 @@ import { BufferMap, EnvoyShutdownError } from "@/utils.js"; import { stringifyToEnvoy } from "@/stringify.js"; let GLOBAL_ENVOY: EnvoyHandle | undefined = undefined; +let GLOBAL_SHARED_CTX: SharedContext | undefined = undefined; export interface EnvoyContext { shared: SharedContext; @@ -96,7 +97,14 @@ export async function startEnvoy(config: EnvoyConfig): Promise { // Must manually wait for envoy to start. export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { - if (!config.notGlobal && GLOBAL_ENVOY) return GLOBAL_ENVOY; + if (!config.notGlobal && GLOBAL_ENVOY && GLOBAL_SHARED_CTX) { + // Copy the token when called multiple times. This is done for serverless envoys where the token + // normally expires around the same time as the /start request expires. The envoy persists longer + // than the /start request so it needs an up to date token. + GLOBAL_SHARED_CTX.config.token = config.token; + + return GLOBAL_ENVOY; + } const [envoyTx, envoyRx] = unboundedChannel(); const [startTx, startRx] = watch(void 0); @@ -110,6 +118,8 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { handle: null as any, }; + if (!config.notGlobal) GLOBAL_SHARED_CTX = shared; + startConnection(shared); const ctx: EnvoyContext = { @@ -126,7 +136,7 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { const handle = createHandle(ctx, startRx); shared.handle = handle; - GLOBAL_ENVOY = handle; + if (!config.notGlobal) GLOBAL_ENVOY = handle; // Register signal handlers const onSignal = () => { @@ -191,6 +201,11 @@ export function startEnvoySync(config: EnvoyConfig): EnvoyHandle { msg: "envoy stopped", }); + if (!ctx.shared.config.notGlobal) { + GLOBAL_ENVOY = undefined; + GLOBAL_SHARED_CTX = undefined; + } + ctx.shared.config.onShutdown(); }); diff --git a/rivetkit-typescript/packages/sqlite-native/src/channel.rs b/rivetkit-typescript/packages/sqlite-native/src/channel.rs index 8848a693a3..b15786510e 100644 --- a/rivetkit-typescript/packages/sqlite-native/src/channel.rs +++ b/rivetkit-typescript/packages/sqlite-native/src/channel.rs @@ -170,7 +170,7 @@ impl std::error::Error for ChannelError {} pub struct KvChannelConfig { /// Base WebSocket endpoint URL (e.g., "ws://localhost:6420"). pub url: String, - /// Authentication token. Engine uses admin_token, manager uses config.token. + /// Authentication token. pub token: Option, /// Namespace for actor scoping. pub namespace: String,