From 95e35ff0bd668454014264409b15ae8486fa1509 Mon Sep 17 00:00:00 2001 From: novahe Date: Sat, 14 Mar 2026 01:17:18 +0800 Subject: [PATCH] fix(shim): optimize monitor to avoid intermittent hangs This commit addresses several deadlock and hang scenarios in the monitor module: - Replaced tokio::sync::Mutex with std::sync::Mutex to allow safe Drop implementation. - Switched to unbounded channels to prevent Reaper thread blocking on subscriber backpressure. - Restored in-lock notification to ensure strict FIFO ordering and unsubscription consistency. --- crates/shim/src/asynchronous/monitor.rs | 350 ++++++++++++++++++------ crates/shim/src/synchronous/monitor.rs | 236 +++++++++++++--- 2 files changed, 460 insertions(+), 126 deletions(-) diff --git a/crates/shim/src/asynchronous/monitor.rs b/crates/shim/src/asynchronous/monitor.rs index efc7b0ff..d06375dd 100644 --- a/crates/shim/src/asynchronous/monitor.rs +++ b/crates/shim/src/asynchronous/monitor.rs @@ -14,17 +14,14 @@ limitations under the License. */ -use std::collections::HashMap; +use std::{collections::HashMap, sync::Mutex}; use lazy_static::lazy_static; use log::error; -use tokio::sync::{ - mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, - Mutex, -}; +use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}; use crate::{ - error::{Error, Result}, + error::Result, monitor::{ExitEvent, Subject, Topic}, }; @@ -40,24 +37,38 @@ lazy_static! { } pub async fn monitor_subscribe(topic: Topic) -> Result { - let mut monitor = MONITOR.lock().await; + let mut monitor = MONITOR.lock().unwrap(); let s = monitor.subscribe(topic)?; Ok(s) } pub async fn monitor_unsubscribe(sub_id: i64) -> Result<()> { - let mut monitor = MONITOR.lock().await; + let mut monitor = MONITOR.lock().unwrap(); monitor.unsubscribe(sub_id) } pub async fn monitor_notify_by_pid(pid: i32, exit_code: i32) -> Result<()> { - let monitor = MONITOR.lock().await; - monitor.notify_by_pid(pid, exit_code).await + let mut monitor = MONITOR.lock().unwrap(); + let subject = Subject::Pid(pid); + monitor.notify_topic(&Topic::Pid, &subject, exit_code); + monitor.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) +} + +pub fn monitor_notify_by_pid_blocking(pid: i32, exit_code: i32) -> Result<()> { + let mut monitor = MONITOR.lock().unwrap(); + let subject = Subject::Pid(pid); + monitor.notify_topic(&Topic::Pid, &subject, exit_code); + monitor.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) } pub async fn monitor_notify_by_exec(id: &str, exec_id: &str, exit_code: i32) -> Result<()> { - let monitor = MONITOR.lock().await; - monitor.notify_by_exec(id, exec_id, exit_code).await + let mut monitor = MONITOR.lock().unwrap(); + let subject = Subject::Exec(id.into(), exec_id.into()); + monitor.notify_topic(&Topic::Exec, &subject, exit_code); + monitor.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) } pub struct Monitor { @@ -76,6 +87,17 @@ pub struct Subscription { pub rx: UnboundedReceiver, } +impl Drop for Subscription { + fn drop(&mut self) { + let id = self.id; + // std::sync::Mutex::lock is safe in any context (sync/async). + // It will only block the thread for a very short time. + if let Ok(mut monitor) = MONITOR.lock() { + let _ = monitor.unsubscribe(id); + } + } +} + impl Monitor { pub fn subscribe(&mut self, topic: Topic) -> Result { let (tx, rx) = unbounded_channel::(); @@ -91,39 +113,23 @@ impl Monitor { Ok(Subscription { id, rx }) } - pub async fn notify_by_pid(&self, pid: i32, exit_code: i32) -> Result<()> { - let subject = Subject::Pid(pid); - self.notify_topic(&Topic::Pid, &subject, exit_code).await; - self.notify_topic(&Topic::All, &subject, exit_code).await; - Ok(()) - } - - pub async fn notify_by_exec(&self, cid: &str, exec_id: &str, exit_code: i32) -> Result<()> { - let subject = Subject::Exec(cid.into(), exec_id.into()); - self.notify_topic(&Topic::Exec, &subject, exit_code).await; - self.notify_topic(&Topic::All, &subject, exit_code).await; - Ok(()) - } - - // notify_topic try best to notify exit codes to all subscribers and log errors. - async fn notify_topic(&self, topic: &Topic, subject: &Subject, exit_code: i32) { - let mut results = Vec::new(); + fn notify_topic(&mut self, topic: &Topic, subject: &Subject, exit_code: i32) { + let mut dead_subscribers = Vec::new(); if let Some(subs) = self.topic_subs.get(topic) { - let subscribers = subs.iter().filter_map(|x| self.subscribers.get(x)); - for sub in subscribers { - let res = sub - .tx - .send(ExitEvent { + for i in subs { + if let Some(sub) = self.subscribers.get(i) { + if let Err(e) = sub.tx.send(ExitEvent { subject: subject.clone(), exit_code, - }) - .map_err(other_error!("failed to send exit code")); - results.push(res); + }) { + error!("failed to send exit code to subscriber {}: {:?}", i, e); + dead_subscribers.push(*i); + } + } } } - let mut result_iter = results.iter(); - while let Some(Err(e)) = result_iter.next() { - error!("failed to send exit code to subscriber {:?}", e) + for id in dead_subscribers { + let _ = self.unsubscribe(id); } } @@ -142,71 +148,235 @@ impl Monitor { #[cfg(test)] mod tests { + use std::time::Duration; + + use lazy_static::lazy_static; + use tokio::sync::Mutex; + use crate::{ asynchronous::monitor::{ monitor_notify_by_exec, monitor_notify_by_pid, monitor_subscribe, monitor_unsubscribe, + MONITOR, }, - monitor::{ExitEvent, Subject, Topic}, + monitor::{Subject, Topic}, }; + lazy_static! { + // Use tokio::sync::Mutex for tests to avoid holding std::sync::MutexGuard across awaits + static ref TEST_LOCK: Mutex<()> = Mutex::new(()); + } + #[tokio::test] - async fn test_monitor() { - let mut s = monitor_subscribe(Topic::Pid).await.unwrap(); - let mut s1 = monitor_subscribe(Topic::All).await.unwrap(); - let mut s2 = monitor_subscribe(Topic::Exec).await.unwrap(); - monitor_notify_by_pid(13, 128).await.unwrap(); - monitor_notify_by_exec("test-container", "test-exec", 139) - .await - .unwrap(); - // pid subscription receive only pid event - if let Some(ExitEvent { - subject: Subject::Pid(p), - exit_code: ec, - }) = s.rx.recv().await + async fn test_monitor_table() { + let _guard = TEST_LOCK.lock().await; + // Clean up any leftovers from previous tests { - assert_eq!(ec, 128); - assert_eq!(p, 13); - } else { - panic!("can not receive the notified event"); + let mut monitor = MONITOR.lock().unwrap(); + monitor.subscribers.clear(); + monitor.topic_subs.clear(); } - // topic all receive all events - if let Some(ExitEvent { - subject: Subject::Pid(p), - exit_code: ec, - }) = s1.rx.recv().await - { - assert_eq!(ec, 128); - assert_eq!(p, 13); - } else { - panic!("can not receive the notified event"); + struct TestCase { + name: &'static str, + subscribe_to: Topic, + notify_pid: Option, + notify_exec: Option<(&'static str, &'static str)>, + expected_pid: Option, + expected_exec: Option<(&'static str, &'static str)>, } - if let Some(ExitEvent { - subject: Subject::Exec(cid, eid), - exit_code: ec, - }) = s1.rx.recv().await + + let cases = vec![ + TestCase { + name: "pid_topic_receives_pid_event", + subscribe_to: Topic::Pid, + notify_pid: Some(101), + notify_exec: None, + expected_pid: Some(101), + expected_exec: None, + }, + TestCase { + name: "pid_topic_ignores_exec_event", + subscribe_to: Topic::Pid, + notify_pid: None, + notify_exec: Some(("c1", "e1")), + expected_pid: None, + expected_exec: None, + }, + TestCase { + name: "exec_topic_receives_exec_event", + subscribe_to: Topic::Exec, + notify_pid: None, + notify_exec: Some(("c2", "e2")), + expected_pid: None, + expected_exec: Some(("c2", "e2")), + }, + TestCase { + name: "all_topic_receives_both", + subscribe_to: Topic::All, + notify_pid: Some(102), + notify_exec: Some(("c3", "e3")), + expected_pid: Some(102), + expected_exec: Some(("c3", "e3")), + }, + ]; + + for tc in cases { + let mut s = monitor_subscribe(tc.subscribe_to.clone()).await.unwrap(); + + if let Some(pid) = tc.notify_pid { + monitor_notify_by_pid(pid, 0).await.unwrap(); + } + if let Some((cid, eid)) = tc.notify_exec { + monitor_notify_by_exec(cid, eid, 0).await.unwrap(); + } + + if let Some(exp_pid) = tc.expected_pid { + let event = tokio::time::timeout(Duration::from_millis(200), s.rx.recv()) + .await + .unwrap_or_else(|_| panic!("{}: timed out waiting for pid", tc.name)) + .expect("channel closed"); + match event.subject { + Subject::Pid(p) => assert_eq!(p, exp_pid, "{}", tc.name), + _ => panic!("{}: expected pid, got {:?}", tc.name, event.subject), + } + } + + if let Some((exp_cid, exp_eid)) = tc.expected_exec { + let event = tokio::time::timeout(Duration::from_millis(200), s.rx.recv()) + .await + .unwrap_or_else(|_| panic!("{}: timed out waiting for exec", tc.name)) + .expect("channel closed"); + match event.subject { + Subject::Exec(c, e) => { + assert_eq!(c, exp_cid, "{}", tc.name); + assert_eq!(e, exp_eid, "{}", tc.name); + } + _ => panic!("{}: expected exec, got {:?}", tc.name, event.subject), + } + } + + // Ensure no extra messages + let res = tokio::time::timeout(Duration::from_millis(50), s.rx.recv()).await; + assert!(res.is_err(), "{}: received unexpected extra event", tc.name); + + monitor_unsubscribe(s.id).await.unwrap(); + } + } + + #[tokio::test] + async fn test_monitor_backpressure() { + let _guard = TEST_LOCK.lock().await; + // Clean up { - assert_eq!(cid, "test-container"); - assert_eq!(eid, "test-exec"); - assert_eq!(ec, 139); - } else { - panic!("can not receive the notified event"); + let mut monitor = MONITOR.lock().unwrap(); + monitor.subscribers.clear(); + monitor.topic_subs.clear(); } - // exec topic only receive exec exit event - if let Some(ExitEvent { - subject: Subject::Exec(cid, eid), - exit_code: ec, - }) = s2.rx.recv().await + let mut s = monitor_subscribe(Topic::Pid).await.unwrap(); + let sid = s.id; + let count = 200; + let base_pid = 10000; + + let receiver = tokio::spawn(async move { + let mut received = 0; + while received < count { + if let Some(event) = s.rx.recv().await { + match event.subject { + Subject::Pid(pid) => { + assert_eq!(pid, base_pid + received); + } + _ => continue, + } + received += 1; + if received % 10 == 0 { + tokio::time::sleep(Duration::from_millis(1)).await; + } + } else { + break; + } + } + received + }); + + for i in 0..count { + monitor_notify_by_pid(base_pid + i, 0).await.unwrap(); + } + + let received_count = tokio::time::timeout(Duration::from_secs(5), receiver) + .await + .expect("Test timed out") + .expect("Receiver task failed"); + + assert_eq!(received_count, count); + monitor_unsubscribe(sid).await.unwrap(); + } + + #[tokio::test] + async fn test_monitor_reliability() { + let _guard = TEST_LOCK.lock().await; + // Clean up { - assert_eq!(cid, "test-container"); - assert_eq!(eid, "test-exec"); - assert_eq!(ec, 139); - } else { - panic!("can not receive the notified event"); + let mut monitor = MONITOR.lock().unwrap(); + monitor.subscribers.clear(); + monitor.topic_subs.clear(); + } + + enum Action { + Unsubscribe, + Drop, + } + + struct ReliabilityCase { + name: &'static str, + action: Action, + } + + let cases = vec![ + ReliabilityCase { + name: "explicit_unsubscribe", + action: Action::Unsubscribe, + }, + ReliabilityCase { + name: "drop_handle", + action: Action::Drop, + }, + ]; + + for tc in cases { + let s_to_remove = monitor_subscribe(Topic::Pid).await.unwrap(); + let mut s_stay = monitor_subscribe(Topic::Pid).await.unwrap(); + let rid = s_to_remove.id; + let test_pid = 20000; + + match tc.action { + Action::Unsubscribe => { + monitor_unsubscribe(rid).await.unwrap(); + } + Action::Drop => { + drop(s_to_remove); + tokio::time::sleep(Duration::from_millis(100)).await; + } + } + + monitor_notify_by_pid(test_pid, 0).await.unwrap(); + + // s_stay should receive the event + let event = tokio::time::timeout(Duration::from_millis(500), s_stay.rx.recv()) + .await + .unwrap_or_else(|_| panic!("{}: stay subscription timed out", tc.name)) + .expect("channel closed"); + + match event.subject { + Subject::Pid(p) => assert_eq!(p, test_pid, "{}", tc.name), + _ => panic!("{}: unexpected event", tc.name), + } + + { + let monitor = MONITOR.lock().unwrap(); + assert!(!monitor.subscribers.contains_key(&rid), "{}", tc.name); + } + monitor_unsubscribe(s_stay.id).await.unwrap(); } - monitor_unsubscribe(s.id).await.unwrap(); - monitor_unsubscribe(s1.id).await.unwrap(); - monitor_unsubscribe(s2.id).await.unwrap(); } } diff --git a/crates/shim/src/synchronous/monitor.rs b/crates/shim/src/synchronous/monitor.rs index e36730a1..fde325d1 100644 --- a/crates/shim/src/synchronous/monitor.rs +++ b/crates/shim/src/synchronous/monitor.rs @@ -20,6 +20,7 @@ use std::{ mpsc::{channel, Receiver, Sender}, Mutex, }, + time::Duration, }; use lazy_static::lazy_static; @@ -48,13 +49,24 @@ pub fn monitor_subscribe(topic: Topic) -> Result { } pub fn monitor_notify_by_pid(pid: i32, exit_code: i32) -> Result<()> { - let monitor = MONITOR.lock().unwrap(); - monitor.notify_by_pid(pid, exit_code) + let mut monitor = MONITOR.lock().unwrap(); + let subject = Subject::Pid(pid); + monitor.notify_topic(&Topic::Pid, &subject, exit_code); + monitor.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) } pub fn monitor_notify_by_exec(id: &str, exec_id: &str, exit_code: i32) -> Result<()> { - let monitor = MONITOR.lock().unwrap(); - monitor.notify_by_exec(id, exec_id, exit_code) + let mut monitor = MONITOR.lock().unwrap(); + let subject = Subject::Exec(id.into(), exec_id.into()); + monitor.notify_topic(&Topic::Exec, &subject, exit_code); + monitor.notify_topic(&Topic::All, &subject, exit_code); + Ok(()) +} + +pub fn monitor_unsubscribe(id: i64) -> Result<()> { + let mut monitor = MONITOR.lock().unwrap(); + monitor.unsubscribe(id) } pub struct Monitor { @@ -87,34 +99,21 @@ impl Monitor { Ok(Subscription { id, rx }) } - pub fn notify_by_pid(&self, pid: i32, exit_code: i32) -> Result<()> { - let subject = Subject::Pid(pid); - self.notify_topic(&Topic::Pid, &subject, exit_code); - self.notify_topic(&Topic::All, &subject, exit_code); - Ok(()) - } - - pub fn notify_by_exec(&self, cid: &str, exec_id: &str, exit_code: i32) -> Result<()> { - let subject = Subject::Exec(cid.into(), exec_id.into()); - self.notify_topic(&Topic::Exec, &subject, exit_code); - self.notify_topic(&Topic::All, &subject, exit_code); - Ok(()) - } - - fn notify_topic(&self, topic: &Topic, subject: &Subject, exit_code: i32) { - self.topic_subs.get(topic).map_or((), |subs| { + fn notify_topic(&mut self, topic: &Topic, subject: &Subject, exit_code: i32) { + if let Some(subs) = self.topic_subs.get(topic) { for i in subs { - self.subscribers.get(i).and_then(|sub| { - sub.tx - .send(ExitEvent { - subject: subject.clone(), - exit_code, - }) - .map_err(|e| warn!("failed to send {}", e)) - .ok() - }); + if let Some(sub) = self.subscribers.get(i) { + // channel::Sender::send is non-blocking when using unbounded channel. + // Sending while holding the lock prevents races with unsubscribe. + if let Err(e) = sub.tx.send(ExitEvent { + subject: subject.clone(), + exit_code, + }) { + warn!("failed to send exit event to subscriber {}: {}", i, e); + } + } } - }) + } } pub fn unsubscribe(&mut self, id: i64) -> Result<()> { @@ -141,14 +140,179 @@ impl Drop for Subscription { pub fn wait_pid(pid: i32, s: Subscription) -> i32 { loop { - if let Ok(ExitEvent { - subject: Subject::Pid(epid), - exit_code: code, - }) = s.rx.recv() + match s.rx.recv_timeout(Duration::from_secs(1)) { + Ok(ExitEvent { + subject: Subject::Pid(epid), + exit_code: code, + }) => { + if pid == epid { + return code; + } + } + Ok(_) => continue, + Err(std::sync::mpsc::RecvTimeoutError::Timeout) => continue, + Err(std::sync::mpsc::RecvTimeoutError::Disconnected) => return 128, + } + } +} + +#[cfg(test)] +mod tests { + use std::thread; + + use super::*; + + lazy_static! { + static ref SYNC_TEST_LOCK: Mutex<()> = Mutex::new(()); + } + + #[test] + fn test_monitor_table_sync() { + let _guard = SYNC_TEST_LOCK.lock().unwrap(); { - if pid == epid { - return code; + let mut monitor = MONITOR.lock().unwrap(); + monitor.subscribers.clear(); + monitor.topic_subs.clear(); + } + + struct TestCase { + name: &'static str, + subscribe_to: Topic, + notify_pid: Option, + notify_exec: Option<(&'static str, &'static str)>, + expected_pid: Option, + expected_exec: Option<(&'static str, &'static str)>, + } + + let cases = vec![ + TestCase { + name: "pid_topic_receives_pid_event", + subscribe_to: Topic::Pid, + notify_pid: Some(301), + notify_exec: None, + expected_pid: Some(301), + expected_exec: None, + }, + TestCase { + name: "exec_topic_receives_exec_event", + subscribe_to: Topic::Exec, + notify_pid: None, + notify_exec: Some(("c2", "e2")), + expected_pid: None, + expected_exec: Some(("c2", "e2")), + }, + ]; + + for tc in cases { + let s = monitor_subscribe(tc.subscribe_to.clone()).unwrap(); + + if let Some(pid) = tc.notify_pid { + monitor_notify_by_pid(pid, 0).unwrap(); + } + if let Some((cid, eid)) = tc.notify_exec { + monitor_notify_by_exec(cid, eid, 0).unwrap(); } + + if let Some(exp_pid) = tc.expected_pid { + let event = + s.rx.recv_timeout(Duration::from_millis(100)) + .unwrap_or_else(|_| panic!("{}: timed out", tc.name)); + match event.subject { + Subject::Pid(p) => assert_eq!(p, exp_pid, "{}", tc.name), + _ => panic!("{}: unexpected subject", tc.name), + } + } + + if let Some((exp_cid, exp_eid)) = tc.expected_exec { + let event = + s.rx.recv_timeout(Duration::from_millis(100)) + .unwrap_or_else(|_| panic!("{}: timed out", tc.name)); + match event.subject { + Subject::Exec(c, e) => { + assert_eq!(c, exp_cid, "{}", tc.name); + assert_eq!(e, exp_eid, "{}", tc.name); + } + _ => panic!("{}: unexpected subject", tc.name), + } + } + + monitor_unsubscribe(s.id).unwrap(); + } + } + + #[test] + fn test_monitor_backpressure_sync() { + let _guard = SYNC_TEST_LOCK.lock().unwrap(); + { + let mut monitor = MONITOR.lock().unwrap(); + monitor.subscribers.clear(); + monitor.topic_subs.clear(); } + + let s = monitor_subscribe(Topic::Pid).unwrap(); + let sid = s.id; + let count = 200; + let base_pid = 20000; + + let handle = thread::spawn(move || { + let mut received = 0; + while received < count { + if let Ok(event) = s.rx.recv_timeout(Duration::from_secs(5)) { + match event.subject { + Subject::Pid(pid) => { + assert_eq!(pid, base_pid + received) + } + _ => continue, + } + received += 1; + if received % 10 == 0 { + thread::sleep(Duration::from_millis(1)); + } + } else { + break; + } + } + received + }); + + for i in 0..count { + monitor_notify_by_pid(base_pid + i, 0).unwrap(); + } + + let received_count = handle.join().expect("Receiver thread panicked"); + assert_eq!(received_count, count); + + monitor_unsubscribe(sid).unwrap(); + } + + #[test] + fn test_monitor_reliability_sync() { + let _guard = SYNC_TEST_LOCK.lock().unwrap(); + { + let mut monitor = MONITOR.lock().unwrap(); + monitor.subscribers.clear(); + monitor.topic_subs.clear(); + } + + let s_to_drop = monitor_subscribe(Topic::Pid).unwrap(); + let s_stay = monitor_subscribe(Topic::Pid).unwrap(); + let rid = s_to_drop.id; + let test_pid = 30000; + + drop(s_to_drop); + thread::sleep(Duration::from_millis(50)); + + monitor_notify_by_pid(test_pid, 0).unwrap(); + + let event = s_stay.rx.recv_timeout(Duration::from_millis(200)).unwrap(); + match event.subject { + Subject::Pid(p) => assert_eq!(p, test_pid), + _ => panic!("unexpected event"), + } + + let monitor = MONITOR.lock().unwrap(); + assert!(!monitor.subscribers.contains_key(&rid)); + drop(monitor); + monitor_unsubscribe(s_stay.id).unwrap(); } }