diff --git a/engine/packages/pegboard/src/ops/runner_config/ensure_normal_if_missing.rs b/engine/packages/pegboard/src/ops/runner_config/ensure_normal_if_missing.rs index f1c579b0c5..6339995123 100644 --- a/engine/packages/pegboard/src/ops/runner_config/ensure_normal_if_missing.rs +++ b/engine/packages/pegboard/src/ops/runner_config/ensure_normal_if_missing.rs @@ -13,16 +13,25 @@ pub async fn pegboard_runner_config_ensure_normal_if_missing( ctx: &OperationCtx, input: &Input, ) -> Result<()> { - ctx.op(crate::ops::runner_config::upsert::Input { - namespace_id: input.namespace_id, - name: input.name.clone(), - config: rivet_types::runner_configs::RunnerConfig { - kind: rivet_types::runner_configs::RunnerConfigKind::Normal {}, - metadata: None, - drain_on_version_upgrade: false, - }, - }) - .await?; + let pool_res = ctx + .op(crate::ops::runner_config::get::Input { + runners: vec![(input.namespace_id, input.name.clone())], + bypass_cache: true, + }) + .await?; + + if !pool_res.is_empty() { + ctx.op(crate::ops::runner_config::upsert::Input { + namespace_id: input.namespace_id, + name: input.name.clone(), + config: rivet_types::runner_configs::RunnerConfig { + kind: rivet_types::runner_configs::RunnerConfigKind::Normal {}, + metadata: None, + drain_on_version_upgrade: false, + }, + }) + .await?; + } Ok(()) } diff --git a/engine/sdks/rust/envoy-client/src/envoy.rs b/engine/sdks/rust/envoy-client/src/envoy.rs index d9ec6dea02..1add31f76e 100644 --- a/engine/sdks/rust/envoy-client/src/envoy.rs +++ b/engine/sdks/rust/envoy-client/src/envoy.rs @@ -404,30 +404,20 @@ async fn handle_shutdown(ctx: &mut EnvoyContext) { ws_send(&ctx.shared, protocol::ToRivet::ToRivetStopping).await; - // Check if any actors are still active - let has_actors = ctx + // Wait for all actors to finish. The process manager (Docker, + // k8s, etc.) provides the ultimate shutdown deadline. + let actor_handles: Vec> = ctx .actors .values() - .any(|gens| gens.values().any(|entry| !entry.handle.is_closed())); + .flat_map(|gens| gens.values()) + .filter(|entry| !entry.handle.is_closed()) + .map(|entry| entry.handle.clone()) + .collect(); - if !has_actors { - let _ = ctx.shared.envoy_tx.send(ToEnvoyMessage::Stop); - } else { - // Wait for all actors to finish. The process manager (Docker, - // k8s, etc.) provides the ultimate shutdown deadline. - let actor_handles: Vec> = ctx - .actors - .values() - .flat_map(|gens| gens.values()) - .filter(|entry| !entry.handle.is_closed()) - .map(|entry| entry.handle.clone()) - .collect(); - - let envoy_tx = ctx.shared.envoy_tx.clone(); - tokio::spawn(async move { - futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await; - tracing::debug!("all actors stopped during graceful shutdown"); - let _ = envoy_tx.send(ToEnvoyMessage::Stop); - }); - } + let envoy_tx = ctx.shared.envoy_tx.clone(); + tokio::spawn(async move { + futures_util::future::join_all(actor_handles.iter().map(|h| h.closed())).await; + tracing::debug!("all actors stopped during graceful shutdown"); + let _ = envoy_tx.send(ToEnvoyMessage::Stop); + }); } diff --git a/engine/sdks/rust/envoy-client/src/handle.rs b/engine/sdks/rust/envoy-client/src/handle.rs index aa1232bfa1..873fd4da90 100644 --- a/engine/sdks/rust/envoy-client/src/handle.rs +++ b/engine/sdks/rust/envoy-client/src/handle.rs @@ -17,6 +17,7 @@ pub struct EnvoyHandle { impl EnvoyHandle { pub fn shutdown(&self, immediate: bool) { self.shared.shutting_down.store(true, Ordering::Release); + if immediate { let _ = self.shared.envoy_tx.send(ToEnvoyMessage::Stop); } else {