Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 9 additions & 4 deletions src/adapter/src/coord.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1885,9 +1885,14 @@ pub struct Coordinator {
/// to stage Batches in Persist that we will then link into the shard.
active_copies: BTreeMap<ConnectionId, ActiveCopyFrom>,

/// A map from connection ids to a watch channel that is set to `true` if the connection
/// received a cancel request.
staged_cancellation: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
/// Connection-scoped cancellation watches.
///
/// Each entry is a watch channel whose value is `false` until cancellation
/// is requested for that connection, at which point it is set to `true`.
///
/// Consumers install/remove these watches while they have cancellable work
/// in flight.
connection_cancel_watches: BTreeMap<ConnectionId, (watch::Sender<bool>, watch::Receiver<bool>)>,
/// Active introspection subscribes.
introspection_subscribes: BTreeMap<GlobalId, IntrospectionSubscribe>,

Expand Down Expand Up @@ -4708,7 +4713,7 @@ pub fn serve(
active_compute_sinks: BTreeMap::new(),
active_webhooks: BTreeMap::new(),
active_copies: BTreeMap::new(),
staged_cancellation: BTreeMap::new(),
connection_cancel_watches: BTreeMap::new(),
introspection_subscribes: BTreeMap::new(),
write_locks: BTreeMap::new(),
deferred_write_ops: BTreeMap::new(),
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/command_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1878,7 +1878,7 @@ impl Coordinator {
self.cancel_cluster_reconfigurations_for_conn(&conn_id)
.await;
self.cancel_pending_copy(&conn_id);
if let Some((tx, _rx)) = self.staged_cancellation.get_mut(&conn_id) {
if let Some((tx, _rx)) = self.connection_cancel_watches.get_mut(&conn_id) {
let _ = tx.send(true);
}
}
Expand Down
18 changes: 11 additions & 7 deletions src/adapter/src/coord/sequencer/inner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,10 +159,12 @@ struct CreateSourceInner {
}

impl Coordinator {
/// Sequences the next staged of a [Staged] plan. This is designed for use with plans that
/// execute both on and off of the coordinator thread. Stages can either produce another stage
/// to execute or a final response. An explicit [Span] is passed to allow for convenient
/// tracing.
/// Sequences a [Staged] plan.
///
/// This is designed for plans that execute both on and off the coordinator
/// thread. Stages can either produce another stage to execute or a final
/// response. Maintains the connection-scoped cancel watch in
/// `connection_cancel_watches` while a stage is cancelable.
pub(crate) async fn sequence_staged<S>(
&mut self,
mut ctx: S::Ctx,
Expand All @@ -180,7 +182,7 @@ impl Coordinator {
// Channel to await cancellation. Insert a new channel, but check if the previous one
// was already canceled.
if let Some((_prev_tx, prev_rx)) = self
.staged_cancellation
.connection_cancel_watches
.insert(session.conn_id().clone(), watch::channel(false))
{
let was_canceled = *prev_rx.borrow();
Expand All @@ -192,7 +194,7 @@ impl Coordinator {
} else {
// If no cancel allowed, remove it so handle_spawn doesn't observe any previous value
// when cancel_enabled may have been true on an earlier stage.
self.staged_cancellation.remove(session.conn_id());
self.connection_cancel_watches.remove(session.conn_id());
}
} else {
cancel_enabled = false
Expand Down Expand Up @@ -225,6 +227,8 @@ impl Coordinator {
}
}

/// Waits for either the spawned stage work to complete or cancellation to
/// be signaled through the connection-scoped cancel watch.
fn handle_spawn<C, T, F>(
&self,
ctx: C,
Expand All @@ -238,7 +242,7 @@ impl Coordinator {
{
let rx: BoxFuture<()> = if let Some((_tx, rx)) = ctx
.session()
.and_then(|session| self.staged_cancellation.get(session.conn_id()))
.and_then(|session| self.connection_cancel_watches.get(session.conn_id()))
{
let mut rx = rx.clone();
Box::pin(async move {
Expand Down
2 changes: 1 addition & 1 deletion src/adapter/src/coord/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ impl Coordinator {

/// Clears coordinator state for a connection.
pub(crate) async fn clear_connection(&mut self, conn_id: &ConnectionId) {
self.staged_cancellation.remove(conn_id);
self.connection_cancel_watches.remove(conn_id);
self.retire_compute_sinks_for_conn(conn_id, ActiveComputeSinkRetireReason::Finished)
.await;
self.retire_cluster_reconfigurations_for_conn(conn_id).await;
Expand Down
Loading