From a308ebc060bde165bee6a652228297be5a6afa55 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Sun, 12 Apr 2026 14:17:55 -0700 Subject: [PATCH] fix(envoy): require runner config for ws conn --- .../artifacts/errors/ws.no_runner_config.json | 5 ++ engine/packages/pegboard-envoy/src/conn.rs | 59 ++++++++++--------- engine/packages/pegboard-envoy/src/errors.rs | 5 ++ 3 files changed, 40 insertions(+), 29 deletions(-) create mode 100644 engine/artifacts/errors/ws.no_runner_config.json diff --git a/engine/artifacts/errors/ws.no_runner_config.json b/engine/artifacts/errors/ws.no_runner_config.json new file mode 100644 index 0000000000..944252a4fc --- /dev/null +++ b/engine/artifacts/errors/ws.no_runner_config.json @@ -0,0 +1,5 @@ +{ + "code": "no_runner_config", + "group": "ws", + "message": "Must create a runner config before connecting an envoy with pool name {pool_name:?}." +} \ No newline at end of file diff --git a/engine/packages/pegboard-envoy/src/conn.rs b/engine/packages/pegboard-envoy/src/conn.rs index 40a1c9635e..c281b35502 100644 --- a/engine/packages/pegboard-envoy/src/conn.rs +++ b/engine/packages/pegboard-envoy/src/conn.rs @@ -18,7 +18,7 @@ use scc::HashMap; use universaldb::prelude::*; use vbare::OwnedVersionedData; -use crate::{metrics, utils::UrlData}; +use crate::{errors, metrics, utils::UrlData}; pub struct Conn { pub namespace_id: Id, @@ -54,6 +54,20 @@ pub async fn init_conn( .ok_or_else(|| namespace::errors::Namespace::NotFound.build()) .with_context(|| format!("namespace not found: {}", namespace_name))?; + let pool_res = ctx + .op(pegboard::ops::runner_config::get::Input { + runners: vec![(namespace.namespace_id, pool_name.clone())], + bypass_cache: false, + }) + .await?; + + let Some(pool) = pool_res.into_iter().next() else { + return Err(errors::WsError::NoRunnerConfig { + pool_name: pool_name.clone(), + } + .build()); + }; + tracing::debug!(namespace_id=?namespace.namespace_id, "new envoy connection"); metrics::CONNECTION_TOTAL @@ -67,27 +81,20 @@ pub async fn init_conn( .with_label_values(&[namespace.namespace_id.to_string().as_str(), &pool_name]) .observe(start.elapsed().as_secs_f64()); + let serverless_drain_grace_period = if let RunnerConfigKind::Serverless { + drain_grace_period, + .. + } = &pool.config.kind + { + Some(*drain_grace_period as i64) + } else { + None + }; + let udb = ctx.udb()?; - let (is_serverless, mut missed_commands, _) = tokio::try_join!( + let (_, mut missed_commands) = tokio::try_join!( // Send init packet as soon as possible async { - let pool_res = ctx - .op(pegboard::ops::runner_config::get::Input { - runners: vec![(namespace.namespace_id, pool_name.clone())], - bypass_cache: false, - }) - .await?; - - let serverless_drain_grace_period = pool_res.first().and_then(|c| { - if let RunnerConfigKind::Serverless { - drain_grace_period, .. - } = &c.config.kind - { - Some(*drain_grace_period as i64) - } else { - None - } - }); let pb = ctx.config().pegboard(); // Send init packet @@ -104,9 +111,7 @@ pub async fn init_conn( let init_msg_serialized = init_msg.serialize(protocol_version)?; ws_handle .send(Message::Binary(init_msg_serialized.into())) - .await?; - - anyhow::Ok(serverless_drain_grace_period.is_some()) + .await }, udb.run(|tx| { let namespace_id = namespace.namespace_id; @@ -279,12 +284,6 @@ pub async fn init_conn( } }) .custom_instrument(tracing::info_span!("envoy_init_tx")), - ctx.op( - pegboard::ops::runner_config::ensure_normal_if_missing::Input { - namespace_id: namespace.namespace_id, - name: pool_name.clone(), - } - ), )?; // Send missed commands (must be after init packet) @@ -318,6 +317,8 @@ pub async fn init_conn( .await?; } + let is_serverless = serverless_drain_grace_period.is_some(); + if is_serverless { report_success(ctx, namespace.namespace_id, &pool_name).await; } @@ -329,7 +330,7 @@ pub async fn init_conn( protocol_version, ws_handle, authorized_tunnel_routes: HashMap::new(), - is_serverless: false, + is_serverless, last_rtt: AtomicU32::new(0), last_ping_ts: AtomicI64::new(util::timestamp::now()), })) diff --git a/engine/packages/pegboard-envoy/src/errors.rs b/engine/packages/pegboard-envoy/src/errors.rs index 8e45ece6b1..07f9ea5e86 100644 --- a/engine/packages/pegboard-envoy/src/errors.rs +++ b/engine/packages/pegboard-envoy/src/errors.rs @@ -14,6 +14,11 @@ pub enum WsError { "The Rivet Engine is migrating. The websocket should attempt to reconnect as soon as possible." )] GoingAway, + #[error( + "no_runner_config", + "Must create a runner config before connecting an envoy with pool name {pool_name:?}." + )] + NoRunnerConfig { pool_name: String }, #[error("timed_out", "Ping timed out.")] TimedOut, #[error(