From b796a61c2a17eb9dfdd5df1571801b1edef2da14 Mon Sep 17 00:00:00 2001 From: Eduardo Trujillo <ed@chromabits.com> Date: Tue, 15 Nov 2022 13:14:58 -0800 Subject: [PATCH] refactor(cli): Improve separation of library and binary modules --- src/cli/args.rs | 53 +++++++++++++ src/cli/bundle.rs | 13 ++++ src/cli/mod.rs | 3 + src/cli/serve.rs | 126 ++++++++++++++++++++++++++++++ src/main.rs | 195 ++++------------------------------------------ 5 files changed, 210 insertions(+), 180 deletions(-) create mode 100644 src/cli/args.rs create mode 100644 src/cli/bundle.rs create mode 100644 src/cli/mod.rs create mode 100644 src/cli/serve.rs diff --git a/src/cli/args.rs b/src/cli/args.rs new file mode 100644 index 0000000..4d1dba9 --- /dev/null +++ b/src/cli/args.rs @@ -0,0 +1,53 @@ +use std::path::PathBuf; + +use clap::{Parser, Subcommand}; +use collective::{cli::{AppOpts, ConfigurableAppOpts}, config::ConfigFileFormat}; +use espresso::config::Config; + +#[derive(Parser)] +#[clap(version = "1.0", author = "Eduardo T. <ed@trujillo.io>")] +pub struct Opts { + /// Sets a custom config file. + #[clap(short, long)] + config: Option<PathBuf>, + /// A level of verbosity, and can be used multiple times + #[clap(short, long, parse(from_occurrences))] + verbose: i32, + #[clap(subcommand)] + pub subcmd: SubCommand, +} + +#[derive(Subcommand)] +pub enum SubCommand { + /// Start the server + Serve, + Bundle(BundleOpts), +} + +/// Create a bundle using the configured bundler +#[derive(Parser)] +pub struct BundleOpts { + /// The path to use a source for the bundle. + pub source_path: PathBuf, +} + +impl AppOpts for Opts { + fn get_log_level_filter(&self) -> Option<log::LevelFilter> { + match self.verbose { + 3 => Some(log::LevelFilter::Trace), + 2 => Some(log::LevelFilter::Debug), + 1 => Some(log::LevelFilter::Info), + _ => None, + } + } +} + +impl ConfigurableAppOpts<Config> for Opts { + fn get_additional_config_paths(&self) -> Vec<(PathBuf, Option<ConfigFileFormat>)> { + if let Some(config_path) = &self.config { + vec![(config_path.clone(), None)] + } else { + vec![] + } + } +} \ No newline at end of file diff --git a/src/cli/bundle.rs b/src/cli/bundle.rs new file mode 100644 index 0000000..e7773f1 --- /dev/null +++ b/src/cli/bundle.rs @@ -0,0 +1,13 @@ +use std::sync::Arc; + +use espresso::{config::Config, bundle::{Bundler, packager}}; + +use super::args::BundleOpts; + +pub async fn bundle(config: Arc<Config>, opts: BundleOpts) -> Result<(), packager::Error> { + let bundler = Bundler::new(config); + + bundler.package(opts.source_path).await.unwrap(); + + Ok(()) +} \ No newline at end of file diff --git a/src/cli/mod.rs b/src/cli/mod.rs new file mode 100644 index 0000000..f00b6dc --- /dev/null +++ b/src/cli/mod.rs @@ -0,0 +1,3 @@ +pub mod args; +pub mod bundle; +pub mod serve; \ No newline at end of file diff --git a/src/cli/serve.rs b/src/cli/serve.rs new file mode 100644 index 0000000..47ab754 --- /dev/null +++ b/src/cli/serve.rs @@ -0,0 +1,126 @@ +use std::{sync::{Arc, mpsc}, collections::HashSet}; +use actix::System; +use collective::thread::{monitor::{ThreadMonitor, self}, self}; +use espresso::{config::Config, bundle::{Unbundler, self}, server::{Server, self}, stats::{StatsServer, self}}; +use lazy_static::lazy_static; +use snafu::{Snafu, ResultExt}; +use tokio::sync::RwLock; + +lazy_static! { + static ref MONITOR: ThreadMonitor = ThreadMonitor::new(); +} + +#[derive(Snafu, Debug)] +pub enum Error { + Unbundle { + source: bundle::Error, + }, + ServeError { + source: Box<server::Error>, + }, + ServeStats { + source: stats::Error, + }, + MonitorError { + source: monitor::Error, + }, + RecvNotify, +} + +type Result<T, E = Error> = std::result::Result<T, E>; + +pub async fn serve(config: Arc<Config>) -> Result<()> { + console_subscriber::init(); + + // Set up a channel for receiving thread notifications. + let (monitor_tx, monitor_rx) = mpsc::channel(); + + // Keep track of what threads have been started. + let mut server_thread_ids = HashSet::new(); + + // Set up unbundler. + let serve_dir = Arc::new(RwLock::new(None)); + let unbundler = Arc::new(Unbundler::new(config.clone(), serve_dir.clone())); + + // Set up main server. + let server = Server::new(config.server.clone(), serve_dir); + + let (server_handle, server_thread_handle) = + server + .spawn(monitor_tx.clone()) + .await + .map_err(|err| Error::ServeError { + source: Box::new(err), + })?; + + server_thread_ids.insert(server_thread_handle.thread().id()); + + // Set up optional stats server. + let mut maybe_stats_server_handle = None; + + match &config.stats { + Some(stats_config) => { + let stats_server = StatsServer::new(stats_config.clone(), unbundler.clone()); + + let (stats_server_handle, stats_thread_handle) = stats_server + .spawn(monitor_tx.clone()) + .await + .context(ServeStats)?; + + maybe_stats_server_handle = Some(stats_server_handle); + server_thread_ids.insert(stats_thread_handle.thread().id()); + } + None => {} + } + + let unbundler_thread_handle = thread::handle::spawn(monitor_tx.clone(), move || { + let sys = System::new(); + + let result = sys + .block_on(async move { unbundler.enter().await }) + .context(Unbundle); + + if let Err(e) = result { + error!("Unbundler failed: {:?}", e); + } + }); + let unbundler_thread_id = unbundler_thread_handle.thread().id(); + + let monitor_thread_handle = thread::handle::spawn(monitor_tx.clone(), move || { + let mut watched_thread_ids = HashSet::new(); + + watched_thread_ids.insert(unbundler_thread_id); + + for server_thread_id in server_thread_ids { + watched_thread_ids.insert(server_thread_id); + } + + if MONITOR.watch(Some(&watched_thread_ids)).is_err() { + error!("Failed to watch threads for panics."); + } + }); + + // Wait for a thread to finish. + loop { + monitor_rx.recv().map_err(|_| Error::RecvNotify)?; + + if Ok(true) == monitor_thread_handle.get_end_handle().has_ended() { + info!("Stopping servers due to a panic."); + + break; + } else if Ok(true) == unbundler_thread_handle.get_end_handle().has_ended() { + info!("Stopping servers due to unbundler shutdown."); + + break; + } + } + + // Stop server threads. + server_handle.stop(true).await; + + if let Some(stats_server_handle) = maybe_stats_server_handle { + stats_server_handle.stop(true).await; + } + + Ok(()) +} diff --git a/src/main.rs b/src/main.rs index c4c87d5..b430f5a 100644 --- a/src/main.rs +++ b/src/main.rs @@ -6,33 +6,20 @@ #[macro_use] extern crate log; -use actix::prelude::*; -use clap::{Parser, Subcommand}; +use cli::{ + args::{Opts, SubCommand}, + bundle, serve, +}; use collective::{ - cli::{AppOpts, ConfigurableAppOpts}, - config::ConfigFileFormat, - thread::{ - self, - monitor::{self, ThreadMonitor}, - }, + cli::ConfigurableAppOpts, + thread::monitor::{self, ThreadMonitor}, }; +use espresso::{bundle::packager, config, stats}; use lazy_static::lazy_static; -use server::Server; use snafu::{ResultExt, Snafu}; -use stats::StatsServer; -use std::{ - collections::HashSet, - net::SocketAddr, - path::PathBuf, - sync::{mpsc, Arc}, -}; -use tokio::sync::RwLock; +use std::{net::SocketAddr, sync::Arc}; -pub mod bundle; -pub mod config; -pub mod files; -pub mod server; -pub mod stats; +pub mod cli; lazy_static! { static ref MONITOR: ThreadMonitor = ThreadMonitor::new(); @@ -47,11 +34,11 @@ pub enum Error { address: SocketAddr, source: std::io::Error, }, - Unbundle { - source: bundle::Error, - }, ServeError { - source: Box<server::Error>, + source: serve::Error, + }, + BundleError { + source: packager::Error, }, ServeStats { source: stats::Error, @@ -67,54 +54,6 @@ pub enum Error { type Result<T, E = Error> = std::result::Result<T, E>; -#[derive(Parser)] -#[clap(version = "1.0", author = "Eduardo T. <ed@trujillo.io>")] -struct Opts { - /// Sets a custom config file. - #[clap(short, long)] - config: Option<PathBuf>, - /// A level of verbosity, and can be used multiple times - #[clap(short, long, parse(from_occurrences))] - verbose: i32, - #[clap(subcommand)] - subcmd: SubCommand, -} - -#[derive(Subcommand)] -enum SubCommand { - /// Start the server - Serve, - Bundle(BundleOpts), -} - -/// Create a bundle using the configured bundler -#[derive(Parser)] -struct BundleOpts { - /// The path to use a source for the bundle. - source_path: PathBuf, -} - -impl AppOpts for Opts { - fn get_log_level_filter(&self) -> Option<log::LevelFilter> { - match self.verbose { - 3 => Some(log::LevelFilter::Trace), - 2 => Some(log::LevelFilter::Debug), - 1 => Some(log::LevelFilter::Info), - _ => None, - } - } -} - -impl ConfigurableAppOpts<config::Config> for Opts { - fn get_additional_config_paths(&self) -> Vec<(PathBuf, Option<ConfigFileFormat>)> { - if let Some(config_path) = &self.config { - vec![(config_path.clone(), None)] - } else { - vec![] - } - } -} - #[actix_rt::main] async fn main() -> Result<()> { let result = inner_main().await; @@ -137,111 +76,7 @@ async fn inner_main() -> Result<()> { let config = Arc::new(config); match opts.subcmd { - SubCommand::Serve => serve(config).await, - SubCommand::Bundle(opts) => bundle(config, opts).await, - } -} - -async fn bundle(config: Arc<config::Config>, opts: BundleOpts) -> Result<()> { - let bundler = bundle::Bundler::new(config); - - bundler.package(opts.source_path).await.unwrap(); - - Ok(()) -} - -async fn serve(config: Arc<config::Config>) -> Result<()> { - console_subscriber::init(); - - // Set up a channel for receiving thread notifications. - let (monitor_tx, monitor_rx) = mpsc::channel(); - - // Keep track of what threads have been started. - let mut server_thread_ids = HashSet::new(); - - // Set up unbundler. - let serve_dir = Arc::new(RwLock::new(None)); - let unbundler = Arc::new(bundle::Unbundler::new(config.clone(), serve_dir.clone())); - - // Set up main server. - let server = Server::new(config.server.clone(), serve_dir); - - let (server_handle, server_thread_handle) = - server - .spawn(monitor_tx.clone()) - .await - .map_err(|err| Error::ServeError { - source: Box::new(err), - })?; - - server_thread_ids.insert(server_thread_handle.thread().id()); - - // Set up optional stats server. - let mut maybe_stats_server_handle = None; - - match &config.stats { - Some(stats_config) => { - let stats_server = StatsServer::new(stats_config.clone(), unbundler.clone()); - - let (stats_server_handle, stats_thread_handle) = stats_server - .spawn(monitor_tx.clone()) - .await - .context(ServeStats)?; - - maybe_stats_server_handle = Some(stats_server_handle); - server_thread_ids.insert(stats_thread_handle.thread().id()); - } - None => {} + SubCommand::Serve => serve::serve(config).await.context(ServeError), + SubCommand::Bundle(opts) => bundle::bundle(config, opts).await.context(BundleError), } - - let unbundler_thread_handle = thread::handle::spawn(monitor_tx.clone(), move || { - let sys = System::new(); - - let result = sys - .block_on(async move { unbundler.enter().await }) - .context(Unbundle); - - if let Err(e) = result { - error!("Unbundler failed: {:?}", e); - } - }); - let unbundler_thread_id = unbundler_thread_handle.thread().id(); - - let monitor_thread_handle = thread::handle::spawn(monitor_tx.clone(), move || { - let mut watched_thread_ids = HashSet::new(); - - watched_thread_ids.insert(unbundler_thread_id); - - for server_thread_id in server_thread_ids { - watched_thread_ids.insert(server_thread_id); - } - - if MONITOR.watch(Some(&watched_thread_ids)).is_err() { - error!("Failed to watch threads for panics."); - } - }); - - // Wait for a thread to finish. - loop { - monitor_rx.recv().map_err(|_| Error::RecvNotify)?; - - if Ok(true) == monitor_thread_handle.get_end_handle().has_ended() { - info!("Stopping servers due to a panic."); - - break; - } else if Ok(true) == unbundler_thread_handle.get_end_handle().has_ended() { - info!("Stopping servers due to unbundler shutdown."); - - break; - } - } - - // Stop server threads. - server_handle.stop(true).await; - - if let Some(stats_server_handle) = maybe_stats_server_handle { - stats_server_handle.stop(true).await; - } - - Ok(()) } -- GitLab