From cfda19bfd81837e166a9ee975c4689599139adc7 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Thu, 9 Apr 2026 19:40:07 -0700 Subject: [PATCH] fix(pb-envoy): reduce round trips for ws connection --- Cargo.toml | 8 +- engine/artifacts/config-schema.json | 2 +- engine/artifacts/errors/ws.timed_out.json | 5 + .../src/namespaces/runner_configs.rs | 5 + engine/packages/config/src/config/pegboard.rs | 2 + engine/packages/pegboard-envoy/src/conn.rs | 246 +++++++----------- engine/packages/pegboard-envoy/src/errors.rs | 9 +- .../packages/pegboard-envoy/src/ping_task.rs | 10 +- engine/packages/pegboard-envoy/src/utils.rs | 9 + .../pegboard-envoy/src/ws_to_tunnel_task.rs | 104 ++++++-- engine/packages/types/src/runner_configs.rs | 5 + .../src/versioned/namespace_runner_config.rs | 2 + .../sdks/rust/envoy-client/src/connection.rs | 62 ++--- .../sdks/rust/envoy-client/src/stringify.rs | 7 +- .../data/namespace.runner_config.v5.bare | 17 +- engine/sdks/schemas/envoy-protocol/v1.bare | 6 +- .../typescript/envoy-protocol/src/index.ts | 20 +- examples/cursors/package.json | 2 +- pnpm-lock.yaml | 20 +- 19 files changed, 276 insertions(+), 265 deletions(-) create mode 100644 engine/artifacts/errors/ws.timed_out.json diff --git a/Cargo.toml b/Cargo.toml index 54ffd8b44f..feed1aeaf1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -144,6 +144,10 @@ members = [ vergen-gitcl = "1.0.0" reqwest-eventsource = "0.6.0" + [workspace.dependencies.tokio-tungstenite] + version = "0.26.0" + features = [ "rustls-tls-native-roots" ] + [workspace.dependencies.vergen] version = "9.0.4" features = [ "build", "cargo", "rustc" ] @@ -176,10 +180,6 @@ members = [ [workspace.dependencies.pest] version = "2.7" - [workspace.dependencies.tokio-tungstenite] - version = "0.26.0" - features = ["rustls-tls-native-roots"] - [workspace.dependencies.rocksdb] version = "0.24" features = [ "multi-threaded-cf" ] diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index ef248614ec..5f0f105378 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -869,7 +869,7 @@ "minimum": 0.0 }, "serverless_drain_grace_period": { - "description": "Drain grace period for serverless runners.\n\nThis time is subtracted from the configured request duration. Once `duration - grace` is reached, the runner is sent stop commands for all of its actors. After the grace period is over (i.e. the full duration is reached) the runner websocket is forcibly closed.\n\nUnit is in milliseconds.", + "description": "**Deprecated** Configure the drain period in the runner config.\n\nDrain grace period for serverless runners.\n\nThis time is subtracted from the configured request duration. Once `duration - grace` is reached, the runner is sent stop commands for all of its actors. After the grace period is over (i.e. the full duration is reached) the runner websocket is forcibly closed.\n\nUnit is in milliseconds.", "type": [ "integer", "null" diff --git a/engine/artifacts/errors/ws.timed_out.json b/engine/artifacts/errors/ws.timed_out.json new file mode 100644 index 0000000000..748f67ea6a --- /dev/null +++ b/engine/artifacts/errors/ws.timed_out.json @@ -0,0 +1,5 @@ +{ + "code": "timed_out", + "group": "ws", + "message": "Ping timed out." +} \ No newline at end of file diff --git a/engine/packages/api-types/src/namespaces/runner_configs.rs b/engine/packages/api-types/src/namespaces/runner_configs.rs index e862064e68..8016002a11 100644 --- a/engine/packages/api-types/src/namespaces/runner_configs.rs +++ b/engine/packages/api-types/src/namespaces/runner_configs.rs @@ -23,6 +23,8 @@ pub enum RunnerConfigKind { /// Seconds. request_lifespan: u32, max_concurrent_actors: Option, + /// Seconds. + drain_grace_period: Option, slots_per_runner: u32, min_runners: Option, max_runners: u32, @@ -50,6 +52,7 @@ impl Into for RunnerConfig { headers, request_lifespan, max_concurrent_actors, + drain_grace_period, slots_per_runner, min_runners, max_runners, @@ -60,6 +63,8 @@ impl Into for RunnerConfig { headers: headers.unwrap_or_default(), request_lifespan, max_concurrent_actors: max_concurrent_actors.unwrap_or(max_runners as u64), + // Default to deprecated config value (config.pegboard.serverless_drain_grace_period) + drain_grace_period: drain_grace_period.unwrap_or(10_000), slots_per_runner, min_runners: min_runners.unwrap_or_default(), max_runners, diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index 69a0a7e630..4a841427df 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -138,6 +138,8 @@ pub struct Pegboard { pub envoy_eligible_threshold: Option, // === Serverless Settings === + /// **Deprecated** Configure the drain period in the runner config. + /// /// Drain grace period for serverless runners. /// /// This time is subtracted from the configured request duration. Once `duration - grace` is reached, the diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index c1b4bef5c9..20f2c46257 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -3,7 +3,7 @@ use std::{ Arc, atomic::{AtomicI64, AtomicU32}, }, - time::{Duration, Instant}, + time::Instant, }; use anyhow::Context; @@ -11,7 +11,6 @@ use futures_util::StreamExt; use futures_util::TryStreamExt; use gas::prelude::*; use hyper_tungstenite::tungstenite::Message; -use rivet_data::converted::{ActorNameKeyData, MetadataKeyData}; use rivet_envoy_protocol::{self as protocol, versioned}; use rivet_guard_core::WebSocketHandle; use rivet_types::runner_configs::RunnerConfigKind; @@ -19,7 +18,7 @@ use scc::HashMap; use universaldb::prelude::*; use vbare::OwnedVersionedData; -use crate::{errors::WsError, metrics, utils::UrlData}; +use crate::{metrics, utils::UrlData}; pub struct Conn { pub namespace_id: Id, @@ -43,6 +42,7 @@ pub async fn init_conn( namespace, pool_name, envoy_key, + version, }: UrlData, ) -> Result> { let start = Instant::now(); @@ -56,36 +56,6 @@ pub async fn init_conn( tracing::debug!(namespace_id=?namespace.namespace_id, "new envoy connection"); - let ws_rx = ws_handle.recv(); - let mut ws_rx = ws_rx.lock().await; - - // Receive init packet - let Ok(msg) = tokio::time::timeout(Duration::from_secs(5), ws_rx.next()).await else { - return Err(WsError::TimedOutWaitingForInit.build()); - }; - - let Some(msg) = msg else { - return Err(WsError::ConnectionClosed.build()); - }; - - let buf = match msg? { - Message::Binary(buf) => buf, - Message::Close(_) => return Err(WsError::ConnectionClosed.build()), - msg => { - tracing::debug!(?msg, "invalid initial message"); - return Err(WsError::InvalidInitialPacket("must be a binary blob").build()); - } - }; - - let init = versioned::ToRivet::deserialize(&buf, protocol_version) - .map_err(|err| WsError::InvalidPacket(err.to_string()).build()) - .context("failed to deserialize initial packet from client")?; - - let protocol::ToRivet::ToRivetInit(init) = init else { - tracing::debug!(?init, "invalid initial packet"); - return Err(WsError::InvalidInitialPacket("must be `ToRivet::Init`").build()); - }; - metrics::CONNECTION_TOTAL .with_label_values(&[ namespace.namespace_id.to_string().as_str(), @@ -97,54 +67,60 @@ pub async fn init_conn( .with_label_values(&[namespace.namespace_id.to_string().as_str(), &pool_name]) .observe(start.elapsed().as_secs_f64()); - let mut conn = Conn { - namespace_id: namespace.namespace_id, - pool_name, - envoy_key, - protocol_version, - ws_handle, - authorized_tunnel_routes: HashMap::new(), - is_serverless: false, - last_rtt: AtomicU32::new(0), - last_ping_ts: AtomicI64::new(util::timestamp::now()), - }; - - handle_init(ctx, &mut conn, init).await?; - - if conn.is_serverless { - report_success(ctx, namespace.namespace_id, &conn.pool_name).await; - } - - Ok(Arc::new(conn)) -} -#[tracing::instrument(skip_all)] -pub async fn handle_init( - ctx: &StandaloneCtx, - conn: &mut Conn, - init: protocol::ToRivetInit, -) -> Result<()> { let udb = ctx.udb()?; - let namespace_id = conn.namespace_id; - let envoy_key = &conn.envoy_key; - let pool_name = &conn.pool_name; - let protocol_version = conn.protocol_version; - let (pool_res, mut missed_commands) = tokio::try_join!( - ctx.op(pegboard::ops::runner_config::get::Input { - runners: vec![(namespace_id, pool_name.clone())], - bypass_cache: false, - }), - // TODO: Move to op + let (is_serverless, mut missed_commands) = tokio::try_join!( + // Send init packet as soon as possible + async { + let pool_res = ctx + .op(pegboard::ops::runner_config::get::Input { + runners: vec![(namespace.namespace_id, pool_name.clone())], + bypass_cache: false, + }) + .await?; + + let serverless_drain_grace_period = pool_res.first().and_then(|c| { + if let RunnerConfigKind::Serverless { + drain_grace_period, .. + } = &c.config.kind + { + Some(*drain_grace_period as i64) + } else { + None + } + }); + let pb = ctx.config().pegboard(); + + // Send init packet + let init_msg = versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyInit( + protocol::ToEnvoyInit { + metadata: protocol::ProtocolMetadata { + envoy_lost_threshold: pb.envoy_lost_threshold(), + actor_stop_threshold: pb.actor_stop_threshold(), + serverless_drain_grace_period, + max_response_payload_size: pb.envoy_max_response_payload_size() as u64, + }, + }, + )); + let init_msg_serialized = init_msg.serialize(protocol_version)?; + ws_handle + .send(Message::Binary(init_msg_serialized.into())) + .await?; + + anyhow::Ok(serverless_drain_grace_period.is_some()) + }, udb.run(|tx| { - let init = init.clone(); + let namespace_id = namespace.namespace_id; + let envoy_key = &envoy_key; + let pool_name = &pool_name; async move { let tx = tx.with_subspace(pegboard::keys::subspace()); let create_ts_key = - pegboard::keys::envoy::CreateTsKey::new(namespace_id, envoy_key.clone()); + pegboard::keys::envoy::CreateTsKey::new(namespace_id, envoy_key.to_string()); let last_ping_ts_key = - pegboard::keys::envoy::LastPingTsKey::new(namespace_id, envoy_key.clone()); + pegboard::keys::envoy::LastPingTsKey::new(namespace_id, envoy_key.to_string()); let version_key = - pegboard::keys::envoy::VersionKey::new(namespace_id, envoy_key.clone()); + pegboard::keys::envoy::VersionKey::new(namespace_id, envoy_key.to_string()); // Read existing data let (create_ts_entry, old_last_ping_ts_entry, version_entry) = tokio::try_join!( @@ -155,15 +131,15 @@ pub async fn handle_init( // Write init data tx.write( - &pegboard::keys::envoy::PoolNameKey::new(namespace_id, envoy_key.clone()), - pool_name.clone(), + &pegboard::keys::envoy::PoolNameKey::new(namespace_id, envoy_key.to_string()), + pool_name.to_string(), )?; tx.write( - &pegboard::keys::envoy::VersionKey::new(namespace_id, envoy_key.clone()), - init.version, + &pegboard::keys::envoy::VersionKey::new(namespace_id, envoy_key.to_string()), + version, )?; tx.atomic_op( - &pegboard::keys::envoy::SlotsKey::new(namespace_id, envoy_key.clone()), + &pegboard::keys::envoy::SlotsKey::new(namespace_id, envoy_key.to_string()), &0i64.to_le_bytes(), MutationType::Add, ); @@ -176,13 +152,13 @@ pub async fn handle_init( create_ts }; tx.write( - &pegboard::keys::envoy::LastPingTsKey::new(namespace_id, envoy_key.clone()), + &pegboard::keys::envoy::LastPingTsKey::new(namespace_id, envoy_key.to_string()), util::timestamp::now(), )?; tx.write( &pegboard::keys::envoy::ProtocolVersionKey::new( namespace_id, - envoy_key.clone(), + envoy_key.to_string(), ), protocol_version, )?; @@ -195,16 +171,16 @@ pub async fn handle_init( &pegboard::keys::ns::ActiveEnvoyKey::new( namespace_id, create_ts, - envoy_key.clone(), + envoy_key.to_string(), ), (), )?; tx.write( &pegboard::keys::ns::ActiveEnvoyByNameKey::new( namespace_id, - pool_name.clone(), + pool_name.to_string(), create_ts, - envoy_key.clone(), + envoy_key.to_string(), ), (), )?; @@ -213,7 +189,7 @@ pub async fn handle_init( if create_ts_entry.is_some() { tx.delete(&pegboard::keys::envoy::ExpiredTsKey::new( namespace_id, - envoy_key.clone(), + envoy_key.to_string(), )); } @@ -223,10 +199,10 @@ pub async fn handle_init( { let old_lb_key = pegboard::keys::ns::EnvoyLoadBalancerIdxKey::new( namespace_id, - pool_name.clone(), + pool_name.to_string(), version, old_last_ping_ts, - envoy_key.clone(), + envoy_key.to_string(), ); tx.add_conflict_key(&old_lb_key, ConflictRangeType::Read)?; @@ -237,30 +213,14 @@ pub async fn handle_init( tx.write( &pegboard::keys::ns::EnvoyLoadBalancerIdxKey::new( namespace_id, - pool_name.clone(), - init.version, + pool_name.to_string(), + version, last_ping_ts, - envoy_key.clone(), + envoy_key.to_string(), ), (), )?; - // Populate actor names if provided - if let Some(actor_names) = &init.prepopulate_actor_names { - // Write each actor name into the namespace actor names list - for (name, data) in actor_names { - let metadata = serde_json::from_str::< - serde_json::Map, - >(&data.metadata) - .unwrap_or_default(); - - tx.write( - &pegboard::keys::ns::ActorNameKey::new(namespace_id, name.clone()), - ActorNameKeyData { metadata }, - )?; - } - } - // Update the pool's protocol version. This is required for serverful pools because normally // the pool's protocol version is updated via the metadata_poller wf but that only runs for // serverless pools. @@ -273,34 +233,10 @@ pub async fn handle_init( protocol_version, )?; - // Write envoy metadata - if let Some(metadata) = &init.metadata { - let metadata = MetadataKeyData { - metadata: - serde_json::from_str::>( - &metadata, - ) - .unwrap_or_default(), - }; - - let metadata_key = - pegboard::keys::envoy::MetadataKey::new(namespace_id, envoy_key.clone()); - - // Clear old metadata - tx.delete_key_subspace(&metadata_key); - - // Write metadata - for (i, chunk) in metadata_key.split(metadata)?.into_iter().enumerate() { - let chunk_key = metadata_key.chunk(i); - - tx.set(&tx.pack(&chunk_key), &chunk); - } - } - let envoy_actor_commands_subspace = pegboard::keys::subspace().subspace( &pegboard::keys::envoy::ActorCommandKey::subspace( namespace_id, - envoy_key.clone(), + envoy_key.to_string(), ), ); @@ -342,32 +278,10 @@ pub async fn handle_init( .await } }) - .custom_instrument(tracing::info_span!("envoy_process_init_tx")), + .custom_instrument(tracing::info_span!("envoy_init_tx")), )?; - conn.is_serverless = pool_res.first().map_or(false, |c| { - matches!(c.config.kind, RunnerConfigKind::Serverless { .. }) - }); - let pb = ctx.config().pegboard(); - - // Send init packet - let init_msg = - versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyInit(protocol::ToEnvoyInit { - metadata: protocol::ProtocolMetadata { - envoy_lost_threshold: pb.envoy_lost_threshold(), - actor_stop_threshold: pb.actor_stop_threshold(), - serverless_drain_grace_period: conn - .is_serverless - .then(|| pb.serverless_drain_grace_period() as i64), - max_response_payload_size: pb.envoy_max_response_payload_size() as u64, - }, - })); - let init_msg_serialized = init_msg.serialize(conn.protocol_version)?; - conn.ws_handle - .send(Message::Binary(init_msg_serialized.into())) - .await?; - - // Send missed commands + // Send missed commands (must be after init packet) if !missed_commands.is_empty() { let db = ctx.udb()?; let msg = { @@ -380,9 +294,9 @@ pub async fn handle_init( .context("failed to parse actor_id from missed envoy command")?; let preloaded = pegboard::actor_kv::preload::fetch_preloaded_kv( &db, - pb, + ctx.config().pegboard(), actor_id, - conn.namespace_id, + namespace.namespace_id, &start.config.name, ) .await?; @@ -392,13 +306,27 @@ pub async fn handle_init( versioned::ToEnvoy::wrap_latest(protocol::ToEnvoy::ToEnvoyCommands(missed_commands)) }; - let msg_serialized = msg.serialize(conn.protocol_version)?; - conn.ws_handle + let msg_serialized = msg.serialize(protocol_version)?; + ws_handle .send(Message::Binary(msg_serialized.into())) .await?; } - Ok(()) + if is_serverless { + report_success(ctx, namespace.namespace_id, &pool_name).await; + } + + Ok(Arc::new(Conn { + namespace_id: namespace.namespace_id, + pool_name, + envoy_key, + protocol_version, + ws_handle, + authorized_tunnel_routes: HashMap::new(), + is_serverless: false, + last_rtt: AtomicU32::new(0), + last_ping_ts: AtomicI64::new(util::timestamp::now()), + })) } /// Report success to the error tracker workflow. diff --git a/engine/packages/pegboard-envoy/src/errors.rs b/engine/packages/pegboard-envoy/src/errors.rs index 45c5d39144..8e45ece6b1 100644 --- a/engine/packages/pegboard-envoy/src/errors.rs +++ b/engine/packages/pegboard-envoy/src/errors.rs @@ -4,8 +4,6 @@ use serde::Serialize; #[derive(RivetError, Debug)] #[error("ws")] pub enum WsError { - #[error("connection_closed", "Normal connection close.")] - ConnectionClosed, #[error( "eviction", "The websocket has been evicted and should not attempt to reconnect." @@ -16,11 +14,8 @@ pub enum WsError { "The Rivet Engine is migrating. The websocket should attempt to reconnect as soon as possible." )] GoingAway, - #[error( - "timed_out_waiting_for_init", - "Timed out waiting for the init packet to be sent." - )] - TimedOutWaitingForInit, + #[error("timed_out", "Ping timed out.")] + TimedOut, #[error( "invalid_initial_packet", "The websocket could not process the initial packet.", diff --git a/engine/packages/pegboard-envoy/src/ping_task.rs b/engine/packages/pegboard-envoy/src/ping_task.rs index efb3723a42..281962aa36 100644 --- a/engine/packages/pegboard-envoy/src/ping_task.rs +++ b/engine/packages/pegboard-envoy/src/ping_task.rs @@ -1,4 +1,3 @@ -use anyhow::ensure; use gas::prelude::*; use hyper_tungstenite::tungstenite::Message; use rand::Rng; @@ -8,7 +7,7 @@ use std::time::Duration; use tokio::sync::watch; use vbare::OwnedVersionedData; -use crate::{LifecycleResult, conn::Conn}; +use crate::{LifecycleResult, conn::Conn, errors::WsError}; #[tracing::instrument(name="ping_task", skip_all, fields(ray_id=?ctx.ray_id(), req_id=?ctx.req_id(), envoy_key=%conn.envoy_key, protocol_version=%conn.protocol_version))] pub async fn task( @@ -33,10 +32,9 @@ pub async fn task( // Check if the last ping is past the timeout threshold let last_ping_ts = conn.last_ping_ts.load(Ordering::SeqCst); let now = util::timestamp::now(); - ensure!( - now - last_ping_ts <= ping_timeout_ms, - "envoy ws ping timed out" - ); + if now - last_ping_ts > ping_timeout_ms { + return Err(WsError::TimedOut.build()); + } // Update ping ctx.op(pegboard::ops::envoy::update_ping::Input { diff --git a/engine/packages/pegboard-envoy/src/utils.rs b/engine/packages/pegboard-envoy/src/utils.rs index b356865f60..51c9175d09 100644 --- a/engine/packages/pegboard-envoy/src/utils.rs +++ b/engine/packages/pegboard-envoy/src/utils.rs @@ -8,6 +8,7 @@ pub struct UrlData { pub namespace: String, pub pool_name: String, pub envoy_key: String, + pub version: u32, } impl UrlData { @@ -58,11 +59,19 @@ impl UrlData { ); } + let version = url + .query_pairs() + .find_map(|(n, v)| (n == "version").then_some(v)) + .context("missing `version` query parameter")? + .parse::() + .context("invalid `version` query parameter")?; + Ok(UrlData { protocol_version, namespace, pool_name, envoy_key, + version, }) } } diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index b7c12a8194..29f4cba449 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -6,11 +6,13 @@ use gas::prelude::*; use hyper_tungstenite::tungstenite::Message; use pegboard::actor_kv; use pegboard::pubsub_subjects::GatewayReceiverSubject; +use rivet_data::converted::{ActorNameKeyData, MetadataKeyData}; use rivet_envoy_protocol::{self as protocol, PROTOCOL_VERSION, versioned}; use rivet_guard_core::websocket_handle::WebSocketReceiver; use scc::HashMap; use std::sync::{Arc, atomic::Ordering}; use tokio::sync::{Mutex, MutexGuard, watch}; +use universaldb::prelude::*; use universaldb::utils::end_of_key_range; use universalpubsub::PublishOpts; use vbare::OwnedVersionedData; @@ -371,9 +373,8 @@ async fn handle_message( .await .context("failed to handle tunnel message")?; } - // NOTE: Init event is processed in `conn::init_conn` - protocol::ToRivet::ToRivetInit(_) => { - tracing::debug!("received additional init packet, ignoring"); + protocol::ToRivet::ToRivetMetadata(metadata) => { + handle_metadata(&ctx, conn.namespace_id, &conn.envoy_key, metadata).await?; } // Forward to demuxer which forwards to actor wf protocol::ToRivet::ToRivetEvents(events) => { @@ -412,33 +413,90 @@ async fn ack_commands( envoy_key: &str, ack: protocol::ToRivetAckCommands, ) -> Result<()> { + let ack = &ack; + ctx.udb()? + .run(|tx| async move { + let tx = tx.with_subspace(pegboard::keys::subspace()); + + for checkpoint in &ack.last_command_checkpoints { + let start = tx.pack( + &pegboard::keys::envoy::ActorCommandKey::subspace_with_actor( + namespace_id, + envoy_key.to_string(), + Id::parse(&checkpoint.actor_id)?, + checkpoint.generation, + ), + ); + let end = end_of_key_range(&tx.pack( + &pegboard::keys::envoy::ActorCommandKey::subspace_with_index( + namespace_id, + envoy_key.to_string(), + Id::parse(&checkpoint.actor_id)?, + checkpoint.generation, + checkpoint.index, + ), + )); + tx.clear_range(&start, &end); + } + + Ok(()) + }) + .await +} + +async fn handle_metadata( + ctx: &StandaloneCtx, + namespace_id: Id, + envoy_key: &str, + metadata: protocol::ToRivetMetadata, +) -> Result<()> { + let metadata = &metadata; ctx.udb()? .run(|tx| { - let ack = ack.clone(); async move { let tx = tx.with_subspace(pegboard::keys::subspace()); - for checkpoint in &ack.last_command_checkpoints { - let start = tx.pack( - &pegboard::keys::envoy::ActorCommandKey::subspace_with_actor( - namespace_id, - envoy_key.to_string(), - Id::parse(&checkpoint.actor_id)?, - checkpoint.generation, - ), - ); - let end = end_of_key_range(&tx.pack( - &pegboard::keys::envoy::ActorCommandKey::subspace_with_index( - namespace_id, - envoy_key.to_string(), - Id::parse(&checkpoint.actor_id)?, - checkpoint.generation, - checkpoint.index, - ), - )); - tx.clear_range(&start, &end); + // Populate actor names if provided + if let Some(actor_names) = &metadata.prepopulate_actor_names { + // Write each actor name into the namespace actor names list + for (name, data) in actor_names { + let metadata = serde_json::from_str::< + serde_json::Map, + >(&data.metadata) + .unwrap_or_default(); + + tx.write( + &pegboard::keys::ns::ActorNameKey::new(namespace_id, name.clone()), + ActorNameKeyData { metadata }, + )?; + } } + // Write envoy metadata + if let Some(metadata) = &metadata.metadata { + let metadata = MetadataKeyData { + metadata: + serde_json::from_str::>( + &metadata, + ) + .unwrap_or_default(), + }; + + let metadata_key = pegboard::keys::envoy::MetadataKey::new( + namespace_id, + envoy_key.to_string(), + ); + + // Clear old metadata + tx.delete_key_subspace(&metadata_key); + + // Write metadata + for (i, chunk) in metadata_key.split(metadata)?.into_iter().enumerate() { + let chunk_key = metadata_key.chunk(i); + + tx.set(&tx.pack(&chunk_key), &chunk); + } + } Ok(()) } }) diff --git a/engine/packages/types/src/runner_configs.rs b/engine/packages/types/src/runner_configs.rs index 7011173b72..91e3af6c3b 100644 --- a/engine/packages/types/src/runner_configs.rs +++ b/engine/packages/types/src/runner_configs.rs @@ -23,6 +23,8 @@ pub enum RunnerConfigKind { /// Seconds. request_lifespan: u32, max_concurrent_actors: u64, + /// Seconds. + drain_grace_period: u32, /// Deprecated. slots_per_runner: u32, /// Deprecated. @@ -60,6 +62,7 @@ impl From for rivet_data::generated::namespace_runner_config_v5::R headers, request_lifespan, max_concurrent_actors, + drain_grace_period, slots_per_runner, min_runners, max_runners, @@ -72,6 +75,7 @@ impl From for rivet_data::generated::namespace_runner_config_v5::R headers: headers.into(), request_lifespan, max_concurrent_actors, + drain_grace_period, slots_per_runner, min_runners, max_runners, @@ -106,6 +110,7 @@ impl From for R headers: o.headers.into(), request_lifespan: o.request_lifespan, max_concurrent_actors: o.max_concurrent_actors, + drain_grace_period: o.drain_grace_period, slots_per_runner: o.slots_per_runner, min_runners: o.min_runners, max_runners: o.max_runners, diff --git a/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs b/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs index a3931952f4..767358b2e0 100644 --- a/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs +++ b/engine/sdks/rust/data/src/versioned/namespace_runner_config.rs @@ -307,6 +307,8 @@ impl NamespaceRunnerConfig { request_lifespan: serverless.request_lifespan, // Default to max_runners for v4 -> v5 migration max_concurrent_actors: serverless.max_runners as u64, + // Default to deprecated config value (config.pegboard.serverless_drain_grace_period) + drain_grace_period: 10_000, slots_per_runner: serverless.slots_per_runner, min_runners: serverless.min_runners, max_runners: serverless.max_runners, diff --git a/engine/sdks/rust/envoy-client/src/connection.rs b/engine/sdks/rust/envoy-client/src/connection.rs index 163c3e34b2..30f979192a 100644 --- a/engine/sdks/rust/envoy-client/src/connection.rs +++ b/engine/sdks/rust/envoy-client/src/connection.rs @@ -118,38 +118,37 @@ async fn single_connection( "websocket connected" ); - // Build prepopulate actor names map - let mut prepopulate_map = HashableMap::new(); - for (name, actor) in &shared.config.prepopulate_actor_names { - prepopulate_map.insert( - name.clone(), - protocol::ActorName { - metadata: actor.metadata.clone(), - }, - ); - } - - // Serialize metadata HashMap to JSON string for the protocol - let metadata_json = shared - .config - .metadata - .as_ref() - .map(|m| serde_json::to_string(m).unwrap_or_else(|_| "{}".to_string())); - - // Send init - ws_send( - shared, - protocol::ToRivet::ToRivetInit(protocol::ToRivetInit { - envoy_key: shared.envoy_key.clone(), - version: shared.config.version, - prepopulate_actor_names: Some(prepopulate_map), - metadata: metadata_json, - }), - ) - .await; - // Spawn write task + let shared2 = shared.clone(); let write_handle = tokio::spawn(async move { + // Build prepopulate actor names map + let mut prepopulate_map = HashableMap::new(); + for (name, actor) in &shared2.config.prepopulate_actor_names { + prepopulate_map.insert( + name.clone(), + protocol::ActorName { + metadata: actor.metadata.clone(), + }, + ); + } + + // Serialize metadata HashMap to JSON string for the protocol + let metadata_json = shared2 + .config + .metadata + .as_ref() + .map(|m| serde_json::to_string(m).unwrap_or_else(|_| "{}".to_string())); + + // Send metadata + ws_send( + &shared2, + protocol::ToRivet::ToRivetMetadata(protocol::ToRivetMetadata { + prepopulate_actor_names: Some(prepopulate_map), + metadata: metadata_json, + }), + ) + .await; + while let Some(msg) = ws_rx.recv().await { match msg { WsTxMessage::Send(data) => { @@ -269,11 +268,12 @@ fn ws_url(shared: &SharedContext) -> String { let base_url = ws_endpoint.trim_end_matches('/'); format!( - "{}/envoys/connect?protocol_version={}&namespace={}&envoy_key={}&pool_name={}", + "{}/envoys/connect?protocol_version={}&namespace={}&envoy_key={}&version={}&pool_name={}", base_url, protocol::PROTOCOL_VERSION, urlencoding::encode(&shared.config.namespace), urlencoding::encode(&shared.envoy_key), + urlencoding::encode(&shared.config.version.to_string()), urlencoding::encode(&shared.config.pool_name), ) } diff --git a/engine/sdks/rust/envoy-client/src/stringify.rs b/engine/sdks/rust/envoy-client/src/stringify.rs index 51bae8d436..0e88a51a3f 100644 --- a/engine/sdks/rust/envoy-client/src/stringify.rs +++ b/engine/sdks/rust/envoy-client/src/stringify.rs @@ -233,12 +233,7 @@ pub fn stringify_event_wrapper(wrapper: &protocol::EventWrapper) -> String { pub fn stringify_to_rivet(message: &protocol::ToRivet) -> String { match message { - protocol::ToRivet::ToRivetInit(val) => { - format!( - "ToRivetInit{{envoyKey: \"{}\", version: {}}}", - val.envoy_key, val.version - ) - } + protocol::ToRivet::ToRivetMetadata(_) => "ToRivetMetadata".to_string(), protocol::ToRivet::ToRivetEvents(events) => { let event_strs: Vec = events.iter().map(stringify_event_wrapper).collect(); format!( diff --git a/engine/sdks/schemas/data/namespace.runner_config.v5.bare b/engine/sdks/schemas/data/namespace.runner_config.v5.bare index 8924d63b18..b141b785f1 100644 --- a/engine/sdks/schemas/data/namespace.runner_config.v5.bare +++ b/engine/sdks/schemas/data/namespace.runner_config.v5.bare @@ -3,13 +3,14 @@ type Json str type Serverless struct { url: str headers: map - request_lifespan: u32 - max_concurrent_actors: u64 - slots_per_runner: u32 - min_runners: u32 - max_runners: u32 - runners_margin: u32 - metadata_poll_interval: optional + requestLifespan: u32 + maxConcurrentActors: u64 + drainGracePeriod: u32 + slotsPerRunner: u32 + minRunners: u32 + maxRunners: u32 + runnersMargin: u32 + metadataPollInterval: optional } type Normal void @@ -22,5 +23,5 @@ type RunnerConfigKind union { type RunnerConfig struct { kind: RunnerConfigKind metadata: optional - drain_on_version_upgrade: bool + drainOnVersionUpgrade: bool } diff --git a/engine/sdks/schemas/envoy-protocol/v1.bare b/engine/sdks/schemas/envoy-protocol/v1.bare index 33d8de9876..c08e30d5c9 100644 --- a/engine/sdks/schemas/envoy-protocol/v1.bare +++ b/engine/sdks/schemas/envoy-protocol/v1.bare @@ -351,9 +351,7 @@ type ToEnvoyPing struct { } # MARK: To Rivet -type ToRivetInit struct { - envoyKey: str - version: u32 +type ToRivetMetadata struct { prepopulateActorNames: optional> metadata: optional } @@ -377,7 +375,7 @@ type ToRivetKvRequest struct { } type ToRivet union { - ToRivetInit | + ToRivetMetadata | ToRivetEvents | ToRivetAckCommands | ToRivetStopping | diff --git a/engine/sdks/typescript/envoy-protocol/src/index.ts b/engine/sdks/typescript/envoy-protocol/src/index.ts index 388e77fe35..4b86eaf903 100644 --- a/engine/sdks/typescript/envoy-protocol/src/index.ts +++ b/engine/sdks/typescript/envoy-protocol/src/index.ts @@ -1696,25 +1696,19 @@ function write15(bc: bare.ByteCursor, x: Json | null): void { /** * MARK: To Rivet */ -export type ToRivetInit = { - readonly envoyKey: string - readonly version: u32 +export type ToRivetMetadata = { readonly prepopulateActorNames: ReadonlyMap | null readonly metadata: Json | null } -export function readToRivetInit(bc: bare.ByteCursor): ToRivetInit { +export function readToRivetMetadata(bc: bare.ByteCursor): ToRivetMetadata { return { - envoyKey: bare.readString(bc), - version: bare.readU32(bc), prepopulateActorNames: read14(bc), metadata: read15(bc), } } -export function writeToRivetInit(bc: bare.ByteCursor, x: ToRivetInit): void { - bare.writeString(bc, x.envoyKey) - bare.writeU32(bc, x.version) +export function writeToRivetMetadata(bc: bare.ByteCursor, x: ToRivetMetadata): void { write14(bc, x.prepopulateActorNames) write15(bc, x.metadata) } @@ -1810,7 +1804,7 @@ export function writeToRivetKvRequest(bc: bare.ByteCursor, x: ToRivetKvRequest): } export type ToRivet = - | { readonly tag: "ToRivetInit"; readonly val: ToRivetInit } + | { readonly tag: "ToRivetMetadata"; readonly val: ToRivetMetadata } | { readonly tag: "ToRivetEvents"; readonly val: ToRivetEvents } | { readonly tag: "ToRivetAckCommands"; readonly val: ToRivetAckCommands } | { readonly tag: "ToRivetStopping"; readonly val: ToRivetStopping } @@ -1823,7 +1817,7 @@ export function readToRivet(bc: bare.ByteCursor): ToRivet { const tag = bare.readU8(bc) switch (tag) { case 0: - return { tag: "ToRivetInit", val: readToRivetInit(bc) } + return { tag: "ToRivetMetadata", val: readToRivetMetadata(bc) } case 1: return { tag: "ToRivetEvents", val: readToRivetEvents(bc) } case 2: @@ -1845,9 +1839,9 @@ export function readToRivet(bc: bare.ByteCursor): ToRivet { export function writeToRivet(bc: bare.ByteCursor, x: ToRivet): void { switch (x.tag) { - case "ToRivetInit": { + case "ToRivetMetadata": { bare.writeU8(bc, 0) - writeToRivetInit(bc, x.val) + writeToRivetMetadata(bc, x.val) break } case "ToRivetEvents": { diff --git a/examples/cursors/package.json b/examples/cursors/package.json index 601e61e3bb..abe255a6ac 100644 --- a/examples/cursors/package.json +++ b/examples/cursors/package.json @@ -30,4 +30,4 @@ }, "stableVersion": "0.8.0", "license": "MIT" -} +} \ No newline at end of file diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index b9eb120cf5..be21a76d89 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -914,9 +914,15 @@ importers: examples/cursors: dependencies: + '@hono/node-server': + specifier: ^1.19.13 + version: 1.19.13(hono@4.11.9) '@rivetkit/react': specifier: workspace:* version: link:../../rivetkit-typescript/packages/react + hono: + specifier: ^4.7.0 + version: 4.11.9 react: specifier: 19.1.0 version: 19.1.0 @@ -7424,6 +7430,12 @@ packages: peerDependencies: hono: ^4 + '@hono/node-server@1.19.13': + resolution: {integrity: sha512-TsQLe4i2gvoTtrHje625ngThGBySOgSK3Xo2XRYOdqGN1teR8+I7vchQC46uLJi8OF62YTYA3AhSpumtkhsaKQ==} + engines: {node: '>=18.14.1'} + peerDependencies: + hono: ^4 + '@hono/node-server@1.19.9': resolution: {integrity: sha512-vHL6w3ecZsky+8P5MD+eFfaGTyCeOHUIFYMGpQGbrBTSmNNoxv0if69rEZ5giu36weC5saFuznL411gRX7bJDw==} engines: {node: '>=18.14.1'} @@ -22702,6 +22714,10 @@ snapshots: dependencies: hono: 4.11.9 + '@hono/node-server@1.19.13(hono@4.11.9)': + dependencies: + hono: 4.11.9 + '@hono/node-server@1.19.9(hono@4.11.9)': dependencies: hono: 4.11.9 @@ -23756,7 +23772,7 @@ snapshots: '@modelcontextprotocol/sdk@1.25.3(hono@4.11.9)(zod@3.25.76)': dependencies: - '@hono/node-server': 1.19.9(hono@4.11.9) + '@hono/node-server': 1.19.13(hono@4.11.9) ajv: 8.17.1 ajv-formats: 3.0.1(ajv@8.17.1) content-type: 1.0.5 @@ -23778,7 +23794,7 @@ snapshots: '@modelcontextprotocol/sdk@1.25.3(hono@4.11.9)(zod@4.1.13)': dependencies: - '@hono/node-server': 1.19.9(hono@4.11.9) + '@hono/node-server': 1.19.13(hono@4.11.9) ajv: 8.17.1 ajv-formats: 3.0.1(ajv@8.17.1) content-type: 1.0.5