From fe41d02a8dbf4850113a75d416b84fb7fdcd247c Mon Sep 17 00:00:00 2001 From: Aljoscha Krettek Date: Sun, 3 May 2026 07:28:19 +0000 Subject: [PATCH] adapter: rename staged_cancellation to connection_cancel_watches The map of per-connection cancel watch channels was named for its historical sole consumer (staged sequencing in `sequence_staged` / `handle_spawn`), but the mechanism itself is generic: any code path that wants a connection-scoped "has this been cancelled?" signal can install/remove an entry. `handle_privileged_cancel` already pokes the sender from outside the staged path. Rename the field to `connection_cancel_watches` and rewrite the doc comment to describe the abstraction (a per-connection watch consumed by whatever has cancellable work in flight) rather than naming a single caller. Also tighten the docs on `sequence_staged` / `handle_spawn` to point at the renamed map. Work towards https://github.com/MaterializeInc/database-issues/issues/6686 Co-Authored-By: Claude Opus 4.7 (1M context) --- src/adapter/src/coord.rs | 13 +++++++++---- src/adapter/src/coord/command_handler.rs | 2 +- src/adapter/src/coord/sequencer/inner.rs | 18 +++++++++++------- src/adapter/src/coord/sql.rs | 2 +- 4 files changed, 22 insertions(+), 13 deletions(-) diff --git a/src/adapter/src/coord.rs b/src/adapter/src/coord.rs index 6f484913e5fb1..5a0673e44c70d 100644 --- a/src/adapter/src/coord.rs +++ b/src/adapter/src/coord.rs @@ -1885,9 +1885,14 @@ pub struct Coordinator { /// to stage Batches in Persist that we will then link into the shard. active_copies: BTreeMap, - /// A map from connection ids to a watch channel that is set to `true` if the connection - /// received a cancel request. - staged_cancellation: BTreeMap, watch::Receiver)>, + /// Connection-scoped cancellation watches. + /// + /// Each entry is a watch channel whose value is `false` until cancellation + /// is requested for that connection, at which point it is set to `true`. + /// + /// Consumers install/remove these watches while they have cancellable work + /// in flight. + connection_cancel_watches: BTreeMap, watch::Receiver)>, /// Active introspection subscribes. introspection_subscribes: BTreeMap, @@ -4708,7 +4713,7 @@ pub fn serve( active_compute_sinks: BTreeMap::new(), active_webhooks: BTreeMap::new(), active_copies: BTreeMap::new(), - staged_cancellation: BTreeMap::new(), + connection_cancel_watches: BTreeMap::new(), introspection_subscribes: BTreeMap::new(), write_locks: BTreeMap::new(), deferred_write_ops: BTreeMap::new(), diff --git a/src/adapter/src/coord/command_handler.rs b/src/adapter/src/coord/command_handler.rs index c7e9dea44264e..c224db5ff3a88 100644 --- a/src/adapter/src/coord/command_handler.rs +++ b/src/adapter/src/coord/command_handler.rs @@ -1878,7 +1878,7 @@ impl Coordinator { self.cancel_cluster_reconfigurations_for_conn(&conn_id) .await; self.cancel_pending_copy(&conn_id); - if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) { + if let Some((tx, _rx)) = self.connection_cancel_watches.get_mut(&conn_id) { let _ = tx.send(true); } } diff --git a/src/adapter/src/coord/sequencer/inner.rs b/src/adapter/src/coord/sequencer/inner.rs index 7f9d5c4bfb20a..01cea854d15b4 100644 --- a/src/adapter/src/coord/sequencer/inner.rs +++ b/src/adapter/src/coord/sequencer/inner.rs @@ -159,10 +159,12 @@ struct CreateSourceInner { } impl Coordinator { - /// Sequences the next staged of a [Staged] plan. This is designed for use with plans that - /// execute both on and off of the coordinator thread. Stages can either produce another stage - /// to execute or a final response. An explicit [Span] is passed to allow for convenient - /// tracing. + /// Sequences a [Staged] plan. + /// + /// This is designed for plans that execute both on and off the coordinator + /// thread. Stages can either produce another stage to execute or a final + /// response. Maintains the connection-scoped cancel watch in + /// `connection_cancel_watches` while a stage is cancelable. pub(crate) async fn sequence_staged( &mut self, mut ctx: S::Ctx, @@ -180,7 +182,7 @@ impl Coordinator { // Channel to await cancellation. Insert a new channel, but check if the previous one // was already canceled. if let Some((_prev_tx, prev_rx)) = self - .staged_cancellation + .connection_cancel_watches .insert(session.conn_id().clone(), watch::channel(false)) { let was_canceled = *prev_rx.borrow(); @@ -192,7 +194,7 @@ impl Coordinator { } else { // If no cancel allowed, remove it so handle_spawn doesn't observe any previous value // when cancel_enabled may have been true on an earlier stage. - self.staged_cancellation.remove(session.conn_id()); + self.connection_cancel_watches.remove(session.conn_id()); } } else { cancel_enabled = false @@ -225,6 +227,8 @@ impl Coordinator { } } + /// Waits for either the spawned stage work to complete or cancellation to + /// be signaled through the connection-scoped cancel watch. fn handle_spawn( &self, ctx: C, @@ -238,7 +242,7 @@ impl Coordinator { { let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx .session() - .and_then(|session| self.staged_cancellation.get(session.conn_id())) + .and_then(|session| self.connection_cancel_watches.get(session.conn_id())) { let mut rx = rx.clone(); Box::pin(async move { diff --git a/src/adapter/src/coord/sql.rs b/src/adapter/src/coord/sql.rs index a7382591a10e1..5f7431ad45e09 100644 --- a/src/adapter/src/coord/sql.rs +++ b/src/adapter/src/coord/sql.rs @@ -217,7 +217,7 @@ impl Coordinator { /// Clears coordinator state for a connection. pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) { - self.staged_cancellation.remove(conn_id); + self.connection_cancel_watches.remove(conn_id); self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished) .await; self.retire_cluster_reconfigurations_for_conn(conn_id).await;