Skip to content

Commit 4c5f562

Browse files
fix(hub): close transport after IO loop to prevent sub-agent process leak (#93)
* fix(hub): close transport after agent IO loop exits to prevent sub-agent process leak (#91) Sub-agent processes (`--serve`) hung indefinitely after completion due to a stdin pipe deadlock: the Hub never closed its write end, so the child's `tokio::io::stdin()` blocking read never received EOF. - Close transport in `finish_and_deliver` after emit/unregister/delivery, giving the child EOF on stdin so the process can exit - Break IPC reader task on `tx.send()` failure instead of silently continuing - Add `AgentProcess::wait_or_kill(timeout)` utility for bounded reaping * fix: rustfmt long-line formatting in connection reader * fix: rustfmt test files
1 parent be75aaf commit 4c5f562

8 files changed

Lines changed: 392 additions & 8 deletions

File tree

crates/loopal-agent-client/src/process.rs

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -88,6 +88,29 @@ impl AgentProcess {
8888
self.child.wait().await
8989
}
9090

91+
/// Wait for the child to exit with a timeout, then SIGKILL if it doesn't.
92+
///
93+
/// Intended for callers that need a bounded wait after signalling the child
94+
/// to exit (e.g. after closing the transport). Not currently used by the
95+
/// default spawn path — kept as a utility for future callers.
96+
pub async fn wait_or_kill(mut self, timeout: Duration) {
97+
match tokio::time::timeout(timeout, self.child.wait()).await {
98+
Ok(Ok(status)) => {
99+
info!(?status, "agent child exited");
100+
}
101+
Ok(Err(e)) => {
102+
warn!("error waiting for agent child: {e}");
103+
}
104+
Err(_) => {
105+
warn!("agent child did not exit within grace period, killing");
106+
if let Err(e) = self.child.kill().await {
107+
warn!("failed to kill agent child: {e}");
108+
}
109+
let _ = self.child.wait().await;
110+
}
111+
}
112+
}
113+
91114
/// Graceful shutdown: close the transport writer (signals EOF to child),
92115
/// wait for exit, then SIGKILL if the grace period expires.
93116
pub async fn shutdown(mut self) -> anyhow::Result<()> {

crates/loopal-agent-hub/src/agent_io.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -158,8 +158,8 @@ pub fn start_agent_io(
158158
}
159159
crate::spawn_manager::spawn_completion_bridge(&n, conn3, completion_rx);
160160
info!(agent = %n, "agent registered in Hub");
161-
let output = agent_io_loop(hub2, conn, rx, n.clone()).await;
162-
finish_and_deliver(&hub, &n2, output).await;
161+
let output = agent_io_loop(hub2, conn.clone(), rx, n.clone()).await;
162+
finish_and_deliver(&hub, &n2, output, &conn).await;
163163
info!(agent = %n2, "agent IO loop ended");
164164
});
165165
}
@@ -175,8 +175,8 @@ pub fn spawn_io_loop(
175175
let n = name.to_string();
176176
let n2 = name.to_string();
177177
tokio::spawn(async move {
178-
let output = agent_io_loop(hub2, conn, rx, n.clone()).await;
179-
finish_and_deliver(&hub, &n2, output).await;
178+
let output = agent_io_loop(hub2, conn.clone(), rx, n.clone()).await;
179+
finish_and_deliver(&hub, &n2, output, &conn).await;
180180
info!(agent = %n2, "agent IO loop ended");
181181
});
182182
}

crates/loopal-agent-hub/src/finish.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -4,15 +4,22 @@ use std::sync::Arc;
44

55
use tokio::sync::Mutex;
66

7+
use loopal_ipc::connection::Connection;
78
use loopal_protocol::{Envelope, QualifiedAddress};
89

910
use crate::hub::Hub;
1011

11-
/// Emit agent finished, unregister, and deliver completion to parent.
12+
/// Emit agent finished, unregister, deliver completion to parent, and close the
13+
/// connection so the child process receives EOF on stdin and can exit.
1214
///
1315
/// Handles both local parents (via completion_tx) and remote parents
1416
/// (via MetaHub uplink). Called after the agent IO loop exits.
15-
pub(crate) async fn finish_and_deliver(hub: &Arc<Mutex<Hub>>, name: &str, output: Option<String>) {
17+
pub(crate) async fn finish_and_deliver(
18+
hub: &Arc<Mutex<Hub>>,
19+
name: &str,
20+
output: Option<String>,
21+
conn: &Arc<Connection>,
22+
) {
1623
let output_text = output.as_deref().unwrap_or("(no output)").to_string();
1724

1825
let (pending, uplink, parent_name) = {
@@ -47,4 +54,9 @@ pub(crate) async fn finish_and_deliver(hub: &Arc<Mutex<Hub>>, name: &str, output
4754
}
4855
}
4956
}
57+
58+
// Close the transport writer so the child process receives EOF on stdin.
59+
// This must happen AFTER delivery — the child's blocking stdin read will
60+
// return, allowing the process to exit cleanly.
61+
conn.close().await;
5062
}

crates/loopal-agent-hub/tests/suite.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,5 +29,7 @@ mod race_condition_test;
2929
mod relay_test;
3030
#[path = "suite/spawn_lifecycle_test.rs"]
3131
mod spawn_lifecycle_test;
32+
#[path = "suite/transport_close_test.rs"]
33+
mod transport_close_test;
3234
#[path = "suite/wait_nonblocking_test.rs"]
3335
mod wait_nonblocking_test;
Lines changed: 276 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,276 @@
1+
//! Tests that the IO loop closes the transport after agent completion.
2+
//!
3+
//! Verifies the fix for the sub-agent process leak: after receiving
4+
//! `agent/completed`, the Hub must close the transport writer so the
5+
//! child process's blocking stdin read gets EOF and the process can exit.
6+
7+
use std::sync::Arc;
8+
use std::time::Duration;
9+
10+
use tokio::sync::{Mutex, mpsc};
11+
12+
use loopal_agent_hub::Hub;
13+
use loopal_agent_hub::spawn_manager::register_agent_connection;
14+
use loopal_ipc::connection::Connection;
15+
use loopal_ipc::protocol::methods;
16+
use loopal_protocol::AgentEvent;
17+
use serde_json::json;
18+
19+
fn make_hub() -> (Arc<Mutex<Hub>>, mpsc::Receiver<AgentEvent>) {
20+
let (tx, rx) = mpsc::channel::<AgentEvent>(64);
21+
(Arc::new(Mutex::new(Hub::new(tx))), rx)
22+
}
23+
24+
/// After `agent/completed`, the Hub-side transport must be disconnected.
25+
/// This is the critical fix: without `conn.close()`, the child process
26+
/// would hang forever on a blocking stdin read.
27+
#[tokio::test]
28+
async fn transport_closed_after_agent_completes() {
29+
let (hub, _event_rx) = make_hub();
30+
31+
let (agent_transport, hub_transport) = loopal_ipc::duplex_pair();
32+
let hub_transport_ref = hub_transport.clone();
33+
34+
let agent_conn = Arc::new(Connection::new(agent_transport));
35+
let server_conn = Arc::new(Connection::new(hub_transport));
36+
37+
let _agent_rx = agent_conn.start();
38+
let server_rx = server_conn.start();
39+
40+
register_agent_connection(
41+
hub.clone(),
42+
"worker",
43+
server_conn,
44+
server_rx,
45+
None,
46+
None,
47+
None,
48+
)
49+
.await;
50+
tokio::time::sleep(Duration::from_millis(50)).await;
51+
52+
// Agent sends agent/completed — triggers IO loop exit + conn.close()
53+
agent_conn
54+
.send_notification(
55+
methods::AGENT_COMPLETED.name,
56+
json!({"reason": "end_turn", "result": "done"}),
57+
)
58+
.await
59+
.unwrap();
60+
61+
// Wait for IO loop to process completion and close transport
62+
tokio::time::sleep(Duration::from_millis(200)).await;
63+
64+
assert!(
65+
!hub_transport_ref.is_connected(),
66+
"Hub-side transport must be disconnected after agent/completed"
67+
);
68+
}
69+
70+
/// The agent (child) side must receive EOF after Hub closes the transport,
71+
/// enabling the child process to exit its reader loop.
72+
#[tokio::test]
73+
async fn agent_receives_eof_after_hub_closes_transport() {
74+
let (hub, _event_rx) = make_hub();
75+
76+
let (agent_transport, hub_transport) = loopal_ipc::duplex_pair();
77+
let agent_transport_ref = agent_transport.clone();
78+
79+
let agent_conn = Arc::new(Connection::new(agent_transport));
80+
let server_conn = Arc::new(Connection::new(hub_transport));
81+
82+
let _agent_rx = agent_conn.start();
83+
let server_rx = server_conn.start();
84+
85+
register_agent_connection(
86+
hub.clone(),
87+
"worker",
88+
server_conn,
89+
server_rx,
90+
None,
91+
None,
92+
None,
93+
)
94+
.await;
95+
tokio::time::sleep(Duration::from_millis(50)).await;
96+
97+
// Agent sends completion
98+
agent_conn
99+
.send_notification(
100+
methods::AGENT_COMPLETED.name,
101+
json!({"reason": "end_turn", "result": "done"}),
102+
)
103+
.await
104+
.unwrap();
105+
106+
// Wait for Hub to close the transport
107+
tokio::time::sleep(Duration::from_millis(200)).await;
108+
109+
// Agent's reader should now get EOF when trying to read.
110+
// recv() returns Ok(None) on EOF.
111+
let recv_result =
112+
tokio::time::timeout(Duration::from_secs(2), agent_transport_ref.recv()).await;
113+
114+
match recv_result {
115+
Ok(Ok(None)) => {} // EOF — correct, Hub closed its writer
116+
Ok(Ok(Some(_))) => panic!("should not receive data after Hub closed transport"),
117+
Ok(Err(_)) => {} // read error is also acceptable (broken pipe)
118+
Err(_) => panic!("agent recv should not timeout — Hub must close transport"),
119+
}
120+
}
121+
122+
/// Result is fully delivered to the parent before transport close.
123+
#[tokio::test]
124+
async fn result_delivered_before_transport_close() {
125+
let (hub, _event_rx) = make_hub();
126+
127+
let (agent_transport, hub_transport) = loopal_ipc::duplex_pair();
128+
let agent_conn = Arc::new(Connection::new(agent_transport));
129+
let server_conn = Arc::new(Connection::new(hub_transport));
130+
131+
let _agent_rx = agent_conn.start();
132+
let server_rx = server_conn.start();
133+
134+
register_agent_connection(
135+
hub.clone(),
136+
"worker",
137+
server_conn,
138+
server_rx,
139+
None,
140+
None,
141+
None,
142+
)
143+
.await;
144+
// Set up a completion watcher before the agent finishes
145+
let mut watcher = {
146+
let mut h = hub.lock().await;
147+
h.registry.watch_completion("worker")
148+
};
149+
tokio::time::sleep(Duration::from_millis(50)).await;
150+
151+
// Agent sends completion with result
152+
agent_conn
153+
.send_notification(
154+
methods::AGENT_COMPLETED.name,
155+
json!({"reason": "end_turn", "result": "the answer is 42"}),
156+
)
157+
.await
158+
.unwrap();
159+
160+
// Wait for the watcher to receive the result (set by emit_agent_finished,
161+
// called in finish_and_deliver BEFORE conn.close)
162+
let result = tokio::time::timeout(Duration::from_secs(2), watcher.changed()).await;
163+
assert!(result.is_ok(), "watcher should be notified");
164+
assert_eq!(
165+
watcher.borrow().as_deref(),
166+
Some("the answer is 42"),
167+
"result must be delivered before transport close"
168+
);
169+
}
170+
171+
/// When the child process crashes (closes its connection without sending
172+
/// `agent/completed`), the Hub must still close the transport so the
173+
/// `agent_proc.wait()` background task can reap the child.
174+
#[tokio::test]
175+
async fn child_crash_triggers_transport_close() {
176+
let (hub, _event_rx) = make_hub();
177+
178+
let (agent_transport, hub_transport) = loopal_ipc::duplex_pair();
179+
let hub_transport_ref = hub_transport.clone();
180+
181+
let agent_conn = Arc::new(Connection::new(agent_transport));
182+
let server_conn = Arc::new(Connection::new(hub_transport));
183+
184+
let _agent_rx = agent_conn.start();
185+
let server_rx = server_conn.start();
186+
187+
register_agent_connection(
188+
hub.clone(),
189+
"crasher",
190+
server_conn,
191+
server_rx,
192+
None,
193+
None,
194+
None,
195+
)
196+
.await;
197+
tokio::time::sleep(Duration::from_millis(50)).await;
198+
199+
// Simulate child crash: close the agent-side writer (child's stdout closes).
200+
// Hub's reader will get EOF → IO loop exits → finish_and_deliver → conn.close.
201+
agent_conn.close().await;
202+
203+
// Wait for Hub to detect EOF and close its side
204+
tokio::time::sleep(Duration::from_millis(300)).await;
205+
206+
assert!(
207+
!hub_transport_ref.is_connected(),
208+
"Hub must close transport even when child crashes without agent/completed"
209+
);
210+
211+
// Agent should be unregistered from Hub
212+
assert!(
213+
hub.lock()
214+
.await
215+
.registry
216+
.get_agent_connection("crasher")
217+
.is_none(),
218+
"crashed agent must be unregistered"
219+
);
220+
}
221+
222+
/// After completion and transport close, the agent must no longer be
223+
/// routable in the Hub registry.
224+
#[tokio::test]
225+
async fn agent_unregistered_after_completion() {
226+
let (hub, _event_rx) = make_hub();
227+
228+
let (agent_transport, hub_transport) = loopal_ipc::duplex_pair();
229+
let agent_conn = Arc::new(Connection::new(agent_transport));
230+
let server_conn = Arc::new(Connection::new(hub_transport));
231+
232+
let _agent_rx = agent_conn.start();
233+
let server_rx = server_conn.start();
234+
235+
register_agent_connection(
236+
hub.clone(),
237+
"ephemeral",
238+
server_conn,
239+
server_rx,
240+
None,
241+
None,
242+
None,
243+
)
244+
.await;
245+
tokio::time::sleep(Duration::from_millis(50)).await;
246+
247+
// Verify registered before completion
248+
assert!(
249+
hub.lock()
250+
.await
251+
.registry
252+
.get_agent_connection("ephemeral")
253+
.is_some(),
254+
"agent should be registered before completion"
255+
);
256+
257+
agent_conn
258+
.send_notification(
259+
methods::AGENT_COMPLETED.name,
260+
json!({"reason": "end_turn", "result": "ok"}),
261+
)
262+
.await
263+
.unwrap();
264+
265+
tokio::time::sleep(Duration::from_millis(200)).await;
266+
267+
// Must be unregistered after completion
268+
assert!(
269+
hub.lock()
270+
.await
271+
.registry
272+
.get_agent_connection("ephemeral")
273+
.is_none(),
274+
"agent must be unregistered after completion"
275+
);
276+
}

crates/loopal-ipc/src/connection.rs

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,10 +83,24 @@ impl Connection {
8383
}
8484
}
8585
IncomingMessage::Request { id, method, params } => {
86-
let _ = tx.send(Incoming::Request { id, method, params }).await;
86+
if tx
87+
.send(Incoming::Request { id, method, params })
88+
.await
89+
.is_err()
90+
{
91+
debug!("IPC reader: incoming channel closed, exiting");
92+
break;
93+
}
8794
}
8895
IncomingMessage::Notification { method, params } => {
89-
let _ = tx.send(Incoming::Notification { method, params }).await;
96+
if tx
97+
.send(Incoming::Notification { method, params })
98+
.await
99+
.is_err()
100+
{
101+
debug!("IPC reader: incoming channel closed, exiting");
102+
break;
103+
}
90104
}
91105
}
92106
}

0 commit comments

Comments
 (0)