From a0ffd3d7f68d2887d7d03f721afef573ddc3b4b0 Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Wed, 13 May 2026 16:32:22 -0400 Subject: [PATCH 1/2] Introduce `sled-agent-scrimlet-reconcilers` crate (#10313) This is groundwork for #10167, and introduces the skeleton of network config reconcilers for use within sled-agent. None of this is wired up yet and all the service-specific reconcilers are placeholders, but it does have the real setup for how these tasks get started and how they report status. The PR is pretty big but hopefully not too bad to review; more than half the code falls into either "tests", "status type definitions", or "placeholder/dummy reconcilers". A tentative suggestion for review order is: 1. The crate-level docs in `lib.rs`; these are written assuming #10167 is complete, not based on the current state of the crate. 2. `handle.rs`, particularly `ScrimletReconcilers` - this is the entry point for sled-agent. It will hold a `ScrimletReconcilers` in its set of long-running tasks. 3. `reconciler_task.rs` - this implements the common control flow for all of the service-specific reconcilers in the crate; handling periodic reactivation, activation when the config changes, transitioning to inert if we stop being a scrimlet because the sidecar goes away at runtime, and transitioning out of inert if it comes back. ~~The only production-affecting change here is that the `ThisSledSwitchZoneUnderlayIpAddr` type moved out of sled-agent and into this crate, so sled-agent depends on this crate just for that type.~~ Edit: As of #10340, `ThisSledSwitchZoneUnderlayIpAddr` has moved to `sled-agent-types`, so now this PR uses it from there and makes no changes to sled-agent proper. --- Cargo.lock | 75 +- Cargo.toml | 5 +- sled-agent/scrimlet-reconcilers/Cargo.toml | 36 + .../src/dpd_reconciler.rs | 58 ++ sled-agent/scrimlet-reconcilers/src/handle.rs | 400 ++++++++++ .../scrimlet-reconcilers/src/handle/tests.rs | 426 ++++++++++ sled-agent/scrimlet-reconcilers/src/lib.rs | 67 ++ .../src/mgd_reconciler.rs | 58 ++ .../src/reconciler_task.rs | 315 ++++++++ .../src/reconciler_task/tests.rs | 745 ++++++++++++++++++ sled-agent/scrimlet-reconcilers/src/status.rs | 141 ++++ .../src/switch_zone_slot.rs | 153 ++++ .../src/uplinkd_reconciler.rs | 60 ++ workspace-hack/Cargo.toml | 4 +- 14 files changed, 2510 insertions(+), 33 deletions(-) create mode 100644 sled-agent/scrimlet-reconcilers/Cargo.toml create mode 100644 sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/handle.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/handle/tests.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/lib.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/mgd_reconciler.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/reconciler_task.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/reconciler_task/tests.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/status.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/switch_zone_slot.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/uplinkd_reconciler.rs diff --git a/Cargo.lock b/Cargo.lock index 70a728f0dc7..1e7f21047de 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -131,7 +131,7 @@ version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -142,7 +142,7 @@ checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" dependencies = [ "anstyle", "once_cell_polyfill", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -1704,7 +1704,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] @@ -2230,7 +2230,7 @@ source = "git+https://github.com/oxidecomputer/crucible?rev=7103cd3a3d7b0112d294 dependencies = [ "crucible-workspace-hack", "libc", - "num-derive 0.4.2", + "num-derive", "num-traits", "thiserror 2.0.18", ] @@ -3629,7 +3629,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5760,7 +5760,7 @@ checksum = "3640c1c38b8e4e43584d8df18be5fc6b0aa314ce6ebf51b53313d4306cca8e46" dependencies = [ "hermit-abi 0.5.2", "libc", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -5837,7 +5837,7 @@ dependencies = [ "portable-atomic", "portable-atomic-util", "serde_core", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -6176,12 +6176,12 @@ dependencies = [ [[package]] name = "libscf-sys" -version = "1.1.0" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12f02d0eda38e8cc453c5ec5d49945545d8d9eb0e59cb2ce4152ba6518f373e7" +checksum = "d0d7bd6cfd9b5d32738cebd83a1b68060d96b1ca10d88bf9a5cb10dfac0f1cdf" dependencies = [ "libc", - "num-derive 0.3.3", + "num-derive", "num-traits", ] @@ -7969,7 +7969,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -8028,17 +8028,6 @@ version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" -[[package]] -name = "num-derive" -version = "0.3.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876a53fff98e03a936a674b29568b0e605f06b29372c2489ff4de23f1949743d" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "num-derive" version = "0.4.2" @@ -11439,9 +11428,9 @@ dependencies = [ [[package]] name = "proptest" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "37566cb3fdacef14c0737f9546df7cfeadbfbc9fef10991038bf5015d0c80532" +checksum = "4b45fcc2344c680f5025fe57779faef368840d0bd1f42f216291f0dc4ace4744" dependencies = [ "bit-set 0.8.0", "bit-vec 0.8.0", @@ -12494,7 +12483,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys 0.11.0", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -12597,7 +12586,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -13702,6 +13691,32 @@ dependencies = [ "tufaceous-artifact", ] +[[package]] +name = "sled-agent-scrimlet-reconcilers" +version = "0.1.0" +dependencies = [ + "assert_matches", + "chrono", + "dpd-client 0.1.0 (git+https://github.com/oxidecomputer/dendrite?rev=187aee7de2e50f907099ea06c04aac96c3455665)", + "dropshot 0.17.0", + "gateway-client", + "gateway-messages", + "gateway-test-utils", + "gateway-types", + "httpmock", + "mg-admin-client", + "omicron-common", + "omicron-test-utils", + "omicron-workspace-hack", + "reqwest 0.13.2", + "serde_json", + "sled-agent-types", + "slog", + "slog-error-chain", + "thiserror 2.0.18", + "tokio", +] + [[package]] name = "sled-agent-types" version = "0.1.0" @@ -14129,7 +14144,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3a766e1110788c36f4fa1c2b71b387a7815aa65f88ce0229841826633d93723e" dependencies = [ "libc", - "windows-sys 0.61.2", + "windows-sys 0.60.2", ] [[package]] @@ -14743,7 +14758,7 @@ dependencies = [ "getrandom 0.4.1", "once_cell", "rustix 1.1.3", - "windows-sys 0.61.2", + "windows-sys 0.52.0", ] [[package]] @@ -14763,7 +14778,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d8c27177b12a6399ffc08b98f76f7c9a1f4fe9fc967c784c5a071fa8d93cf7e1" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.59.0", ] [[package]] @@ -17112,7 +17127,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.61.2", + "windows-sys 0.48.0", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 7780221d91f..933bc840dc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -147,6 +147,7 @@ members = [ "sled-agent/bootstrap-agent-lockstep-api", "sled-agent/bootstrap-agent-lockstep-types", "sled-agent/config-reconciler", + "sled-agent/scrimlet-reconcilers", "sled-agent/health-monitor", "sled-agent/measurements", "sled-agent/rack-setup", @@ -333,6 +334,7 @@ default-members = [ "sled-agent/bootstrap-agent-lockstep-api", "sled-agent/bootstrap-agent-lockstep-types", "sled-agent/config-reconciler", + "sled-agent/scrimlet-reconcilers", "sled-agent/health-monitor", "sled-agent/measurements", "sled-agent/rack-setup", @@ -727,7 +729,7 @@ propolis_api_types = { git = "https://github.com/oxidecomputer/propolis", rev = propolis-client = { git = "https://github.com/oxidecomputer/propolis", rev = "bc489ddf0f38f75e0c194b86cf6f0de377f68845" } propolis-mock-server = { git = "https://github.com/oxidecomputer/propolis", rev = "bc489ddf0f38f75e0c194b86cf6f0de377f68845" } # NOTE: see above! -proptest = "1.7.0" +proptest = "1.11.0" qorb = "0.4.1" quote = "1.0" # Some dependencies still require rand 0.8.x. @@ -792,6 +794,7 @@ sled-agent-config-reconciler = { path = "sled-agent/config-reconciler" } sled-agent-health-monitor = { path = "sled-agent/health-monitor" } sled-agent-measurements = { path = "sled-agent/measurements" } sled-agent-rack-setup = { path = "sled-agent/rack-setup" } +sled-agent-scrimlet-reconcilers = { path = "sled-agent/scrimlet-reconcilers" } sled-agent-types = { path = "sled-agent/types" } sled-agent-types-versions = { path = "sled-agent/types/versions" } sled-agent-resolvable-files = { path = "sled-agent/resolvable-files" } diff --git a/sled-agent/scrimlet-reconcilers/Cargo.toml b/sled-agent/scrimlet-reconcilers/Cargo.toml new file mode 100644 index 00000000000..533574a1772 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/Cargo.toml @@ -0,0 +1,36 @@ +[package] +name = "sled-agent-scrimlet-reconcilers" +version = "0.1.0" +edition.workspace = true +license = "MPL-2.0" + +[lints] +workspace = true + +[dependencies] +chrono.workspace = true +dpd-client.workspace = true +gateway-client.workspace = true +gateway-types.workspace = true +mg-admin-client.workspace = true +omicron-common.workspace = true +reqwest.workspace = true +sled-agent-types.workspace = true +slog.workspace = true +slog-error-chain.workspace = true +thiserror.workspace = true +tokio.workspace = true + +omicron-workspace-hack.workspace = true + +[dev-dependencies] +assert_matches.workspace = true +dropshot.workspace = true +gateway-messages.workspace = true +gateway-test-utils.workspace = true +httpmock.workspace = true +omicron-test-utils.workspace = true +serde_json.workspace = true + +[features] +testing = [] diff --git a/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs new file mode 100644 index 00000000000..5360654d0ac --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs @@ -0,0 +1,58 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Reconciler responsible for configuration of `dpd` within a scrimlet's switch +//! zone. + +use crate::handle::ScrimletReconcilersMode; +use crate::reconciler_task::Reconciler; +use crate::switch_zone_slot::ThisSledSwitchSlot; +use dpd_client::Client; +use sled_agent_types::system_networking::SystemNetworkingConfig; +use slog::Logger; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct DpdReconcilerStatus { + pub todo_status: (), +} + +impl slog::KV for DpdReconcilerStatus { + fn serialize( + &self, + _record: &slog::Record<'_>, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_str("dpd-reconciler".into(), "not yet implemented") + } +} + +#[derive(Debug)] +pub(crate) struct DpdReconciler { + _client: Client, + _switch_slot: ThisSledSwitchSlot, +} + +impl Reconciler for DpdReconciler { + type Status = DpdReconcilerStatus; + + const LOGGER_COMPONENT_NAME: &'static str = "DpdReconciler"; + const RE_RECONCILE_INTERVAL: Duration = Duration::from_secs(30); + + fn new( + mode: ScrimletReconcilersMode, + switch_slot: ThisSledSwitchSlot, + parent_log: &Logger, + ) -> Self { + Self { _client: mode.dpd_client(parent_log), _switch_slot: switch_slot } + } + + async fn do_reconciliation( + &mut self, + _system_networking_config: &SystemNetworkingConfig, + _log: &Logger, + ) -> Self::Status { + DpdReconcilerStatus { todo_status: () } + } +} diff --git a/sled-agent/scrimlet-reconcilers/src/handle.rs b/sled-agent/scrimlet-reconcilers/src/handle.rs new file mode 100644 index 00000000000..d4e74f10390 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/handle.rs @@ -0,0 +1,400 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! [`ScrimletReconcilers`] is the entry point into this crate for `sled-agent`; +//! it provides a suitable handle for `sled-agent`'s "long running tasks", and +//! contains a handle to each of the inner service-specific reconcilers. + +use crate::DetermineSwitchSlotStatus; +use crate::dpd_reconciler::DpdReconciler; +use crate::mgd_reconciler::MgdReconciler; +use crate::reconciler_task::ReconcilerTaskHandle; +use crate::status::ScrimletReconcilersStatus; +use crate::status::ScrimletStatus; +use crate::switch_zone_slot::ThisSledSwitchSlot; +use crate::uplinkd_reconciler::UplinkdReconciler; +use omicron_common::address::DENDRITE_PORT; +use omicron_common::address::MGD_PORT; +use omicron_common::address::MGS_PORT; +use sled_agent_types::sled::ThisSledSwitchZoneUnderlayIpAddr; +use sled_agent_types::system_networking::SystemNetworkingConfig; +use slog::Logger; +use slog::info; +use std::net::SocketAddr; +use std::net::SocketAddrV6; +use std::sync::Arc; +use std::sync::OnceLock; +use std::time::Duration; +use tokio::sync::watch; + +/// Mode in which the scrimlet reconcilers should run. +/// +/// This exists to support tests where we don't have a real switch zone like +/// we expect to have on real hardware. The production sled-agent will always +/// pass `SwitchZone(ip)`; in this mode, we'll run reconcilers that talk to +/// services at the provided IP with their well-known ports and that communicate +/// with SMF within the switch zone. In the `Test { .. }` mode, we'll point the +/// reconcilers at the specified addresses for some services, and won't run the +/// SMF-based reconcilers at all. +#[derive(Debug, Clone, Copy)] +pub enum ScrimletReconcilersMode { + SwitchZone(ThisSledSwitchZoneUnderlayIpAddr), + #[cfg(any(test, feature = "testing"))] + Test { + mgs_addr: SocketAddr, + dpd_addr: SocketAddr, + mgd_addr: SocketAddr, + }, +} + +impl ScrimletReconcilersMode { + // Build a `reqwest` client with different timeout settings depending on + // whether we're expecting to contact a real service or a test one. + fn reqwest_client(&self) -> reqwest::Client { + match self { + ScrimletReconcilersMode::SwitchZone(_) => { + // Build a custom reqwest client, primarily to set a lower + // `pool_idle_timeout`. dropshot's default connection timeout is + // 30 seconds. We want to ensure we don't hit + // in any + // reconcilers that try to re-reconcile on a 30 second interval, + // so we choose a much lower `pool_idle_timeout`: 10 seconds is + // long enough to reuse a connection for all the requests made + // during one reconciliation pass, but is short enough we should + // discard it before the server wants to time us out. + // + // The 15 second connect and read timeout are consistent with + // progenitor's normal defaults. + reqwest::ClientBuilder::new() + .connect_timeout(Duration::from_secs(15)) + .pool_idle_timeout(Duration::from_secs(10)) + .build() + .expect("reqwest parameters are valid") + } + #[cfg(any(test, feature = "testing"))] + ScrimletReconcilersMode::Test { .. } => { + // Some of our tests use tokio's paused time. We want to + // construct a reqwest client that does not specify any timeouts + // at all, allowing it to wait forever; this plays nicely with + // paused time. (Paused time + timeouts cause the timeouts to + // elapse instantly, which doesn't mesh well with establishing + // TCP connections.) + reqwest::Client::new() + } + } + } + + pub(crate) fn mgs_client( + self, + parent_log: &Logger, + ) -> gateway_client::Client { + let addr: SocketAddr = match self { + ScrimletReconcilersMode::SwitchZone(ip) => { + SocketAddrV6::new(ip.into(), MGS_PORT, 0, 0).into() + } + #[cfg(any(test, feature = "testing"))] + ScrimletReconcilersMode::Test { mgs_addr, .. } => mgs_addr, + }; + let baseurl = format!("http://{addr}"); + gateway_client::Client::new_with_client( + &baseurl, + self.reqwest_client(), + parent_log + .new(slog::o!("component" => "ThisSledSwitchSlotMgsClient")), + ) + } + + pub(crate) fn dpd_client(self, parent_log: &Logger) -> dpd_client::Client { + use omicron_common::OMICRON_DPD_TAG; + + let addr: SocketAddr = match self { + ScrimletReconcilersMode::SwitchZone(ip) => { + SocketAddrV6::new(ip.into(), DENDRITE_PORT, 0, 0).into() + } + #[cfg(any(test, feature = "testing"))] + ScrimletReconcilersMode::Test { dpd_addr, .. } => dpd_addr, + }; + let baseurl = format!("http://{addr}"); + dpd_client::Client::new_with_client( + &baseurl, + self.reqwest_client(), + dpd_client::ClientState { + tag: OMICRON_DPD_TAG.to_owned(), + log: parent_log + .new(slog::o!("component" => "DpdReconcilerClient")), + }, + ) + } + + pub(crate) fn mgd_client( + self, + parent_log: &Logger, + ) -> mg_admin_client::Client { + let addr: SocketAddr = match self { + ScrimletReconcilersMode::SwitchZone(ip) => { + SocketAddrV6::new(ip.into(), MGD_PORT, 0, 0).into() + } + #[cfg(any(test, feature = "testing"))] + ScrimletReconcilersMode::Test { mgd_addr, .. } => mgd_addr, + }; + let baseurl = format!("http://{addr}"); + mg_admin_client::Client::new_with_client( + &baseurl, + self.reqwest_client(), + parent_log.new(slog::o!("component" => "MgdReconcilerClient")), + ) + } +} + +/// Information required to enable all the scrimlet reconciler tasks provided +/// by this crate. +#[derive(Debug, Clone)] +pub struct SledAgentNetworkingInfo { + pub system_networking_config_rx: watch::Receiver, + pub mode: ScrimletReconcilersMode, +} + +/// Handle to tasks that reconcile network configuration with services within a +/// scrimlet's local switch zone. +/// +/// [`ScrimletReconcilers`] has a two- or three-phase initialization process +/// (depending on whether the sled is a non-scrimlet or a scrimlet) to support +/// being included in `sled-agent`'s set of "long running tasks". +/// [`ScrimletReconcilers::new()`] can be constructed at any time (in +/// particular: very soon after `sled-agent` starts). +/// +/// After a [`ScrimletReconcilers`] has been created, `sled-agent` is +/// responsible for calling +/// [`ScrimletReconcilers::set_sled_agent_networking_info_once()`] exactly one +/// time: this provides the reconcilers with a watch channel to receive the +/// current [`SystemNetworkingConfig`] and the IP address of this sled's switch +/// zone (should it have one). +/// +/// After `sled-agent` has provided those prerequisites, on all sleds, +/// [`ScrimletReconcilers`] spawns a tokio task that waits until both of these +/// have occurred: +/// +/// 1. [`ScrimletReconcilers::set_scrimlet_status()`] is called with +/// [`ScrimletStatus::Scrimlet`]. +/// 2. We successfully contact MGS within our switch zone to determine which +/// switch slot we are. +/// +/// On non-scrimlet sleds, step 1 never happens, so the reconciler tasks are +/// never spawned. +/// +/// Once both of these are satisfied on scrimlets, all reconciliation tasks are +/// spawned and begin running. They will reactivate periodically and in response +/// to any changes to the [`SystemNetworkingConfig`] from sled-agent, and will +/// go inert if [`ScrimletReconcilers::set_scrimlet_status()`] is called with +/// [`ScrimletStatus::NotScrimlet`] (remaining inert until we are told we are a +/// scrimlet again). +pub struct ScrimletReconcilers { + // Sending half of the channel used to communicate to all the reconcilers + // whether we're still a scrimlet. + scrimlet_status_tx: watch::Sender, + + // These once locks hold the second and third phases of initialization + // described in the doc comment above. + determining_switch_slot: + OnceLock>, + running_reconcilers: Arc>, + + parent_log: Logger, +} + +impl ScrimletReconcilers { + pub fn new(parent_log: &Logger) -> Self { + // We discard the receiver here, and create new subscribers if and when + // we spawn tasks that need to consume it. + let (scrimlet_status_tx, _scrimlet_status_rx) = + watch::channel(ScrimletStatus::NotScrimlet); + + Self { + scrimlet_status_tx, + determining_switch_slot: OnceLock::new(), + running_reconcilers: Arc::new(OnceLock::new()), + parent_log: parent_log.clone(), + } + } + + pub fn status(&self) -> ScrimletReconcilersStatus { + // Do we have running reconcilers? If so, report their status. + if let Some(running) = self.running_reconcilers.get() { + let RunningReconcilers { + dpd_reconciler, + mgd_reconciler, + uplinkd_reconciler, + } = running; + ScrimletReconcilersStatus::Running { + dpd_reconciler: dpd_reconciler.status(), + mgd_reconciler: mgd_reconciler.status(), + uplinkd_reconciler: uplinkd_reconciler.status(), + } + } + // Otherwise, have we started determining our switch slot? + else if let Some(status_rx) = self.determining_switch_slot.get() { + ScrimletReconcilersStatus::DeterminingSwitchSlot( + status_rx.borrow().clone(), + ) + } + // Otherwise, we're still waiting for the networking info. + else { + ScrimletReconcilersStatus::WaitingForSledAgentNetworkingInfo + } + } + + /// Set whether this sled is a scrimlet or not. + /// + /// This doesn't change _much_ at runtime, but it can: we may start out "not + /// a scrimlet" and then discover an attached switch later after boot, at + /// which point we become a scrimlet, or we may become "not a scrimlet" if + /// the switch is detached at runtime. + pub fn set_scrimlet_status(&self, status: ScrimletStatus) { + self.scrimlet_status_tx.send_if_modified(|prev| { + if *prev == status { + false + } else { + *prev = status; + true + } + }); + } + + /// Provide the networking information necessary to start the scrimlet + /// reconciler tasks. + /// + /// # Panics + /// + /// This method panics if called more than once; this is considered a + /// programmer error. + /// + /// One of the bits inside `info` is a watch channel: receiving a second + /// channel is a sign of control flow gone very wrong, as all the tasks will + /// already be operating based on the first channel received. + pub fn set_sled_agent_networking_info_once( + &self, + info: SledAgentNetworkingInfo, + ) { + let (determining_switch_slot_tx, determining_switch_slot_rx) = + watch::channel(DetermineSwitchSlotStatus::NotScrimlet); + + // Ensure we're only called once. + if self.determining_switch_slot.set(determining_switch_slot_rx).is_err() + { + panic!( + "set_sled_agent_networking_info_once() called more than \ + once - scrimlet reconcilers are already set up and \ + running based on the initial information provided!" + ); + } + + // We now know this is the one and only time we've been called; spawn a + // task that waits until we're a scrimlet, then waits until we can + // determine our switch slot, then spawns all the running reconcilers + // (populating `self.running_reconcilers`). + // + // We don't hang on to the join handle from this task; it exits either + // when it populates `self.running_reconcilers`, or when we're dropped + // (because it will exit when `self.scrimlet_status_tx` is closed). + tokio::spawn(determine_switch_slot( + Arc::clone(&self.running_reconcilers), + determining_switch_slot_tx, + self.scrimlet_status_tx.subscribe(), + info.mode.mgs_client(&self.parent_log), + info, + self.parent_log.clone(), + )); + } +} + +async fn determine_switch_slot( + running_reconcilers: Arc>, + determining_switch_slot_tx: watch::Sender, + mut scrimlet_status_rx: watch::Receiver, + mgs_client: gateway_client::Client, + networking_info: SledAgentNetworkingInfo, + parent_log: Logger, +) { + let log = parent_log + .new(slog::o!("component" => "ThisSledSwitchSlotDetermination")); + + // Block until either we successfully contact MGS at our switch zone + // underlay IP or the sending half of `scrimlet_status_rx` is closed (which + // means our parent `ScrimletReconcilers` was dropped - this only happens in + // tests). + let this_sled_switch_slot = + match ThisSledSwitchSlot::determine_retrying_forever( + determining_switch_slot_tx, + &mut scrimlet_status_rx, + &mgs_client, + &log, + ) + .await + { + Ok(slot) => slot, + Err(_recv_error) => { + return; + } + }; + info!( + log, "determined this sled's switch slot"; + "slot" => ?this_sled_switch_slot, + ); + + // We know `running_reconcilers` must be unset, because we're the only one + // that sets it, and we're only spawned from + // `set_sled_agent_networking_info_once()` (which itself guarantees it's + // only called once). + running_reconcilers + .set(RunningReconcilers::spawn_all( + scrimlet_status_rx, + networking_info, + this_sled_switch_slot, + &parent_log, + )) + .expect("running reconcilers is only set once"); +} + +#[derive(Debug)] +struct RunningReconcilers { + dpd_reconciler: ReconcilerTaskHandle, + mgd_reconciler: ReconcilerTaskHandle, + uplinkd_reconciler: ReconcilerTaskHandle, +} + +impl RunningReconcilers { + fn spawn_all( + scrimlet_status_rx: watch::Receiver, + networking_info: SledAgentNetworkingInfo, + this_sled_switch_slot: ThisSledSwitchSlot, + parent_log: &Logger, + ) -> Self { + let dpd_reconciler = ReconcilerTaskHandle::::spawn( + scrimlet_status_rx.clone(), + networking_info.system_networking_config_rx.clone(), + networking_info.mode, + this_sled_switch_slot, + parent_log, + ); + let mgd_reconciler = ReconcilerTaskHandle::::spawn( + scrimlet_status_rx.clone(), + networking_info.system_networking_config_rx.clone(), + networking_info.mode, + this_sled_switch_slot, + parent_log, + ); + let uplinkd_reconciler = + ReconcilerTaskHandle::::spawn( + scrimlet_status_rx, + networking_info.system_networking_config_rx, + networking_info.mode, + this_sled_switch_slot, + parent_log, + ); + Self { dpd_reconciler, mgd_reconciler, uplinkd_reconciler } + } +} + +#[cfg(test)] +mod tests; diff --git a/sled-agent/scrimlet-reconcilers/src/handle/tests.rs b/sled-agent/scrimlet-reconcilers/src/handle/tests.rs new file mode 100644 index 00000000000..fd78c9826db --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/handle/tests.rs @@ -0,0 +1,426 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use std::net::Ipv6Addr; +use std::time::Duration; + +use super::*; +use assert_matches::assert_matches; +use dropshot::ConfigLogging; +use dropshot::ConfigLoggingLevel; +use dropshot::test_util::LogContext; +use gateway_messages::SpPort; +use gateway_test_utils::setup::GatewayTestContext; +use httpmock::Mock; +use httpmock::MockServer; +use omicron_test_utils::dev; +use sled_agent_types::early_networking::RackNetworkConfig; + +// For "happy path" tests, we spin up a real MGS instances (pointed at a +// simulated SP). +// +// For "sad path" tests, we use a mock MGS so we can inject failures. +trait MgsFlavor { + fn address(&self) -> SocketAddr; + async fn teardown(self); +} + +impl MgsFlavor for GatewayTestContext { + fn address(&self) -> SocketAddr { + SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, self.port, 0, 0)) + } + + async fn teardown(self) { + GatewayTestContext::teardown(self).await + } +} + +impl MgsFlavor for MockServer { + fn address(&self) -> SocketAddr { + *MockServer::address(self) + } + + async fn teardown(self) {} +} + +struct Harness { + handle: ScrimletReconcilers, + networking_config_tx: watch::Sender, + mgs: T, + logctx: LogContext, +} + +impl Harness { + fn new_common(logctx: LogContext, mgs: T) -> Self { + let (networking_config_tx, _) = + watch::channel(SystemNetworkingConfig { + rack_network_config: RackNetworkConfig { + rack_subnet: "fd00:1122:3344:0100::/56".parse().unwrap(), + infra_ip_first: "192.0.2.10".parse().unwrap(), + infra_ip_last: "192.0.2.100".parse().unwrap(), + ports: Vec::new(), + bgp: Vec::new(), + bfd: Vec::new(), + }, + blueprint_external_networking_config: None, + }); + + let handle = ScrimletReconcilers::new(&logctx.log); + + Self { handle, networking_config_tx, mgs, logctx } + } + + async fn teardown(self) { + self.logctx.cleanup_successful(); + self.mgs.teardown().await; + } + + fn sled_agent_networking_info(&self) -> SledAgentNetworkingInfo { + let dummy_addr = "0.0.0.0:0".parse().unwrap(); + SledAgentNetworkingInfo { + system_networking_config_rx: self.networking_config_tx.subscribe(), + mode: ScrimletReconcilersMode::Test { + mgs_addr: self.mgs.address(), + dpd_addr: dummy_addr, + mgd_addr: dummy_addr, + }, + } + } + + async fn wait_for_task_status(&self, matches: F) + where + F: Fn(&ScrimletReconcilersStatus) -> bool, + { + let mut status = self.handle.status(); + let start = tokio::time::Instant::now(); + while start.elapsed() < Duration::from_secs(30) { + if matches(&status) { + return; + } + tokio::time::sleep(Duration::from_millis(10)).await; + status = self.handle.status(); + } + panic!("timeout waiting for task status (got {status:?})"); + } +} + +impl Harness { + async fn new_real_mgs(logctx: LogContext) -> Self { + let ctx = gateway_test_utils::setup::test_setup( + logctx.test_name(), + SpPort::One, + ) + .await; + Self::new_common(logctx, ctx) + } +} + +impl Harness { + fn new_mock_mgs(logctx: LogContext) -> Self { + Self::new_common(logctx, MockServer::start()) + } + + fn mock_mgs_set_switch_slot(&self, slot: u16) -> Mock<'_> { + let body = + serde_json::json!({ "type": "switch", "slot": slot }).to_string(); + self.mgs.mock(|when, then| { + when.method(httpmock::Method::GET).path("/local/switch-id"); + then.status(200) + .header("content-type", "application/json") + .body(body); + }) + } + + fn mock_mgs_set_503(&self) -> Mock<'_> { + self.mgs.mock(|when, then| { + when.method(httpmock::Method::GET).path("/local/switch-id"); + then.status(503).header("content-type", "application/json").body( + serde_json::json!({ + "request_id": "test", + "message": "service unavailable", + }) + .to_string(), + ); + }) + } + + async fn wait_for_mock_to_be_called(&self, mock: &Mock<'_>) { + // Tricky bit: we use tokio's paused time in tests below both for + // consistency and test speed, but we need to wait for _real_ time for + // httpmock to receive requests. Use `std::time::Instant` here so we + // wait for 10 real seconds for a request to be received. + let start = std::time::Instant::now(); + while start.elapsed() < Duration::from_secs(10) { + if mock.calls_async().await == 0 { + // advance paused time... + tokio::time::sleep(Duration::from_secs(1)).await; + // and also _really_ sleep briefly to avoid spinning 100% CPU + std::thread::sleep(Duration::from_millis(10)); + continue; + } + + // We got at least 1 call; assert and return. + return mock.assert_async().await; + } + + panic!("timed out wait for mock to be called"); + } +} + +#[tokio::test] +#[should_panic( + expected = "set_sled_agent_networking_info_once() called more than once" +)] +async fn calling_set_sled_agent_networking_info_once_multiple_times_panics() { + // Set up a stderr logger - we're going to panic and won't have the + // opportunity to clean up a file-based one. + let log_config = + ConfigLogging::StderrTerminal { level: ConfigLoggingLevel::Trace }; + let logctx = LogContext::new( + "calling_set_sled_agent_networking_info_once_multiple_times_panics", + &log_config, + ); + let harness = Harness::new_mock_mgs(logctx); + + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); +} + +// Happy-path test for non-scrimlets. +#[tokio::test] +async fn non_scrimlet_two_phase_initialization() { + let logctx = dev::test_setup_log("non_scrimlet_two_phase_initialization"); + let harness = Harness::new_mock_mgs(logctx); + + // initial status + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::WaitingForSledAgentNetworkingInfo + ); + + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); + + // terminal status for non-scrimlets + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::NotScrimlet + ) + ); + + harness.teardown().await; +} + +// Happy-path test for scrimlets where we find out we're a scrimlet after +// getting our networking info. +#[tokio::test] +async fn scrimlet_three_phase_initialization_info_then_scrimlet() { + let logctx = dev::test_setup_log( + "scrimlet_three_phase_initialization_info_then_scrimlet", + ); + let harness = Harness::new_real_mgs(logctx).await; + + // initial status + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::WaitingForSledAgentNetworkingInfo + ); + + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); + + // status before we become a scrimlet + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::NotScrimlet + ) + ); + + harness.handle.set_scrimlet_status(ScrimletStatus::Scrimlet); + + harness + .wait_for_task_status(|status| { + matches!(status, ScrimletReconcilersStatus::Running { .. }) + }) + .await; + + harness.teardown().await; +} + +// Happy-path test for scrimlets where we find out we're a scrimlet before +// getting our networking info. +#[tokio::test] +async fn scrimlet_three_phase_initialization_scrimlet_then_info() { + let logctx = dev::test_setup_log( + "scrimlet_three_phase_initialization_scrimlet_then_info", + ); + let harness = Harness::new_real_mgs(logctx).await; + + // Become a scrimlet right away. + harness.handle.set_scrimlet_status(ScrimletStatus::Scrimlet); + + // initial status + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::WaitingForSledAgentNetworkingInfo + ); + + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); + + // We're already a scrimlet, so we contact MGS then transition to Running. + harness + .wait_for_task_status(|status| { + matches!(status, ScrimletReconcilersStatus::Running { .. }) + }) + .await; + + harness.teardown().await; +} + +// Scrimlet test case where MGS fails the first time we ask then succeeds later. +#[tokio::test(start_paused = true)] +async fn scrimlet_mgs_fails_first_attempt() { + let logctx = dev::test_setup_log("scrimlet_mgs_fails_first_attempt"); + let harness = Harness::new_mock_mgs(logctx); + + // Configure our mock MGS to return an HTTP 503. + let mut error_mock = harness.mock_mgs_set_503(); + + // initial status + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::WaitingForSledAgentNetworkingInfo + ); + + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); + + // status before we become a scrimlet + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::NotScrimlet + ) + ); + + harness.handle.set_scrimlet_status(ScrimletStatus::Scrimlet); + + // Wait for our mock to receive a request and return a 503; we should then + // see the `WaitingToRetry` state with an error. + harness.wait_for_mock_to_be_called(&error_mock).await; + // We should first see the "waiting to retry" status... + harness + .wait_for_task_status(|status| { + matches!( + status, + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::WaitingToRetry { + prev_attempt_err, + } + ) if prev_attempt_err.contains("status: 503") + ) + }) + .await; + + // Changing MGS to return success should transition us to `Running`. + error_mock.delete(); + let success_mock = harness.mock_mgs_set_switch_slot(0); + + harness.wait_for_mock_to_be_called(&success_mock).await; + harness + .wait_for_task_status(|status| { + matches!(status, ScrimletReconcilersStatus::Running { .. }) + }) + .await; + + harness.teardown().await; +} + +// Scrimlet test case where MGS fails, then we become "not a scrimlet", then we +// become a scrimlet again and MGS succeeds. +#[tokio::test(start_paused = true)] +async fn scrimlet_mgs_fails_then_we_become_not_a_scrimlet() { + let logctx = + dev::test_setup_log("scrimlet_mgs_fails_then_we_become_not_a_scrimlet"); + let harness = Harness::new_mock_mgs(logctx); + + // Configure our mock MGS to return an HTTP 503. + let mut error_mock = harness.mock_mgs_set_503(); + + // initial status + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::WaitingForSledAgentNetworkingInfo + ); + + harness.handle.set_sled_agent_networking_info_once( + harness.sled_agent_networking_info(), + ); + + // status before we become a scrimlet + assert_matches!( + harness.handle.status(), + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::NotScrimlet + ) + ); + + harness.handle.set_scrimlet_status(ScrimletStatus::Scrimlet); + + // Wait until we've failed to determine our switch slot. + harness.wait_for_mock_to_be_called(&error_mock).await; + harness + .wait_for_task_status(|status| { + matches!( + status, + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::WaitingToRetry { + prev_attempt_err, + } + ) if prev_attempt_err.contains("status: 503") + ) + }) + .await; + + // Now sled-agent tells us we're not a scrimlet; we should transition back + // to `NotScrimlet`. + harness.handle.set_scrimlet_status(ScrimletStatus::NotScrimlet); + harness + .wait_for_task_status(|status| { + matches!( + status, + ScrimletReconcilersStatus::DeterminingSwitchSlot( + DetermineSwitchSlotStatus::NotScrimlet + ) + ) + }) + .await; + + // Reconfigure MGS to succeed. + error_mock.delete(); + let success_mock = harness.mock_mgs_set_switch_slot(0); + + // Become a scrimlet again - we should transition through `ContactingMgs` + // (with no previous error) then to `Running`. + harness.handle.set_scrimlet_status(ScrimletStatus::Scrimlet); + harness.wait_for_mock_to_be_called(&success_mock).await; + harness + .wait_for_task_status(|status| { + matches!(status, ScrimletReconcilersStatus::Running { .. }) + }) + .await; + + harness.teardown().await; +} diff --git a/sled-agent/scrimlet-reconcilers/src/lib.rs b/sled-agent/scrimlet-reconcilers/src/lib.rs new file mode 100644 index 00000000000..0a5de4a3fe8 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/lib.rs @@ -0,0 +1,67 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! This crate implements long-running reconciler tasks responsible for +//! configuration of services within a scrimlet's switch zone. +//! +//! These tasks _only_ talk to services on the same sled as the sled-agent +//! executing these tasks; we attempt to ensure this at runtime via types like +//! [`ThisSledSwitchZoneUnderlayIpAddr`]. A scrimlet running these tasks should +//! never attempt to talk to another scrimlet's switch zone, and a non-scrimlet +//! running these tasks should never attempt to talk to anything. (Non-scrimlet +//! sleds still create a [`ScrimletReconcilers`] handle, as sled-agent can't +//! easily tell the difference between "not a scrimlet because we're in a +//! different cubby" and "not a scrimlet because we should have a switch but it +//! isn't connected / isn't powered on / etc.", but the reconciliation tasks are +//! only spawned under conditions that only scrimlets can satisfy; see +//! [`ScrimletReconcilers`] for more detail.) +//! +//! These tasks are responsible for applying system-level networking, including: +//! +//! * Configuration of uplink ports within `dpd` +//! * Configuration of NAT entries for system-level services (boundary NTP, +//! Nexus, external DNS - notably _not_ instances) within `dpd` +//! * Configuration of BGP within `mgd` +//! * Configuration of BFD within `mgd` +//! * Configuration of static routes within `mgd` +//! * Configuration of SMF properties for `uplinkd` and `lldpd` +//! +//! The specific configuration that should be applied comes from Nexus (or RSS, +//! at rack setup time) and is sent to `sled-agent` via the bootstore. +//! +//! In the past, responsibility for this configuration was split: sled-agent was +//! responsible for applying an initial config on sled boot (required for cold +//! boot of the rack), and Nexus was responsible for continuously keeping the +//! config in sync afterwards. This had a variety of problems; see +//! . The split is now +//! that Nexus is responsible for maintaining what the configuration should be, +//! and each scrimlet is responsible for applying that configuration to its own +//! switch zone's services; the latter is implemented via this crate. +//! +//! [`ThisSledSwitchZoneUnderlayIpAddr`]: +//! sled_agent_types::sled::ThisSledSwitchZoneUnderlayIpAddr + +mod dpd_reconciler; +mod handle; +mod mgd_reconciler; +mod reconciler_task; +mod status; +mod switch_zone_slot; +mod uplinkd_reconciler; + +pub use dpd_reconciler::DpdReconcilerStatus; +pub use handle::ScrimletReconcilers; +pub use handle::ScrimletReconcilersMode; +pub use handle::SledAgentNetworkingInfo; +pub use mgd_reconciler::MgdReconcilerStatus; +pub use status::DetermineSwitchSlotStatus; +pub use status::ReconcilerActivationReason; +pub use status::ReconcilerCurrentStatus; +pub use status::ReconcilerInertReason; +pub use status::ReconcilerRunningStatus; +pub use status::ReconcilerStatus; +pub use status::ReconciliationCompletedStatus; +pub use status::ScrimletReconcilersStatus; +pub use status::ScrimletStatus; +pub use uplinkd_reconciler::UplinkdReconcilerStatus; diff --git a/sled-agent/scrimlet-reconcilers/src/mgd_reconciler.rs b/sled-agent/scrimlet-reconcilers/src/mgd_reconciler.rs new file mode 100644 index 00000000000..fac5d4f0dca --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/mgd_reconciler.rs @@ -0,0 +1,58 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Reconciler responsible for configuration of `mgd` within a scrimlet's switch +//! zone. + +use crate::ScrimletReconcilersMode; +use crate::reconciler_task::Reconciler; +use crate::switch_zone_slot::ThisSledSwitchSlot; +use mg_admin_client::Client; +use sled_agent_types::system_networking::SystemNetworkingConfig; +use slog::Logger; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct MgdReconcilerStatus { + pub todo_status: (), +} + +impl slog::KV for MgdReconcilerStatus { + fn serialize( + &self, + _record: &slog::Record<'_>, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_str("mgd-reconciler".into(), "not yet implemented") + } +} + +#[derive(Debug)] +pub(crate) struct MgdReconciler { + _client: Client, + _switch_slot: ThisSledSwitchSlot, +} + +impl Reconciler for MgdReconciler { + type Status = MgdReconcilerStatus; + + const LOGGER_COMPONENT_NAME: &'static str = "MgdReconciler"; + const RE_RECONCILE_INTERVAL: Duration = Duration::from_secs(30); + + fn new( + mode: ScrimletReconcilersMode, + switch_slot: ThisSledSwitchSlot, + parent_log: &Logger, + ) -> Self { + Self { _client: mode.mgd_client(parent_log), _switch_slot: switch_slot } + } + + async fn do_reconciliation( + &mut self, + _system_networking_config: &SystemNetworkingConfig, + _log: &Logger, + ) -> Self::Status { + MgdReconcilerStatus { todo_status: () } + } +} diff --git a/sled-agent/scrimlet-reconcilers/src/reconciler_task.rs b/sled-agent/scrimlet-reconcilers/src/reconciler_task.rs new file mode 100644 index 00000000000..988e4651a3b --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/reconciler_task.rs @@ -0,0 +1,315 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! The general framework for one of the service-specific reconciler tasks +//! implemented in this crate. +//! +//! Each reconciler follows the same structure: +//! +//! 1. If we ever stop being a scrimlet (i.e., the attached switch goes away), +//! go inert until we become a scrimlet again (i.e., the switch reappears). +//! This can happen during sidecar updates if it powers off briefly to reset +//! internal FPGAs, or in a variety of other less common and more rainy-day +//! situations. +//! 2. Periodically or when the `SystemNetworkingConfig` changes, perform +//! service-specific reconciliation. This is provided by implementors of the +//! [`Reconciler`] trait elsewhere in this crate. +//! 3. Report status of this task in an output watch channel, suitable for +//! reporting in the sled-agent inventory. +//! +//! [`ReconcilerTask::run()`] handles 1 and 3, and service-specific +//! implementations of [`Reconciler`] provide 2. + +use crate::handle::ScrimletReconcilersMode; +use crate::status::ReconcilerActivationReason; +use crate::status::ReconcilerCurrentStatus; +use crate::status::ReconcilerInertReason; +use crate::status::ReconcilerRunningStatus; +use crate::status::ReconcilerStatus; +use crate::status::ReconciliationCompletedStatus; +use crate::status::ScrimletStatus; +use crate::switch_zone_slot::ThisSledSwitchSlot; +use chrono::SecondsFormat; +use chrono::Utc; +use sled_agent_types::system_networking::SystemNetworkingConfig; +use slog::Logger; +use slog::error; +use slog::info; +use std::convert::Infallible; +use std::time::Duration; +use std::time::Instant; +use tokio::sync::watch; +use tokio::sync::watch::error::RecvError; +use tokio::task::JoinHandle; + +/// Trait that should be implemented by the service-specific reconciler tasks +/// elsewhere in this crate. +pub(crate) trait Reconciler: Send + 'static { + type Status: slog::KV + Clone + Send + Sync + 'static; + + const LOGGER_COMPONENT_NAME: &'static str; + const RE_RECONCILE_INTERVAL: Duration; + + /// Construct a new instance of this `Reconciler`. + /// + /// Typically builds a client for the relevant service based on `mode` and + /// record `switch_slot` for use inside future calls to + /// `do_reconciliation()`. + fn new( + mode: ScrimletReconcilersMode, + switch_slot: ThisSledSwitchSlot, + parent_log: &Logger, + ) -> Self; + + /// Perform any required reconciliation based on the current contents of + /// `system_networking_config`. + /// + /// This method is infallible; any errors must be described by + /// `Self::Status`. + fn do_reconciliation( + &mut self, + system_networking_config: &SystemNetworkingConfig, + log: &Logger, + ) -> impl Future + Send; +} + +#[derive(Debug)] +pub(crate) struct ReconcilerTaskHandle { + status_rx: watch::Receiver>, + + // We never wait on this task: the only way it exits is by panicking (which + // results in a process abort, since we build sled-agent with panic=abort) + // or if the watch channels it's reading are dropped, which itself can only + // happen by a panic elsewhere. + _task: JoinHandle<()>, +} + +impl ReconcilerTaskHandle { + pub(crate) fn spawn( + scrimlet_status_rx: watch::Receiver, + system_networking_config_rx: watch::Receiver, + mode: ScrimletReconcilersMode, + this_sled_switch_slot: ThisSledSwitchSlot, + parent_log: &Logger, + ) -> Self { + Self::spawn_impl( + scrimlet_status_rx, + system_networking_config_rx, + mode, + this_sled_switch_slot, + parent_log, + T::new, + ) + } + + // Separate, private function that allows unit tests to customize how `T` is + // constructed. Production passes `T::new` as `inner_constructor`; i.e., + // just call the constructor we know exists from the `Reconciler` trait. + fn spawn_impl( + scrimlet_status_rx: watch::Receiver, + system_networking_config_rx: watch::Receiver, + mode: ScrimletReconcilersMode, + this_sled_switch_slot: ThisSledSwitchSlot, + parent_log: &Logger, + inner_constructor: F, + ) -> Self + where + F: FnOnce(ScrimletReconcilersMode, ThisSledSwitchSlot, &Logger) -> T + + Send + + Sync + + 'static, + { + let (status_tx, status_rx) = watch::channel(ReconcilerStatus { + current_status: ReconcilerCurrentStatus::Idle, + last_completion: None, + }); + + let log = parent_log + .new(slog::o!("scrimlet_reconciler" => T::LOGGER_COMPONENT_NAME)); + + let mut inner_task = ReconcilerTask { + scrimlet_status_rx, + system_networking_config_rx, + status_tx, + inner: inner_constructor(mode, this_sled_switch_slot, parent_log), + log, + }; + + let task = tokio::spawn(async move { + match inner_task.run().await { + // `inner_task.run()` runs forever... + Ok(never_returns) => match never_returns {}, + + // ... unless one of its input watch channels has closed. + Err(_recv_error) => { + inner_task.status_tx.send_modify(|status| { + status.current_status = ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::TaskExitedUnexpectedly, + ); + }); + error!( + inner_task.log, + "exited due to watch channel closure \ + (unexpected except during shutdown in tests)" + ); + } + } + }); + + Self { status_rx, _task: task } + } + + pub(crate) fn status(&self) -> ReconcilerStatus { + self.status_rx.borrow().clone() + } +} + +struct ReconcilerTask { + scrimlet_status_rx: watch::Receiver, + system_networking_config_rx: watch::Receiver, + status_tx: watch::Sender>, + inner: T, + log: Logger, +} + +impl ReconcilerTask { + async fn run(&mut self) -> Result { + let mut activation_reason = ReconcilerActivationReason::Startup; + let mut activation_count: u64 = 0; + + loop { + // We know we _were_ a scrimlet at some point, because we determined + // our switch slot by contacting MGS within our own switch zone. But + // it's possible we could become "not a scrimlet" in the future + // (e.g., if the switch disappears out from under us). In such a + // case, block until it comes back. + self.wait_if_this_sled_is_no_longer_a_scrimlet().await?; + + // We _are_ a scrimlet; perform reconciliation. + let start_instant = Instant::now(); + let start_time = Utc::now(); + info!( + self.log, "starting reconciliation attempt"; + "activation_reason" => ?activation_reason, + "activation_count" => activation_count, + ); + + // Snapshot the current networking config so we hold the watch + // channel as little as possible. + let system_networking_config = + self.system_networking_config_rx.borrow_and_update().clone(); + let running_status = + ReconcilerRunningStatus::new(activation_reason); + + // Update our status. + self.status_tx.send_modify(|status| { + status.current_status = + ReconcilerCurrentStatus::Running(running_status); + }); + + // Actually perform reconciliation. + let status_result = self + .inner + .do_reconciliation(&system_networking_config, &self.log) + .await; + + // Update our output watch channel with the result. + info!( + self.log, "reconciliation attempt complete"; + "activation_reason" => ?activation_reason, + "activation_count" => activation_count, + "started_at" => start_time.to_rfc3339_opts( + SecondsFormat::Millis, + /* use_z */ true, + ), + "elapsed" => ?start_instant.elapsed(), + &status_result, + ); + self.status_tx.send_modify(|status| { + status.current_status = ReconcilerCurrentStatus::Idle; + status.last_completion = + Some(Box::new(ReconciliationCompletedStatus { + activation_reason, + completed_at_time: Utc::now(), + ran_for: running_status.elapsed_since_start(), + activation_count, + status: status_result, + })); + }); + activation_count = activation_count.wrapping_add(1); + + // Wait until we should perform reconciliation again: our + // re-reconciliation periodic timer fires or one of our input watch + // channels changes. + // + // All arms are cancel-safe and we do not `.await` within the body + // of any arm, avoiding any opportunity for futurelock. + activation_reason = tokio::select! { + () = tokio::time::sleep(T::RE_RECONCILE_INTERVAL) => { + ReconcilerActivationReason::PeriodicTimer + } + + result = self.system_networking_config_rx.changed() => { + () = result?; + ReconcilerActivationReason::SystemNetworkingConfigChanged + } + + result = self.scrimlet_status_rx.changed() => { + () = result?; + ReconcilerActivationReason::ScrimletStatusChanged + } + }; + } + } + + async fn wait_if_this_sled_is_no_longer_a_scrimlet( + &mut self, + ) -> Result<(), RecvError> { + let mut logged_not_scrimlet = false; + + loop { + let status = *self.scrimlet_status_rx.borrow_and_update(); + match status { + ScrimletStatus::Scrimlet => { + return Ok(()); + } + ScrimletStatus::NotScrimlet => { + if !logged_not_scrimlet { + info!( + self.log, + "not a scrimlet - reconciler going inert" + ); + logged_not_scrimlet = true; + } + self.status_tx.send_modify(|status| { + status.current_status = ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::NoLongerAScrimlet, + ); + }); + + // Select over both input channels so we can detect channel + // closure and exit cleanly if either channel goes away. If + // the rack network config changes here, we'll spuriously + // loop around and reread the scrimlet status, but that's no + // big deal. + // + // Both arms are cancel-safe and we do not `.await` within + // the body of any arm, avoiding any opportunity for + // futurelock. + tokio::select! { + result = self.scrimlet_status_rx.changed() => { + () = result?; + } + result = self.system_networking_config_rx.changed() => { + () = result?; + } + } + } + } + } + } +} + +#[cfg(test)] +mod tests; diff --git a/sled-agent/scrimlet-reconcilers/src/reconciler_task/tests.rs b/sled-agent/scrimlet-reconcilers/src/reconciler_task/tests.rs new file mode 100644 index 00000000000..b0cce407532 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/reconciler_task/tests.rs @@ -0,0 +1,745 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use super::*; +use assert_matches::assert_matches; +use sled_agent_types::early_networking::RackNetworkConfig; +use std::mem; +use std::sync::Arc; +use std::sync::Mutex; +use tokio::sync::mpsc; +use tokio::time::Instant; + +struct MockReconciler { + do_reconciliation_calls: Arc>>, + do_reconciliation_results: mpsc::UnboundedReceiver, +} + +#[derive(Debug, Clone)] +struct MockReconcilerStatus(String); + +impl slog::KV for MockReconcilerStatus { + fn serialize( + &self, + _record: &slog::Record<'_>, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_str("mock-reconciler".into(), &self.0) + } +} + +impl Reconciler for MockReconciler { + type Status = MockReconcilerStatus; + + const LOGGER_COMPONENT_NAME: &'static str = "MockReconciler"; + const RE_RECONCILE_INTERVAL: Duration = Duration::from_secs(30); + + fn new( + _mode: ScrimletReconcilersMode, + _switch_slot: ThisSledSwitchSlot, + _parent_log: &Logger, + ) -> Self { + unimplemented!("not called by tests") + } + + async fn do_reconciliation( + &mut self, + system_networking_config: &SystemNetworkingConfig, + _log: &Logger, + ) -> Self::Status { + self.do_reconciliation_calls + .lock() + .unwrap() + .push(system_networking_config.clone()); + MockReconcilerStatus( + self.do_reconciliation_results + .recv() + .await + .expect("test never closes sending side of channel"), + ) + } +} + +fn test_system_networking_config_1() -> SystemNetworkingConfig { + SystemNetworkingConfig { + rack_network_config: RackNetworkConfig { + rack_subnet: "fd00:1122:3344:0100::/56".parse().unwrap(), + infra_ip_first: "192.0.2.10".parse().unwrap(), + infra_ip_last: "192.0.2.100".parse().unwrap(), + ports: Vec::new(), + bgp: Vec::new(), + bfd: Vec::new(), + }, + blueprint_external_networking_config: None, + } +} + +fn test_system_networking_config_2() -> SystemNetworkingConfig { + SystemNetworkingConfig { + rack_network_config: RackNetworkConfig { + rack_subnet: "fd00:aabb:ccdd:0200::/56".parse().unwrap(), + infra_ip_first: "192.0.2.20".parse().unwrap(), + infra_ip_last: "192.0.2.200".parse().unwrap(), + ports: Vec::new(), + bgp: Vec::new(), + bfd: Vec::new(), + }, + blueprint_external_networking_config: None, + } +} + +struct Harness { + task: ReconcilerTaskHandle, + scrimlet_status_tx: watch::Sender, + system_networking_config_tx: watch::Sender, + do_reconciliation_results_tx: mpsc::UnboundedSender, + do_reconciliation_calls: Arc>>, +} + +impl Harness { + const WAIT_FOR_STATUS_CHECK_INTERVAL: Duration = Duration::from_millis(100); + + fn new(log: &Logger) -> Self { + let (scrimlet_status_tx, scrimlet_status_rx) = + watch::channel(ScrimletStatus::Scrimlet); + let (system_networking_config_tx, system_networking_config_rx) = + watch::channel(test_system_networking_config_1()); + + let (do_reconciliation_results_tx, do_reconciliation_results) = + mpsc::unbounded_channel(); + let do_reconciliation_calls = Arc::new(Mutex::new(Vec::new())); + + let task = { + let do_reconciliation_calls = Arc::clone(&do_reconciliation_calls); + let dummy_addr = "0.0.0.0:0".parse().unwrap(); + ReconcilerTaskHandle::spawn_impl( + scrimlet_status_rx, + system_networking_config_rx, + ScrimletReconcilersMode::Test { + mgs_addr: dummy_addr, + dpd_addr: dummy_addr, + mgd_addr: dummy_addr, + }, + ThisSledSwitchSlot::TEST_FAKE, + log, + |_ip, _slot, _log| MockReconciler { + do_reconciliation_calls, + do_reconciliation_results, + }, + ) + }; + + Self { + task, + scrimlet_status_tx, + system_networking_config_tx, + do_reconciliation_results_tx, + do_reconciliation_calls, + } + } + + fn set_scrimlet_status(&self, status: ScrimletStatus) { + self.scrimlet_status_tx.send_modify(|s| { + *s = status; + }); + } + + async fn wait_for_do_reconciliation_call_count(&self, count: usize) { + let mut last_seen = self.do_reconciliation_calls.lock().unwrap().len(); + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(5) { + if last_seen == count { + return; + } + tokio::time::sleep(Self::WAIT_FOR_STATUS_CHECK_INTERVAL).await; + last_seen = self.do_reconciliation_calls.lock().unwrap().len(); + } + panic!( + "timeout waiting for do_reconciliation call count {count} \ + (got {last_seen})" + ); + } + + async fn wait_for_task_status( + &self, + description: &str, + matches: F, + ) -> ReconcilerStatus + where + F: Fn(&ReconcilerCurrentStatus) -> bool, + { + let mut status = self.task.status(); + let start = Instant::now(); + while start.elapsed() < Duration::from_secs(5) { + if matches(&status.current_status) { + return status; + } + tokio::time::sleep(Self::WAIT_FOR_STATUS_CHECK_INTERVAL).await; + status = self.task.status(); + } + panic!( + "timeout waiting for task status {description} (got {status:?})" + ); + } + + async fn wait_for_task_status_no_longer_a_scrimlet( + &self, + ) -> ReconcilerStatus { + self.wait_for_task_status("Inert(NoLongerAScrimlet)", |status| { + matches!( + status, + ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::NoLongerAScrimlet + ) + ) + }) + .await + } + + async fn wait_for_task_status_idle( + &self, + ) -> ReconcilerStatus { + self.wait_for_task_status("Idle", |status| { + matches!(status, ReconcilerCurrentStatus::Idle) + }) + .await + } + + async fn shutdown_cleanly(self) { + let Self { + task, scrimlet_status_tx, do_reconciliation_results_tx, .. + } = self; + + // Dropping this watch channel should cause the task to exit. + mem::drop(scrimlet_status_tx); + + task._task.await.expect("task didn't panic"); + let final_status = task.status_rx.borrow(); + assert_matches!( + final_status.current_status, + ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::TaskExitedUnexpectedly + ) + ); + + // Explicitly drop this _after_ the task exits so we're guaranteed + // not to hit the `.expect()` in + // `MockReconciler::do_reconciliation()` above. + mem::drop(do_reconciliation_results_tx); + } +} + +// Test the first activation. +#[tokio::test(start_paused = true)] +async fn first_activation() { + let logctx = omicron_test_utils::dev::test_setup_log("first_activation"); + let harness = Harness::new(&logctx.log); + + // Confirm we start in Idle. + assert_matches!( + harness.task.status().current_status, + ReconcilerCurrentStatus::Idle + ); + + // Task should call do_reconciliation() for the first time. + harness.wait_for_do_reconciliation_call_count(1).await; + harness.do_reconciliation_results_tx.send("first".to_string()).unwrap(); + let status = harness.wait_for_task_status_idle().await; + + let completion = + status.last_completion.expect("last_completion should be preserved"); + assert_eq!(completion.activation_count, 0); + assert_eq!(completion.status.0, "first"); + assert_matches!( + completion.activation_reason, + ReconcilerActivationReason::Startup + ); + + harness.shutdown_cleanly().await; + logctx.cleanup_successful(); +} + +// Test: after completing a reconciliation and reaching the select!, +// setting NotScrimlet causes the task to loop back to +// wait_if_this_sled_is_no_longer_a_scrimlet and go inert — without +// performing another reconciliation. +#[tokio::test(start_paused = true)] +async fn scrimlet_becomes_not_scrimlet_during_select() { + let logctx = omicron_test_utils::dev::test_setup_log( + "scrimlet_becomes_not_scrimlet_during_select", + ); + let harness = Harness::new(&logctx.log); + + // Complete the first reconciliation so the task reaches the select!. + harness.wait_for_do_reconciliation_call_count(1).await; + harness.do_reconciliation_results_tx.send("first".to_string()).unwrap(); + harness.wait_for_task_status_idle().await; + + // Become NotScrimlet → the select! fires with ScrimletStatusChanged, + // the loop goes back to wait_if_this_sled_is_no_longer_a_scrimlet, and the + // task becomes Inert(NoLongerAScrimlet). + harness.set_scrimlet_status(ScrimletStatus::NotScrimlet); + let status = harness.wait_for_task_status_no_longer_a_scrimlet().await; + + // No additional do_reconciliation call should have happened: the + // ScrimletStatusChanged activation saw NotScrimlet and went inert + // instead of reconciling. + assert_eq!(harness.do_reconciliation_calls.lock().unwrap().len(), 1); + + // last_completion from the prior run should still be present. + let completion = + status.last_completion.expect("last_completion should be preserved"); + assert_eq!(completion.activation_count, 0); + assert_eq!(completion.status.0, "first"); + + harness.shutdown_cleanly().await; + logctx.cleanup_successful(); +} + +// Test: after the first reconciliation completes, changing the +// SystemNetworkingConfig triggers a second reconciliation with +// activation_reason = SystemNetworkingConfigChanged and the new config. +#[tokio::test(start_paused = true)] +async fn system_networking_config_change_triggers_re_reconciliation() { + let logctx = omicron_test_utils::dev::test_setup_log( + "system_networking_config_change_triggers_re_reconciliation", + ); + let harness = Harness::new(&logctx.log); + + // Wait for the first do_reconciliation call (Startup). + harness.wait_for_do_reconciliation_call_count(1).await; + + // Complete the first reconciliation. + harness.do_reconciliation_results_tx.send("first".to_string()).unwrap(); + harness.wait_for_task_status_idle().await; + + // Send a new SystemNetworkingConfig. + let first_config = test_system_networking_config_1(); + let second_config = test_system_networking_config_2(); + assert_ne!(first_config, second_config); + harness.system_networking_config_tx.send(second_config.clone()).unwrap(); + + // Wait for the second do_reconciliation call. + harness.wait_for_do_reconciliation_call_count(2).await; + + // The second call should have received the new config. + let received_configs = + harness.do_reconciliation_calls.lock().unwrap().clone(); + assert_eq!(received_configs.len(), 2); + assert_eq!(received_configs[0], first_config); + assert_eq!(received_configs[1], second_config); + + // Status should be Running with SystemNetworkingConfigChanged. + let status = harness.task.status(); + match &status.current_status { + ReconcilerCurrentStatus::Running(running) => { + assert_matches!( + running.activation_reason(), + ReconcilerActivationReason::SystemNetworkingConfigChanged + ); + } + other => panic!("expected Running status, got {other:?}"), + } + + // Complete the second reconciliation. + harness.do_reconciliation_results_tx.send("second".to_string()).unwrap(); + let status = harness.wait_for_task_status_idle().await; + + let completion = + status.last_completion.expect("should have last_completion"); + assert_matches!( + completion.activation_reason, + ReconcilerActivationReason::SystemNetworkingConfigChanged + ); + assert_eq!(completion.activation_count, 1); + assert_eq!(completion.status.0, "second"); + + harness.shutdown_cleanly().await; + logctx.cleanup_successful(); +} + +// Test: after the first reconciliation completes, the periodic timer +// fires after RE_RECONCILE_INTERVAL and triggers a second +// reconciliation with activation_reason = PeriodicTimer. +#[tokio::test(start_paused = true)] +async fn periodic_timer_triggers_re_reconciliation() { + let logctx = omicron_test_utils::dev::test_setup_log( + "periodic_timer_triggers_re_reconciliation", + ); + let harness = Harness::new(&logctx.log); + + // Complete the first reconciliation (Startup). + harness.wait_for_do_reconciliation_call_count(1).await; + harness.do_reconciliation_results_tx.send("first".to_string()).unwrap(); + harness.wait_for_task_status_idle().await; + + // Advance time just short of the interval — no second call yet. + tokio::time::advance( + MockReconciler::RE_RECONCILE_INTERVAL - Duration::from_millis(1), + ) + .await; + assert_eq!(harness.do_reconciliation_calls.lock().unwrap().len(), 1); + + // Advance past the interval — periodic timer fires. + tokio::time::advance(Duration::from_millis(1)).await; + harness.wait_for_do_reconciliation_call_count(2).await; + + // Status should be Running with PeriodicTimer. + let status = harness.task.status(); + match &status.current_status { + ReconcilerCurrentStatus::Running(running) => { + assert_matches!( + running.activation_reason(), + ReconcilerActivationReason::PeriodicTimer + ); + } + other => panic!("expected Running status, got {other:?}"), + } + + // Complete and verify. + harness.do_reconciliation_results_tx.send("periodic".to_string()).unwrap(); + let status = harness.wait_for_task_status_idle().await; + let completion = + status.last_completion.expect("should have last_completion"); + assert_matches!( + completion.activation_reason, + ReconcilerActivationReason::PeriodicTimer + ); + assert_eq!(completion.activation_count, 1); + assert_eq!(completion.status.0, "periodic"); + + harness.shutdown_cleanly().await; + logctx.cleanup_successful(); +} + +// Test: if the SystemNetworkingConfig changes while do_reconciliation is +// in-flight, the task should notice when it reaches the select! and +// immediately perform another reconciliation with +// activation_reason = SystemNetworkingConfigChanged using the latest config. +#[tokio::test(start_paused = true)] +async fn config_change_during_inflight_reconciliation() { + let logctx = omicron_test_utils::dev::test_setup_log( + "config_change_during_inflight_reconciliation", + ); + let harness = Harness::new(&logctx.log); + + // Wait for the first do_reconciliation call (Startup) to be entered. + harness.wait_for_do_reconciliation_call_count(1).await; + + // While the first reconciliation is still in-flight, change the + // config. The task won't see this until it finishes and hits the + // select!. + harness + .system_networking_config_tx + .send(test_system_networking_config_2()) + .unwrap(); + + // Complete the first reconciliation. + harness.do_reconciliation_results_tx.send("first".to_string()).unwrap(); + + // The task should immediately start a second reconciliation because + // system_networking_config_rx.changed() fires in the select!. Because we + // have time paused, the elapsed time should be exactly one check interval + // of our test harness. + let before = tokio::time::Instant::now(); + harness.wait_for_do_reconciliation_call_count(2).await; + assert_eq!(before.elapsed(), Harness::WAIT_FOR_STATUS_CHECK_INTERVAL); + + // The second call should have received the new config (via + // borrow_and_update()). + let received_configs = + harness.do_reconciliation_calls.lock().unwrap().clone(); + assert_eq!(received_configs[0], test_system_networking_config_1()); + assert_eq!(received_configs[1], test_system_networking_config_2()); + + // Status should be Running with SystemNetworkingConfigChanged. + let status = harness.task.status(); + match &status.current_status { + ReconcilerCurrentStatus::Running(running) => { + assert_matches!( + running.activation_reason(), + ReconcilerActivationReason::SystemNetworkingConfigChanged + ); + } + other => panic!("expected Running status, got {other:?}"), + } + + // Complete the second reconciliation and verify. + harness.do_reconciliation_results_tx.send("second".to_string()).unwrap(); + let status = harness.wait_for_task_status_idle().await; + let completion = + status.last_completion.expect("should have last_completion"); + assert_matches!( + completion.activation_reason, + ReconcilerActivationReason::SystemNetworkingConfigChanged + ); + assert_eq!(completion.activation_count, 1); + assert_eq!(completion.status.0, "second"); + + harness.shutdown_cleanly().await; + logctx.cleanup_successful(); +} + +// Test: full scrimlet status round-trip. Start as scrimlet, complete +// reconciliation #0 (Startup). Set NotScrimlet → task goes inert. Set +// Scrimlet again → reconciliation #1 fires with activation_reason = +// ScrimletStatusChanged and activation_count = 1. +#[tokio::test(start_paused = true)] +async fn scrimlet_status_round_trip() { + let logctx = + omicron_test_utils::dev::test_setup_log("scrimlet_status_round_trip"); + let harness = Harness::new(&logctx.log); + + // First reconciliation (Startup). + harness.wait_for_do_reconciliation_call_count(1).await; + harness.do_reconciliation_results_tx.send("first".to_string()).unwrap(); + let status = harness.wait_for_task_status_idle().await; + let completion = + status.last_completion.expect("should have last_completion"); + assert_matches!( + completion.activation_reason, + ReconcilerActivationReason::Startup + ); + assert_eq!(completion.activation_count, 0); + + // Become NotScrimlet → task should go inert. + harness.set_scrimlet_status(ScrimletStatus::NotScrimlet); + harness.wait_for_task_status_no_longer_a_scrimlet().await; + + // No additional do_reconciliation call should have happened. + assert_eq!(harness.do_reconciliation_calls.lock().unwrap().len(), 1); + + // Become Scrimlet again → reconciliation #1 fires. + harness.set_scrimlet_status(ScrimletStatus::Scrimlet); + harness.wait_for_do_reconciliation_call_count(2).await; + + // Status should be Running with ScrimletStatusChanged. + let status = harness.task.status(); + match &status.current_status { + ReconcilerCurrentStatus::Running(running) => { + assert_matches!( + running.activation_reason(), + ReconcilerActivationReason::ScrimletStatusChanged + ); + } + other => panic!("expected Running status, got {other:?}"), + } + + // Complete the second reconciliation and check last_completion. + harness.do_reconciliation_results_tx.send("second".to_string()).unwrap(); + let status = harness.wait_for_task_status_idle().await; + let completion = + status.last_completion.expect("should have last_completion"); + assert_matches!( + completion.activation_reason, + ReconcilerActivationReason::ScrimletStatusChanged + ); + assert_eq!(completion.activation_count, 1); + assert_eq!(completion.status.0, "second"); + + harness.shutdown_cleanly().await; + logctx.cleanup_successful(); +} + +// Test: dropping the system_networking_config sender while the task is in +// the select! (after completing a reconciliation) causes the task to +// exit with TaskExitedUnexpectedly. +#[tokio::test(start_paused = true)] +async fn channel_closure_system_networking_config_during_select() { + let logctx = omicron_test_utils::dev::test_setup_log( + "channel_closure_system_networking_config_during_select", + ); + let harness = Harness::new(&logctx.log); + + // Complete one reconciliation so the task reaches the select!. + harness.wait_for_do_reconciliation_call_count(1).await; + harness.do_reconciliation_results_tx.send("done".to_string()).unwrap(); + harness.wait_for_task_status_idle().await; + + // Drop the system_networking_config sender. This closes the watch channel, + // which causes the `system_networking_config_rx.changed()` arm in the + // select! to return Err(RecvError), causing the task to exit. + let Harness { + task, + scrimlet_status_tx, + system_networking_config_tx, + do_reconciliation_results_tx, + do_reconciliation_calls, + } = harness; + + mem::drop(system_networking_config_tx); + + // Wait for the task to exit and verify the final status. + task._task.await.expect("task didn't panic"); + let final_status = task.status_rx.borrow(); + assert_matches!( + final_status.current_status, + ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::TaskExitedUnexpectedly + ) + ); + + // do_reconciliation should have been called exactly once (the initial + // Startup reconciliation); no second call after the channel closed. + assert_eq!(do_reconciliation_calls.lock().unwrap().len(), 1); + + mem::drop(scrimlet_status_tx); + mem::drop(do_reconciliation_results_tx); + + logctx.cleanup_successful(); +} + +// Test: dropping the scrimlet_status sender while the task is in the +// select! (after completing a reconciliation) causes the task to exit +// with TaskExitedUnexpectedly. +#[tokio::test(start_paused = true)] +async fn channel_closure_scrimlet_status_during_select() { + let logctx = omicron_test_utils::dev::test_setup_log( + "channel_closure_scrimlet_status_during_select", + ); + let harness = Harness::new(&logctx.log); + + // Complete one reconciliation so the task reaches the select!. + harness.wait_for_do_reconciliation_call_count(1).await; + harness.do_reconciliation_results_tx.send("done".to_string()).unwrap(); + harness.wait_for_task_status_idle().await; + + // Destructure the harness so we can drop the scrimlet_status sender + // while still holding the other pieces we need. + let Harness { + task, + scrimlet_status_tx, + do_reconciliation_results_tx, + do_reconciliation_calls, + .. + } = harness; + + // Drop the scrimlet_status sender. This closes the watch channel, + // which causes the `scrimlet_status_rx.changed()` arm in the + // select! to return Err(RecvError), causing the task to exit. + mem::drop(scrimlet_status_tx); + + // Wait for the task to exit and verify the final status. + task._task.await.expect("task didn't panic"); + let final_status = task.status_rx.borrow(); + assert_matches!( + final_status.current_status, + ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::TaskExitedUnexpectedly + ) + ); + + // do_reconciliation should have been called exactly once (the initial + // Startup reconciliation); no second call after the channel closed. + assert_eq!(do_reconciliation_calls.lock().unwrap().len(), 1); + + // Explicitly drop after the task exits so we can't hit the .expect() + // in MockReconciler::do_reconciliation(). + mem::drop(do_reconciliation_results_tx); + + logctx.cleanup_successful(); +} + +// Test: dropping the scrimlet_status sender while the task is blocked in +// wait_if_this_sled_is_no_longer_a_scrimlet() waiting for the switch slot +// causes the task to exit with TaskExitedUnexpectedly. +#[tokio::test(start_paused = true)] +async fn channel_closure_scrimlet_status_during_not_scrimlet() { + let logctx = omicron_test_utils::dev::test_setup_log( + "channel_closure_scrimlet_status_during_not_scrimlet", + ); + let harness = Harness::new(&logctx.log); + + // Set ourselves as "not a scrimlet". + harness.set_scrimlet_status(ScrimletStatus::NotScrimlet); + + // Wait for the task to reach Inert(NoLongerAScrimlet). + harness.wait_for_task_status_no_longer_a_scrimlet().await; + + // Destructure the harness so we can drop the scrimlet_status sender + // while still holding the other pieces we need. + let Harness { + task, + scrimlet_status_tx, + system_networking_config_tx, + do_reconciliation_results_tx, + do_reconciliation_calls, + } = harness; + + // Drop the scrimlet_status sender. This closes the watch channel, + // which causes scrimlet_status_rx.changed().await to return + // Err(RecvError) inside wait_if_this_sled_is_no_longer_a_scrimlet(), + // causing the task to exit. + mem::drop(scrimlet_status_tx); + + // Wait for the task to exit and verify the final status. + task._task.await.expect("task didn't panic"); + let final_status = task.status_rx.borrow(); + assert_matches!( + final_status.current_status, + ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::TaskExitedUnexpectedly + ) + ); + + // do_reconciliation() should never have been called. + assert_eq!(do_reconciliation_calls.lock().unwrap().len(), 0); + + // Explicitly drop after the task exits. + mem::drop(do_reconciliation_results_tx); + mem::drop(system_networking_config_tx); + + logctx.cleanup_successful(); +} + +// Test: dropping the system_networking_config sender while the task is blocked +// in wait_if_this_sled_is_no_longer_a_scrimlet() causes the task to exit with +// TaskExitedUnexpectedly. +#[tokio::test(start_paused = true)] +async fn channel_closure_system_networking_config_during_not_scrimlet() { + let logctx = omicron_test_utils::dev::test_setup_log( + "channel_closure_system_networking_config_during_not_scrimlet", + ); + let harness = Harness::new(&logctx.log); + + // Set ourselves as "not a scrimlet". + harness.set_scrimlet_status(ScrimletStatus::NotScrimlet); + + // Wait for the task to reach Inert(NoLongerAScrimlet). + harness.wait_for_task_status_no_longer_a_scrimlet().await; + + // Destructure the harness so we can drop the system_networking_config + // sender while still holding the other pieces we need. + let Harness { + task, + scrimlet_status_tx, + system_networking_config_tx, + do_reconciliation_results_tx, + do_reconciliation_calls, + } = harness; + + // Drop the system_networking_config sender. The task is currently blocked + // waiting to become a scrimlet again, but also monitors this channel for + // closure; it should notice and exit. + mem::drop(system_networking_config_tx); + + // Wait for the task to exit and verify the final status. + task._task.await.expect("task didn't panic"); + let final_status = task.status_rx.borrow(); + assert_matches!( + final_status.current_status, + ReconcilerCurrentStatus::Inert( + ReconcilerInertReason::TaskExitedUnexpectedly + ) + ); + + // do_reconciliation() should never have been called. + assert_eq!(do_reconciliation_calls.lock().unwrap().len(), 0); + + // Explicitly drop after the task exits. + mem::drop(do_reconciliation_results_tx); + mem::drop(scrimlet_status_tx); + + logctx.cleanup_successful(); +} diff --git a/sled-agent/scrimlet-reconcilers/src/status.rs b/sled-agent/scrimlet-reconcilers/src/status.rs new file mode 100644 index 00000000000..d433745b4d6 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/status.rs @@ -0,0 +1,141 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Types for the status and results of the reconcilers in this crate. + +use crate::DpdReconcilerStatus; +use crate::MgdReconcilerStatus; +use crate::UplinkdReconcilerStatus; +use chrono::DateTime; +use chrono::Utc; +use std::time::Duration; +use std::time::Instant; + +/// Whether or not this sled is a scrimlet. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ScrimletStatus { + Scrimlet, + NotScrimlet, +} + +/// Status of attempting to determine this sled's switch slot via MGS within +/// this sled's switch zone. +#[derive(Debug, Clone)] +pub enum DetermineSwitchSlotStatus { + /// We're not attempting to contact MGS because we're not a scrimlet. + NotScrimlet, + + /// We're currently attempting to contact MGS. + /// + /// If this is not the first attempt, `prev_attempt_err` contains the error + /// we encountered the last time. (If the last time succeeded, we'd be + /// done!) + ContactingMgs { prev_attempt_err: Option }, + + /// We're currently idle waiting for a timeout to retry due to a previous + /// failure. + WaitingToRetry { prev_attempt_err: String }, +} + +/// Why a reconciler task has gone inert. +#[derive(Debug, Clone, Copy)] +pub enum ReconcilerInertReason { + /// The reconciler task started when this sled was a scrimlet, but it has + /// since become "not a scrimlet" (e.g., because the attached switch has + /// gone away). + NoLongerAScrimlet, + + /// The reconciler task exited. This is not expected except in tests; the + /// task runs forever as long as sled-agent holds on to the channels used to + /// communicate with it. + TaskExitedUnexpectedly, +} + +/// Why a reconciler task was activated. +#[derive(Debug, Clone, Copy)] +pub enum ReconcilerActivationReason { + /// Each reconciler runs once on startup. + Startup, + /// The task was activated due to its periodic timer firing. + PeriodicTimer, + /// The task was activated in response to a change in the networking config. + SystemNetworkingConfigChanged, + /// The task was activated in response to the sled becoming a scrimlet again + /// (after previously transitioning to "not a scrimlet"). + ScrimletStatusChanged, +} + +#[derive(Debug, Clone)] +pub struct ReconciliationCompletedStatus { + pub activation_reason: ReconcilerActivationReason, + pub completed_at_time: DateTime, + pub ran_for: Duration, + pub activation_count: u64, + pub status: T, +} + +#[derive(Debug, Clone, Copy)] +pub struct ReconcilerRunningStatus { + activation_reason: ReconcilerActivationReason, + started_at_time: DateTime, + started_at_instant: Instant, +} + +impl ReconcilerRunningStatus { + pub(crate) fn new(activation_reason: ReconcilerActivationReason) -> Self { + Self { + activation_reason, + started_at_time: Utc::now(), + started_at_instant: Instant::now(), + } + } + + pub fn activation_reason(&self) -> ReconcilerActivationReason { + self.activation_reason + } + + pub fn started_at(&self) -> DateTime { + self.started_at_time + } + + pub fn elapsed_since_start(&self) -> Duration { + self.started_at_instant.elapsed() + } +} + +#[derive(Debug, Clone)] +pub enum ReconcilerCurrentStatus { + /// The reconciler is inert: it will not or cannot run for some reason. + Inert(ReconcilerInertReason), + /// The reconciler is currently running. + Running(ReconcilerRunningStatus), + /// The reconciler is not currently running. + Idle, +} + +#[derive(Debug, Clone)] +pub struct ReconcilerStatus { + /// Status of the task at this moment. + pub current_status: ReconcilerCurrentStatus, + /// Final status of the most recent activation of this task. + // Box the inner status to avoid clippy complaining about + // `ScrimletReconcilersStatus::Running { ... }` being overly large. + pub last_completion: Option>>, +} + +#[derive(Debug, Clone)] +pub enum ScrimletReconcilersStatus { + /// `sled-agent` has not yet provided underlay networking information. + WaitingForSledAgentNetworkingInfo, + + /// We're attempting to determine our switch slot. + DeterminingSwitchSlot(DetermineSwitchSlotStatus), + + /// We are a scrimlet and the individual reconcilers are running. + Running { + dpd_reconciler: ReconcilerStatus, + mgd_reconciler: ReconcilerStatus, + uplinkd_reconciler: ReconcilerStatus, + }, +} diff --git a/sled-agent/scrimlet-reconcilers/src/switch_zone_slot.rs b/sled-agent/scrimlet-reconcilers/src/switch_zone_slot.rs new file mode 100644 index 00000000000..d802446fce4 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/switch_zone_slot.rs @@ -0,0 +1,153 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use crate::DetermineSwitchSlotStatus; +use crate::ScrimletStatus; +use gateway_client::Client; +use gateway_client::ClientInfo; +use gateway_types::component::SpType; +use sled_agent_types::early_networking::SwitchSlot; +use slog::Logger; +use slog::error; +use slog::warn; +use slog_error_chain::InlineErrorChain; +use std::time::Duration; +use tokio::sync::watch; +use tokio::sync::watch::error::RecvError; + +/// Newtype wrapper around [`SwitchSlot`]. This type is always the physical slot +/// of our own, local switch. +/// +/// This information can only be determined by asking MGS inside our own switch +/// zone. An instance of this type can only be created if we are indeed a +/// scrimlet. +#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, PartialOrd, Ord)] +pub(crate) struct ThisSledSwitchSlot(SwitchSlot); + +impl PartialEq for ThisSledSwitchSlot { + fn eq(&self, other: &SwitchSlot) -> bool { + self.0 == *other + } +} + +impl PartialEq for SwitchSlot { + fn eq(&self, other: &ThisSledSwitchSlot) -> bool { + *self == other.0 + } +} + +impl ThisSledSwitchSlot { + const MGS_RETRY_TIMEOUT: Duration = Duration::from_secs(5); + + #[cfg(test)] + pub(crate) const TEST_FAKE: Self = Self(SwitchSlot::Switch0); + + /// Attempt to determine this sled's switch slot via `client`, which _must_ + /// be an MGS client pointed at the IP address of our switch zone. (We take + /// this as a `Client` instead of a more strict type to allow tests to call + /// this function with a `client` pointed at a non-switch-zone address; the + /// function is `pub(crate)` and we expect callers to respect this + /// requirement in non-test paths.) + /// + /// This function blocks until it either succeeds or the + /// `scrimlet_status_rx` channel is closed. It will retry indefinitely on + /// any failures to communicate via `client`. + pub(crate) async fn determine_retrying_forever( + determine_status_tx: watch::Sender, + scrimlet_status_rx: &mut watch::Receiver, + client: &Client, + log: &Logger, + ) -> Result { + loop { + // Wait until we become a scrimlet; there's no point in trying to + // contact our switch zone if it doesn't exist. + loop { + let scrimlet_status = *scrimlet_status_rx.borrow_and_update(); + match scrimlet_status { + ScrimletStatus::Scrimlet => break, + ScrimletStatus::NotScrimlet => { + determine_status_tx.send_modify(|status| { + *status = DetermineSwitchSlotStatus::NotScrimlet; + }); + scrimlet_status_rx.changed().await?; + continue; + } + } + } + + // Update to our status to `ContactingMgs`, and carry forward any + // error from a previous attempt. + determine_status_tx.send_if_modified(|status| match status { + DetermineSwitchSlotStatus::ContactingMgs { .. } => false, + DetermineSwitchSlotStatus::NotScrimlet => { + *status = DetermineSwitchSlotStatus::ContactingMgs { + prev_attempt_err: None, + }; + true + } + DetermineSwitchSlotStatus::WaitingToRetry { + prev_attempt_err, + } => { + *status = DetermineSwitchSlotStatus::ContactingMgs { + prev_attempt_err: Some(prev_attempt_err.clone()), + }; + true + } + }); + + // We are a scrimlet - see if we know our own slot yet. + let err = match client + .sp_local_switch_id() + .await + .map(|resp| resp.into_inner()) + { + Ok(identity) => match (identity.type_, identity.slot) { + (SpType::Switch, 0) => { + return Ok(ThisSledSwitchSlot(SwitchSlot::Switch0)); + } + (SpType::Switch, 1) => { + return Ok(ThisSledSwitchSlot(SwitchSlot::Switch1)); + } + (sp_type, sp_slot) => { + // We should never get any other response; if we do, + // something has gone very wrong with MGS. It's not + // likely retrying will fix this, but there isn't + // anything else we can do. + error!( + log, + "failed to determine this sled's switch slot: got \ + unexpected identity; will retry"; + "sp_type" => ?sp_type, + "sp_slot" => sp_slot, + ); + format!( + "received invalid SP type/slot combo from MGS {}: \ + {sp_type:?}/{sp_slot}", + client.baseurl() + ) + } + }, + Err(err) => { + let err = InlineErrorChain::new(&err); + warn!( + log, + "failed to determine this sled's switch slot; \ + will retry"; + &err, + ); + err.to_string() + } + }; + + determine_status_tx.send_modify(|status| { + *status = DetermineSwitchSlotStatus::WaitingToRetry { + prev_attempt_err: err, + }; + }); + + // Sleep briefly before retrying. + tokio::time::sleep(Self::MGS_RETRY_TIMEOUT).await; + } + } +} diff --git a/sled-agent/scrimlet-reconcilers/src/uplinkd_reconciler.rs b/sled-agent/scrimlet-reconcilers/src/uplinkd_reconciler.rs new file mode 100644 index 00000000000..4c3fe678e12 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/uplinkd_reconciler.rs @@ -0,0 +1,60 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Reconciler for configuration of `uplinkd` within a scrimlet's switch zone. +//! +//! Unlike most reconcilers in this crate, `uplinkd`'s configuration is managed +//! via SMF, not a dropshot server. + +use crate::ScrimletReconcilersMode; +use crate::reconciler_task::Reconciler; +use crate::switch_zone_slot::ThisSledSwitchSlot; +use sled_agent_types::system_networking::SystemNetworkingConfig; +use slog::Logger; +use std::time::Duration; + +#[derive(Debug, Clone)] +pub struct UplinkdReconcilerStatus { + pub todo_status: (), +} + +impl slog::KV for UplinkdReconcilerStatus { + fn serialize( + &self, + _record: &slog::Record<'_>, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + serializer.emit_str("uplinkd-reconciler".into(), "not yet implemented") + } +} + +#[derive(Debug)] +pub(crate) struct UplinkdReconciler { + _switch_slot: ThisSledSwitchSlot, +} + +impl Reconciler for UplinkdReconciler { + type Status = UplinkdReconcilerStatus; + + const LOGGER_COMPONENT_NAME: &'static str = "UplinkdReconciler"; + const RE_RECONCILE_INTERVAL: std::time::Duration = Duration::from_secs(30); + + fn new( + _mode: ScrimletReconcilersMode, + switch_slot: ThisSledSwitchSlot, + _parent_log: &Logger, + ) -> Self { + // TODO: Remain inert if `mode` is `ScrimletReconcilersMode::Test`, + // since that indicates there's no real zone to connect to. + Self { _switch_slot: switch_slot } + } + + async fn do_reconciliation( + &mut self, + _system_networking_config: &SystemNetworkingConfig, + _log: &Logger, + ) -> Self::Status { + UplinkdReconcilerStatus { todo_status: () } + } +} diff --git a/workspace-hack/Cargo.toml b/workspace-hack/Cargo.toml index a2e3bbe507f..64ff04d4ad6 100644 --- a/workspace-hack/Cargo.toml +++ b/workspace-hack/Cargo.toml @@ -102,7 +102,7 @@ postgres-types = { version = "0.2.12", default-features = false, features = ["wi ppv-lite86 = { version = "0.2.21", default-features = false, features = ["simd", "std"] } predicates = { version = "3.1.4" } proc-macro2 = { version = "1.0.106" } -proptest = { version = "1.10.0" } +proptest = { version = "1.11.0" } quote = { version = "1.0.45" } rand-274715c4dabd11b0 = { package = "rand", version = "0.9.2" } rand-c38e5c1d305a1b54 = { package = "rand", version = "0.8.6" } @@ -250,7 +250,7 @@ postgres-types = { version = "0.2.12", default-features = false, features = ["wi ppv-lite86 = { version = "0.2.21", default-features = false, features = ["simd", "std"] } predicates = { version = "3.1.4" } proc-macro2 = { version = "1.0.106" } -proptest = { version = "1.10.0" } +proptest = { version = "1.11.0" } quote = { version = "1.0.45" } rand-274715c4dabd11b0 = { package = "rand", version = "0.9.2" } rand-c38e5c1d305a1b54 = { package = "rand", version = "0.8.6" } From 996069239d018989cb195449f733a456e8f4c8bd Mon Sep 17 00:00:00 2001 From: John Gallagher Date: Thu, 14 May 2026 10:50:27 -0400 Subject: [PATCH 2/2] scrimlet-reconcilers: flesh out dpd service NAT reconciler (#10442) This is almost all new code (and about 60/40 "real code" / "tests"). Today NAT entry reconciliation is inverted from what this PR implements: dpd fetches NAT entries from Nexus and does reconciliation internally. In this PR, sled-agent does the reconciliation. This requires fetching _all_ NAT entries from dpd (to decide what to remove, if anything). My biggest concerns here are: 1. Is it correct? 2. Can we ship this, or do we need to block on https://github.com/oxidecomputer/dendrite/issues/255 or something like it? This is targeted at the feature / integration branch that's building up the complete `sled-agent-scrimlet-reconcilers` crate. --- Cargo.lock | 7 + gateway-test-utils/src/setup.rs | 6 +- .../execution/src/test_utils.rs | 3 +- nexus/src/app/sagas/instance_start.rs | 4 +- nexus/src/app/sagas/instance_update/mod.rs | 12 +- nexus/test-utils/src/nexus_test.rs | 8 +- nexus/test-utils/src/starter.rs | 9 +- sled-agent/scrimlet-reconcilers/Cargo.toml | 8 + .../src/dpd_reconciler.rs | 40 +- .../src/dpd_reconciler/nat.rs | 647 ++++++++++++++++++ .../src/dpd_reconciler/nat/tests.rs | 411 +++++++++++ .../scrimlet-reconcilers/src/handle/tests.rs | 6 +- sled-agent/scrimlet-reconcilers/src/lib.rs | 2 + .../system_networking.rs | 28 +- .../versions/src/impls/system_networking.rs | 16 + test-utils/src/dev/dendrite.rs | 6 +- 16 files changed, 1174 insertions(+), 39 deletions(-) create mode 100644 sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat.rs create mode 100644 sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat/tests.rs diff --git a/Cargo.lock b/Cargo.lock index 1e7f21047de..9edd61e5495 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -13695,24 +13695,31 @@ dependencies = [ name = "sled-agent-scrimlet-reconcilers" version = "0.1.0" dependencies = [ + "anyhow", "assert_matches", "chrono", + "daft", "dpd-client 0.1.0 (git+https://github.com/oxidecomputer/dendrite?rev=187aee7de2e50f907099ea06c04aac96c3455665)", "dropshot 0.17.0", + "futures", "gateway-client", "gateway-messages", "gateway-test-utils", "gateway-types", "httpmock", + "macaddr", "mg-admin-client", "omicron-common", "omicron-test-utils", + "omicron-uuid-kinds", "omicron-workspace-hack", + "proptest", "reqwest 0.13.2", "serde_json", "sled-agent-types", "slog", "slog-error-chain", + "test-strategy", "thiserror 2.0.18", "tokio", ] diff --git a/gateway-test-utils/src/setup.rs b/gateway-test-utils/src/setup.rs index ab5a1dc0fd9..e51f8b83d66 100644 --- a/gateway-test-utils/src/setup.rs +++ b/gateway-test-utils/src/setup.rs @@ -38,7 +38,7 @@ pub const DEFAULT_SP_SIM_CONFIG: &str = pub struct GatewayTestContext { pub client: gateway_client::Client, pub server: omicron_gateway::Server, - pub port: u16, + port: u16, pub simrack: SimRack, pub logctx: LogContext, pub gateway_id: Uuid, @@ -47,6 +47,10 @@ pub struct GatewayTestContext { } impl GatewayTestContext { + pub fn address(&self) -> SocketAddrV6 { + SocketAddrV6::new(Ipv6Addr::LOCALHOST, self.port, 0, 0) + } + pub fn mgs_backends(&self) -> watch::Receiver { self.resolver_backends.clone() } diff --git a/nexus/reconfigurator/execution/src/test_utils.rs b/nexus/reconfigurator/execution/src/test_utils.rs index cd46adacd0b..5070e51b691 100644 --- a/nexus/reconfigurator/execution/src/test_utils.rs +++ b/nexus/reconfigurator/execution/src/test_utils.rs @@ -109,7 +109,8 @@ pub fn overridables_for_test( for (id_str, switch_slot) in scrimlets { let sled_id = id_str.parse().unwrap(); let ip = Ipv6Addr::LOCALHOST; - let mgs_port = cptestctx.gateway.get(&switch_slot).unwrap().port; + let mgs_port = + cptestctx.gateway.get(&switch_slot).unwrap().address().port(); let dendrite_port = cptestctx.dendrite.read().unwrap().get(&switch_slot).unwrap().port; let mgd_port = cptestctx.mgd.get(&switch_slot).unwrap().port; diff --git a/nexus/src/app/sagas/instance_start.rs b/nexus/src/app/sagas/instance_start.rs index 4e6e428f885..5a4f822a2f4 100644 --- a/nexus/src/app/sagas/instance_start.rs +++ b/nexus/src/app/sagas/instance_start.rs @@ -1155,7 +1155,6 @@ async fn sis_ensure_running( #[cfg(test)] mod test { use core::time::Duration; - use std::net::SocketAddrV6; use crate::app::sagas::disk_delete::test::ExpungeTestHarness; use crate::app::sagas::disk_delete::test::create_disk; @@ -1493,8 +1492,7 @@ mod test { // Reuse the port number from the removed Switch0 to start a new dendrite instance let nexus_address = cptestctx.internal_client.bind_address; let mgs = cptestctx.gateway.get(&SwitchSlot::Switch0).unwrap(); - let mgs_address = - SocketAddrV6::new(Ipv6Addr::LOCALHOST, mgs.port, 0, 0).into(); + let mgs_address = mgs.address().into(); // Test fault recovery for nat propogation // Start a new dendrite instance for switch0 diff --git a/nexus/src/app/sagas/instance_update/mod.rs b/nexus/src/app/sagas/instance_update/mod.rs index 2a85f888d8e..7be235e1fda 100644 --- a/nexus/src/app/sagas/instance_update/mod.rs +++ b/nexus/src/app/sagas/instance_update/mod.rs @@ -2777,13 +2777,13 @@ mod test { cptestctx: &ControlPlaneTestContext, switch0_port: u16, ) { - use std::net::Ipv6Addr; - use std::net::SocketAddrV6; - let nexus_address = cptestctx.internal_client.bind_address; - let mgs = cptestctx.gateway.get(&SwitchSlot::Switch0).unwrap(); - let mgs_address = - SocketAddrV6::new(Ipv6Addr::LOCALHOST, mgs.port, 0, 0).into(); + let mgs_address = cptestctx + .gateway + .get(&SwitchSlot::Switch0) + .unwrap() + .address() + .into(); let new_switch0 = omicron_test_utils::dev::dendrite::DendriteInstance::start( diff --git a/nexus/test-utils/src/nexus_test.rs b/nexus/test-utils/src/nexus_test.rs index 693aea88732..733ec41a5af 100644 --- a/nexus/test-utils/src/nexus_test.rs +++ b/nexus/test-utils/src/nexus_test.rs @@ -259,13 +259,7 @@ impl ControlPlaneTestContext { }; let mgs = self.gateway.get(&switch_slot).unwrap(); - let mgs_addr = std::net::SocketAddrV6::new( - std::net::Ipv6Addr::LOCALHOST, - mgs.port, - 0, - 0, - ) - .into(); + let mgs_addr = mgs.address().into(); let dendrite = omicron_test_utils::dev::dendrite::DendriteInstance::start( diff --git a/nexus/test-utils/src/starter.rs b/nexus/test-utils/src/starter.rs index 3822be75d1f..5f54d487e0f 100644 --- a/nexus/test-utils/src/starter.rs +++ b/nexus/test-utils/src/starter.rs @@ -422,8 +422,7 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { let log = &self.logctx.log; debug!(log, "Starting Dendrite"; "switch_slot" => ?switch_slot); let mgs = self.gateway.get(&switch_slot).unwrap(); - let mgs_addr = - SocketAddrV6::new(Ipv6Addr::LOCALHOST, mgs.port, 0, 0).into(); + let mgs_addr = mgs.address().into(); // Set up a stub instance of dendrite let dendrite = dev::dendrite::DendriteInstance::start( @@ -448,9 +447,7 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { pub async fn start_mgd(&mut self, switch_slot: SwitchSlot) { let log = &self.logctx.log; debug!(log, "Starting mgd"; "switch_slot" => ?switch_slot); - let mgs = self.gateway.get(&switch_slot).unwrap(); - let mgs_addr = - SocketAddrV6::new(Ipv6Addr::LOCALHOST, mgs.port, 0, 0).into(); + let mgs_addr = self.gateway.get(&switch_slot).unwrap().address().into(); // Set up an instance of mgd let mgd = @@ -484,7 +481,7 @@ impl<'a, N: NexusServer> ControlPlaneStarter<'a, N> { sled_id, Ipv6Addr::LOCALHOST, self.dendrite.read().unwrap().get(&switch_slot).unwrap().port, - self.gateway.get(&switch_slot).unwrap().port, + self.gateway.get(&switch_slot).unwrap().address().port(), self.mgd.get(&switch_slot).unwrap().port, ) .unwrap() diff --git a/sled-agent/scrimlet-reconcilers/Cargo.toml b/sled-agent/scrimlet-reconcilers/Cargo.toml index 533574a1772..6a6aa3e52a5 100644 --- a/sled-agent/scrimlet-reconcilers/Cargo.toml +++ b/sled-agent/scrimlet-reconcilers/Cargo.toml @@ -9,11 +9,15 @@ workspace = true [dependencies] chrono.workspace = true +daft.workspace = true dpd-client.workspace = true +futures.workspace = true gateway-client.workspace = true gateway-types.workspace = true +macaddr.workspace = true mg-admin-client.workspace = true omicron-common.workspace = true +omicron-uuid-kinds.workspace = true reqwest.workspace = true sled-agent-types.workspace = true slog.workspace = true @@ -24,13 +28,17 @@ tokio.workspace = true omicron-workspace-hack.workspace = true [dev-dependencies] +anyhow.workspace = true assert_matches.workspace = true dropshot.workspace = true gateway-messages.workspace = true gateway-test-utils.workspace = true httpmock.workspace = true omicron-test-utils.workspace = true +proptest.workspace = true serde_json.workspace = true +sled-agent-types = { workspace = true, features = ["testing"] } +test-strategy.workspace = true [features] testing = [] diff --git a/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs index 5360654d0ac..a7428fddcd6 100644 --- a/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs +++ b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler.rs @@ -11,26 +11,35 @@ use crate::switch_zone_slot::ThisSledSwitchSlot; use dpd_client::Client; use sled_agent_types::system_networking::SystemNetworkingConfig; use slog::Logger; +use slog::info; use std::time::Duration; +mod nat; + +pub use nat::DpdNatReconcilerStatus; +pub use nat::DpdNatReconcilerStatusNatEntry; +pub use nat::DpdNatReconcilerStatusNatEntryFailure; + #[derive(Debug, Clone)] pub struct DpdReconcilerStatus { - pub todo_status: (), + /// Result of reconciling service zone NAT entries + pub nat_status: DpdNatReconcilerStatus, } impl slog::KV for DpdReconcilerStatus { fn serialize( &self, - _record: &slog::Record<'_>, + record: &slog::Record<'_>, serializer: &mut dyn slog::Serializer, ) -> slog::Result { - serializer.emit_str("dpd-reconciler".into(), "not yet implemented") + let Self { nat_status } = self; + nat_status.serialize(record, serializer) } } #[derive(Debug)] pub(crate) struct DpdReconciler { - _client: Client, + client: Client, _switch_slot: ThisSledSwitchSlot, } @@ -45,14 +54,29 @@ impl Reconciler for DpdReconciler { switch_slot: ThisSledSwitchSlot, parent_log: &Logger, ) -> Self { - Self { _client: mode.dpd_client(parent_log), _switch_slot: switch_slot } + Self { client: mode.dpd_client(parent_log), _switch_slot: switch_slot } } async fn do_reconciliation( &mut self, - _system_networking_config: &SystemNetworkingConfig, - _log: &Logger, + system_networking_config: &SystemNetworkingConfig, + log: &Logger, ) -> Self::Status { - DpdReconcilerStatus { todo_status: () } + let nat_status = if let Some(nat_entries) = system_networking_config + .blueprint_external_networking_config + .as_ref() + .map(|config| &config.service_zone_nat_entries) + { + nat::reconcile(&self.client, nat_entries, log).await + } else { + DpdNatReconcilerStatus::NoNatEntriesConfig + }; + + info!( + log, "dpd reconciliation completed"; + &nat_status, + ); + + DpdReconcilerStatus { nat_status } } } diff --git a/sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat.rs b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat.rs new file mode 100644 index 00000000000..aa211a14ba4 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat.rs @@ -0,0 +1,647 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +//! Reconciliation of Omicron service NAT entries. +//! +//! Does not modify non-service NAT entries. + +use daft::Diffable; +use dpd_client::Client; +use futures::Stream; +use futures::TryStreamExt; +use macaddr::MacAddr6; +use omicron_common::api::external::MacAddr; +use omicron_common::api::external::Vni; +use omicron_uuid_kinds::OmicronZoneUuid; +use sled_agent_types::system_networking::ServiceZoneNatEntries; +use sled_agent_types::system_networking::ServiceZoneNatEntry; +use slog::Logger; +use slog::info; +use slog::warn; +use slog_error_chain::InlineErrorChain; +use std::collections::BTreeMap; +use std::collections::BTreeSet; +use std::net::IpAddr; +use std::net::Ipv6Addr; +use std::num::NonZeroU32; + +type DpdClientError = dpd_client::Error; + +const SINGLE_REQUEST_LIMIT: Option = + Some(NonZeroU32::new(128).unwrap()); + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct DpdNatReconcilerStatusNatEntry { + pub external_ip: IpAddr, + pub first_port: u16, + pub last_port: u16, +} + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord)] +pub struct DpdNatReconcilerStatusNatEntryFailure { + pub entry: DpdNatReconcilerStatusNatEntry, + pub error: String, +} + +/// Status of reconciling service zone NAT entries with `dpd`. +#[derive(Debug, Clone)] +pub enum DpdNatReconcilerStatus { + /// Reconciliation was skipped because the bootstore contains no NAT entry + /// config information. + NoNatEntriesConfig, + + /// Reconciliation failed while attempting to read the current set of + /// entries from `dpd`. + FailedReadingCurrentDpdNatEntries(String), + + /// Reconciliation failed because the bootstore config contained an illegal + /// combination of entries (e.g., two zones with identical NAT entries). + InvalidSystemNetworkingConfig(String), + + /// Reconciliation completed successfully. + Success { + /// Set of zone IDs whose NAT entries were already correct in `dpd` and + /// left unchanged. + unchanged: BTreeSet, + + /// List of NAT entries removed. + removed: Vec, + + /// Map of zone NAT entries created. + created: BTreeMap, + }, + + /// Reconciliation completed but had at least one failure. + PartialSuccess { + /// Set of zone IDs whose NAT entries were already correct in `dpd` and + /// left unchanged. + unchanged: BTreeSet, + + /// List of NAT entries successfully removed. + removed: Vec, + + /// List of NAT entries we tried but failed to remove. + remove_failures: Vec, + + /// Map of zone NAT entries successfully created. + created: BTreeMap, + + /// Map of zone NAT entries we tried but failed to create. + create_failures: + BTreeMap, + }, +} + +impl slog::KV for DpdNatReconcilerStatus { + fn serialize( + &self, + _record: &slog::Record<'_>, + serializer: &mut dyn slog::Serializer, + ) -> slog::Result { + match self { + DpdNatReconcilerStatus::NoNatEntriesConfig => serializer.emit_str( + "nat-reconciler-skipped".into(), + "no NAT entries present in config", + ), + DpdNatReconcilerStatus::FailedReadingCurrentDpdNatEntries( + reason, + ) => serializer.emit_str("nat-reconciler-failed".into(), reason), + DpdNatReconcilerStatus::InvalidSystemNetworkingConfig(reason) => { + serializer.emit_arguments( + "nat-reconciler-failed".into(), + &format_args!("invalid system networking config: {reason}"), + ) + } + DpdNatReconcilerStatus::Success { unchanged, removed, created } => { + // Only show a summary count; we have individual log statements + // for each create/remove. + for (key, val) in [ + ("nat-entries-unchanged", unchanged.len()), + ("nat-entries-successfully-removed", removed.len()), + ("nat-entries-failed-to-remove", 0), + ("nat-entries-successfully-created", created.len()), + ("nat-entries-failed-to-create", 0), + ] { + serializer.emit_usize(key.into(), val)?; + } + Ok(()) + } + DpdNatReconcilerStatus::PartialSuccess { + unchanged, + removed, + remove_failures, + created, + create_failures, + } => { + // Only show a summary count; we have individual log statements + // for each create/remove. + for (key, val) in [ + ("nat-entries-unchanged", unchanged.len()), + ("nat-entries-successfully-removed", removed.len()), + ("nat-entries-failed-to-remove", remove_failures.len()), + ("nat-entries-successfully-created", created.len()), + ("nat-entries-failed-to-create", create_failures.len()), + ] { + serializer.emit_usize(key.into(), val)?; + } + Ok(()) + } + } + } +} + +/// Perform reconciliation. +/// +/// On successful completion, the `dpd` reachable via `Client` will contain all +/// the service NAT entries described by `desired_nat_entries`, and no other NAT +/// entries in the service zone VNI(s). +pub(super) async fn reconcile( + client: &Client, + desired_nat_entries: &ServiceZoneNatEntries, + log: &Logger, +) -> DpdNatReconcilerStatus { + let dpd_current_entries = match CurrentDpdEntriesAssembler::assemble( + client, + desired_nat_entries, + log, + ) + .await + { + Ok(entries) => entries, + Err(err) => { + return DpdNatReconcilerStatus::FailedReadingCurrentDpdNatEntries( + format!( + "failed to read current NAT entries from dpd: {}", + InlineErrorChain::new(&err), + ), + ); + } + }; + + let plan = match ReconciliationPlan::new( + &dpd_current_entries, + desired_nat_entries, + log, + ) { + Ok(plan) => plan, + Err(err) => { + return DpdNatReconcilerStatus::InvalidSystemNetworkingConfig(err); + } + }; + + apply_plan(client, plan, log).await +} + +/// Apply the contents of `plan` to dpd via `client`. +/// +/// This requires `plan.to_remove.len() + plan.to_create.len()` independent +/// calls to `dpd`. We do not short circuit on failure: we'll always attempt to +/// make every call required. This may not be the right choice, but some +/// arguments in favor: +/// +/// * We'd like to eventually replace this with fewer calls, if we add different +/// APIs to `dpd` for applying NAT settings in bulk. +/// +/// * In practice we expect the number of calls here to be small. On startup we +/// expect ~10 `to_create` calls (one for each service, which is typically 2 +/// boundary NTP, 3 Nexus, and 1-5 external DNS), and for every reconciliation +/// attempt after that we expect 0-1 (either no changes, or a single new +/// service has been added or removed; it's possible we'll see multiple, but +/// unlikely given we re-reconcile on every networking config change). +/// * We always want to report the status of every step described by `plan`, and +/// implementing stop-on-first-failure means we'd need to record a "didn't +/// attempt because of an earlier failure" status for some steps. That's +/// doable but annoying. +async fn apply_plan( + client: &Client, + plan: ReconciliationPlan, + log: &Logger, +) -> DpdNatReconcilerStatus { + let ReconciliationPlan { unchanged, to_remove, to_create } = plan; + + // Always remove first. DPD keys NAT entries by IP address and lower port; + // if we've changed which zone is associated with a given IP/port pair, we + // need to ensure we remove the old entry before attempting to create a new + // one. + // + // If we fail to remove an entry that shares an IP/port with an entry in + // `to_create`, the create will also fail. We could optimize this by + // skipping creates if they had a corresponding remove failure, but that's + // quite a lot of bookkeeping for a rare case that isn't very problematic + // anyway (remove failure means the create will fail, but if the remove + // failed the create very likely could have failed anyway!). + let mut removed = Vec::new(); + let mut remove_failures = Vec::new(); + for entry in to_remove { + let result = match entry.target_ip { + IpAddr::V4(ip) => { + client.nat_ipv4_delete(&ip, entry.first_port).await + } + IpAddr::V6(ip) => { + client.nat_ipv6_delete(&ip, entry.first_port).await + } + }; + + match result { + Ok(_) => { + info!(log, "successfully removed NAT entry"; "entry" => ?entry); + removed.push(entry.into()); + } + Err(err) => { + let err = InlineErrorChain::new(&err); + warn!( + log, "failed to remove NAT entry"; + "entry" => ?entry, + &err, + ); + remove_failures.push(DpdNatReconcilerStatusNatEntryFailure { + entry: entry.into(), + error: err.to_string(), + }); + } + } + } + + let mut created = BTreeMap::new(); + let mut create_failures = BTreeMap::new(); + for (zone_id, entry) in to_create { + match create_nat_entry(client, &entry).await { + Ok(()) => { + info!( + log, "successfully created NAT entry"; + "zone-id" => %zone_id, + "entry" => ?entry, + ); + created.insert(zone_id, entry.into()); + } + Err(err) => { + let err = InlineErrorChain::new(&err); + warn!( + log, "failed to create NAT entry"; + "zone-id" => %zone_id, + "entry" => ?entry, + &err, + ); + create_failures.insert( + zone_id, + DpdNatReconcilerStatusNatEntryFailure { + entry: entry.into(), + error: err.to_string(), + }, + ); + } + } + } + + if remove_failures.is_empty() && create_failures.is_empty() { + DpdNatReconcilerStatus::Success { unchanged, removed, created } + } else { + DpdNatReconcilerStatus::PartialSuccess { + unchanged, + removed, + remove_failures, + created, + create_failures, + } + } +} + +#[derive(Debug, PartialEq, Eq)] +struct ReconciliationPlan { + // Set of zones whose NAT entries already exist in DPD. + unchanged: BTreeSet, + + // Set of NAT entries that exist in DPD but not our desired set; each of + // these should be removed. + to_remove: BTreeSet, + + // Set of NAT entries that don't exist in DPD but are in our desired set; + // each of these should be created. + to_create: BTreeMap, +} + +impl ReconciliationPlan { + /// Construct a new plan by diffing the current entries against the desired + /// entries. + /// + /// # Errors + /// + /// Fails if `service_nat_entries` contains invalid data (this should be + /// impossible). + fn new( + dpd_current_entries: &BTreeSet, + service_nat_entries: &ServiceZoneNatEntries, + log: &Logger, + ) -> Result { + // Convert `service_nat_entries` into both a set of `NatEntry`s (so we + // can diff it against `dpd_current_entries` via `daft`) and a map of + // `NatEntry` back to the zone ID that needs it (for our status + // reporting). + let mut desired_nat_entries = BTreeSet::new(); + let mut nat_to_zone_id = BTreeMap::new(); + for entry in service_nat_entries.iter() { + let zone_id = entry.zone_id; + let entry = NatEntry::from(entry); + + // We should have no duplicates; if we do, we have two different + // zones that want the same NAT entry. Refuse to reconcile. This + // should be impossible by construction: `ServiceZoneNatEntries` + // rejects overlapping entries. We double-check here in case that + // changes or is buggy. + if let Some(prev_zone_id) = nat_to_zone_id.insert(entry, zone_id) { + return Err(format!( + "invalid SystemNetworkingConfig: zones {zone_id} and \ + {prev_zone_id} want the same NAT entry: {entry:?}", + )); + } + + // We don't have to check again for duplicates here; we just + // confirmed every `entry` is unique. + desired_nat_entries.insert(entry); + } + + let nat_entry_diff = dpd_current_entries.diff(&desired_nat_entries); + + let unchanged = nat_entry_diff + .common + .into_iter() + .map(|entry| { + nat_to_zone_id + .get(entry) + .copied() + .expect("nat_to_zone_id has a value for every common entry") + }) + .collect::>(); + let to_remove = nat_entry_diff + .removed + .into_iter() + .copied() + .collect::>(); + let to_create = nat_entry_diff + .added + .into_iter() + .map(|entry| { + let zone_id = nat_to_zone_id + .get(entry) + .copied() + .expect("nat_to_zone_id has a value for every added entry"); + (zone_id, *entry) + }) + .collect::>(); + + info!( + log, + "generated NAT reconciliation plan"; + "entries-unchanged" => unchanged.len(), + "entries-to-remove" => to_remove.len(), + "entries-to-create" => to_create.len(), + ); + + Ok(Self { unchanged, to_remove, to_create }) + } +} + +async fn create_nat_entry( + client: &Client, + entry: &NatEntry, +) -> Result<(), DpdClientError> { + let nat_target = dpd_client::types::NatTarget { + inner_mac: dpd_client::types::MacAddr { a: entry.nic_mac.into_array() }, + internal_ip: entry.sled_underlay_ip, + vni: entry.vni.as_u32().into(), + }; + match entry.target_ip { + IpAddr::V4(ip) => client + .nat_ipv4_create( + &ip, + entry.first_port, + entry.last_port, + &nat_target, + ) + .await + .map(|response| response.into_inner()), + IpAddr::V6(ip) => client + .nat_ipv6_create( + &ip, + entry.first_port, + entry.last_port, + &nat_target, + ) + .await + .map(|response| response.into_inner()), + } +} + +#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Diffable)] +#[cfg_attr(test, derive(test_strategy::Arbitrary))] +struct NatEntry { + sled_underlay_ip: Ipv6Addr, + target_ip: IpAddr, + #[cfg_attr(test, strategy(proptest::strategy::Just(0)))] + first_port: u16, + #[cfg_attr(test, strategy(proptest::strategy::Just(65535)))] + last_port: u16, + nic_mac: MacAddr, + vni: Vni, +} + +impl From for DpdNatReconcilerStatusNatEntry { + fn from(value: NatEntry) -> Self { + Self { + external_ip: value.target_ip, + first_port: value.first_port, + last_port: value.last_port, + } + } +} + +impl From<&'_ ServiceZoneNatEntry> for NatEntry { + fn from(value: &'_ ServiceZoneNatEntry) -> Self { + let (first_port, last_port) = value.kind.nat_port_range(); + Self { + sled_underlay_ip: value.sled_underlay_ip, + target_ip: value.kind.external_ip(), + first_port, + last_port, + nic_mac: value.nic_mac, + vni: value.vni, + } + } +} + +#[derive(Debug, Clone, Copy, thiserror::Error)] +#[error("invalid VNI: {0}")] +struct BadVni(u32); + +impl TryFrom for NatEntry { + type Error = BadVni; + + fn try_from( + value: dpd_client::types::Ipv4Nat, + ) -> Result { + let vni = Vni::try_from(value.target.vni.0) + .map_err(|_| BadVni(value.target.vni.0))?; + Ok(Self { + sled_underlay_ip: value.target.internal_ip, + target_ip: value.external.into(), + nic_mac: MacAddr6::from(value.target.inner_mac.a).into(), + first_port: value.low, + last_port: value.high, + vni, + }) + } +} + +impl TryFrom for NatEntry { + type Error = BadVni; + + fn try_from( + value: dpd_client::types::Ipv6Nat, + ) -> Result { + let vni = Vni::try_from(value.target.vni.0) + .map_err(|_| BadVni(value.target.vni.0))?; + Ok(Self { + sled_underlay_ip: value.target.internal_ip, + target_ip: value.external.into(), + nic_mac: MacAddr6::from(value.target.inner_mac.a).into(), + first_port: value.low, + last_port: value.high, + vni, + }) + } +} + +/// Helper to assemble all IPv4 and IPv6 NAT entries in the relevant VNI(s). +/// +/// Currently this requires listing _all_ NAT entries in `dpd` and filtering on +/// our side down to just the VNIs we care about. We'd like a nicer API on the +/// dpd side: . +struct CurrentDpdEntriesAssembler<'a> { + client: &'a Client, + service_vnis: BTreeSet, + current_entries: BTreeSet, +} + +struct RelevantEntryCount { + service_vni: u64, + non_service_vni: u64, +} + +impl<'a> CurrentDpdEntriesAssembler<'a> { + async fn assemble( + client: &'a Client, + desired_nat_entries: &ServiceZoneNatEntries, + log: &Logger, + ) -> Result, DpdClientError> { + // We want to reconcile "all service NAT entries", but + // `desired_nat_entries` only tells us what should exist, not what needs + // to be removed. Build a set of the `Vni`s used in all our services; in + // practice, we expect this to always be a set of length one containing + // the `Vni::SERVICES_VNI` constant, because all services use that Vni. + // But we've written this as we have so that if we decide to split each + // kind of service into a separate Vni, this reconciliation still works. + // + // We assume there are never any service NAT entries in `dpd` that have + // a Vni other than one of the ones present in `desired_nat_entries`. + // `ServiceZoneNatEntries`, by construction, enforces that it's not + // empty, which guarantees that this set will be nonempty too. + let service_vnis: BTreeSet = + desired_nat_entries.iter().map(|entry| entry.vni).collect(); + let mut builder = + Self { client, service_vnis, current_entries: BTreeSet::new() }; + builder.read_ipv4_entries(log).await?; + builder.read_ipv6_entries(log).await?; + Ok(builder.current_entries) + } + + async fn read_ipv4_entries( + &mut self, + log: &Logger, + ) -> Result<(), DpdClientError> { + let mut stream_addresses = + self.client.nat_ipv4_addresses_list_stream(SINGLE_REQUEST_LIMIT); + + let mut counts = + RelevantEntryCount { service_vni: 0, non_service_vni: 0 }; + + while let Some(ip) = stream_addresses.try_next().await? { + self.assemble_entries_from_stream( + self.client.nat_ipv4_list_stream(&ip, SINGLE_REQUEST_LIMIT), + &mut counts, + ) + .await?; + } + + info!( + log, + "finished fetching current ipv4 NAT entries from dpd"; + "service-nat-entries" => counts.service_vni, + "non-service-nat-entries" => counts.non_service_vni, + "service-vnis" => ?self.service_vnis, + ); + + Ok(()) + } + + async fn read_ipv6_entries( + &mut self, + log: &Logger, + ) -> Result<(), DpdClientError> { + let mut stream_addresses = + self.client.nat_ipv6_addresses_list_stream(SINGLE_REQUEST_LIMIT); + + let mut counts = + RelevantEntryCount { service_vni: 0, non_service_vni: 0 }; + + while let Some(ip) = stream_addresses.try_next().await? { + self.assemble_entries_from_stream( + self.client.nat_ipv6_list_stream(&ip, SINGLE_REQUEST_LIMIT), + &mut counts, + ) + .await?; + } + + info!( + log, + "finished fetching current ipv6 NAT entries from dpd"; + "service-nat-entries" => counts.service_vni, + "non-service-nat-entries" => counts.non_service_vni, + "service-vnis" => ?self.service_vnis, + ); + + Ok(()) + } + + async fn assemble_entries_from_stream( + &mut self, + mut stream: S, + counts: &mut RelevantEntryCount, + ) -> Result<(), DpdClientError> + where + S: Stream> + Unpin, + T: TryInto, + { + while let Some(entry) = stream.try_next().await? { + // The only way we can fail to convert a dpd `NatEntry` is if the + // Vni from dpd isn't a valid omicron Vni (the `BadVni` in our + // generic bound). This should never happen, but if it does, we know + // this isn't an entry we care about: we're only looking for entries + // that match our services' Vni(s). + match entry.try_into() { + Ok(entry) if self.service_vnis.contains(&entry.vni) => { + self.current_entries.insert(entry); + counts.service_vni += 1; + } + Ok(_) | Err(BadVni(_)) => { + counts.non_service_vni += 1; + } + } + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests; diff --git a/sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat/tests.rs b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat/tests.rs new file mode 100644 index 00000000000..262ce2bad28 --- /dev/null +++ b/sled-agent/scrimlet-reconcilers/src/dpd_reconciler/nat/tests.rs @@ -0,0 +1,411 @@ +// This Source Code Form is subject to the terms of the Mozilla Public +// License, v. 2.0. If a copy of the MPL was not distributed with this +// file, You can obtain one at https://mozilla.org/MPL/2.0/. + +use super::*; +use anyhow::Context; +use gateway_messages::SpPort; +use omicron_common::OMICRON_DPD_TAG; +use omicron_test_utils::dev; +use proptest::prelude::any; +use proptest::prelude::proptest as proptest_macro; +use proptest::strategy::Strategy; +use test_strategy::Arbitrary; +use test_strategy::proptest; +use tokio::task::block_in_place; + +#[proptest] +fn proptest_plan_all_unchanged(service_nat_entries: ServiceZoneNatEntries) { + let logctx = + omicron_test_utils::dev::test_setup_log("proptest_plan_all_unchanged"); + let log = &logctx.log; + + let dpd_current_entries = + service_nat_entries.iter().map(NatEntry::from).collect(); + let plan = ReconciliationPlan::new( + &dpd_current_entries, + &service_nat_entries, + log, + ) + .expect("planning should succeed"); + + let expected = ReconciliationPlan { + unchanged: service_nat_entries.iter().map(|e| e.zone_id).collect(), + to_remove: BTreeSet::new(), + to_create: BTreeMap::new(), + }; + + assert_eq!(plan, expected); + + logctx.cleanup_successful(); +} + +#[proptest] +fn proptest_plan_create_all(service_nat_entries: ServiceZoneNatEntries) { + let logctx = + omicron_test_utils::dev::test_setup_log("proptest_plan_create_all"); + let log = &logctx.log; + + let dpd_current_entries = BTreeSet::new(); + let plan = ReconciliationPlan::new( + &dpd_current_entries, + &service_nat_entries, + log, + ) + .expect("planning should succeed"); + + let expected = ReconciliationPlan { + unchanged: BTreeSet::new(), + to_remove: BTreeSet::new(), + to_create: service_nat_entries + .iter() + .map(|e| (e.zone_id, NatEntry::from(e))) + .collect(), + }; + + assert_eq!(plan, expected); + + logctx.cleanup_successful(); +} + +#[derive(Debug, Clone, Arbitrary)] +struct TestInput { + // Complete set of desired `ServiceZoneNatEntries` we want to exist after + // reconciliation. This is divided into two: the first + // 0..preexisting_service_nat_entries entries should already exist in dpd, + // and the remaining should not (until we perform reconciliation). + desired_service_nat_entries: ServiceZoneNatEntries, + #[strategy(0..=#desired_service_nat_entries.len())] + preexisting_service_nat_entries: usize, + + // Stale service entries. We should remove all of these. + #[strategy(proptest::collection::btree_set( + any::(), + 0..32, + ).prop_filter( + "no overlap with desired_service_nat_entries", + move |entries| { + filter_out_overlapping_arbitrary_service_entries( + &#desired_service_nat_entries, + entries, + ) + } + ))] + stale_service_entries: BTreeSet, + + // Non-service entries. We should never touch these. + #[strategy(proptest::collection::btree_set( + any::().prop_filter( + "non-service VNI", + |entry| entry.vni != Vni::SERVICES_VNI + ), + 0..32, + ).prop_filter( + "no duplicate IPs", + move |entries| filter_out_overlapping_arbitrary_non_service_entries( + &#desired_service_nat_entries, + &#stale_service_entries, + entries, + ), + ))] + preexisting_non_service_entries: BTreeSet, +} + +// Helper to collapse ipv4-mapped-ipv6 (generated by proptests sometimes) +// down to just ipv4. +fn collapse_ipv4_mapped(ip: IpAddr) -> IpAddr { + match ip { + ip @ IpAddr::V4(_) => ip, + ip @ IpAddr::V6(ip6) => match ip6.to_ipv4_mapped() { + Some(ip4) => IpAddr::V4(ip4), + None => ip, + }, + } +} + +// Proptest filter to ensure we don't get conflicts from dpd on external IPs: +// return false if `entries` contains duplicate IPs or any IP also used by +// `desired_service_nat_entries`. +fn filter_out_overlapping_arbitrary_service_entries( + desired_service_nat_entries: &ServiceZoneNatEntries, + entries: &BTreeSet, +) -> bool { + let mut entry_ips = BTreeSet::new(); + for entry in entries { + if desired_service_nat_entries.contains_key(&entry.zone_id) { + return false; + } + if !entry_ips.insert(collapse_ipv4_mapped(entry.kind.external_ip())) { + return false; + } + } + + desired_service_nat_entries.iter().all(|other| { + !entry_ips.contains(&collapse_ipv4_mapped(other.kind.external_ip())) + }) +} + +// Proptest filter to ensure we don't get conflicts from dpd on external IPs: +// return false if `entries` contains duplicate IPs or any IP also used by +// `desired_service_nat_entries` or `stale_service_entries`. +fn filter_out_overlapping_arbitrary_non_service_entries( + desired_service_nat_entries: &ServiceZoneNatEntries, + stale_service_entries: &BTreeSet, + entries: &BTreeSet, +) -> bool { + let prior = desired_service_nat_entries + .iter() + .chain(stale_service_entries.iter()) + .map(|e| collapse_ipv4_mapped(e.kind.external_ip())) + .collect::>(); + let mut entry_ips = BTreeSet::new(); + for e in entries { + if !entry_ips.insert(collapse_ipv4_mapped(e.target_ip)) { + return false; + } + } + prior.intersection(&entry_ips).next().is_none() +} + +impl TestInput { + // All entries we should set up in dpd before reconciling. + fn entries_that_should_exist_at_start_of_test( + &self, + ) -> impl Iterator + '_ { + self.desired_service_nat_entries + .iter() + .take(self.preexisting_service_nat_entries) + .chain(self.stale_service_entries.iter()) + .map(NatEntry::from) + .chain(self.preexisting_non_service_entries.iter().copied()) + } + + // All entries we expect to exist in dpd after reconciling. + fn entries_that_should_exist_after_reconciliation( + &self, + ) -> impl Iterator + '_ { + self.desired_service_nat_entries + .iter() + .map(NatEntry::from) + .chain(self.preexisting_non_service_entries.iter().copied()) + } + + async fn apply_initial_to_dpd( + &self, + client: &dpd_client::Client, + ) -> anyhow::Result<()> { + // Reset from any previous test. (One `dendrite` instance is used across + // multiple proptest invocations). + self.erase_all_nat_entries(client) + .await + .context("failed to erase all NAT entries at start of test")?; + + for entry in self.entries_that_should_exist_at_start_of_test() { + create_nat_entry(client, &entry).await.with_context(|| { + format!("failed to create initial entry {entry:?}") + })?; + } + + Ok(()) + } + + async fn validate_post_reconciliation( + &self, + client: &dpd_client::Client, + ) -> anyhow::Result<()> { + let dpd_entries = self.collect_all_dpd_entries(client).await?; + let expected_entries = self + .entries_that_should_exist_after_reconciliation() + .collect::>(); + assert_eq!(expected_entries, dpd_entries); + Ok(()) + } + + async fn collect_all_dpd_entries( + &self, + client: &dpd_client::Client, + ) -> anyhow::Result> { + let mut all_entries = BTreeSet::new(); + + // Collect IPv4 + let mut stream_addresses = + client.nat_ipv4_addresses_list_stream(SINGLE_REQUEST_LIMIT); + while let Some(ip) = stream_addresses + .try_next() + .await + .context("failed while streaming ipv4 addrs")? + { + let mut entries = + client.nat_ipv4_list_stream(&ip, SINGLE_REQUEST_LIMIT); + while let Some(e) = entries + .try_next() + .await + .context("failed while streaming ipv4 NAT entries")? + { + all_entries.insert( + NatEntry::try_from(e.clone()).with_context(|| { + format!("failed to convert {e:?} to NatEntry") + })?, + ); + } + } + + // Collect IPv6 + let mut stream_addresses = + client.nat_ipv6_addresses_list_stream(SINGLE_REQUEST_LIMIT); + while let Some(ip) = stream_addresses + .try_next() + .await + .context("failed while streaming ipv6 addrs")? + { + let mut entries = + client.nat_ipv6_list_stream(&ip, SINGLE_REQUEST_LIMIT); + while let Some(e) = entries + .try_next() + .await + .context("failed while streaming ipv6 NAT entries")? + { + all_entries.insert( + NatEntry::try_from(e.clone()).with_context(|| { + format!("failed to convert {e:?} to NatEntry") + })?, + ); + } + } + + Ok(all_entries) + } + + async fn erase_all_nat_entries( + &self, + client: &dpd_client::Client, + ) -> anyhow::Result<()> { + for e in self.collect_all_dpd_entries(client).await? { + match e.target_ip { + IpAddr::V4(ip) => { + client + .nat_ipv4_delete(&ip, e.first_port) + .await + .with_context(|| format!("failed to delete {e:?}"))?; + } + IpAddr::V6(ip) => { + client + .nat_ipv6_delete(&ip, e.first_port) + .await + .with_context(|| format!("failed to delete {e:?}"))?; + } + } + } + + Ok(()) + } +} + +#[proptest] +fn proptest_plan_mix(input: TestInput) { + let logctx = omicron_test_utils::dev::test_setup_log("proptest_plan_mix"); + let log = &logctx.log; + + // We expect to remove all stale service entries. + let expected_to_remove = input + .stale_service_entries + .iter() + .map(NatEntry::from) + .collect::>(); + + // The "current entries" includes all stale entries plus a subset of the + // desired entries; start with the stale ones, and we add the subset in the + // loop below. + let mut dpd_current_entries = expected_to_remove.clone(); + + // Now add the number of common entries we expect. + let mut expected_unchanged = BTreeSet::new(); + let mut expected_to_create = BTreeMap::new(); + for (i, entry) in input.desired_service_nat_entries.iter().enumerate() { + if i < input.preexisting_service_nat_entries { + dpd_current_entries.insert(NatEntry::from(entry)); + expected_unchanged.insert(entry.zone_id); + } else { + expected_to_create.insert(entry.zone_id, NatEntry::from(entry)); + } + } + + let plan = ReconciliationPlan::new( + &dpd_current_entries, + &input.desired_service_nat_entries, + log, + ) + .expect("planning should succeed"); + + let expected = ReconciliationPlan { + unchanged: expected_unchanged, + to_remove: expected_to_remove, + to_create: expected_to_create, + }; + + assert_eq!(plan, expected); + + logctx.cleanup_successful(); +} + +#[tokio::test(flavor = "multi_thread")] +async fn proptest_full_reconciliation() { + let logctx = + omicron_test_utils::dev::test_setup_log("proptest_full_reconciliation"); + let mgsctx = gateway_test_utils::setup::test_setup( + "proptest_full_reconciliation", + SpPort::One, + ) + .await; + let mut dpdctx = dev::dendrite::DendriteInstance::start( + 0, + None, + Some(mgsctx.address().into()), + ) + .await + .expect("started dendrite"); + let client = dpd_client::Client::new( + &format!("http://{}", dpdctx.address()), + dpd_client::ClientState { + tag: OMICRON_DPD_TAG.to_owned(), + log: logctx.log.clone(), + }, + ); + let rt = tokio::runtime::Handle::current(); + + let one_test_invocation = async |input: TestInput| { + input + .apply_initial_to_dpd(&client) + .await + .expect("applied initial setup to dpd"); + + let status = + reconcile(&client, &input.desired_service_nat_entries, &logctx.log) + .await; + + match status { + DpdNatReconcilerStatus::NoNatEntriesConfig + | DpdNatReconcilerStatus::FailedReadingCurrentDpdNatEntries(_) + | DpdNatReconcilerStatus::InvalidSystemNetworkingConfig(_) + | DpdNatReconcilerStatus::PartialSuccess { .. } => { + panic!("unexpected reconciler status: {status:?}"); + } + DpdNatReconcilerStatus::Success { .. } => { + input + .validate_post_reconciliation(&client) + .await + .expect("validated post reconciliation entries"); + } + } + }; + + proptest_macro!(|(input: TestInput)| { + // Do a little dance to call our async `one_test_invocation` within the + // non-async `proptest_macro!()` context. + block_in_place(|| rt.block_on(one_test_invocation(input))); + }); + + dpdctx.cleanup().await.expect("dpd cleanup succeeded"); + mgsctx.teardown().await; + logctx.cleanup_successful(); +} diff --git a/sled-agent/scrimlet-reconcilers/src/handle/tests.rs b/sled-agent/scrimlet-reconcilers/src/handle/tests.rs index fd78c9826db..f8ee5184130 100644 --- a/sled-agent/scrimlet-reconcilers/src/handle/tests.rs +++ b/sled-agent/scrimlet-reconcilers/src/handle/tests.rs @@ -2,9 +2,6 @@ // License, v. 2.0. If a copy of the MPL was not distributed with this // file, You can obtain one at https://mozilla.org/MPL/2.0/. -use std::net::Ipv6Addr; -use std::time::Duration; - use super::*; use assert_matches::assert_matches; use dropshot::ConfigLogging; @@ -16,6 +13,7 @@ use httpmock::Mock; use httpmock::MockServer; use omicron_test_utils::dev; use sled_agent_types::early_networking::RackNetworkConfig; +use std::time::Duration; // For "happy path" tests, we spin up a real MGS instances (pointed at a // simulated SP). @@ -28,7 +26,7 @@ trait MgsFlavor { impl MgsFlavor for GatewayTestContext { fn address(&self) -> SocketAddr { - SocketAddr::V6(SocketAddrV6::new(Ipv6Addr::LOCALHOST, self.port, 0, 0)) + SocketAddr::V6(GatewayTestContext::address(self)) } async fn teardown(self) { diff --git a/sled-agent/scrimlet-reconcilers/src/lib.rs b/sled-agent/scrimlet-reconcilers/src/lib.rs index 0a5de4a3fe8..2810d70ca3c 100644 --- a/sled-agent/scrimlet-reconcilers/src/lib.rs +++ b/sled-agent/scrimlet-reconcilers/src/lib.rs @@ -50,6 +50,8 @@ mod status; mod switch_zone_slot; mod uplinkd_reconciler; +pub use dpd_reconciler::DpdNatReconcilerStatusNatEntry; +pub use dpd_reconciler::DpdNatReconcilerStatusNatEntryFailure; pub use dpd_reconciler::DpdReconcilerStatus; pub use handle::ScrimletReconcilers; pub use handle::ScrimletReconcilersMode; diff --git a/sled-agent/types/versions/src/bootstore_service_nat/system_networking.rs b/sled-agent/types/versions/src/bootstore_service_nat/system_networking.rs index 46e9a80e8e9..d6d6c4a5025 100644 --- a/sled-agent/types/versions/src/bootstore_service_nat/system_networking.rs +++ b/sled-agent/types/versions/src/bootstore_service_nat/system_networking.rs @@ -39,7 +39,17 @@ use std::collections::btree_map::Entry; use std::net::IpAddr; use std::net::Ipv6Addr; -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, JsonSchema)] +#[derive( + Clone, + Debug, + Deserialize, + Serialize, + PartialEq, + Eq, + PartialOrd, + Ord, + JsonSchema, +)] #[serde(tag = "kind", rename_all = "snake_case")] #[cfg_attr(any(test, feature = "testing"), derive(test_strategy::Arbitrary))] pub enum ServiceZoneNatKind { @@ -48,12 +58,26 @@ pub enum ServiceZoneNatKind { Nexus { external_ip: IpAddr }, } -#[derive(Clone, Debug, Deserialize, Serialize, PartialEq, Eq, JsonSchema)] +#[derive( + Clone, + Debug, + Deserialize, + Serialize, + PartialEq, + Eq, + PartialOrd, + Ord, + JsonSchema, +)] #[cfg_attr(any(test, feature = "testing"), derive(test_strategy::Arbitrary))] pub struct ServiceZoneNatEntry { pub zone_id: OmicronZoneUuid, pub sled_underlay_ip: Ipv6Addr, pub nic_mac: MacAddr, + #[cfg_attr( + any(test, feature = "testing"), + strategy(proptest::strategy::Just(Vni::SERVICES_VNI)) + )] pub vni: Vni, pub kind: ServiceZoneNatKind, } diff --git a/sled-agent/types/versions/src/impls/system_networking.rs b/sled-agent/types/versions/src/impls/system_networking.rs index e9fe0689947..38ee85c8798 100644 --- a/sled-agent/types/versions/src/impls/system_networking.rs +++ b/sled-agent/types/versions/src/impls/system_networking.rs @@ -102,6 +102,22 @@ impl proptest::arbitrary::Arbitrary for ServiceZoneNatEntries { for e in extra { entries.insert_overwrite(e); } + // Filter out sets that contain overlapping IPs. + let mut unique_ips = std::collections::BTreeSet::new(); + for e in &entries { + let ip = match e.kind.external_ip() { + ip @ IpAddr::V4(_) => ip, + ip @ IpAddr::V6(ip6) => { + match ip6.to_ipv4_mapped() { + Some(ip4) => IpAddr::V4(ip4), + None => ip, + } + } + }; + if !unique_ips.insert(ip) { + return None; + } + } ServiceZoneNatEntries::try_from(entries).ok() }, ) diff --git a/test-utils/src/dev/dendrite.rs b/test-utils/src/dev/dendrite.rs index 7992fcf48f6..dce6b4689d1 100644 --- a/test-utils/src/dev/dendrite.rs +++ b/test-utils/src/dev/dendrite.rs @@ -4,7 +4,7 @@ //! Tools for managing Dendrite during development -use std::net::SocketAddr; +use std::net::{Ipv6Addr, SocketAddr, SocketAddrV6}; use std::path::{Path, PathBuf}; use std::process::Stdio; use std::time::Duration; @@ -96,6 +96,10 @@ impl DendriteInstance { Ok(Self { port, args, child: Some(child), data_dir: Some(temp_dir) }) } + pub fn address(&self) -> SocketAddrV6 { + SocketAddrV6::new(Ipv6Addr::LOCALHOST, self.port, 0, 0) + } + pub async fn cleanup(&mut self) -> Result<(), anyhow::Error> { if let Some(mut child) = self.child.take() { child.start_kill().context("Sending SIGKILL to child")?;