From d9d06d3ecc3a534ac888a1befa0d4c3bd9708b57 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 20:47:41 -0400 Subject: [PATCH 01/17] feat(events): define push-event JSON schema Adds the Event enum and RemovedReason for the push-event socket. Wire format is JSONL: snapshot reuses shpool_protocol::Session so the schema matches `shpool list --json`; deltas are flat objects tagged with `type`. --- libshpool/src/events.rs | 123 ++++++++++++++++++++++++++++++++++++++++ libshpool/src/lib.rs | 1 + 2 files changed, 124 insertions(+) create mode 100644 libshpool/src/events.rs diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs new file mode 100644 index 00000000..27794135 --- /dev/null +++ b/libshpool/src/events.rs @@ -0,0 +1,123 @@ +//! Push-event protocol for the daemon. +//! +//! Events are published to subscribers connected to a sibling Unix socket +//! next to the main shpool socket. The wire format is JSON, one event per +//! line (newline-delimited; aka JSONL). Non-Rust clients only need a Unix +//! socket and a JSON parser to consume the stream. +//! +//! On connect, a subscriber receives a `snapshot` event reflecting the +//! current session table, atomically with respect to the table mutations +//! that produce subsequent delta events. After the snapshot, the subscriber +//! receives delta events as the session table changes. To force a re-sync, +//! a subscriber may simply reconnect. +//! +//! The `sessions` field of a snapshot event uses the same schema as the +//! `sessions` field of `shpool list --json`, so the two surfaces stay in +//! sync by construction. + +use serde_derive::Serialize; +use shpool_protocol::Session; + +/// An event published on the events socket. +#[derive(Serialize, Debug)] +#[serde(tag = "type")] +pub enum Event { + /// Sent as the first message after a subscriber connects, reflecting + /// the current session table. + #[serde(rename = "snapshot")] + Snapshot { sessions: Vec }, + + /// A new session was created. + #[serde(rename = "session.created")] + SessionCreated { name: String, started_at_unix_ms: i64 }, + + /// A client attached to an existing session. + #[serde(rename = "session.attached")] + SessionAttached { name: String, last_connected_at_unix_ms: i64 }, + + /// A client detached from a session that is still alive. + #[serde(rename = "session.detached")] + SessionDetached { name: String, last_disconnected_at_unix_ms: i64 }, + + /// A session was removed from the session table. + #[serde(rename = "session.removed")] + SessionRemoved { name: String, reason: RemovedReason }, +} + +#[derive(Serialize, Debug)] +#[serde(rename_all = "lowercase")] +pub enum RemovedReason { + /// The shell process exited on its own. + Exited, + /// The session was killed by an explicit `shpool kill` request. + Killed, +} + +#[cfg(test)] +mod tests { + use super::*; + use shpool_protocol::SessionStatus; + + fn json(event: &Event) -> String { + serde_json::to_string(event).unwrap() + } + + #[test] + fn snapshot_serializes_with_sessions_array() { + let event = Event::Snapshot { + sessions: vec![Session { + name: "main".into(), + started_at_unix_ms: 100, + last_connected_at_unix_ms: Some(200), + last_disconnected_at_unix_ms: None, + status: SessionStatus::Attached, + }], + }; + assert_eq!( + json(&event), + r#"{"type":"snapshot","sessions":[{"name":"main","started_at_unix_ms":100,"last_connected_at_unix_ms":200,"last_disconnected_at_unix_ms":null,"status":"Attached"}]}"# + ); + } + + #[test] + fn session_created_serializes_flat() { + let event = Event::SessionCreated { name: "main".into(), started_at_unix_ms: 42 }; + assert_eq!(json(&event), r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"#); + } + + #[test] + fn session_attached_serializes_flat() { + let event = + Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; + assert_eq!( + json(&event), + r#"{"type":"session.attached","name":"main","last_connected_at_unix_ms":42}"# + ); + } + + #[test] + fn session_detached_serializes_flat() { + let event = + Event::SessionDetached { name: "main".into(), last_disconnected_at_unix_ms: 42 }; + assert_eq!( + json(&event), + r#"{"type":"session.detached","name":"main","last_disconnected_at_unix_ms":42}"# + ); + } + + #[test] + fn session_removed_serializes_with_reason() { + let exited = + Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; + assert_eq!( + json(&exited), + r#"{"type":"session.removed","name":"main","reason":"exited"}"# + ); + let killed = + Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; + assert_eq!( + json(&killed), + r#"{"type":"session.removed","name":"main","reason":"killed"}"# + ); + } +} diff --git a/libshpool/src/lib.rs b/libshpool/src/lib.rs index 354ba851..d0694d1e 100644 --- a/libshpool/src/lib.rs +++ b/libshpool/src/lib.rs @@ -36,6 +36,7 @@ mod daemon; mod daemonize; mod detach; mod duration; +mod events; mod hooks; mod kill; mod list; From d4f2a3f0a70088efe3285b338362c2f3d6e13a49 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:01:28 -0400 Subject: [PATCH 02/17] feat(events): add EventBus and sibling events socket listener The daemon binds a sibling Unix socket (events.socket) next to the main shpool socket and accepts long-lived subscribers. Each new subscriber receives a snapshot of the session table as its first message, built under the shells lock so subsequent deltas (published in a follow-up change) cannot race the registration. Per-subscriber writer threads with bounded channels and a write timeout isolate slow or stuck consumers from the daemon's hot path; subscribers that fall behind are dropped and re-sync by reconnecting. Extracts collect_sessions from handle_list so the snapshot and the existing `shpool list --json` output share one schema-producing path. --- libshpool/src/daemon/mod.rs | 4 +- libshpool/src/daemon/server.rs | 101 +++++++----- libshpool/src/events.rs | 271 +++++++++++++++++++++++++++++++++ 3 files changed, 341 insertions(+), 35 deletions(-) diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index 7b52ec71..2df4ea7a 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -17,7 +17,7 @@ use std::{env, os::unix::net::UnixListener, path::PathBuf}; use anyhow::Context; use tracing::{info, instrument}; -use crate::{config, consts, hooks}; +use crate::{config, consts, events, hooks}; mod etc_environment; mod exit_notify; @@ -84,6 +84,8 @@ pub fn run( // spawn the signal handler thread in the background signals::Handler::new(cleanup_socket.clone()).spawn()?; + let _events_guard = server.start_events_listener(events::socket_path(&socket))?; + server::Server::serve(server, listener)?; if let Some(sock) = cleanup_socket { diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 8698595c..9ab59237 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -49,7 +49,7 @@ use crate::{ etc_environment, exit_notify::ExitNotifier, hooks, pager, pager::PagerError, prompt, shell, show_motd, ttl_reaper, }, - protocol, test_hooks, tty, user, + events, protocol, test_hooks, tty, user, }; const DEFAULT_INITIAL_SHELL_PATH: &str = "/usr/bin:/bin:/usr/sbin:/sbin"; @@ -74,6 +74,7 @@ pub struct Server { runtime_dir: PathBuf, register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>, hooks: Box, + pub events_bus: Arc, daily_messenger: Arc, log_level_handle: tracing_subscriber::reload::Handle< tracing_subscriber::filter::LevelFilter, @@ -110,11 +111,38 @@ impl Server { runtime_dir, register_new_reapable_session: new_sess_tx, hooks, + events_bus: events::EventBus::new(), daily_messenger, log_level_handle, })) } + /// Bind the events socket and spawn the accept thread. Each accepted + /// connection is handed a snapshot built under the session-table lock + /// so subsequent deltas (published under the same lock) cannot race. + /// The returned guard unlinks the socket file on drop. + pub fn start_events_listener( + self: &Arc, + socket_path: PathBuf, + ) -> anyhow::Result { + let server = Arc::clone(self); + events::start_listener(socket_path, move |stream| { + server.handle_events_subscriber(stream) + }) + } + + fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { + let receiver = { + let _s = span!(Level::INFO, "events_subscribe_lock(shells)").entered(); + let shells = self.shells.lock().unwrap(); + let sessions = + collect_sessions(&shells).context("collecting snapshot for events subscriber")?; + let snapshot = events::Event::Snapshot { sessions }; + self.events_bus.register(&snapshot) + }; + events::spawn_writer(stream, receiver) + } + #[instrument(skip_all)] pub fn serve(server: Arc, listener: UnixListener) -> anyhow::Result<()> { test_hooks::emit("daemon-about-to-listen"); @@ -646,40 +674,8 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock().unwrap(); - - let sessions: anyhow::Result> = shells - .iter() - .map(|(k, v)| { - let status = match v.inner.try_lock() { - Ok(_) => SessionStatus::Disconnected, - Err(_) => SessionStatus::Attached, - }; - - let timestamps = v.lifecycle_timestamps.lock().unwrap(); - let last_connected_at_unix_ms = timestamps - .last_connected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - - let last_disconnected_at_unix_ms = timestamps - .last_disconnected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - - Ok(Session { - name: k.to_string(), - started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() - as i64, - last_connected_at_unix_ms, - last_disconnected_at_unix_ms, - status, - }) - }) - .collect(); - let sessions = sessions.context("collecting running session metadata")?; - + let sessions = collect_sessions(&shells)?; write_reply(&mut stream, ListReply { sessions })?; - Ok(()) } @@ -1143,6 +1139,43 @@ impl Server { } } +/// Collect a snapshot of the session table for `list` replies and event +/// snapshots. The caller must hold the shells lock for the duration of +/// the call so the resulting list is consistent with concurrent mutators. +fn collect_sessions( + shells: &HashMap>, +) -> anyhow::Result> { + shells + .iter() + .map(|(k, v)| { + let status = match v.inner.try_lock() { + Ok(_) => SessionStatus::Disconnected, + Err(_) => SessionStatus::Attached, + }; + + let timestamps = v.lifecycle_timestamps.lock().unwrap(); + let last_connected_at_unix_ms = timestamps + .last_connected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + let last_disconnected_at_unix_ms = timestamps + .last_disconnected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + + Ok(Session { + name: k.to_string(), + started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() + as i64, + last_connected_at_unix_ms, + last_disconnected_at_unix_ms, + status, + }) + }) + .collect::>>() + .context("collecting running session metadata") +} + // HACK: this is not a good way to detect shells that don't support our // sentinel injection approach, but it is better than just hanging when a // user tries to start one. diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 27794135..a77c693d 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -15,8 +15,31 @@ //! `sessions` field of `shpool list --json`, so the two surfaces stay in //! sync by construction. +use std::{ + io::Write, + os::unix::net::{UnixListener, UnixStream}, + path::{Path, PathBuf}, + sync::{ + mpsc::{self, Receiver, SyncSender, TrySendError}, + Arc, Mutex, + }, + thread, + time::Duration, +}; + +use anyhow::Context; use serde_derive::Serialize; use shpool_protocol::Session; +use tracing::{error, info, warn}; + +/// Per-subscriber outbound queue depth. Subscribers that fall this far +/// behind are dropped; reconnection re-syncs them via a fresh snapshot. +const SUBSCRIBER_QUEUE_DEPTH: usize = 64; + +/// Write timeout for stuck subscribers (e.g. suspended via Ctrl-Z). After +/// this elapses on a blocked write, the writer thread exits and the +/// subscriber is implicitly dropped on the next publish. +const WRITE_TIMEOUT: Duration = Duration::from_secs(5); /// An event published on the events socket. #[derive(Serialize, Debug)] @@ -53,6 +76,149 @@ pub enum RemovedReason { Killed, } +/// Fans out events to all connected subscribers. +/// +/// Lock ordering: callers that publish under another lock (e.g. the session +/// table) must take that lock before [`EventBus::publish`] takes its own +/// internal lock. [`EventBus::register`] follows the same order, so a +/// subscriber registered while the session-table lock is held cannot miss +/// a delta from a mutation that committed under that lock. +pub struct EventBus { + subscribers: Mutex>>>, +} + +impl EventBus { + pub fn new() -> Arc { + Arc::new(Self { subscribers: Mutex::new(Vec::new()) }) + } + + /// Broadcast `event` to all current subscribers. Subscribers whose + /// queues are full or whose receivers have hung up are dropped. + pub fn publish(&self, event: &Event) { + let line = match serialize_line(event) { + Some(line) => line, + None => return, + }; + let mut subs = self.subscribers.lock().unwrap(); + subs.retain(|tx| match tx.try_send(line.clone()) { + Ok(()) => true, + Err(TrySendError::Full(_)) => { + warn!("dropping events subscriber: queue full"); + false + } + Err(TrySendError::Disconnected(_)) => false, + }); + } + + /// Register a new subscriber with `snapshot` as the first message in + /// its queue. Returns the receiver to be handed to a writer thread. + pub fn register(&self, snapshot: &Event) -> Receiver> { + let line = serialize_line(snapshot).expect("snapshot serialization"); + let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); + tx.try_send(line).expect("seeding empty channel cannot fail"); + self.subscribers.lock().unwrap().push(tx); + rx + } +} + +fn serialize_line(event: &Event) -> Option> { + match serde_json::to_string(event) { + Ok(s) => Some(format!("{s}\n").into()), + Err(e) => { + error!("serializing event {:?}: {:?}", event, e); + None + } + } +} + +/// Sibling events socket path next to the main shpool socket. +pub fn socket_path(main_socket: &Path) -> PathBuf { + let mut path = main_socket.to_path_buf(); + path.set_file_name("events.socket"); + path +} + +/// Owns the events socket file. Dropping the guard unlinks the socket +/// path so a fresh daemon doesn't trip on stale files. The accept thread +/// is not stopped — daemon shutdown takes the process down. +pub struct ListenerGuard { + path: PathBuf, +} + +impl Drop for ListenerGuard { + fn drop(&mut self) { + match std::fs::remove_file(&self.path) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => warn!("removing events socket {:?}: {:?}", self.path, e), + } + } +} + +/// Bind the events socket and spawn the accept thread. For each accepted +/// connection, `on_accept` is invoked with the stream; it is expected to +/// register the subscriber with the bus and spawn a writer thread (see +/// [`spawn_writer`]). The returned guard unlinks the socket file on drop. +pub fn start_listener( + socket_path: PathBuf, + on_accept: F, +) -> anyhow::Result +where + F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, +{ + if socket_path.exists() { + std::fs::remove_file(&socket_path) + .with_context(|| format!("removing stale events socket {:?}", socket_path))?; + } + let listener = UnixListener::bind(&socket_path) + .with_context(|| format!("binding events socket {:?}", socket_path))?; + info!("events socket listening at {:?}", socket_path); + thread::Builder::new() + .name("events-accept".into()) + .spawn(move || run_accept_loop(listener, on_accept)) + .context("spawning events accept thread")?; + Ok(ListenerGuard { path: socket_path }) +} + +fn run_accept_loop(listener: UnixListener, on_accept: F) +where + F: Fn(UnixStream) -> anyhow::Result<()>, +{ + for stream in listener.incoming() { + match stream { + Ok(stream) => { + if let Err(e) = on_accept(stream) { + warn!("accepting events subscriber: {:?}", e); + } + } + Err(e) => { + error!("events listener accept failed: {:?}", e); + break; + } + } + } +} + +/// Set the write timeout and spawn a thread that drains `receiver` to +/// `stream` until either side closes or a write times out. +pub fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { + stream.set_write_timeout(Some(WRITE_TIMEOUT)).context("setting write timeout")?; + thread::Builder::new() + .name("events-writer".into()) + .spawn(move || run_writer(stream, receiver)) + .context("spawning events writer thread")?; + Ok(()) +} + +fn run_writer(mut stream: UnixStream, receiver: Receiver>) { + while let Ok(line) = receiver.recv() { + if let Err(e) = stream.write_all(line.as_bytes()) { + info!("events subscriber gone: {:?}", e); + break; + } + } +} + #[cfg(test)] mod tests { use super::*; @@ -105,6 +271,111 @@ mod tests { ); } + #[test] + fn bus_publish_with_no_subscribers_is_a_noop() { + let bus = EventBus::new(); + bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 1 }); + } + + #[test] + fn bus_register_seeds_receiver_with_snapshot() { + let bus = EventBus::new(); + let snapshot = Event::Snapshot { sessions: vec![] }; + let rx = bus.register(&snapshot); + let line = rx.try_recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); + } + + #[test] + fn bus_publish_reaches_subscriber_after_snapshot() { + let bus = EventBus::new(); + let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 7 }); + let snapshot_line = rx.recv().unwrap(); + let delta_line = rx.recv().unwrap(); + assert_eq!(&*snapshot_line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); + assert_eq!( + &*delta_line, + "{\"type\":\"session.created\",\"name\":\"main\",\"started_at_unix_ms\":7}\n" + ); + } + + #[test] + fn bus_drops_subscriber_whose_queue_is_full() { + let bus = EventBus::new(); + let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + // Fill the channel to capacity (the snapshot already used 1 slot). + for i in 0..(SUBSCRIBER_QUEUE_DEPTH - 1) { + bus.publish(&Event::SessionCreated { + name: format!("s{i}"), + started_at_unix_ms: i as i64, + }); + } + assert_eq!(bus.subscribers.lock().unwrap().len(), 1); + // One more publish overflows and the subscriber is dropped. + bus.publish(&Event::SessionCreated { name: "overflow".into(), started_at_unix_ms: 0 }); + assert_eq!(bus.subscribers.lock().unwrap().len(), 0); + // The receiver still has the buffered events; the channel is not + // closed for it from the receiving side. + drop(rx); + } + + #[test] + fn bus_drops_subscriber_whose_receiver_hung_up() { + let bus = EventBus::new(); + let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + drop(rx); + bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 0 }); + assert_eq!(bus.subscribers.lock().unwrap().len(), 0); + } + + #[test] + fn bus_publish_reaches_every_subscriber() { + let bus = EventBus::new(); + let rx_a = bus.register(&Event::Snapshot { sessions: vec![] }); + let rx_b = bus.register(&Event::Snapshot { sessions: vec![] }); + bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 1 }); + for rx in [&rx_a, &rx_b] { + let _snapshot = rx.recv().unwrap(); + let delta = rx.recv().unwrap(); + assert!(delta.contains(r#""type":"session.created""#)); + assert!(delta.contains(r#""name":"main""#)); + } + } + + #[test] + fn writer_exits_when_peer_closes_stream() { + let (a, b) = UnixStream::pair().unwrap(); + let (tx, rx) = mpsc::sync_channel::>(8); + let handle = thread::spawn(move || run_writer(a, rx)); + drop(b); + // The send may succeed (kernel buffered) or fail; what matters is + // that closing the channel unblocks the writer thread on the next + // recv, regardless of write outcome. + let _ = tx.try_send("ignored\n".into()); + drop(tx); + handle.join().unwrap(); + } + + #[test] + fn spawn_writer_sets_write_timeout() { + let (a, _b) = UnixStream::pair().unwrap(); + let probe = a.try_clone().unwrap(); + let (_tx, rx) = mpsc::sync_channel(1); + spawn_writer(a, rx).unwrap(); + assert_eq!(probe.write_timeout().unwrap(), Some(WRITE_TIMEOUT)); + } + + #[test] + fn listener_guard_unlinks_socket_on_drop() { + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let guard = start_listener(path.clone(), |_| Ok(())).unwrap(); + assert!(path.exists(), "socket file should exist while guard is alive"); + drop(guard); + assert!(!path.exists(), "socket file should be unlinked on guard drop"); + } + #[test] fn session_removed_serializes_with_reason() { let exited = From ea78579bebccac30a4fa02fde75942f1f2e9a44c Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:35:12 -0400 Subject: [PATCH 03/17] feat(events): publish deltas from session lifecycle mutations Emits session.created, session.attached, session.detached, and session.removed (with reason exited|killed) at the seven mutation sites that change the session table: * select_shell_desc create path: created + attached * select_shell_desc reattach path: attached * handle_attach client-disconnect path: detached * handle_attach shell-exit path: removed{exited} * handle_detach: detached * handle_kill: removed{killed} * ttl_reaper expiry: removed{killed} Each publish runs inside the same shells-lock scope as its mutation, so wire-order matches causal-order for any subscriber. Reaping is surfaced as `killed` for now; a dedicated reason can be added later if a use case appears. --- libshpool/src/daemon/server.rs | 53 +++++++++++++++++++++++++----- libshpool/src/daemon/ttl_reaper.rs | 9 +++++ 2 files changed, 54 insertions(+), 8 deletions(-) diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 9ab59237..d99993f8 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -94,12 +94,14 @@ impl Server { >, ) -> anyhow::Result> { let shells = Arc::new(Mutex::new(HashMap::new())); + let events_bus = events::EventBus::new(); // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); let shells_tab = Arc::clone(&shells); + let reaper_bus = Arc::clone(&events_bus); thread::spawn(move || { - if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab) { + if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab, reaper_bus) { warn!("ttl reaper exited with error: {:?}", e); } }); @@ -111,7 +113,7 @@ impl Server { runtime_dir, register_new_reapable_session: new_sess_tx, hooks, - events_bus: events::EventBus::new(), + events_bus, daily_messenger, log_level_handle, })) @@ -339,6 +341,10 @@ impl Server { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock().unwrap(); shells.remove(&header.name); + self.events_bus.publish(&events::Event::SessionRemoved { + name: header.name.clone(), + reason: events::RemovedReason::Exited, + }); } // The child shell has exited, so the shell->client thread should @@ -357,8 +363,13 @@ impl Server { let _s = span!(Level::INFO, "disconnect_lock(shells)").entered(); let shells = self.shells.lock().unwrap(); if let Some(session) = shells.get(&header.name) { + let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_disconnected_at = - Some(time::SystemTime::now()); + Some(now); + self.events_bus.publish(&events::Event::SessionDetached { + name: header.name.clone(), + last_disconnected_at_unix_ms: unix_ms(now), + }); } } if let Err(err) = self.hooks.on_client_disconnect(&header.name) { @@ -420,8 +431,13 @@ impl Server { // the channel is still open so the subshell is still running info!("taking over existing session inner"); inner.client_stream = Some(stream.try_clone()?); + let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_connected_at = - Some(time::SystemTime::now()); + Some(now); + self.events_bus.publish(&events::Event::SessionAttached { + name: header.name.clone(), + last_connected_at_unix_ms: unix_ms(now), + }); if inner .shell_to_client_join_h @@ -494,13 +510,22 @@ impl Server { matches!(motd, MotdDisplayMode::Dump), )?; - session.lifecycle_timestamps.lock().unwrap().last_connected_at = - Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + session.lifecycle_timestamps.lock().unwrap().last_connected_at = Some(now); + let started_at = session.started_at; { // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock().unwrap(); shells.insert(header.name.clone(), Box::new(session)); + self.events_bus.publish(&events::Event::SessionCreated { + name: header.name.clone(), + started_at_unix_ms: unix_ms(started_at), + }); + self.events_bus.publish(&events::Event::SessionAttached { + name: header.name.clone(), + last_connected_at_unix_ms: unix_ms(now), + }); } // fallthrough to bidi streaming } else if let Err(err) = self.hooks.on_reattach(&header.name) { @@ -600,8 +625,12 @@ impl Server { if let shell::ClientConnectionStatus::DetachNone = status { not_attached_sessions.push(session); } else { - s.lifecycle_timestamps.lock().unwrap().last_disconnected_at = - Some(time::SystemTime::now()); + let now = time::SystemTime::now(); + s.lifecycle_timestamps.lock().unwrap().last_disconnected_at = Some(now); + self.events_bus.publish(&events::Event::SessionDetached { + name: session.clone(), + last_disconnected_at_unix_ms: unix_ms(now), + }); } } else { not_found_sessions.push(session); @@ -659,6 +688,10 @@ impl Server { for session in to_remove.iter() { shells.remove(session); + self.events_bus.publish(&events::Event::SessionRemoved { + name: session.clone(), + reason: events::RemovedReason::Killed, + }); } if !to_remove.is_empty() { test_hooks::emit("daemon-handle-kill-removed-shells"); @@ -1139,6 +1172,10 @@ impl Server { } } +fn unix_ms(t: time::SystemTime) -> i64 { + t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64).unwrap_or(0) +} + /// Collect a snapshot of the session table for `list` replies and event /// snapshots. The caller must hold the shells lock for the duration of /// the call so the resulting list is consistent with concurrent mutators. diff --git a/libshpool/src/daemon/ttl_reaper.rs b/libshpool/src/daemon/ttl_reaper.rs index f5664b75..810519e9 100644 --- a/libshpool/src/daemon/ttl_reaper.rs +++ b/libshpool/src/daemon/ttl_reaper.rs @@ -30,12 +30,14 @@ use std::{ use tracing::{info, span, warn, Level}; use super::shell; +use crate::events; /// Run the reaper thread loop. Should be invoked in a dedicated /// thread. pub fn run( new_sess: crossbeam_channel::Receiver<(String, Instant)>, shells: Arc>>>, + events_bus: Arc, ) -> anyhow::Result<()> { let _s = span!(Level::INFO, "ttl_reaper").entered(); @@ -115,6 +117,13 @@ pub fn run( continue; } shells.remove(&reapable.session_name); + // Reaping is a daemon-initiated termination; surfaced to + // subscribers as `killed` until we have a use case for a + // dedicated reason. + events_bus.publish(&events::Event::SessionRemoved { + name: reapable.session_name.clone(), + reason: events::RemovedReason::Killed, + }); } } } From a4558beb12d009d0a544e12685bfdb11ee001ca5 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:37:54 -0400 Subject: [PATCH 04/17] feat(events): add `shpool events` subcommand Connects to the daemon's events socket and prints each JSON line to stdout, flushing per line so the stream is pipeline-friendly: shpool events | jq 'select(.type == "session.removed")' The first line is a snapshot of the current session table; subsequent lines are deltas. Reconnect to force a fresh snapshot. --- libshpool/src/events.rs | 19 ++++++++++++++++++- libshpool/src/lib.rs | 10 ++++++++++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index a77c693d..b81f0f83 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -16,7 +16,7 @@ //! sync by construction. use std::{ - io::Write, + io::{BufRead, BufReader, Write}, os::unix::net::{UnixListener, UnixStream}, path::{Path, PathBuf}, sync::{ @@ -131,6 +131,23 @@ fn serialize_line(event: &Event) -> Option> { } } +/// Connect to the events socket, copy each line to stdout, and flush per +/// line so the stream is usable in pipes (`shpool events | jq`). Returns +/// when the daemon closes the connection. +pub fn subscribe_to_stdout(socket_path: &Path) -> anyhow::Result<()> { + let stream = UnixStream::connect(socket_path) + .with_context(|| format!("connecting to events socket {:?}", socket_path))?; + let reader = BufReader::new(stream); + let stdout = std::io::stdout(); + let mut out = stdout.lock(); + for line in reader.lines() { + let line = line.context("reading event")?; + writeln!(out, "{line}").context("writing event")?; + out.flush().context("flushing stdout")?; + } + Ok(()) +} + /// Sibling events socket path next to the main shpool socket. pub fn socket_path(main_socket: &Path) -> PathBuf { let mut path = main_socket.to_path_buf(); diff --git a/libshpool/src/lib.rs b/libshpool/src/lib.rs index d0694d1e..2b728d48 100644 --- a/libshpool/src/lib.rs +++ b/libshpool/src/lib.rs @@ -210,6 +210,15 @@ needs debugging, but would be clobbered by a restart.")] #[clap(help = "new log level")] level: shpool_protocol::LogLevel, }, + + #[clap(about = "Subscribe to the daemon's push-event stream + +Connects to the events socket and writes each event (one JSON object +per line) to stdout, flushing after every line so the stream is +pipeline-friendly (e.g. `shpool events | jq`). The first line is a +snapshot of the current session table; subsequent lines are deltas. +Reconnect to force a fresh snapshot.")] + Events, } impl Args { @@ -382,6 +391,7 @@ pub fn run(args: Args, hooks: Option>) -> an Commands::Kill { sessions } => kill::run(sessions, socket), Commands::List { json } => list::run(socket, json), Commands::SetLogLevel { level } => set_log_level::run(level, socket), + Commands::Events => events::subscribe_to_stdout(&events::socket_path(&socket)), }; if let Err(err) = res { From 464b6fec5679a025c29895d24739ef6711cc25c2 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Sun, 26 Apr 2026 21:52:12 -0400 Subject: [PATCH 05/17] test(events): integration tests for the events socket MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Three end-to-end tests, exercising the full daemon → events socket → JSON wire path: * snapshot_then_lifecycle: snapshot, then session.created / .attached / .detached / .removed{killed} as a session is created, detached (via background mode), and killed. * snapshot_includes_existing_sessions: a subscriber that connects after a session already exists receives that session in the snapshot. * multiple_subscribers_each_get_independent_streams: two concurrent subscribers both receive the snapshot and full delta sequence. --- shpool/tests/events.rs | 146 +++++++++++++++++++++++++++++++++++++++++ 1 file changed, 146 insertions(+) create mode 100644 shpool/tests/events.rs diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs new file mode 100644 index 00000000..aac6fbc6 --- /dev/null +++ b/shpool/tests/events.rs @@ -0,0 +1,146 @@ +use std::{ + io::{BufRead, BufReader}, + os::unix::net::UnixStream, + path::PathBuf, + time::Duration, +}; + +use anyhow::{anyhow, Context}; +use ntest::timeout; +use serde_json::Value; + +mod support; + +use crate::support::daemon::{AttachArgs, DaemonArgs, Proc}; + +fn events_socket_path(daemon: &Proc) -> PathBuf { + daemon.socket_path.with_file_name("events.socket") +} + +fn connect_events(daemon: &Proc) -> anyhow::Result> { + let path = events_socket_path(daemon); + let mut sleep_dur = Duration::from_millis(5); + for _ in 0..12 { + if let Ok(stream) = UnixStream::connect(&path) { + return Ok(BufReader::new(stream)); + } + std::thread::sleep(sleep_dur); + sleep_dur *= 2; + } + Err(anyhow!("events socket never became available at {:?}", path)) +} + +fn next_event(reader: &mut BufReader) -> anyhow::Result { + let mut line = String::new(); + let n = reader.read_line(&mut line).context("reading event line")?; + if n == 0 { + return Err(anyhow!("events socket closed unexpectedly")); + } + serde_json::from_str(&line).with_context(|| format!("parsing event JSON: {line:?}")) +} + +#[test] +#[timeout(30000)] +fn snapshot_then_lifecycle() -> anyhow::Result<()> { + let mut daemon = Proc::new( + "norc.toml", + DaemonArgs { listen_events: false, ..DaemonArgs::default() }, + ) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + + let snap = next_event(&mut sub)?; + assert_eq!(snap["type"], "snapshot"); + assert_eq!(snap["sessions"].as_array().unwrap().len(), 0); + + // Background attach: client connects, daemon publishes created+attached; + // the client immediately detaches, triggering the detached event. + let _attach = daemon + .attach( + "s1", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + let created = next_event(&mut sub)?; + assert_eq!(created["type"], "session.created"); + assert_eq!(created["name"], "s1"); + assert!(created["started_at_unix_ms"].is_number()); + + let attached = next_event(&mut sub)?; + assert_eq!(attached["type"], "session.attached"); + assert_eq!(attached["name"], "s1"); + + let detached = next_event(&mut sub)?; + assert_eq!(detached["type"], "session.detached"); + assert_eq!(detached["name"], "s1"); + + let kill_out = daemon.kill(vec!["s1".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + let removed = next_event(&mut sub)?; + assert_eq!(removed["type"], "session.removed"); + assert_eq!(removed["name"], "s1"); + assert_eq!(removed["reason"], "killed"); + + Ok(()) +} + +#[test] +#[timeout(30000)] +fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { + let mut daemon = Proc::new( + "norc.toml", + DaemonArgs { listen_events: false, ..DaemonArgs::default() }, + ) + .context("starting daemon proc")?; + + let _attach = daemon + .attach( + "pre-existing", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + // Wait for the session to land in the table before subscribing. + daemon.wait_until_list_matches(|out| out.contains("pre-existing"))?; + + let mut sub = connect_events(&daemon)?; + let snap = next_event(&mut sub)?; + assert_eq!(snap["type"], "snapshot"); + let sessions = snap["sessions"].as_array().unwrap(); + assert_eq!(sessions.len(), 1); + assert_eq!(sessions[0]["name"], "pre-existing"); + + Ok(()) +} + +#[test] +#[timeout(30000)] +fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { + let mut daemon = Proc::new( + "norc.toml", + DaemonArgs { listen_events: false, ..DaemonArgs::default() }, + ) + .context("starting daemon proc")?; + let mut sub_a = connect_events(&daemon)?; + let mut sub_b = connect_events(&daemon)?; + + assert_eq!(next_event(&mut sub_a)?["type"], "snapshot"); + assert_eq!(next_event(&mut sub_b)?["type"], "snapshot"); + + let _attach = daemon + .attach( + "shared", + AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, + ) + .context("starting attach proc")?; + + for sub in [&mut sub_a, &mut sub_b] { + assert_eq!(next_event(sub)?["type"], "session.created"); + assert_eq!(next_event(sub)?["type"], "session.attached"); + assert_eq!(next_event(sub)?["type"], "session.detached"); + } + + Ok(()) +} From 68694a263174312906963b796a5f064bde0f5005 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Mon, 27 Apr 2026 09:09:34 -0400 Subject: [PATCH 06/17] fix(events): de-duplicate lifecycle deltas; clean socket on signal exit - Gate the SessionRemoved{Exited} publish in handle_attach's shell-exit branch on shells.remove() actually returning Some, so a concurrent kill or reaper that already removed the entry doesn't produce a duplicate removal event. - Drop the eager SessionDetached publish in handle_detach. The bidi-loop unwind path in handle_attach already publishes the matching event with its own timestamp; emitting it twice was observable to subscribers. Keep the eager last_disconnected_at write so concurrent list() callers still see fresh state immediately. - In select_shell_desc, defer the reattach SessionAttached publish past the is_finished() check so it isn't emitted for a session about to be implicitly clobbered by the create path. Have the create path publish SessionRemoved{Exited} when it overwrites an existing entry, so the replacement is explicit on the wire. - Thread the events socket path through signals::Handler so signal exits clean it up alongside the main socket. Switch to a Vec to handle both. Tolerate NotFound on cleanup since the events socket may not have been bound yet when a signal arrives. - Drop the unnecessary `pub` on Server.events_bus. - Apply nightly rustfmt. Three new integration tests: - explicit_detach_publishes_one_event: pins the no-duplicate-detached invariant by using a kill as a known-next-event fence; a duplicate detached would surface as the next read instead of session.removed. - signal_exit_unlinks_sockets: SIGTERM the daemon and assert both socket files are gone. - reattach_emits_attached_only: regression guard for the reattach path. --- libshpool/src/daemon/mod.rs | 12 ++- libshpool/src/daemon/server.rs | 58 ++++++++------ libshpool/src/daemon/signals.rs | 16 ++-- libshpool/src/events.rs | 29 +++---- shpool/tests/events.rs | 138 +++++++++++++++++++++++++++----- 5 files changed, 181 insertions(+), 72 deletions(-) diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index 2df4ea7a..d23e8357 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -81,10 +81,16 @@ pub fn run( (Some(socket.clone()), UnixListener::bind(&socket).context("binding to socket")?) } }; - // spawn the signal handler thread in the background - signals::Handler::new(cleanup_socket.clone()).spawn()?; + let events_socket = events::socket_path(&socket); - let _events_guard = server.start_events_listener(events::socket_path(&socket))?; + // spawn the signal handler thread in the background. Both sockets need + // explicit cleanup on signal exit because the process exits before any + // RAII guard can run. + let mut socks_to_clean: Vec = cleanup_socket.iter().cloned().collect(); + socks_to_clean.push(events_socket.clone()); + signals::Handler::new(socks_to_clean).spawn()?; + + let _events_guard = server.start_events_listener(events_socket)?; server::Server::serve(server, listener)?; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index d99993f8..c8f156c9 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -74,7 +74,7 @@ pub struct Server { runtime_dir: PathBuf, register_new_reapable_session: crossbeam_channel::Sender<(String, Instant)>, hooks: Box, - pub events_bus: Arc, + events_bus: Arc, daily_messenger: Arc, log_level_handle: tracing_subscriber::reload::Handle< tracing_subscriber::filter::LevelFilter, @@ -128,9 +128,7 @@ impl Server { socket_path: PathBuf, ) -> anyhow::Result { let server = Arc::clone(self); - events::start_listener(socket_path, move |stream| { - server.handle_events_subscriber(stream) - }) + events::start_listener(socket_path, move |stream| server.handle_events_subscriber(stream)) } fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { @@ -340,11 +338,14 @@ impl Server { { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock().unwrap(); - shells.remove(&header.name); - self.events_bus.publish(&events::Event::SessionRemoved { - name: header.name.clone(), - reason: events::RemovedReason::Exited, - }); + // Gated: a concurrent kill or reaper may have already + // removed the entry and published its own removal. + if shells.remove(&header.name).is_some() { + self.events_bus.publish(&events::Event::SessionRemoved { + name: header.name.clone(), + reason: events::RemovedReason::Exited, + }); + } } // The child shell has exited, so the shell->client thread should @@ -434,10 +435,6 @@ impl Server { let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_connected_at = Some(now); - self.events_bus.publish(&events::Event::SessionAttached { - name: header.name.clone(), - last_connected_at_unix_ms: unix_ms(now), - }); if inner .shell_to_client_join_h @@ -449,9 +446,14 @@ impl Server { "child_exited chan unclosed, but shell->client thread has exited, clobbering with new subshell" ); status = AttachStatus::Created { warnings }; + } else { + // Reattach confirmed; the create path won't run + // and clobber the entry, so it's safe to publish. + self.events_bus.publish(&events::Event::SessionAttached { + name: header.name.clone(), + last_connected_at_unix_ms: unix_ms(now), + }); } - - // status is already attached } Some(exit_status) => { // the channel is closed so we know the subshell exited @@ -517,7 +519,17 @@ impl Server { // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); let mut shells = self.shells.lock().unwrap(); + // If we're replacing a stale entry whose shell process is + // gone, surface that to subscribers before announcing the + // replacement. + let clobbered = shells.contains_key(&header.name); shells.insert(header.name.clone(), Box::new(session)); + if clobbered { + self.events_bus.publish(&events::Event::SessionRemoved { + name: header.name.clone(), + reason: events::RemovedReason::Exited, + }); + } self.events_bus.publish(&events::Event::SessionCreated { name: header.name.clone(), started_at_unix_ms: unix_ms(started_at), @@ -625,12 +637,12 @@ impl Server { if let shell::ClientConnectionStatus::DetachNone = status { not_attached_sessions.push(session); } else { - let now = time::SystemTime::now(); - s.lifecycle_timestamps.lock().unwrap().last_disconnected_at = Some(now); - self.events_bus.publish(&events::Event::SessionDetached { - name: session.clone(), - last_disconnected_at_unix_ms: unix_ms(now), - }); + // The bidi-loop unwind in handle_attach owns the + // SessionDetached publish (with its own timestamp); + // we just update last_disconnected_at eagerly so a + // concurrent list() reflects the detach immediately. + s.lifecycle_timestamps.lock().unwrap().last_disconnected_at = + Some(time::SystemTime::now()); } } else { not_found_sessions.push(session); @@ -1179,9 +1191,7 @@ fn unix_ms(t: time::SystemTime) -> i64 { /// Collect a snapshot of the session table for `list` replies and event /// snapshots. The caller must hold the shells lock for the duration of /// the call so the resulting list is consistent with concurrent mutators. -fn collect_sessions( - shells: &HashMap>, -) -> anyhow::Result> { +fn collect_sessions(shells: &HashMap>) -> anyhow::Result> { shells .iter() .map(|(k, v)| { diff --git a/libshpool/src/daemon/signals.rs b/libshpool/src/daemon/signals.rs index 3359883d..5e533f04 100644 --- a/libshpool/src/daemon/signals.rs +++ b/libshpool/src/daemon/signals.rs @@ -23,11 +23,11 @@ use signal_hook::{consts::TERM_SIGNALS, flag, iterator::Signals}; use tracing::{error, info}; pub struct Handler { - sock: Option, + socks: Vec, } impl Handler { - pub fn new(sock: Option) -> Self { - Handler { sock } + pub fn new(socks: Vec) -> Self { + Handler { socks } } pub fn spawn(self) -> anyhow::Result<()> { @@ -57,10 +57,12 @@ impl Handler { for signal in &mut signals { assert!(TERM_SIGNALS.contains(&signal)); - info!("term sig handler: cleaning up socket"); - if let Some(sock) = self.sock { - if let Err(e) = std::fs::remove_file(sock).context("cleaning up socket") { - error!("error cleaning up socket file: {}", e); + info!("term sig handler: cleaning up sockets"); + for sock in &self.socks { + match std::fs::remove_file(sock) { + Ok(()) => {} + Err(e) if e.kind() == std::io::ErrorKind::NotFound => {} + Err(e) => error!("error cleaning up socket {:?}: {}", sock, e), } } diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index b81f0f83..6ce9e034 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -176,10 +176,7 @@ impl Drop for ListenerGuard { /// connection, `on_accept` is invoked with the stream; it is expected to /// register the subscriber with the bus and spawn a writer thread (see /// [`spawn_writer`]). The returned guard unlinks the socket file on drop. -pub fn start_listener( - socket_path: PathBuf, - on_accept: F, -) -> anyhow::Result +pub fn start_listener(socket_path: PathBuf, on_accept: F) -> anyhow::Result where F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, { @@ -265,13 +262,15 @@ mod tests { #[test] fn session_created_serializes_flat() { let event = Event::SessionCreated { name: "main".into(), started_at_unix_ms: 42 }; - assert_eq!(json(&event), r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"#); + assert_eq!( + json(&event), + r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"# + ); } #[test] fn session_attached_serializes_flat() { - let event = - Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; + let event = Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; assert_eq!( json(&event), r#"{"type":"session.attached","name":"main","last_connected_at_unix_ms":42}"# @@ -395,17 +394,9 @@ mod tests { #[test] fn session_removed_serializes_with_reason() { - let exited = - Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; - assert_eq!( - json(&exited), - r#"{"type":"session.removed","name":"main","reason":"exited"}"# - ); - let killed = - Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; - assert_eq!( - json(&killed), - r#"{"type":"session.removed","name":"main","reason":"killed"}"# - ); + let exited = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; + assert_eq!(json(&exited), r#"{"type":"session.removed","name":"main","reason":"exited"}"#); + let killed = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; + assert_eq!(json(&killed), r#"{"type":"session.removed","name":"main","reason":"killed"}"#); } } diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs index aac6fbc6..cbda498d 100644 --- a/shpool/tests/events.rs +++ b/shpool/tests/events.rs @@ -42,11 +42,9 @@ fn next_event(reader: &mut BufReader) -> anyhow::Result { #[test] #[timeout(30000)] fn snapshot_then_lifecycle() -> anyhow::Result<()> { - let mut daemon = Proc::new( - "norc.toml", - DaemonArgs { listen_events: false, ..DaemonArgs::default() }, - ) - .context("starting daemon proc")?; + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; let snap = next_event(&mut sub)?; @@ -56,10 +54,7 @@ fn snapshot_then_lifecycle() -> anyhow::Result<()> { // Background attach: client connects, daemon publishes created+attached; // the client immediately detaches, triggering the detached event. let _attach = daemon - .attach( - "s1", - AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, - ) + .attach("s1", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) .context("starting attach proc")?; let created = next_event(&mut sub)?; @@ -89,11 +84,9 @@ fn snapshot_then_lifecycle() -> anyhow::Result<()> { #[test] #[timeout(30000)] fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { - let mut daemon = Proc::new( - "norc.toml", - DaemonArgs { listen_events: false, ..DaemonArgs::default() }, - ) - .context("starting daemon proc")?; + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; let _attach = daemon .attach( @@ -115,14 +108,121 @@ fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { Ok(()) } +// `shpool detach` triggers two code paths that both updated the session: +// the explicit handler and, asynchronously, the bidi-loop unwind in the +// attach worker. An earlier version emitted SessionDetached from both, +// producing a duplicate event with two different timestamps. This test +// pins exactly one detached event per detach by using a kill as a +// known-next-event fence — if a duplicate detached were buffered, the +// next read would return it instead of `session.removed`. +#[test] +#[timeout(30000)] +fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + let _snap = next_event(&mut sub)?; + + // Foreground attach (no `background`) keeps the session attached. + let _attach = daemon + .attach("s", AttachArgs { null_stdin: true, ..AttachArgs::default() }) + .context("starting attach proc")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + + let detach_out = daemon.detach(vec!["s".into()]).context("running detach")?; + assert!(detach_out.status.success(), "detach failed: {:?}", detach_out); + + let detached = next_event(&mut sub)?; + assert_eq!(detached["type"], "session.detached"); + assert_eq!(detached["name"], "s"); + + // Wait for the unwind path to complete (session shows disconnected in + // the list output) so any duplicate detached event would already be + // queued by the time we issue the kill below. + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let kill_out = daemon.kill(vec!["s".into()]).context("running kill")?; + assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); + + let next = next_event(&mut sub)?; + assert_eq!( + next["type"], "session.removed", + "expected next event to be removed, got {next} — possible duplicate detached" + ); + assert_eq!(next["reason"], "killed"); + + Ok(()) +} + +// Reattach should produce a single `session.attached` for the existing +// session, with no `session.created`. Catches regressions where the +// reattach path accidentally falls through to the create path. +#[test] +#[timeout(30000)] +fn reattach_emits_attached_only() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let mut sub = connect_events(&daemon)?; + let _snap = next_event(&mut sub)?; + + let _attach1 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("first attach")?; + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + daemon.wait_until_list_matches(|out| out.contains("disconnected"))?; + + let _attach2 = daemon + .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) + .context("reattach")?; + + let attached = next_event(&mut sub)?; + assert_eq!( + attached["type"], "session.attached", + "expected attached on reattach, got {attached}" + ); + assert_eq!(attached["name"], "s"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); + + Ok(()) +} + +// SIGTERM should clean up both sockets via the signal handler, since +// process::exit bypasses any RAII guard. +#[test] +#[timeout(30000)] +fn signal_exit_unlinks_sockets() -> anyhow::Result<()> { + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; + let main_sock = daemon.socket_path.clone(); + let events_sock = events_socket_path(&daemon); + assert!(main_sock.exists(), "main socket should exist while daemon runs"); + assert!(events_sock.exists(), "events socket should exist while daemon runs"); + + let pid = nix::unistd::Pid::from_raw( + daemon.proc.as_ref().expect("daemon process handle").id() as i32 + ); + nix::sys::signal::kill(pid, nix::sys::signal::SIGTERM).context("sending SIGTERM")?; + daemon.proc_wait().context("waiting for daemon to exit")?; + + assert!(!main_sock.exists(), "main socket should be unlinked on signal exit"); + assert!(!events_sock.exists(), "events socket should be unlinked on signal exit"); + + Ok(()) +} + #[test] #[timeout(30000)] fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { - let mut daemon = Proc::new( - "norc.toml", - DaemonArgs { listen_events: false, ..DaemonArgs::default() }, - ) - .context("starting daemon proc")?; + let mut daemon = + Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) + .context("starting daemon proc")?; let mut sub_a = connect_events(&daemon)?; let mut sub_b = connect_events(&daemon)?; From 4b7153e636d27e6583f0ca850b148a76af2b34d0 Mon Sep 17 00:00:00 2001 From: Ethan Pailes Date: Mon, 27 Apr 2026 08:30:39 -0600 Subject: [PATCH 07/17] fix: improve error messages on chunk read failures (#351) This would have helped for debugging https://github.com/shell-pool/shpool/issues/335. Let's add it now, better late than never. --- libshpool/src/protocol.rs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/libshpool/src/protocol.rs b/libshpool/src/protocol.rs index 6051db8e..e4f64a4a 100644 --- a/libshpool/src/protocol.rs +++ b/libshpool/src/protocol.rs @@ -100,14 +100,14 @@ impl<'data> ChunkExt<'data> for Chunk<'data> { where R: std::io::Read, { - let kind = r.read_u8()?; - let kind = ChunkKind::try_from(kind)?; + let kind = r.read_u8().context("reading chunk kind")?; + let kind = ChunkKind::try_from(kind).context("parsing chunk kind")?; if let ChunkKind::ExitStatus = kind { if 4 > buf.len() { return Err(anyhow!("chunk of size 4 exceeds size limit of {} bytes", buf.len())); } - r.read_exact(&mut buf[..4])?; + r.read_exact(&mut buf[..4]).context("reading exit status payload")?; Ok(Chunk { kind, buf: &buf[..4] }) } else { let len = r.read_u32::()? as usize; @@ -118,7 +118,7 @@ impl<'data> ChunkExt<'data> for Chunk<'data> { buf.len() )); } - r.read_exact(&mut buf[..len])?; + r.read_exact(&mut buf[..len]).context("reading chunk payload")?; Ok(Chunk { kind, buf: &buf[..len] }) } } From 5af8c628625a9b565a22e65145b66a3735f335fa Mon Sep 17 00:00:00 2001 From: Ethan Pailes Date: Mon, 27 Apr 2026 12:10:13 -0600 Subject: [PATCH 08/17] fix: reduce global lock contention when resizing (#355) This patch improves lock contention on the global sessions lock by releasing it when sending resize and detach messages to the shell->client thread. Previously, we could lock up the global table for up to SESSION_MSG_TIMEOUT, which is quite bad. --- libshpool/src/daemon/server.rs | 137 +++++++++++++++++++-------------- 1 file changed, 80 insertions(+), 57 deletions(-) diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 8698595c..97f39aca 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -689,68 +689,91 @@ impl Server { mut stream: UnixStream, header: SessionMessageRequest, ) -> anyhow::Result<()> { - // create a slot to store our reply so we can do - // our IO without the lock held. - let reply = { - let _s = span!(Level::INFO, "lock(shells)").entered(); - let shells = self.shells.lock().unwrap(); - if let Some(session) = shells.get(&header.session_name) { - match header.payload { - SessionMessageRequestPayload::Resize(resize_request) => { - let _s = span!(Level::INFO, "lock(pager_ctl)").entered(); - let pager_ctl = session.pager_ctl.lock().unwrap(); - if let Some(pager_ctl) = pager_ctl.as_ref() { - info!("resizing pager"); - pager_ctl - .tty_size_change - .send_timeout(resize_request.tty_size.clone(), SESSION_MSG_TIMEOUT) - .context("sending tty size change to pager")?; - pager_ctl - .tty_size_change_ack - .recv_timeout(SESSION_MSG_TIMEOUT) - .context("recving tty size change ack from pager")?; + let reply = self.dispath_session_message(header)?; + + write_reply(&mut stream, reply).context("handle_session_message: writing reply")?; + + Ok(()) + } + + fn dispath_session_message( + &self, + header: SessionMessageRequest, + ) -> anyhow::Result { + match header.payload { + SessionMessageRequestPayload::Resize(resize_request) => { + let pager_ctl = { + let _s = span!(Level::INFO, "resize_lock_1(shells)").entered(); + let shells = self.shells.lock().unwrap(); + if let Some(session) = shells.get(&header.session_name) { + Arc::clone(&session.pager_ctl) + } else { + return Ok(SessionMessageReply::NotFound); + } + }; + let _s = span!(Level::INFO, "lock(pager_ctl)").entered(); + let pager_ctl = pager_ctl.lock().unwrap(); + + if let Some(pager_ctl) = pager_ctl.as_ref() { + info!("resizing pager"); + pager_ctl + .tty_size_change + .send_timeout(resize_request.tty_size.clone(), SESSION_MSG_TIMEOUT) + .context("sending tty size change to pager")?; + pager_ctl + .tty_size_change_ack + .recv_timeout(SESSION_MSG_TIMEOUT) + .context("recving tty size change ack from pager")?; + } else { + let shell_to_client_ctl = { + let _s = span!(Level::INFO, "resize_lock_2(shells)").entered(); + let shells = self.shells.lock().unwrap(); + if let Some(session) = shells.get(&header.session_name) { + Arc::clone(&session.shell_to_client_ctl) } else { - let _s = - span!(Level::INFO, "resize_lock(shell_to_client_ctl)").entered(); - let shell_to_client_ctl = session.shell_to_client_ctl.lock().unwrap(); - shell_to_client_ctl - .tty_size_change - .send_timeout(resize_request.tty_size, SESSION_MSG_TIMEOUT) - .context("sending tty size change to shell->client")?; - shell_to_client_ctl - .tty_size_change_ack - .recv_timeout(SESSION_MSG_TIMEOUT) - .context("recving tty size ack")?; + return Ok(SessionMessageReply::NotFound); } + }; + let _s = span!(Level::INFO, "lock(shell_to_client_ctl)").entered(); + let shell_to_client_ctl = shell_to_client_ctl.lock().unwrap(); - SessionMessageReply::Resize(ResizeReply::Ok) - } - SessionMessageRequestPayload::Detach => { - let _s = span!(Level::INFO, "detach_lock(shell_to_client_ctl)").entered(); - let shell_to_client_ctl = session.shell_to_client_ctl.lock().unwrap(); - shell_to_client_ctl - .client_connection - .send_timeout( - shell::ClientConnectionMsg::Disconnect, - SESSION_MSG_TIMEOUT, - ) - .context("sending client detach to shell->client")?; - let status = shell_to_client_ctl - .client_connection_ack - .recv_timeout(SESSION_MSG_TIMEOUT) - .context("getting client conn ack")?; - info!("detached session({}), status = {:?}", header.session_name, status); - SessionMessageReply::Detach(SessionMessageDetachReply::Ok) - } + shell_to_client_ctl + .tty_size_change + .send_timeout(resize_request.tty_size, SESSION_MSG_TIMEOUT) + .context("sending tty size change to shell->client")?; + shell_to_client_ctl + .tty_size_change_ack + .recv_timeout(SESSION_MSG_TIMEOUT) + .context("recving tty size ack")?; } - } else { - SessionMessageReply::NotFound - } - }; - - write_reply(&mut stream, reply).context("handle_session_message: writing reply")?; - Ok(()) + Ok(SessionMessageReply::Resize(ResizeReply::Ok)) + } + SessionMessageRequestPayload::Detach => { + let shell_to_client_ctl = { + let _s = span!(Level::INFO, "detach_lock(shells)").entered(); + let shells = self.shells.lock().unwrap(); + if let Some(session) = shells.get(&header.session_name) { + Arc::clone(&session.shell_to_client_ctl) + } else { + return Ok(SessionMessageReply::NotFound); + } + }; + let _s = span!(Level::INFO, "lock(shell_to_client_ctl)").entered(); + let shell_to_client_ctl = shell_to_client_ctl.lock().unwrap(); + + shell_to_client_ctl + .client_connection + .send_timeout(shell::ClientConnectionMsg::Disconnect, SESSION_MSG_TIMEOUT) + .context("sending client detach to shell->client")?; + let status = shell_to_client_ctl + .client_connection_ack + .recv_timeout(SESSION_MSG_TIMEOUT) + .context("getting client conn ack")?; + info!("detached session({}), status = {:?}", header.session_name, status); + Ok(SessionMessageReply::Detach(SessionMessageDetachReply::Ok)) + } + } } /// Spawn a subshell and return the sessession descriptor for it. The From 3c1615cedc99a2e19076c6fe98d33bfeecac57ca Mon Sep 17 00:00:00 2001 From: Ethan Pailes Date: Mon, 27 Apr 2026 12:18:02 -0600 Subject: [PATCH 09/17] fix: bug in shell startup loop when handling EOF (#354) This patch fixes an issue where we were not exiting on EOF correctly when scanning for the shell startup sentinel. --- libshpool/src/daemon/prompt.rs | 2 +- ...shell.toml.tmpl => custom_shell.toml.tmpl} | 0 shpool/tests/regression.rs | 40 ++++++++++++++++++- 3 files changed, 39 insertions(+), 3 deletions(-) rename shpool/tests/data/{hang_shell.toml.tmpl => custom_shell.toml.tmpl} (100%) diff --git a/libshpool/src/daemon/prompt.rs b/libshpool/src/daemon/prompt.rs index 0ed29147..16e8cf5f 100644 --- a/libshpool/src/daemon/prompt.rs +++ b/libshpool/src/daemon/prompt.rs @@ -174,7 +174,7 @@ fn wait_for_startup(pty_master: &mut shpool_pty::fork::Master) -> anyhow::Result let len = pty_master.read(&mut buf).context("reading chunk to scan for startup")?; if len == 0 { - continue; + return Err(anyhow!("EOF during shell startup")); } let buf = &buf[..len]; debug!("buf='{}'", String::from_utf8_lossy(buf)); diff --git a/shpool/tests/data/hang_shell.toml.tmpl b/shpool/tests/data/custom_shell.toml.tmpl similarity index 100% rename from shpool/tests/data/hang_shell.toml.tmpl rename to shpool/tests/data/custom_shell.toml.tmpl diff --git a/shpool/tests/regression.rs b/shpool/tests/regression.rs index b69cdeb8..9e17800a 100644 --- a/shpool/tests/regression.rs +++ b/shpool/tests/regression.rs @@ -23,10 +23,10 @@ use crate::support::{daemon::DaemonArgs, tmpdir}; fn list_not_blocked_by_slow_shell_spawn() -> anyhow::Result<()> { let tmp_dir = tmpdir::Dir::new("/tmp/shpool-test")?; - let config_tmpl = fs::read_to_string(support::testdata_file("hang_shell.toml.tmpl"))?; + let config_tmpl = fs::read_to_string(support::testdata_file("custom_shell.toml.tmpl"))?; let config_contents = config_tmpl .replace("SHELL", support::testdata_file("hang_shell.sh").to_string_lossy().as_ref()); - let config_file = tmp_dir.path().join("motd_dump.toml"); + let config_file = tmp_dir.path().join("custom_shell.toml"); { let mut f = fs::File::create(&config_file)?; f.write_all(config_contents.as_bytes())?; @@ -83,3 +83,39 @@ fn list_not_blocked_by_slow_shell_spawn() -> anyhow::Result<()> { } } } + +/// Regression test for a bug where shpool would loop forever if the shell +/// exited immediately during startup while we were waiting for the +/// startup sentinel. +#[test] +#[timeout(10000)] +fn no_loop_on_shell_exit_during_startup() -> anyhow::Result<()> { + let tmp_dir = tmpdir::Dir::new("/tmp/shpool-test")?; + + let config_tmpl = fs::read_to_string(support::testdata_file("custom_shell.toml.tmpl"))?; + // Use /bin/true as the shell so it exits immediately. + // We need to trigger wait_for_startup, which happens when prompt_prefix is set. + let config_contents = config_tmpl.replace("SHELL", "/bin/true"); + let config_file = tmp_dir.path().join("exit_shell.toml"); + { + let mut f = fs::File::create(&config_file)?; + f.write_all(config_contents.as_bytes())?; + } + + let mut daemon_proc = support::daemon::Proc::new(&config_file, DaemonArgs::default()) + .context("starting daemon proc")?; + + // Try to attach. This should trigger wait_for_startup because prompt_prefix is + // set in hang_shell.toml.tmpl. + // The shell (/bin/true) will exit immediately, causing wait_for_startup to get + // EOF. On BUGGY code: this loops forever in the daemon. + // On FIXED code: this returns an error in the daemon, and the attach proc + // finishes. + let mut attach_proc = + daemon_proc.attach("sh1", Default::default()).context("starting attach proc")?; + + // Wait for the attach process to exit. + let _status = attach_proc.proc.wait().context("waiting for attach proc")?; + + Ok(()) +} From 7a790f555f039bc26296eb9065077de33f42745e Mon Sep 17 00:00:00 2001 From: "release-plz-for-shpool[bot]" <175248994+release-plz-for-shpool[bot]@users.noreply.github.com> Date: Mon, 27 Apr 2026 12:22:59 -0600 Subject: [PATCH 10/17] chore: release (#334) Co-authored-by: release-plz-for-shpool[bot] <175248994+release-plz-for-shpool[bot]@users.noreply.github.com> --- Cargo.lock | 6 +++--- debian/changelog | 15 +++++++++++++++ libshpool/Cargo.toml | 4 ++-- shpool-protocol/CHANGELOG | 8 ++++++++ shpool-protocol/Cargo.toml | 2 +- shpool/Cargo.toml | 4 ++-- 6 files changed, 31 insertions(+), 8 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index e759c433..7ae606bf 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -537,7 +537,7 @@ dependencies = [ [[package]] name = "libshpool" -version = "0.9.5" +version = "0.9.6" dependencies = [ "anyhow", "assert_matches", @@ -992,7 +992,7 @@ checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" [[package]] name = "shpool" -version = "0.9.5" +version = "0.9.6" dependencies = [ "anyhow", "clap", @@ -1009,7 +1009,7 @@ dependencies = [ [[package]] name = "shpool-protocol" -version = "0.3.4" +version = "0.3.5" dependencies = [ "anyhow", "clap", diff --git a/debian/changelog b/debian/changelog index 1bdab469..f9c16c91 100644 --- a/debian/changelog +++ b/debian/changelog @@ -1,4 +1,19 @@ +shpool (0.9.6) unstable; urgency=low + + Fixed + + * bug in shell startup loop when handling EOF ([#354](https://github.com/shell-pool/shpool/pull/354)) + * wait for shell deadlock ([#348](https://github.com/shell-pool/shpool/pull/348)) + * reduce global lock contention when resizing ([#355](https://github.com/shell-pool/shpool/pull/355)) + * improve error messages on chunk read failures ([#351](https://github.com/shell-pool/shpool/pull/351)) + + Other + + * add Homebrew installation instructions for macOS ([#333](https://github.com/shell-pool/shpool/pull/333)) + + -- Shpool Authors Mon, 27 Apr 2026 18:18:49 +0000 + shpool (0.9.5) unstable; urgency=low Added diff --git a/libshpool/Cargo.toml b/libshpool/Cargo.toml index 486bd44c..5f72dd14 100644 --- a/libshpool/Cargo.toml +++ b/libshpool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "libshpool" -version = "0.9.5" +version = "0.9.6" edition = "2024" repository = "https://github.com/shell-pool/shpool" authors = ["Ethan Pailes "] @@ -45,7 +45,7 @@ strip-ansi-escapes = "0.2.0" # cleaning up strings for pager display notify = { version = "7", features = ["crossbeam-channel"] } # watch config file for updates libproc = "0.14.8" # sniffing shells by examining the subprocess daemonize = "0.5" # autodaemonization -shpool-protocol = { version = "0.3.4", path = "../shpool-protocol" } # client-server protocol +shpool-protocol = { version = "0.3.5", path = "../shpool-protocol" } # client-server protocol # rusty wrapper for unix apis [dependencies.nix] diff --git a/shpool-protocol/CHANGELOG b/shpool-protocol/CHANGELOG index 21f4f5b5..e8032b91 100644 --- a/shpool-protocol/CHANGELOG +++ b/shpool-protocol/CHANGELOG @@ -1,4 +1,12 @@ +shpool-protocol (0.3.5) unstable; urgency=low + + Other + + * add Homebrew installation instructions for macOS ([#333](https://github.com/shell-pool/shpool/pull/333)) + + -- Shpool Authors Mon, 27 Apr 2026 18:18:49 +0000 + shpool-protocol (0.3.4) unstable; urgency=low Other diff --git a/shpool-protocol/Cargo.toml b/shpool-protocol/Cargo.toml index 575d1834..28e49aec 100644 --- a/shpool-protocol/Cargo.toml +++ b/shpool-protocol/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shpool-protocol" -version = "0.3.4" +version = "0.3.5" edition = "2024" authors = ["Ethan Pailes "] repository = "https://github.com/shell-pool/shpool" diff --git a/shpool/Cargo.toml b/shpool/Cargo.toml index 123f18bf..7cb23dab 100644 --- a/shpool/Cargo.toml +++ b/shpool/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "shpool" -version = "0.9.5" +version = "0.9.6" edition = "2024" authors = ["Ethan Pailes "] repository = "https://github.com/shell-pool/shpool" @@ -21,7 +21,7 @@ test_hooks = ["libshpool/test_hooks"] [dependencies] clap = { version = "4", features = ["derive"] } # cli parsing anyhow = "1" # dynamic, unstructured errors -libshpool = { version = "0.9.5", path = "../libshpool" } +libshpool = { version = "0.9.6", path = "../libshpool" } [dev-dependencies] lazy_static = "1" # globals From 9aa05cd38487372a92293646077af82576eb24ca Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Mon, 27 Apr 2026 21:40:30 -0400 Subject: [PATCH 11/17] refactor(events): trim event payloads to type fields only Each event now serializes as `{"type":"session."}` with no other fields. To learn what the event refers to, subscribers follow up with `shpool list` (or send `ConnectHeader::List` over the main socket). - Drop the welcome `snapshot` event; subscribers do their own bootstrap list call after connecting. - Drop `name`, timestamps, and `reason` from the lifecycle events. - Remove `RemovedReason`; reaped/killed/exited share `session.removed`. - Inline `collect_sessions` back into `handle_list` (the snapshot was its only other consumer) and drop the now-unused `unix_ms` helper. --- libshpool/src/daemon/server.rs | 126 ++++++++-------------- libshpool/src/daemon/ttl_reaper.rs | 8 +- libshpool/src/events.rs | 166 +++++++---------------------- shpool/tests/events.rs | 72 ++----------- 4 files changed, 93 insertions(+), 279 deletions(-) diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index c8f156c9..897d38a3 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -119,10 +119,8 @@ impl Server { })) } - /// Bind the events socket and spawn the accept thread. Each accepted - /// connection is handed a snapshot built under the session-table lock - /// so subsequent deltas (published under the same lock) cannot race. - /// The returned guard unlinks the socket file on drop. + /// Bind the events socket and spawn the accept thread. The returned + /// guard unlinks the socket file on drop. pub fn start_events_listener( self: &Arc, socket_path: PathBuf, @@ -132,14 +130,7 @@ impl Server { } fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { - let receiver = { - let _s = span!(Level::INFO, "events_subscribe_lock(shells)").entered(); - let shells = self.shells.lock().unwrap(); - let sessions = - collect_sessions(&shells).context("collecting snapshot for events subscriber")?; - let snapshot = events::Event::Snapshot { sessions }; - self.events_bus.register(&snapshot) - }; + let receiver = self.events_bus.register(); events::spawn_writer(stream, receiver) } @@ -341,10 +332,7 @@ impl Server { // Gated: a concurrent kill or reaper may have already // removed the entry and published its own removal. if shells.remove(&header.name).is_some() { - self.events_bus.publish(&events::Event::SessionRemoved { - name: header.name.clone(), - reason: events::RemovedReason::Exited, - }); + self.events_bus.publish(&events::Event::SessionRemoved); } } @@ -367,10 +355,7 @@ impl Server { let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_disconnected_at = Some(now); - self.events_bus.publish(&events::Event::SessionDetached { - name: header.name.clone(), - last_disconnected_at_unix_ms: unix_ms(now), - }); + self.events_bus.publish(&events::Event::SessionDetached); } } if let Err(err) = self.hooks.on_client_disconnect(&header.name) { @@ -449,10 +434,7 @@ impl Server { } else { // Reattach confirmed; the create path won't run // and clobber the entry, so it's safe to publish. - self.events_bus.publish(&events::Event::SessionAttached { - name: header.name.clone(), - last_connected_at_unix_ms: unix_ms(now), - }); + self.events_bus.publish(&events::Event::SessionAttached); } } Some(exit_status) => { @@ -514,7 +496,6 @@ impl Server { let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_connected_at = Some(now); - let started_at = session.started_at; { // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); @@ -525,19 +506,10 @@ impl Server { let clobbered = shells.contains_key(&header.name); shells.insert(header.name.clone(), Box::new(session)); if clobbered { - self.events_bus.publish(&events::Event::SessionRemoved { - name: header.name.clone(), - reason: events::RemovedReason::Exited, - }); + self.events_bus.publish(&events::Event::SessionRemoved); } - self.events_bus.publish(&events::Event::SessionCreated { - name: header.name.clone(), - started_at_unix_ms: unix_ms(started_at), - }); - self.events_bus.publish(&events::Event::SessionAttached { - name: header.name.clone(), - last_connected_at_unix_ms: unix_ms(now), - }); + self.events_bus.publish(&events::Event::SessionCreated); + self.events_bus.publish(&events::Event::SessionAttached); } // fallthrough to bidi streaming } else if let Err(err) = self.hooks.on_reattach(&header.name) { @@ -638,9 +610,9 @@ impl Server { not_attached_sessions.push(session); } else { // The bidi-loop unwind in handle_attach owns the - // SessionDetached publish (with its own timestamp); - // we just update last_disconnected_at eagerly so a - // concurrent list() reflects the detach immediately. + // SessionDetached publish; we just update + // last_disconnected_at eagerly so a concurrent list() + // reflects the detach immediately. s.lifecycle_timestamps.lock().unwrap().last_disconnected_at = Some(time::SystemTime::now()); } @@ -700,10 +672,7 @@ impl Server { for session in to_remove.iter() { shells.remove(session); - self.events_bus.publish(&events::Event::SessionRemoved { - name: session.clone(), - reason: events::RemovedReason::Killed, - }); + self.events_bus.publish(&events::Event::SessionRemoved); } if !to_remove.is_empty() { test_hooks::emit("daemon-handle-kill-removed-shells"); @@ -719,7 +688,35 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock().unwrap(); - let sessions = collect_sessions(&shells)?; + let sessions = shells + .iter() + .map(|(k, v)| { + let status = match v.inner.try_lock() { + Ok(_) => SessionStatus::Disconnected, + Err(_) => SessionStatus::Attached, + }; + + let timestamps = v.lifecycle_timestamps.lock().unwrap(); + let last_connected_at_unix_ms = timestamps + .last_connected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + let last_disconnected_at_unix_ms = timestamps + .last_disconnected_at + .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) + .transpose()?; + + Ok(Session { + name: k.to_string(), + started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() + as i64, + last_connected_at_unix_ms, + last_disconnected_at_unix_ms, + status, + }) + }) + .collect::>>() + .context("collecting running session metadata")?; write_reply(&mut stream, ListReply { sessions })?; Ok(()) } @@ -1184,45 +1181,6 @@ impl Server { } } -fn unix_ms(t: time::SystemTime) -> i64 { - t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64).unwrap_or(0) -} - -/// Collect a snapshot of the session table for `list` replies and event -/// snapshots. The caller must hold the shells lock for the duration of -/// the call so the resulting list is consistent with concurrent mutators. -fn collect_sessions(shells: &HashMap>) -> anyhow::Result> { - shells - .iter() - .map(|(k, v)| { - let status = match v.inner.try_lock() { - Ok(_) => SessionStatus::Disconnected, - Err(_) => SessionStatus::Attached, - }; - - let timestamps = v.lifecycle_timestamps.lock().unwrap(); - let last_connected_at_unix_ms = timestamps - .last_connected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - let last_disconnected_at_unix_ms = timestamps - .last_disconnected_at - .map(|t| t.duration_since(time::UNIX_EPOCH).map(|d| d.as_millis() as i64)) - .transpose()?; - - Ok(Session { - name: k.to_string(), - started_at_unix_ms: v.started_at.duration_since(time::UNIX_EPOCH)?.as_millis() - as i64, - last_connected_at_unix_ms, - last_disconnected_at_unix_ms, - status, - }) - }) - .collect::>>() - .context("collecting running session metadata") -} - // HACK: this is not a good way to detect shells that don't support our // sentinel injection approach, but it is better than just hanging when a // user tries to start one. diff --git a/libshpool/src/daemon/ttl_reaper.rs b/libshpool/src/daemon/ttl_reaper.rs index 810519e9..5b4bfe4a 100644 --- a/libshpool/src/daemon/ttl_reaper.rs +++ b/libshpool/src/daemon/ttl_reaper.rs @@ -117,13 +117,7 @@ pub fn run( continue; } shells.remove(&reapable.session_name); - // Reaping is a daemon-initiated termination; surfaced to - // subscribers as `killed` until we have a use case for a - // dedicated reason. - events_bus.publish(&events::Event::SessionRemoved { - name: reapable.session_name.clone(), - reason: events::RemovedReason::Killed, - }); + events_bus.publish(&events::Event::SessionRemoved); } } } diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 6ce9e034..4d9a6077 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -5,15 +5,10 @@ //! line (newline-delimited; aka JSONL). Non-Rust clients only need a Unix //! socket and a JSON parser to consume the stream. //! -//! On connect, a subscriber receives a `snapshot` event reflecting the -//! current session table, atomically with respect to the table mutations -//! that produce subsequent delta events. After the snapshot, the subscriber -//! receives delta events as the session table changes. To force a re-sync, -//! a subscriber may simply reconnect. -//! -//! The `sessions` field of a snapshot event uses the same schema as the -//! `sessions` field of `shpool list --json`, so the two surfaces stay in -//! sync by construction. +//! Events carry no payload beyond their type — they signal that *something* +//! changed in the session table. Subscribers learn the new state by calling +//! `shpool list` (or the equivalent over the main socket). Subscribers that +//! fall too far behind are dropped and may simply reconnect. use std::{ io::{BufRead, BufReader, Write}, @@ -29,11 +24,10 @@ use std::{ use anyhow::Context; use serde_derive::Serialize; -use shpool_protocol::Session; use tracing::{error, info, warn}; /// Per-subscriber outbound queue depth. Subscribers that fall this far -/// behind are dropped; reconnection re-syncs them via a fresh snapshot. +/// behind are dropped and must reconnect. const SUBSCRIBER_QUEUE_DEPTH: usize = 64; /// Write timeout for stuck subscribers (e.g. suspended via Ctrl-Z). After @@ -44,45 +38,24 @@ const WRITE_TIMEOUT: Duration = Duration::from_secs(5); /// An event published on the events socket. #[derive(Serialize, Debug)] #[serde(tag = "type")] +#[allow(clippy::enum_variant_names)] pub enum Event { - /// Sent as the first message after a subscriber connects, reflecting - /// the current session table. - #[serde(rename = "snapshot")] - Snapshot { sessions: Vec }, - - /// A new session was created. #[serde(rename = "session.created")] - SessionCreated { name: String, started_at_unix_ms: i64 }, - - /// A client attached to an existing session. + SessionCreated, #[serde(rename = "session.attached")] - SessionAttached { name: String, last_connected_at_unix_ms: i64 }, - - /// A client detached from a session that is still alive. + SessionAttached, #[serde(rename = "session.detached")] - SessionDetached { name: String, last_disconnected_at_unix_ms: i64 }, - - /// A session was removed from the session table. + SessionDetached, #[serde(rename = "session.removed")] - SessionRemoved { name: String, reason: RemovedReason }, -} - -#[derive(Serialize, Debug)] -#[serde(rename_all = "lowercase")] -pub enum RemovedReason { - /// The shell process exited on its own. - Exited, - /// The session was killed by an explicit `shpool kill` request. - Killed, + SessionRemoved, } /// Fans out events to all connected subscribers. /// /// Lock ordering: callers that publish under another lock (e.g. the session /// table) must take that lock before [`EventBus::publish`] takes its own -/// internal lock. [`EventBus::register`] follows the same order, so a -/// subscriber registered while the session-table lock is held cannot miss -/// a delta from a mutation that committed under that lock. +/// internal lock. Publishing under the table lock keeps wire-order = +/// causal-order across mutators. pub struct EventBus { subscribers: Mutex>>>, } @@ -110,12 +83,10 @@ impl EventBus { }); } - /// Register a new subscriber with `snapshot` as the first message in - /// its queue. Returns the receiver to be handed to a writer thread. - pub fn register(&self, snapshot: &Event) -> Receiver> { - let line = serialize_line(snapshot).expect("snapshot serialization"); + /// Register a new subscriber. Returns the receiver to be handed to a + /// writer thread. + pub fn register(&self) -> Receiver> { let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); - tx.try_send(line).expect("seeding empty channel cannot fail"); self.subscribers.lock().unwrap().push(tx); rx } @@ -236,126 +207,77 @@ fn run_writer(mut stream: UnixStream, receiver: Receiver>) { #[cfg(test)] mod tests { use super::*; - use shpool_protocol::SessionStatus; fn json(event: &Event) -> String { serde_json::to_string(event).unwrap() } #[test] - fn snapshot_serializes_with_sessions_array() { - let event = Event::Snapshot { - sessions: vec![Session { - name: "main".into(), - started_at_unix_ms: 100, - last_connected_at_unix_ms: Some(200), - last_disconnected_at_unix_ms: None, - status: SessionStatus::Attached, - }], - }; - assert_eq!( - json(&event), - r#"{"type":"snapshot","sessions":[{"name":"main","started_at_unix_ms":100,"last_connected_at_unix_ms":200,"last_disconnected_at_unix_ms":null,"status":"Attached"}]}"# - ); + fn session_created_serializes_with_only_type() { + assert_eq!(json(&Event::SessionCreated), r#"{"type":"session.created"}"#); } #[test] - fn session_created_serializes_flat() { - let event = Event::SessionCreated { name: "main".into(), started_at_unix_ms: 42 }; - assert_eq!( - json(&event), - r#"{"type":"session.created","name":"main","started_at_unix_ms":42}"# - ); + fn session_attached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionAttached), r#"{"type":"session.attached"}"#); } #[test] - fn session_attached_serializes_flat() { - let event = Event::SessionAttached { name: "main".into(), last_connected_at_unix_ms: 42 }; - assert_eq!( - json(&event), - r#"{"type":"session.attached","name":"main","last_connected_at_unix_ms":42}"# - ); + fn session_detached_serializes_with_only_type() { + assert_eq!(json(&Event::SessionDetached), r#"{"type":"session.detached"}"#); } #[test] - fn session_detached_serializes_flat() { - let event = - Event::SessionDetached { name: "main".into(), last_disconnected_at_unix_ms: 42 }; - assert_eq!( - json(&event), - r#"{"type":"session.detached","name":"main","last_disconnected_at_unix_ms":42}"# - ); + fn session_removed_serializes_with_only_type() { + assert_eq!(json(&Event::SessionRemoved), r#"{"type":"session.removed"}"#); } #[test] fn bus_publish_with_no_subscribers_is_a_noop() { let bus = EventBus::new(); - bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 1 }); - } - - #[test] - fn bus_register_seeds_receiver_with_snapshot() { - let bus = EventBus::new(); - let snapshot = Event::Snapshot { sessions: vec![] }; - let rx = bus.register(&snapshot); - let line = rx.try_recv().unwrap(); - assert_eq!(&*line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); + bus.publish(&Event::SessionCreated); } #[test] - fn bus_publish_reaches_subscriber_after_snapshot() { + fn bus_publish_reaches_subscriber() { let bus = EventBus::new(); - let rx = bus.register(&Event::Snapshot { sessions: vec![] }); - bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 7 }); - let snapshot_line = rx.recv().unwrap(); - let delta_line = rx.recv().unwrap(); - assert_eq!(&*snapshot_line, "{\"type\":\"snapshot\",\"sessions\":[]}\n"); - assert_eq!( - &*delta_line, - "{\"type\":\"session.created\",\"name\":\"main\",\"started_at_unix_ms\":7}\n" - ); + let rx = bus.register(); + bus.publish(&Event::SessionCreated); + let line = rx.recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); } #[test] fn bus_drops_subscriber_whose_queue_is_full() { let bus = EventBus::new(); - let rx = bus.register(&Event::Snapshot { sessions: vec![] }); - // Fill the channel to capacity (the snapshot already used 1 slot). - for i in 0..(SUBSCRIBER_QUEUE_DEPTH - 1) { - bus.publish(&Event::SessionCreated { - name: format!("s{i}"), - started_at_unix_ms: i as i64, - }); + let rx = bus.register(); + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + bus.publish(&Event::SessionCreated); } assert_eq!(bus.subscribers.lock().unwrap().len(), 1); - // One more publish overflows and the subscriber is dropped. - bus.publish(&Event::SessionCreated { name: "overflow".into(), started_at_unix_ms: 0 }); + bus.publish(&Event::SessionCreated); assert_eq!(bus.subscribers.lock().unwrap().len(), 0); - // The receiver still has the buffered events; the channel is not - // closed for it from the receiving side. drop(rx); } #[test] fn bus_drops_subscriber_whose_receiver_hung_up() { let bus = EventBus::new(); - let rx = bus.register(&Event::Snapshot { sessions: vec![] }); + let rx = bus.register(); drop(rx); - bus.publish(&Event::SessionCreated { name: "x".into(), started_at_unix_ms: 0 }); + bus.publish(&Event::SessionCreated); assert_eq!(bus.subscribers.lock().unwrap().len(), 0); } #[test] fn bus_publish_reaches_every_subscriber() { let bus = EventBus::new(); - let rx_a = bus.register(&Event::Snapshot { sessions: vec![] }); - let rx_b = bus.register(&Event::Snapshot { sessions: vec![] }); - bus.publish(&Event::SessionCreated { name: "main".into(), started_at_unix_ms: 1 }); + let rx_a = bus.register(); + let rx_b = bus.register(); + bus.publish(&Event::SessionCreated); for rx in [&rx_a, &rx_b] { - let _snapshot = rx.recv().unwrap(); - let delta = rx.recv().unwrap(); - assert!(delta.contains(r#""type":"session.created""#)); - assert!(delta.contains(r#""name":"main""#)); + let line = rx.recv().unwrap(); + assert_eq!(&*line, "{\"type\":\"session.created\"}\n"); } } @@ -391,12 +313,4 @@ mod tests { drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); } - - #[test] - fn session_removed_serializes_with_reason() { - let exited = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Exited }; - assert_eq!(json(&exited), r#"{"type":"session.removed","name":"main","reason":"exited"}"#); - let killed = Event::SessionRemoved { name: "main".into(), reason: RemovedReason::Killed }; - assert_eq!(json(&killed), r#"{"type":"session.removed","name":"main","reason":"killed"}"#); - } } diff --git a/shpool/tests/events.rs b/shpool/tests/events.rs index cbda498d..7c61f2cc 100644 --- a/shpool/tests/events.rs +++ b/shpool/tests/events.rs @@ -41,69 +41,26 @@ fn next_event(reader: &mut BufReader) -> anyhow::Result { #[test] #[timeout(30000)] -fn snapshot_then_lifecycle() -> anyhow::Result<()> { +fn lifecycle() -> anyhow::Result<()> { let mut daemon = Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; - let snap = next_event(&mut sub)?; - assert_eq!(snap["type"], "snapshot"); - assert_eq!(snap["sessions"].as_array().unwrap().len(), 0); - // Background attach: client connects, daemon publishes created+attached; // the client immediately detaches, triggering the detached event. let _attach = daemon .attach("s1", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) .context("starting attach proc")?; - let created = next_event(&mut sub)?; - assert_eq!(created["type"], "session.created"); - assert_eq!(created["name"], "s1"); - assert!(created["started_at_unix_ms"].is_number()); - - let attached = next_event(&mut sub)?; - assert_eq!(attached["type"], "session.attached"); - assert_eq!(attached["name"], "s1"); - - let detached = next_event(&mut sub)?; - assert_eq!(detached["type"], "session.detached"); - assert_eq!(detached["name"], "s1"); + assert_eq!(next_event(&mut sub)?["type"], "session.created"); + assert_eq!(next_event(&mut sub)?["type"], "session.attached"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); let kill_out = daemon.kill(vec!["s1".into()]).context("running kill")?; assert!(kill_out.status.success(), "kill failed: {:?}", kill_out); - let removed = next_event(&mut sub)?; - assert_eq!(removed["type"], "session.removed"); - assert_eq!(removed["name"], "s1"); - assert_eq!(removed["reason"], "killed"); - - Ok(()) -} - -#[test] -#[timeout(30000)] -fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { - let mut daemon = - Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) - .context("starting daemon proc")?; - - let _attach = daemon - .attach( - "pre-existing", - AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }, - ) - .context("starting attach proc")?; - - // Wait for the session to land in the table before subscribing. - daemon.wait_until_list_matches(|out| out.contains("pre-existing"))?; - - let mut sub = connect_events(&daemon)?; - let snap = next_event(&mut sub)?; - assert_eq!(snap["type"], "snapshot"); - let sessions = snap["sessions"].as_array().unwrap(); - assert_eq!(sessions.len(), 1); - assert_eq!(sessions[0]["name"], "pre-existing"); + assert_eq!(next_event(&mut sub)?["type"], "session.removed"); Ok(()) } @@ -111,10 +68,10 @@ fn snapshot_includes_existing_sessions() -> anyhow::Result<()> { // `shpool detach` triggers two code paths that both updated the session: // the explicit handler and, asynchronously, the bidi-loop unwind in the // attach worker. An earlier version emitted SessionDetached from both, -// producing a duplicate event with two different timestamps. This test -// pins exactly one detached event per detach by using a kill as a -// known-next-event fence — if a duplicate detached were buffered, the -// next read would return it instead of `session.removed`. +// producing a duplicate event. This test pins exactly one detached event +// per detach by using a kill as a known-next-event fence — if a duplicate +// detached were buffered, the next read would return it instead of +// `session.removed`. #[test] #[timeout(30000)] fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { @@ -122,7 +79,6 @@ fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; - let _snap = next_event(&mut sub)?; // Foreground attach (no `background`) keeps the session attached. let _attach = daemon @@ -134,9 +90,7 @@ fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { let detach_out = daemon.detach(vec!["s".into()]).context("running detach")?; assert!(detach_out.status.success(), "detach failed: {:?}", detach_out); - let detached = next_event(&mut sub)?; - assert_eq!(detached["type"], "session.detached"); - assert_eq!(detached["name"], "s"); + assert_eq!(next_event(&mut sub)?["type"], "session.detached"); // Wait for the unwind path to complete (session shows disconnected in // the list output) so any duplicate detached event would already be @@ -151,7 +105,6 @@ fn explicit_detach_publishes_one_event() -> anyhow::Result<()> { next["type"], "session.removed", "expected next event to be removed, got {next} — possible duplicate detached" ); - assert_eq!(next["reason"], "killed"); Ok(()) } @@ -166,7 +119,6 @@ fn reattach_emits_attached_only() -> anyhow::Result<()> { Proc::new("norc.toml", DaemonArgs { listen_events: false, ..DaemonArgs::default() }) .context("starting daemon proc")?; let mut sub = connect_events(&daemon)?; - let _snap = next_event(&mut sub)?; let _attach1 = daemon .attach("s", AttachArgs { background: true, null_stdin: true, ..AttachArgs::default() }) @@ -186,7 +138,6 @@ fn reattach_emits_attached_only() -> anyhow::Result<()> { attached["type"], "session.attached", "expected attached on reattach, got {attached}" ); - assert_eq!(attached["name"], "s"); assert_eq!(next_event(&mut sub)?["type"], "session.detached"); Ok(()) @@ -226,9 +177,6 @@ fn multiple_subscribers_each_get_independent_streams() -> anyhow::Result<()> { let mut sub_a = connect_events(&daemon)?; let mut sub_b = connect_events(&daemon)?; - assert_eq!(next_event(&mut sub_a)?["type"], "snapshot"); - assert_eq!(next_event(&mut sub_b)?["type"], "snapshot"); - let _attach = daemon .attach( "shared", From bb2ec68cb32ba36e479c46683d2b739e5dd7ae7d Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Mon, 27 Apr 2026 21:40:55 -0400 Subject: [PATCH 12/17] docs(events): add EVENTS.md describing the events protocol Cover the sibling-socket transport, the four event types, the JSONL wire format, the `shpool events` CLI helper plus direct-socket use for heavier-duty consumers, the ordering guarantee (publish under the session-table lock so wire-order matches causal-order), and the slow-subscriber drop policy. --- EVENTS.md | 67 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 EVENTS.md diff --git a/EVENTS.md b/EVENTS.md new file mode 100644 index 00000000..58379280 --- /dev/null +++ b/EVENTS.md @@ -0,0 +1,67 @@ +# Events + +`shpool` exposes an event stream so that external programs can react to changes +without polling. This way, a program (e.g. a TUI) can call `shpool list` (or the +equivalent `ConnectHeader::List` request over the main socket; see the +[`shpool-protocol`](./shpool-protocol) crate) after each event so that its model +is always consistent with shpool's state. + +## The events socket + +The daemon binds a sibling Unix socket next to the main shpool socket: + +```bash +/shpool/shpool.socket # main RPC socket +/shpool/events.socket # events socket (this protocol) +``` + +A subscriber connects to `events.socket` and reads events. The daemon ignores anything written to the events socket, so for subscribers it's effectively read-only. + +## Event types + +| `type` | Meaning | +| ------------------ | -------------------------------------------------------- | +| `session.created` | A new session was added to the table. | +| `session.attached` | A client attached or reattached to a session. | +| `session.detached` | A client disconnected from a still-running session. | +| `session.removed` | A session was removed (shell exited, killed, or reaped). | + +Subscribers should ignore unknown `type` values so that future event types do +not break older consumers. + +## Wire format + +The daemon writes one JSON object per line (JSONL). Each event looks like: + +```json +{"type":""} +``` + +There are no other fields. To learn what the event refers to (which session, +when, etc.), call `shpool list` (or use `ConnectHeader::List`). + +## Subscribing + +For ad-hoc use, `shpool events` connects to the events socket and prints each +event line to stdout, flushing after each line: + +```bash +shpool events | while read -r ev; do + echo "got: $ev" + shpool list +done + +shpool events | jq . +``` + +## Ordering + +Events are published while the daemon holds its session-table lock, so the order +events appear on the wire matches the order in which they happened. + +## Slow subscribers + +Each subscriber has a bounded outbound queue. A subscriber that falls too far +behind is dropped by the daemon (in which case the subscriber can always reconnect). +There is no replay, so events that fired while a subscriber was disconnected are +lost. From 09ac7db56b5e0a12ba5531753442248708de181f Mon Sep 17 00:00:00 2001 From: Ethan Pailes Date: Fri, 24 Apr 2026 22:14:33 +0000 Subject: [PATCH 13/17] chore: bump shpool_pty to 0.4.0 This patch bumps the shpool_pty crate version to pick up some soundness fixes. I noticed that it's resource management was unsound when I realized that it both allowed cloning and did not use reference counting, so I fixed that. I think the only reason this wasn't burning us before was that the lifetime for the Fork object was outliving all the Master and Slave objects pulled off of it (either that or we were just not cleaning up resources at all). --- Cargo.lock | 4 ++-- libshpool/Cargo.toml | 2 +- libshpool/src/daemon/pager.rs | 9 +++------ libshpool/src/daemon/prompt.rs | 4 ++-- libshpool/src/daemon/server.rs | 4 +--- libshpool/src/daemon/shell.rs | 14 +++++--------- 6 files changed, 14 insertions(+), 23 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 7ae606bf..3e7e188a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1035,9 +1035,9 @@ dependencies = [ [[package]] name = "shpool_pty" -version = "0.3.2" +version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "78cc0114fa23588602dbb02375ef5eead7bfcbdc0f1bed9b54c5e0722e197df8" +checksum = "2829e0f3a285c7ec45f47c7990f67ec808cb5117aad814e09d8fd7e0fe922f46" dependencies = [ "errno 0.2.8", "libc", diff --git a/libshpool/Cargo.toml b/libshpool/Cargo.toml index 5f72dd14..06e6a2a9 100644 --- a/libshpool/Cargo.toml +++ b/libshpool/Cargo.toml @@ -28,7 +28,7 @@ serde_json = "1" # JSON output for list command toml = "0.8" # config parsing byteorder = "1" # endianness signal-hook = "0.3" # signal handling -shpool_pty = "0.3.2" # spawning shells in ptys +shpool_pty = "0.4.0" # spawning shells in ptys lazy_static = "1" # globals crossbeam-channel = "0.5" # channels libc = "0.2" # basic libc types diff --git a/libshpool/src/daemon/pager.rs b/libshpool/src/daemon/pager.rs index b0f340ae..88109d75 100644 --- a/libshpool/src/daemon/pager.rs +++ b/libshpool/src/daemon/pager.rs @@ -176,7 +176,7 @@ impl Pager { // spawn a background thread to handle tty size change events, // setting it up to go away when _ctl_guard removes the ctl // handle. - let pty_master_fd = pty_master.raw_fd().ok_or(anyhow!("no fd for pty master"))?; + let pty_master_fd = pty_master.raw_fd(); init_tty_size.set_fd(pty_master_fd).context("setting init tty size")?; let tty_size = Arc::new(Mutex::new(init_tty_size.clone())); let tty_size_ref = Arc::clone(&tty_size); @@ -209,16 +209,13 @@ impl Pager { let mut last_heartbeat_at = Instant::now(); let mut buf = vec![0; consts::BUF_SIZE]; - let watchable_master = pty_master; + let watchable_master = pty_master.clone(); let watchable_client_stream = client_stream.try_clone().context("could not clone client stream")?; loop { // wake up when there is data for us going in either direction let mut poll_fds = [ - poll::PollFd::new( - watchable_master.borrow_fd().ok_or(anyhow!("no master fd"))?, - poll::PollFlags::POLLIN, - ), + poll::PollFd::new(watchable_master.borrow_fd(), poll::PollFlags::POLLIN), poll::PollFd::new(watchable_client_stream.as_fd(), poll::PollFlags::POLLIN), ]; let nready = poll::poll(&mut poll_fds, POLL_MS).context("polling both streams")?; diff --git a/libshpool/src/daemon/prompt.rs b/libshpool/src/daemon/prompt.rs index 16e8cf5f..74a62bf6 100644 --- a/libshpool/src/daemon/prompt.rs +++ b/libshpool/src/daemon/prompt.rs @@ -145,9 +145,9 @@ fn wait_for_startup(pty_master: &mut shpool_pty::fork::Master) -> anyhow::Result .write_all(startup_sentinel_cmd.as_bytes()) .context("running startup sentinel script")?; - let watchable_master = *pty_master; + let watchable_master = pty_master.clone(); let mut poll_fds = [poll::PollFd::new( - watchable_master.borrow_fd().ok_or(anyhow!("no master fd"))?, + watchable_master.borrow_fd(), PollFlags::POLLIN | PollFlags::POLLHUP | PollFlags::POLLERR, )]; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 97f39aca..adbeb622 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -888,9 +888,7 @@ impl Server { let mut fork = shpool_pty::fork::Fork::from_ptmx().context("forking pty")?; if let Ok(slave) = fork.is_child() { if noecho { - if let Some(fd) = slave.borrow_fd() { - tty::disable_echo(fd).context("disabling echo on pty")?; - } + tty::disable_echo(slave.borrow_fd()).context("disabling echo on pty")?; } for fd in consts::STDERR_FD + 1..(nix::unistd::SysconfVar::OPEN_MAX as i32) { let _ = nix::unistd::close(fd); diff --git a/libshpool/src/daemon/shell.rs b/libshpool/src/daemon/shell.rs index 3d6ec59b..61b9a394 100644 --- a/libshpool/src/daemon/shell.rs +++ b/libshpool/src/daemon/shell.rs @@ -232,7 +232,7 @@ impl SessionInner { let mut needs_initial_motd_dump = self.needs_initial_motd_dump; let mut pty_master = self.pty_master.is_parent()?; - let watchable_master = pty_master; + let watchable_master = pty_master.clone(); let name = self.name.clone(); let config = self.config.clone(); let closure = move || { @@ -242,7 +242,7 @@ impl SessionInner { session_restore::new(config, &args.tty_size, args.scrollback_lines); let mut buf: Vec = vec![0; consts::BUF_SIZE]; let mut poll_fds = [poll::PollFd::new( - watchable_master.borrow_fd().ok_or(anyhow!("no master fd"))?, + watchable_master.borrow_fd(), PollFlags::POLLIN | PollFlags::POLLHUP | PollFlags::POLLERR, )]; @@ -292,7 +292,7 @@ impl SessionInner { xpixel: conn.size.xpixel, ypixel: conn.size.ypixel, }; - oversize.set_fd(pty_master.raw_fd().ok_or(anyhow!("no master fd"))?)?; + oversize.set_fd(pty_master.raw_fd())?; // Prepare a resize command for pty to execute later. resize_cmd = Some(ResizeCmd { @@ -404,11 +404,7 @@ impl SessionInner { if resize_cmd.when.saturating_duration_since(time::Instant::now()) == time::Duration::ZERO { - let status = pty_master - .raw_fd() - .ok_or(anyhow!("no master fd")) - .and_then(|fd| resize_cmd.size.set_fd(fd)); - if let Err(e) = status { + if let Err(e) = resize_cmd.size.set_fd(pty_master.raw_fd()) { warn!("error resizing pty: {}", e); } executed_resize = true; @@ -751,7 +747,7 @@ impl SessionInner { span!(Level::INFO, "client->shell", s = self.name, cid = conn_id).entered(); let mut bindings = bindings.context("compiling keybindings engine")?; - let mut master_writer = *pty_master; + let mut master_writer = pty_master.clone(); let mut snip_sections = vec![]; // (, ) let mut keep_sections = vec![]; // (, ) From 2a73f1e0a922dc76c87fea7da59c5239fa747648 Mon Sep 17 00:00:00 2001 From: Ethan Pailes Date: Fri, 24 Apr 2026 22:34:20 +0000 Subject: [PATCH 14/17] chore: tweak AI policy This patch tweaks the AI policy again. --- .github/pull_request_template.md | 6 ++++++ HACKING.md | 36 +++++++++++++++++++------------- 2 files changed, 27 insertions(+), 15 deletions(-) diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 20803d5e..bd5cd0df 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -5,4 +5,10 @@ Please ack that you have read the [AI Policy](https://github.com/shell-pool/shpool/blob/master/HACKING.md#ai-policy) and explain your use of AI to generate this PR. +### This PR was: + +* [ ] mostly or completely vibe coded +* [ ] mostly or completely meat coded +* [ ] bit of both + ## Description diff --git a/HACKING.md b/HACKING.md index 6ea049da..aff54fc3 100644 --- a/HACKING.md +++ b/HACKING.md @@ -6,7 +6,8 @@ Some tips for working on shpool. ## AI Policy Do not use AI to generate issues or bug reports. Instead, just write what -you prompted the AI with. +you prompted the AI with. It's fine (and encouraged!) to use AI to generate +example code to include in an issue to spark a discussion. When creating PRs, it's fine to use AI to write tests and understand the codebase. Application code can be fine as well, but make sure to review carefully @@ -14,6 +15,11 @@ and prevent the accumulation of bloat or needless abstraction. If you do use AI to generate code, you must review and test it carefully yourself. Do not send patches you don't understand. +If you submit a PR that feels substantially AI generated, it may be re-written +to match the style of the existing codebase at the discretion of the +maintainers. That should not discourage you from submitting such PRs, as +they can be very useful even if we just use them for inspiration. + When creating PRs or filing issues, disclose your AI usage. ## Installing From Source @@ -212,20 +218,6 @@ $ rr replay --debugger=rust-gdb --onprocess= where `` is taken from the output of `rr ps`. -## Preserving Logs in Tests - -By default, tests will clean up log files emitted by the various -shpool subprocesses they spawn. In order get the tests to leave -log files around for later inspection, you can set the -`SHPOOL_LEAVE_TEST_LOGS` environment variable to `true`. - -For example to run `happy_path` from the `attach` suite and -leave log files in place you might run - -``` -$ SHPOOL_LEAVE_TEST_LOGS=true cargo test --test attach happy_path -- --nocapture -``` - ## Running Tests on macOS Some tests are skipped on macOS due to platform differences: @@ -245,3 +237,17 @@ and will be skipped automatically when running `cargo test` on macOS. Some tests use hard-coded wait times. This leads to timing failures in some environments. macOS seems particularly sensitive to this, so be aware that some of those tests are currently a bit flaky there. + +## Preserving Logs in Tests + +By default, tests will clean up log files emitted by the various +shpool subprocesses they spawn. In order get the tests to leave +log files around for later inspection, you can set the +`SHPOOL_LEAVE_TEST_LOGS` environment variable to `true`. + +For example to run `happy_path` from the `attach` suite and +leave log files in place you might run + +``` +$ SHPOOL_LEAVE_TEST_LOGS=true cargo test --test attach happy_path -- --nocapture +``` From 7de8ee272f353af2d8d6947d5116731aa4aa2a83 Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Wed, 29 Apr 2026 22:52:54 -0400 Subject: [PATCH 15/17] EVENTS.md: fix main socket description and remove "Ordering" section Per https://github.com/shell-pool/shpool/pull/352#pullrequestreview-4198152162 --- EVENTS.md | 7 +------ 1 file changed, 1 insertion(+), 6 deletions(-) diff --git a/EVENTS.md b/EVENTS.md index 58379280..4d122561 100644 --- a/EVENTS.md +++ b/EVENTS.md @@ -11,7 +11,7 @@ is always consistent with shpool's state. The daemon binds a sibling Unix socket next to the main shpool socket: ```bash -/shpool/shpool.socket # main RPC socket +/shpool/shpool.socket # main socket /shpool/events.socket # events socket (this protocol) ``` @@ -54,11 +54,6 @@ done shpool events | jq . ``` -## Ordering - -Events are published while the daemon holds its session-table lock, so the order -events appear on the wire matches the order in which they happened. - ## Slow subscribers Each subscriber has a bounded outbound queue. A subscriber that falls too far From d1f3cb2819c1d633fa2f41428e5891ae1c01873d Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Tue, 5 May 2026 13:45:05 -0400 Subject: [PATCH 16/17] events: polish error contexts, naming, and module boundary - Add .context() on the signal-handler spawn and events-listener startup paths in daemon::run. - Move the ttl-reaper thread::spawn's capture-clones (shells, events_bus) inside a block expression. The cloned Arcs shadow the outer names rather than introducing parallel shells_tab / reaper_bus bindings in the function scope. - Reword the comment on the shells-remove + SessionRemoved publish to explain why the publish is gated on is_some(): a concurrent kill or reaper may have removed the entry (and published) while we were waiting for the lock. - Fold three single-use SystemTime::now() locals back inline. - Have start_listener take Arc directly instead of a closure that registers + spawns a writer. EventBus::register and spawn_writer drop their pub modifiers now that the only call site is within events.rs. - Restore the let-binding type annotation on the collected sessions Result in handle_list. - Document JSONL newline handling in EVENTS.md. --- EVENTS.md | 4 ++++ libshpool/src/daemon/mod.rs | 5 +++-- libshpool/src/daemon/server.rs | 39 +++++++++++++++------------------- libshpool/src/events.rs | 27 ++++++++++------------- 4 files changed, 35 insertions(+), 40 deletions(-) diff --git a/EVENTS.md b/EVENTS.md index 4d122561..481ead59 100644 --- a/EVENTS.md +++ b/EVENTS.md @@ -40,6 +40,10 @@ The daemon writes one JSON object per line (JSONL). Each event looks like: There are no other fields. To learn what the event refers to (which session, when, etc.), call `shpool list` (or use `ConnectHeader::List`). +The format is robust: literal newline characters only appear as delimiters +between events. Any newlines within JSON string values are automatically +escaped (as `\n`) by the daemon. + ## Subscribing For ad-hoc use, `shpool events` connects to the events socket and prints each diff --git a/libshpool/src/daemon/mod.rs b/libshpool/src/daemon/mod.rs index d23e8357..82ce5b7c 100644 --- a/libshpool/src/daemon/mod.rs +++ b/libshpool/src/daemon/mod.rs @@ -88,9 +88,10 @@ pub fn run( // RAII guard can run. let mut socks_to_clean: Vec = cleanup_socket.iter().cloned().collect(); socks_to_clean.push(events_socket.clone()); - signals::Handler::new(socks_to_clean).spawn()?; + signals::Handler::new(socks_to_clean).spawn().context("spawning signal handler")?; - let _events_guard = server.start_events_listener(events_socket)?; + let _events_guard = + server.start_events_listener(events_socket).context("starting events listener")?; server::Server::serve(server, listener)?; diff --git a/libshpool/src/daemon/server.rs b/libshpool/src/daemon/server.rs index 1739853c..b27465e2 100644 --- a/libshpool/src/daemon/server.rs +++ b/libshpool/src/daemon/server.rs @@ -98,11 +98,13 @@ impl Server { // buffered so that we are unlikely to block when setting up a // new session let (new_sess_tx, new_sess_rx) = crossbeam_channel::bounded(10); - let shells_tab = Arc::clone(&shells); - let reaper_bus = Arc::clone(&events_bus); - thread::spawn(move || { - if let Err(e) = ttl_reaper::run(new_sess_rx, shells_tab, reaper_bus) { - warn!("ttl reaper exited with error: {:?}", e); + thread::spawn({ + let shells = Arc::clone(&shells); + let events_bus = Arc::clone(&events_bus); + move || { + if let Err(e) = ttl_reaper::run(new_sess_rx, shells, events_bus) { + warn!("ttl reaper exited with error: {:?}", e); + } } }); @@ -125,13 +127,7 @@ impl Server { self: &Arc, socket_path: PathBuf, ) -> anyhow::Result { - let server = Arc::clone(self); - events::start_listener(socket_path, move |stream| server.handle_events_subscriber(stream)) - } - - fn handle_events_subscriber(&self, stream: UnixStream) -> anyhow::Result<()> { - let receiver = self.events_bus.register(); - events::spawn_writer(stream, receiver) + events::start_listener(socket_path, Arc::clone(&self.events_bus)) } #[instrument(skip_all)] @@ -329,8 +325,9 @@ impl Server { { let _s = span!(Level::INFO, "2_lock(shells)").entered(); let mut shells = self.shells.lock().unwrap(); - // Gated: a concurrent kill or reaper may have already - // removed the entry and published its own removal. + // The publish below is gated on `is_some()` because a + // concurrent kill or reaper may have already removed the + // entry (and published) while we were waiting for the lock. if shells.remove(&header.name).is_some() { self.events_bus.publish(&events::Event::SessionRemoved); } @@ -352,9 +349,8 @@ impl Server { let _s = span!(Level::INFO, "disconnect_lock(shells)").entered(); let shells = self.shells.lock().unwrap(); if let Some(session) = shells.get(&header.name) { - let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_disconnected_at = - Some(now); + Some(time::SystemTime::now()); self.events_bus.publish(&events::Event::SessionDetached); } } @@ -417,9 +413,8 @@ impl Server { // the channel is still open so the subshell is still running info!("taking over existing session inner"); inner.client_stream = Some(stream.try_clone()?); - let now = time::SystemTime::now(); session.lifecycle_timestamps.lock().unwrap().last_connected_at = - Some(now); + Some(time::SystemTime::now()); if inner .shell_to_client_join_h @@ -494,8 +489,8 @@ impl Server { matches!(motd, MotdDisplayMode::Dump), )?; - let now = time::SystemTime::now(); - session.lifecycle_timestamps.lock().unwrap().last_connected_at = Some(now); + session.lifecycle_timestamps.lock().unwrap().last_connected_at = + Some(time::SystemTime::now()); { // we unwrap to propagate the poison as an unwind let _s = span!(Level::INFO, "select_shell_lock_2(shells)").entered(); @@ -688,7 +683,7 @@ impl Server { fn handle_list(&self, mut stream: UnixStream) -> anyhow::Result<()> { let _s = span!(Level::INFO, "lock(shells)").entered(); let shells = self.shells.lock().unwrap(); - let sessions = shells + let sessions: Vec = shells .iter() .map(|(k, v)| { let status = match v.inner.try_lock() { @@ -715,7 +710,7 @@ impl Server { status, }) }) - .collect::>>() + .collect::>() .context("collecting running session metadata")?; write_reply(&mut stream, ListReply { sessions })?; Ok(()) diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index 4d9a6077..b876916e 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -85,7 +85,7 @@ impl EventBus { /// Register a new subscriber. Returns the receiver to be handed to a /// writer thread. - pub fn register(&self) -> Receiver> { + fn register(&self) -> Receiver> { let (tx, rx) = mpsc::sync_channel(SUBSCRIBER_QUEUE_DEPTH); self.subscribers.lock().unwrap().push(tx); rx @@ -144,13 +144,10 @@ impl Drop for ListenerGuard { } /// Bind the events socket and spawn the accept thread. For each accepted -/// connection, `on_accept` is invoked with the stream; it is expected to -/// register the subscriber with the bus and spawn a writer thread (see -/// [`spawn_writer`]). The returned guard unlinks the socket file on drop. -pub fn start_listener(socket_path: PathBuf, on_accept: F) -> anyhow::Result -where - F: Fn(UnixStream) -> anyhow::Result<()> + Send + 'static, -{ +/// connection, the subscriber is registered with `bus` and a writer +/// thread is spawned to drain its queue. The returned guard unlinks the +/// socket file on drop. +pub fn start_listener(socket_path: PathBuf, bus: Arc) -> anyhow::Result { if socket_path.exists() { std::fs::remove_file(&socket_path) .with_context(|| format!("removing stale events socket {:?}", socket_path))?; @@ -160,19 +157,17 @@ where info!("events socket listening at {:?}", socket_path); thread::Builder::new() .name("events-accept".into()) - .spawn(move || run_accept_loop(listener, on_accept)) + .spawn(move || run_accept_loop(listener, bus)) .context("spawning events accept thread")?; Ok(ListenerGuard { path: socket_path }) } -fn run_accept_loop(listener: UnixListener, on_accept: F) -where - F: Fn(UnixStream) -> anyhow::Result<()>, -{ +fn run_accept_loop(listener: UnixListener, bus: Arc) { for stream in listener.incoming() { match stream { Ok(stream) => { - if let Err(e) = on_accept(stream) { + let receiver = bus.register(); + if let Err(e) = spawn_writer(stream, receiver) { warn!("accepting events subscriber: {:?}", e); } } @@ -186,7 +181,7 @@ where /// Set the write timeout and spawn a thread that drains `receiver` to /// `stream` until either side closes or a write times out. -pub fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { +fn spawn_writer(stream: UnixStream, receiver: Receiver>) -> anyhow::Result<()> { stream.set_write_timeout(Some(WRITE_TIMEOUT)).context("setting write timeout")?; thread::Builder::new() .name("events-writer".into()) @@ -308,7 +303,7 @@ mod tests { fn listener_guard_unlinks_socket_on_drop() { let dir = tempfile::tempdir().unwrap(); let path = dir.path().join("events.socket"); - let guard = start_listener(path.clone(), |_| Ok(())).unwrap(); + let guard = start_listener(path.clone(), EventBus::new()).unwrap(); assert!(path.exists(), "socket file should exist while guard is alive"); drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); From ca34b35b5bfee35b61ab4a4a7c57c4244c2d426b Mon Sep 17 00:00:00 2001 From: Geoffrey Churchill Date: Tue, 5 May 2026 14:45:09 -0400 Subject: [PATCH 17/17] events: add tests for bus isolation, latency, and lock ordering - bus_publish_with_many_subscribers_is_not_quadratic: bound a single publish to 10K subscribers under 50 ms (~4 ms on a 2.1 GHz CPU; 50 ms absorbs CI tail latency while still catching quadratic regressions, which would be on the order of seconds at N=10K). - bus_drops_slow_subscriber_on_overflow_without_affecting_fast: a fast subscriber keeps receiving every event while a slow one's queue fills and is eventually dropped on overflow. Asserts SUBSCRIBER_QUEUE_DEPTH stays small since the test scales with it. - accept_loop_registers_concurrent_subscribers: dial 20 connections in parallel and confirm all are registered and each receives a published event. - bus_concurrent_publish_under_outer_lock_delivers_all_events: publish from multiple threads while each holds an outer mutex; all events arrive in order. --- libshpool/src/events.rs | 120 ++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/libshpool/src/events.rs b/libshpool/src/events.rs index b876916e..04ff0ae0 100644 --- a/libshpool/src/events.rs +++ b/libshpool/src/events.rs @@ -308,4 +308,124 @@ mod tests { drop(guard); assert!(!path.exists(), "socket file should be unlinked on guard drop"); } + + #[test] + fn bus_publish_with_many_subscribers_is_not_quadratic() { + let bus = EventBus::new(); + let n = 10_000; + let mut rxs = Vec::with_capacity(n); + for _ in 0..n { + rxs.push(bus.register()); + } + let start = std::time::Instant::now(); + bus.publish(&Event::SessionCreated); + let elapsed = start.elapsed(); + // Measured ~4 ms on a 2.1 GHz CPU; the 50 ms bound absorbs CI + // tail latency while still catching gross regressions (e.g. an + // accidental O(N^2) would be on the order of seconds at N=10K). + assert!(elapsed < Duration::from_millis(50), "publish to {n} subscribers took {elapsed:?}"); + } + + #[test] + fn bus_drops_slow_subscriber_on_overflow_without_affecting_fast() { + // This test is O(SUBSCRIBER_QUEUE_DEPTH); cap the constant so a + // future bump (e.g. after moving to a dynamically-grown queue) doesn't make + // this test unboundedly slow. + assert!(SUBSCRIBER_QUEUE_DEPTH < 1024); + + let bus = EventBus::new(); + let slow_rx = bus.register(); + let fast_rx = bus.register(); + + // Fill slow's queue while draining fast each iteration so only slow + // accumulates a backlog. + for _ in 0..SUBSCRIBER_QUEUE_DEPTH { + bus.publish(&Event::SessionCreated); + assert_eq!(&*fast_rx.recv().unwrap(), "{\"type\":\"session.created\"}\n"); + } + assert_eq!(bus.subscribers.lock().unwrap().len(), 2); + + // The next publish overflows slow's queue and drops it. Fast's queue + // is empty so it keeps receiving. + bus.publish(&Event::SessionCreated); + assert_eq!(&*fast_rx.recv().unwrap(), "{\"type\":\"session.created\"}\n"); + assert_eq!(bus.subscribers.lock().unwrap().len(), 1); + + drop(slow_rx); + } + + #[test] + fn accept_loop_registers_concurrent_subscribers() { + use std::io::Read; + + let dir = tempfile::tempdir().unwrap(); + let path = dir.path().join("events.socket"); + let bus = EventBus::new(); + let _guard = start_listener(path.clone(), Arc::clone(&bus)).unwrap(); + + let n = 20; + let dial_handles: Vec<_> = (0..n) + .map(|_| { + let path = path.clone(); + thread::spawn(move || UnixStream::connect(&path).unwrap()) + }) + .collect(); + let streams: Vec = + dial_handles.into_iter().map(|h| h.join().unwrap()).collect(); + + // Wait for the accept thread to register all subscribers. + let deadline = std::time::Instant::now() + Duration::from_secs(2); + loop { + let count = bus.subscribers.lock().unwrap().len(); + if count >= n { + break; + } + assert!( + std::time::Instant::now() < deadline, + "only {count}/{n} subscribers registered before timeout" + ); + thread::sleep(Duration::from_millis(10)); + } + + bus.publish(&Event::SessionCreated); + let expected = b"{\"type\":\"session.created\"}\n"; + for mut stream in streams { + stream.set_read_timeout(Some(Duration::from_secs(2))).unwrap(); + let mut buf = vec![0u8; expected.len()]; + stream.read_exact(&mut buf).unwrap(); + assert_eq!(buf.as_slice(), expected); + } + } + + #[test] + fn bus_concurrent_publish_under_outer_lock_delivers_all_events() { + let bus = EventBus::new(); + let rx = bus.register(); + let outer: Arc> = Arc::new(Mutex::new(())); + + let n_threads = 4; + let n_per_thread = 8; + let total = n_threads * n_per_thread; + + let handles: Vec<_> = (0..n_threads) + .map(|_| { + let bus = Arc::clone(&bus); + let outer = Arc::clone(&outer); + thread::spawn(move || { + for _ in 0..n_per_thread { + let _g = outer.lock().unwrap(); + bus.publish(&Event::SessionCreated); + } + }) + }) + .collect(); + for h in handles { + h.join().unwrap(); + } + + for _ in 0..total { + rx.recv().unwrap(); + } + assert!(rx.try_recv().is_err()); + } }