Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions engine/packages/api-public/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ rivet-types.workspace = true
rivet-util.workspace = true
serde_json.workspace = true
serde.workspace = true
subtle.workspace = true
tokio.workspace = true
tower-http.workspace = true
tracing.workspace = true
Expand Down
20 changes: 14 additions & 6 deletions engine/packages/api-public/src/ctx.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
use anyhow::Result;
use std::{
ops::Deref,
sync::{
Arc,
atomic::{AtomicBool, Ordering},
},
};

use anyhow::Result;
use subtle::ConstantTimeEq;

#[derive(Clone)]
pub struct ApiCtx {
Expand All @@ -31,11 +31,19 @@ impl ApiCtx {

self.authentication_handled.store(true, Ordering::Relaxed);

if self.token.as_ref() == Some(auth.admin_token.read()) {
Ok(())
} else {
Err(rivet_api_builder::ApiForbidden.build())
let Some(token) = &self.token else {
return Err(rivet_api_builder::ApiForbidden.build());
};

if token
.as_bytes()
.ct_ne(auth.admin_token.read().as_bytes())
.into()
{
return Err(rivet_api_builder::ApiForbidden.build());
}

Ok(())
}

pub fn skip_auth(&self) {
Expand Down
7 changes: 6 additions & 1 deletion engine/packages/guard/src/routing/envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use anyhow::Result;
use gas::prelude::*;
use rivet_guard_core::{RoutingOutput, request_context::RequestContext};
use std::sync::Arc;
use subtle::ConstantTimeEq;

use super::{SEC_WEBSOCKET_PROTOCOL, WS_PROTOCOL_TOKEN, X_RIVET_TOKEN, validate_regional_host};

Expand Down Expand Up @@ -81,7 +82,11 @@ async fn route_envoy_internal(
};

// Validate token
if token != auth.admin_token.read() {
if token
.as_bytes()
.ct_ne(auth.admin_token.read().as_bytes())
.into()
{
return Err(rivet_api_builder::ApiForbidden.build());
}

Expand Down
13 changes: 13 additions & 0 deletions engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,7 @@ pub async fn handle_init(
);

tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
tx.delete(&old_lb_key);
}

// Insert into LB
Expand Down Expand Up @@ -258,6 +259,18 @@ pub async fn handle_init(
}
}

// Update the pool's protocol version. This is required for serverful pools because normally
// the pool's protocol version is updated via the metadata_poller wf but that only runs for
// serverless pools.
tx.write(
&pegboard::keys::runner_config::ProtocolVersionKey::new(
namespace_id,
pool_name.clone(),
),
protocol_version,
)?;

// Write envoy metadata
if let Some(metadata) = &init.metadata {
let metadata = MetadataKeyData {
metadata:
Expand Down
19 changes: 7 additions & 12 deletions engine/packages/pegboard-kv-channel/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -94,14 +94,11 @@ impl CustomServeTrait for PegboardKvChannelCustomServe {
// Parse URL params.
let url = url::Url::parse(&format!("ws://placeholder{}", req_ctx.path()))
.context("failed to parse WebSocket URL")?;
let params: HashMap<String, String> = url
.query_pairs()
.map(|(k, v)| (k.to_string(), v.to_string()))
.collect();

// Validate protocol version.
let protocol_version: u32 = params
.get("protocol_version")
let protocol_version: u32 = url
.query_pairs()
.find_map(|(n, v)| (n == "protocol_version").then_some(v))
.context("missing protocol_version query param")?
.parse()
.context("invalid protocol_version")?;
Expand All @@ -112,10 +109,11 @@ impl CustomServeTrait for PegboardKvChannelCustomServe {
);

// Resolve namespace.
let namespace_name = params
.get("namespace")
let namespace_name = url
.query_pairs()
.find_map(|(n, v)| (n == "namespace").then_some(v))
.context("missing namespace query param")?
.clone();
.to_string();
let namespace = ctx
.op(namespace::ops::resolve_for_name_global::Input {
name: namespace_name.clone(),
Expand Down Expand Up @@ -820,9 +818,6 @@ async fn handle_kv_delete_range(
/// Look up an actor by ID and return the parsed ID and actor name.
///
/// Defense-in-depth: verifies the actor belongs to the authenticated namespace.
/// The admin_token is a global credential, so this is not strictly necessary
/// today, but prevents cross-namespace access if a less-privileged auth
/// mechanism is introduced in the future.
async fn resolve_actor(
ctx: &StandaloneCtx,
actor_id: &str,
Expand Down
57 changes: 29 additions & 28 deletions engine/packages/pegboard-outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,16 +166,15 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>

tracing::debug!(?namespace_id, %pool_name, ?actor_id, ?generation, "received outbound request");

// Check pool
let db = ctx.udb()?;
let (pool_res, namespace_res, preloaded_kv) = tokio::try_join!(
let (namespace_res, pool_res, preloaded_kv) = tokio::try_join!(
ctx.op(namespace::ops::get_global::Input {
namespace_ids: vec![namespace_id],
}),
ctx.op(pegboard::ops::runner_config::get::Input {
runners: vec![(namespace_id, pool_name.clone())],
bypass_cache: false,
}),
ctx.op(namespace::ops::get_global::Input {
namespace_ids: vec![namespace_id],
}),
pegboard::actor_kv::preload::fetch_preloaded_kv(
&db,
ctx.config().pegboard(),
Expand All @@ -184,10 +183,6 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
&actor_config.name,
),
)?;
let Some(pool) = pool_res.into_iter().next() else {
tracing::debug!("pool does not exist, ending outbound handler");
return Ok(());
};
let Some(namespace) = namespace_res.into_iter().next() else {
tracing::error!("namespace not found, ending outbound handler");
report_error(
Expand All @@ -199,20 +194,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
.await;
return Ok(());
};

let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![
protocol::CommandWrapper {
checkpoint,
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
config: actor_config,
// Empty because request ids are ephemeral. This is intercepted by guard and
// populated before it reaches the envoy
hibernating_requests: Vec::new(),
preloaded_kv,
}),
},
]))
.serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?;
let Some(pool) = pool_res.into_iter().next() else {
tracing::debug!("pool does not exist, ending outbound handler");
return Ok(());
};

let RunnerConfigKind::Serverless {
url,
Expand All @@ -228,6 +213,20 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
return Ok(());
};

let payload = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(vec![
protocol::CommandWrapper {
checkpoint,
inner: protocol::Command::CommandStartActor(protocol::CommandStartActor {
config: actor_config,
// Empty because request ids are ephemeral. This is intercepted by guard and
// populated before it reaches the envoy
hibernating_requests: Vec::new(),
preloaded_kv,
}),
},
]))
.serialize_with_embedded_version(pool.protocol_version.unwrap_or(PROTOCOL_VERSION))?;

// Send ack to actor wf before starting an outbound req
ctx.signal(pegboard::workflows::actor2::Allocated { generation })
.to_workflow::<pegboard::workflows::actor2::Workflow>()
Expand All @@ -250,6 +249,10 @@ async fn handle(ctx: &StandaloneCtx, packet: protocol::ToOutbound) -> Result<()>
&url,
headers,
request_lifespan,
ctx.config()
.auth
.as_ref()
.map(|a| a.admin_token.read().as_str()),
)
.await;

Expand All @@ -272,15 +275,13 @@ async fn serverless_outbound_req(
url: &str,
headers: HashMap<String, String>,
request_lifespan: u32,
token: Option<&str>,
) -> Result<()> {
let current_dc = ctx.config().topology().current_dc()?;
let mut term_signal = TermSignal::get();

let token = if let Some(auth) = &ctx.config().auth {
Some((
X_RIVET_TOKEN,
HeaderValue::try_from(auth.admin_token.read())?,
))
let token = if let Some(token) = token {
Some((X_RIVET_TOKEN, HeaderValue::try_from(token)?))
} else {
None
};
Expand Down
4 changes: 1 addition & 3 deletions engine/packages/pegboard/src/ops/envoy/update_ping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,10 +68,8 @@ pub async fn pegboard_envoy_update_ping(ctx: &OperationCtx, input: &Input) -> Re
input.envoy_key.clone(),
);

// Add read conflict
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;

// Clear old key
tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?;
tx.delete(&old_lb_key);

tx.write(
Expand Down
19 changes: 17 additions & 2 deletions engine/sdks/typescript/envoy-client/src/tasks/envoy/index.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rivetkit-typescript/packages/sqlite-native/src/channel.rs
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ impl std::error::Error for ChannelError {}
pub struct KvChannelConfig {
/// Base WebSocket endpoint URL (e.g., "ws://localhost:6420").
pub url: String,
/// Authentication token. Engine uses admin_token, manager uses config.token.
/// Authentication token.
pub token: Option<String>,
/// Namespace for actor scoping.
pub namespace: String,
Expand Down
Loading