From 81ca1d927c7e21fc2ca6618796df3131f5db377c Mon Sep 17 00:00:00 2001 From: Robin Appelman <robin@icewind.nl> Date: Fri, 29 Oct 2021 16:46:12 +0200 Subject: [PATCH] more advanced debouce logic instead of ignoring any messages send within N secconds of the last it now sends up to one message every N secconds while they are comming in and will send a final message after N secconds if the last message was debounced Signed-off-by: Robin Appelman <robin@icewind.nl> --- src/connection.rs | 22 ++++++++++++++++----- src/message.rs | 49 ++++++++++++++++++++++++++++++++++++++++++----- 2 files changed, 61 insertions(+), 10 deletions(-) diff --git a/src/connection.rs b/src/connection.rs index 6d6a2cb..64c8133 100644 --- a/src/connection.rs +++ b/src/connection.rs @@ -10,17 +10,17 @@ use std::num::NonZeroUsize; use std::sync::atomic::{AtomicUsize, Ordering}; use std::sync::Arc; use std::time::{Duration, Instant}; -use tokio::sync::broadcast::{channel, Receiver, Sender}; +use tokio::sync::broadcast; use tokio::time::timeout; use warp::filters::ws::{Message, WebSocket}; const USER_CONNECTION_LIMIT: usize = 64; #[derive(Default)] -pub struct ActiveConnections(DashMap<UserId, Sender<MessageType>, RandomState>); +pub struct ActiveConnections(DashMap<UserId, broadcast::Sender<MessageType>, RandomState>); impl ActiveConnections { - pub async fn add(&self, user: UserId) -> Result<Receiver<MessageType>> { + pub async fn add(&self, user: UserId) -> Result<broadcast::Receiver<MessageType>> { if let Some(sender) = self.0.get(&user) { // stop a single user from trying to eat all the resources if sender.receiver_count() > USER_CONNECTION_LIMIT { @@ -29,7 +29,7 @@ impl ActiveConnections { Ok(sender.subscribe()) } } else { - let (tx, rx) = channel(4); + let (tx, rx) = broadcast::channel(4); self.0.insert(user, tx); Ok(rx) } @@ -95,10 +95,22 @@ pub async fn handle_user_socket(mut ws: WebSocket, app: Arc<App>, forwarded_for: msg = timeout(Duration::from_secs(30), rx.recv()) => { match msg { Ok(Ok(msg)) => { - log::debug!(target: "notify_push::send", "Sending {} to {}", msg, user_id); if debounce.should_send(&msg) { + log::debug!(target: "notify_push::send", "Sending {} to {}", msg, user_id); METRICS.add_message(); user_ws_tx.send(msg.into()).await.ok(); + } else { + log::debug!(target: "notify_push::send", "Debouncing {} to {}", msg, user_id); + } + } + Err(_timout) if debounce.has_held_message() => { + // if any message got held back for debounce, we try sending them now + for msg in debounce.get_held_messages() { + if debounce.should_send(&msg) { + log::debug!(target: "notify_push::send", "Sending debounced {} to {}", msg, user_id); + METRICS.add_message(); + user_ws_tx.send(msg.into()).await.ok(); + } } } Err(_timout) => { diff --git a/src/message.rs b/src/message.rs index 641fef8..f4c4aec 100644 --- a/src/message.rs +++ b/src/message.rs @@ -40,6 +40,9 @@ pub struct DebounceMap { file: Instant, activity: Instant, notification: Instant, + file_held: bool, + activity_held: bool, + notification_held: bool, } impl Default for DebounceMap { @@ -49,6 +52,9 @@ impl Default for DebounceMap { file: past, activity: past, notification: past, + file_held: false, + activity_held: false, + notification_held: false, } } } @@ -58,9 +64,13 @@ impl DebounceMap { pub fn should_send(&mut self, ty: &MessageType) -> bool { if DEBOUNCE_ENABLE.load(Ordering::Relaxed) { let last_send = self.get_last_send(ty); - if Instant::now().duration_since(last_send) > Self::get_debounce_time(ty) { + if Instant::now().duration_since(last_send) > Self::debounce_time(ty) { self.set_last_send(ty); + self.set_held(ty, false); true + } else if Instant::now().duration_since(last_send) > Duration::from_millis(100) { + self.set_held(ty, true); + false } else { false } @@ -69,6 +79,26 @@ impl DebounceMap { } } + pub fn has_held_message(&self) -> bool { + self.file_held || self.activity_held || self.notification_held + } + + pub fn get_held_messages(&self) -> impl Iterator<Item = MessageType> { + self.file_held + .then(|| MessageType::File) + .into_iter() + .chain( + self.activity_held + .then(|| MessageType::Activity) + .into_iter(), + ) + .chain( + self.notification_held + .then(|| MessageType::Notification) + .into_iter(), + ) + } + fn get_last_send(&self, ty: &MessageType) -> Instant { match ty { MessageType::File => self.file, @@ -87,11 +117,20 @@ impl DebounceMap { } } - fn get_debounce_time(ty: &MessageType) -> Duration { + fn set_held(&mut self, ty: &MessageType, held: bool) { + match ty { + MessageType::File => self.file_held = held, + MessageType::Activity => self.activity_held = held, + MessageType::Notification => self.notification_held = held, + MessageType::Custom(..) => {} // no debouncing for custom messages + } + } + + fn debounce_time(ty: &MessageType) -> Duration { match ty { - MessageType::File => Duration::from_secs(5), - MessageType::Activity => Duration::from_secs(15), - MessageType::Notification => Duration::from_secs(1), + MessageType::File => Duration::from_secs(60), + MessageType::Activity => Duration::from_secs(120), + MessageType::Notification => Duration::from_secs(30), MessageType::Custom(..) => Duration::from_millis(1), // no debouncing for custom messages } } -- GitLab