Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions engine/artifacts/errors/ws.no_runner_config.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

59 changes: 30 additions & 29 deletions engine/packages/pegboard-envoy/src/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use scc::HashMap;
use universaldb::prelude::*;
use vbare::OwnedVersionedData;

use crate::{metrics, utils::UrlData};
use crate::{errors, metrics, utils::UrlData};

pub struct Conn {
pub namespace_id: Id,
Expand Down Expand Up @@ -54,6 +54,20 @@ pub async fn init_conn(
.ok_or_else(|| namespace::errors::Namespace::NotFound.build())
.with_context(|| format!("namespace not found: {}", namespace_name))?;

let pool_res = ctx
.op(pegboard::ops::runner_config::get::Input {
runners: vec![(namespace.namespace_id, pool_name.clone())],
bypass_cache: false,
})
.await?;

let Some(pool) = pool_res.into_iter().next() else {
return Err(errors::WsError::NoRunnerConfig {
pool_name: pool_name.clone(),
}
.build());
};

tracing::debug!(namespace_id=?namespace.namespace_id, "new envoy connection");

metrics::CONNECTION_TOTAL
Expand All @@ -67,27 +81,20 @@ pub async fn init_conn(
.with_label_values(&[namespace.namespace_id.to_string().as_str(), &pool_name])
.observe(start.elapsed().as_secs_f64());

let serverless_drain_grace_period = if let RunnerConfigKind::Serverless {
drain_grace_period,
..
} = &pool.config.kind
{
Some(*drain_grace_period as i64)
} else {
None
};

let udb = ctx.udb()?;
let (is_serverless, mut missed_commands, _) = tokio::try_join!(
let (_, mut missed_commands) = tokio::try_join!(
// Send init packet as soon as possible
async {
let pool_res = ctx
.op(pegboard::ops::runner_config::get::Input {
runners: vec![(namespace.namespace_id, pool_name.clone())],
bypass_cache: false,
})
.await?;

let serverless_drain_grace_period = pool_res.first().and_then(|c| {
if let RunnerConfigKind::Serverless {
drain_grace_period, ..
} = &c.config.kind
{
Some(*drain_grace_period as i64)
} else {
None
}
});
let pb = ctx.config().pegboard();

// Send init packet
Expand All @@ -104,9 +111,7 @@ pub async fn init_conn(
let init_msg_serialized = init_msg.serialize(protocol_version)?;
ws_handle
.send(Message::Binary(init_msg_serialized.into()))
.await?;

anyhow::Ok(serverless_drain_grace_period.is_some())
.await
},
udb.run(|tx| {
let namespace_id = namespace.namespace_id;
Expand Down Expand Up @@ -279,12 +284,6 @@ pub async fn init_conn(
}
})
.custom_instrument(tracing::info_span!("envoy_init_tx")),
ctx.op(
pegboard::ops::runner_config::ensure_normal_if_missing::Input {
namespace_id: namespace.namespace_id,
name: pool_name.clone(),
}
),
)?;

// Send missed commands (must be after init packet)
Expand Down Expand Up @@ -318,6 +317,8 @@ pub async fn init_conn(
.await?;
}

let is_serverless = serverless_drain_grace_period.is_some();

if is_serverless {
report_success(ctx, namespace.namespace_id, &pool_name).await;
}
Expand All @@ -329,7 +330,7 @@ pub async fn init_conn(
protocol_version,
ws_handle,
authorized_tunnel_routes: HashMap::new(),
is_serverless: false,
is_serverless,
last_rtt: AtomicU32::new(0),
last_ping_ts: AtomicI64::new(util::timestamp::now()),
}))
Expand Down
5 changes: 5 additions & 0 deletions engine/packages/pegboard-envoy/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ pub enum WsError {
"The Rivet Engine is migrating. The websocket should attempt to reconnect as soon as possible."
)]
GoingAway,
#[error(
"no_runner_config",
"Must create a runner config before connecting an envoy with pool name {pool_name:?}."
)]
NoRunnerConfig { pool_name: String },
#[error("timed_out", "Ping timed out.")]
TimedOut,
#[error(
Expand Down
Loading