From f7ea64f7651a2925303337f23358b7727e09e987 Mon Sep 17 00:00:00 2001 From: Eduardo Trujillo <ed@chromabits.com> Date: Tue, 15 Nov 2022 12:50:45 -0800 Subject: [PATCH] refactor(monitor): Use monitor module from collective crate --- src/lib.rs | 1 - src/main.rs | 9 +- src/monitor.rs | 227 ------------------------------------------------- 3 files changed, 5 insertions(+), 232 deletions(-) delete mode 100644 src/monitor.rs diff --git a/src/lib.rs b/src/lib.rs index c9ae179..65b8c4b 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,6 +9,5 @@ extern crate log; pub mod bundle; pub mod config; pub mod files; -pub mod monitor; pub mod server; pub mod stats; diff --git a/src/main.rs b/src/main.rs index d47c14f..c4c87d5 100644 --- a/src/main.rs +++ b/src/main.rs @@ -11,10 +11,12 @@ use clap::{Parser, Subcommand}; use collective::{ cli::{AppOpts, ConfigurableAppOpts}, config::ConfigFileFormat, - thread, + thread::{ + self, + monitor::{self, ThreadMonitor}, + }, }; use lazy_static::lazy_static; -use monitor::Monitor; use server::Server; use snafu::{ResultExt, Snafu}; use stats::StatsServer; @@ -29,12 +31,11 @@ use tokio::sync::RwLock; pub mod bundle; pub mod config; pub mod files; -pub mod monitor; pub mod server; pub mod stats; lazy_static! { - static ref MONITOR: Monitor = Monitor::new(); + static ref MONITOR: ThreadMonitor = ThreadMonitor::new(); } #[derive(Snafu, Debug)] diff --git a/src/monitor.rs b/src/monitor.rs deleted file mode 100644 index d69aafc..0000000 --- a/src/monitor.rs +++ /dev/null @@ -1,227 +0,0 @@ -//! Thread Monitoring -//! -//! Utilities for monitoring when a thread has panicked. - -use snafu::Snafu; -use std::{ - collections::{HashMap, HashSet}, - panic, - sync::{Condvar, Mutex}, - thread::{self, Thread, ThreadId}, -}; - -#[derive(Debug, Snafu)] -pub enum Error { - WatchStateLock, - #[snafu(display("There should only be one active call to watch()."))] - MultipleWatches, - NoWatches, - Uninitialized, -} - -pub type Result<T, E = Error> = std::result::Result<T, E>; - -struct State { - panicked: HashMap<ThreadId, Thread>, - watched: Option<HashSet<ThreadId>>, - initialized: bool, -} - -/// A thread panic monitor. -pub struct Monitor { - condvar: Condvar, - state: Mutex<State>, -} - -impl Monitor { - /// Instantiates a new thread monitor. - pub fn new() -> Self { - Monitor { - condvar: Condvar::new(), - state: Mutex::new(State { - panicked: HashMap::new(), - watched: None, - initialized: false, - }), - } - } - - /// Initializes the thread monitor. - /// - /// This should be done before watching any threads. - /// - /// Internally, the monitor will set up a new panic hook, which will be used - /// for detecting panics on the threads being watched. - /// - /// The monitor will initially ignore all panics. Use [`set_watched`] or - /// [`watch`] to specify which threads to monitor. - /// - /// [`set_watched`]: #method.set_watched - /// [`watch`]: #method.watch - pub fn init(&'static self) -> Result<()> { - let hook = panic::take_hook(); - - panic::set_hook(Box::new(move |panic_info| { - match self.state.lock() { - Ok(mut state) => { - match &state.watched { - Some(watched) => { - let current_thread = thread::current(); - - // Only notify if the thread ID is being watched. - if watched.contains(¤t_thread.id()) { - state.panicked.insert(current_thread.id(), current_thread); - - self.condvar.notify_all(); - } - } - None => {} - } - } - Err(_) => error!("Unable to update map of panicked threads."), - } - - hook(panic_info) - })); - - let mut state = self.state.lock().map_err(|_| Error::WatchStateLock)?; - - state.initialized = true; - - Ok(()) - } - - /// Set the threads to be watched by this monitor. - /// - /// If [init] has been called, the monitor will begin recording panics for - /// the specified threads. - /// - /// [`init`]: #method.init - pub fn set_watched(&self, thread_ids: HashSet<ThreadId>) -> Result<()> { - let mut state = self.state.lock().map_err(|_| Error::WatchStateLock)?; - - state.watched = Some(thread_ids); - - Ok(()) - } - - /// Watches the provided thread IDs. - /// - /// [`init`] has to be called before this method. An [`Uninitialized`] error - /// will be returned if it's not. - /// - /// - If an empty set is passed, this function returns immediately. - /// - If a thread set is not passed and one hasn't been set with - /// [`set_watched`], a [`NoWatches`] error will be returned. - /// - If [`set_watched`] was previously called and one of the watched threads - /// already panicked, this function will return immediately. - /// - Otherwise, the monitor will block the current thread until one of the - /// watched threads has a panic. - /// - /// [`set_watched`]: #method.set_watched - /// [`NoWatches`]: ./enum.Error.html#variant.NoWatches - /// [`Uninitialized`]: ./enum.Error.html#variant.Uninitialized - pub fn watch(&self, thread_ids: Option<&HashSet<ThreadId>>) -> Result<Vec<Thread>> { - let mut state = self.state.lock().map_err(|_| Error::WatchStateLock)?; - - if !state.initialized { - return Err(Error::Uninitialized); - } - - let thread_ids = match thread_ids { - Some(thread_ids) => Ok(thread_ids.clone()), - None => match &(state.watched) { - Some(thread_ids) => Ok(thread_ids.clone()), - None => Err(Error::NoWatches), - }, - }?; - - if thread_ids.is_empty() { - return Ok(vec![]); - } - - state.panicked = HashMap::new(); - state.watched = Some(thread_ids); - - let mut watched_panicked = vec![]; - - loop { - // Since `state` may have changed, we need to reload the list of thread - // ids, otherwise we would be stuck checking for thread ids that may not - // be watched anymore. - let thread_ids = match &state.watched { - Some(thread_ids) => thread_ids, - None => return Err(Error::NoWatches), - }; - - if thread_ids.is_empty() { - return Ok(vec![]); - } - - for thread_id in thread_ids { - if let Some(thread) = state.panicked.get(thread_id) { - watched_panicked.push(thread.clone().clone()); - } - } - - if !watched_panicked.is_empty() { - return Ok(watched_panicked); - } - - state = self - .condvar - .wait(state) - .map_err(|_| Error::WatchStateLock)?; - } - } -} - -impl Default for Monitor { - fn default() -> Self { - Self::new() - } -} - -#[cfg(test)] -mod tests { - use super::Monitor; - use lazy_static::lazy_static; - use std::{collections::HashSet, sync::mpsc, thread, time::Duration}; - - lazy_static! { - static ref MONITOR: Monitor = Monitor::new(); - } - - #[test] - pub fn test_watch() { - MONITOR.init().unwrap(); - - MONITOR.watch(Some(&HashSet::new())).unwrap(); - - let (tx, rx) = mpsc::channel(); - - let handle = thread::spawn(move || { - rx.recv().unwrap(); - - panic!("Oh no"); - }); - - let mut thread_ids = HashSet::new(); - thread_ids.insert(handle.thread().id()); - - thread::sleep(Duration::from_millis(10)); - - let test_handle = thread::spawn(move || { - let watch_result = MONITOR.watch(Some(&thread_ids)).unwrap(); - - assert_eq!(watch_result.is_empty(), false); - assert_eq!(watch_result[0].id(), handle.thread().id()); - }); - - thread::sleep(Duration::from_millis(10)); - - tx.send(true).unwrap(); - - test_handle.join().unwrap(); - } -} -- GitLab