diff --git a/engine/packages/pegboard/src/workflows/serverless/conn.rs b/engine/packages/pegboard/src/workflows/serverless/conn.rs index ed0485ab4d..56a90282e8 100644 --- a/engine/packages/pegboard/src/workflows/serverless/conn.rs +++ b/engine/packages/pegboard/src/workflows/serverless/conn.rs @@ -156,16 +156,13 @@ enum OutboundReqOutput { #[timeout = u64::MAX] async fn outbound_req(ctx: &ActivityCtx, input: &OutboundReqInput) -> Result { let mut term_signal = TermSignal::get(); - let mut drain_sub = ctx - .subscribe::(("workflow_id", ctx.workflow_id())) - .await?; loop { metrics::SERVERLESS_OUTBOUND_REQ_ACTIVE .with_label_values(&[&input.namespace_id.to_string(), &input.runner_name]) .inc(); - let res = outbound_req_inner(ctx, input, &mut term_signal, &mut drain_sub).await; + let res = outbound_req_inner(ctx, input, &mut term_signal).await; metrics::SERVERLESS_OUTBOUND_REQ_ACTIVE .with_label_values(&[&input.namespace_id.to_string(), &input.runner_name]) @@ -203,7 +200,6 @@ async fn outbound_req_inner( ctx: &ActivityCtx, input: &OutboundReqInput, term_signal: &mut TermSignal, - drain_sub: &mut message::SubscriptionHandle, ) -> Result { if is_runner_draining(ctx, input.receiver_wf_id).await? { return Ok(OutboundReqOutput::Draining { drain_sent: false }); @@ -438,7 +434,6 @@ async fn outbound_req_inner( } }, _ = tokio::time::sleep(sleep_until_drain) => {} - _ = drain_sub.next() => {} _ = term_signal.recv() => {} }; diff --git a/engine/packages/pegboard/src/workflows/serverless/receiver.rs b/engine/packages/pegboard/src/workflows/serverless/receiver.rs index 6f0af7bc16..579f385263 100644 --- a/engine/packages/pegboard/src/workflows/serverless/receiver.rs +++ b/engine/packages/pegboard/src/workflows/serverless/receiver.rs @@ -48,11 +48,7 @@ pub async fn pegboard_serverless_receiver(ctx: &mut WorkflowCtx, input: &Input) .send() .await?; - // if the connection is currently running an outbound req, this will be received - ctx.msg(conn::Drain {}) - .topic(("workflow_id", conn_wf_id)) - .send() - .await?; + ctx.removed::>().await?; // Wait for connection wf to complete so this wf's state remains readable ctx.workflow::(conn_wf_id).output().await?;