Skip to content
Snippets Groups Projects
Commit 81ca1d92 authored by Robin Appelman's avatar Robin Appelman
Browse files

more advanced debouce logic


instead of ignoring any messages send within N secconds of the last
it now sends up to one message every N secconds while they are comming in
and will send a final message after N secconds if the last message was debounced

Signed-off-by: default avatarRobin Appelman <robin@icewind.nl>
parent 48945058
No related branches found
No related tags found
No related merge requests found
......@@ -10,17 +10,17 @@ use std::num::NonZeroUsize;
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::sync::broadcast::{channel, Receiver, Sender};
use tokio::sync::broadcast;
use tokio::time::timeout;
use warp::filters::ws::{Message, WebSocket};
const USER_CONNECTION_LIMIT: usize = 64;
#[derive(Default)]
pub struct ActiveConnections(DashMap<UserId, Sender<MessageType>, RandomState>);
pub struct ActiveConnections(DashMap<UserId, broadcast::Sender<MessageType>, RandomState>);
impl ActiveConnections {
pub async fn add(&self, user: UserId) -> Result<Receiver<MessageType>> {
pub async fn add(&self, user: UserId) -> Result<broadcast::Receiver<MessageType>> {
if let Some(sender) = self.0.get(&user) {
// stop a single user from trying to eat all the resources
if sender.receiver_count() > USER_CONNECTION_LIMIT {
......@@ -29,7 +29,7 @@ impl ActiveConnections {
Ok(sender.subscribe())
}
} else {
let (tx, rx) = channel(4);
let (tx, rx) = broadcast::channel(4);
self.0.insert(user, tx);
Ok(rx)
}
......@@ -95,10 +95,22 @@ pub async fn handle_user_socket(mut ws: WebSocket, app: Arc<App>, forwarded_for:
msg = timeout(Duration::from_secs(30), rx.recv()) => {
match msg {
Ok(Ok(msg)) => {
log::debug!(target: "notify_push::send", "Sending {} to {}", msg, user_id);
if debounce.should_send(&msg) {
log::debug!(target: "notify_push::send", "Sending {} to {}", msg, user_id);
METRICS.add_message();
user_ws_tx.send(msg.into()).await.ok();
} else {
log::debug!(target: "notify_push::send", "Debouncing {} to {}", msg, user_id);
}
}
Err(_timout) if debounce.has_held_message() => {
// if any message got held back for debounce, we try sending them now
for msg in debounce.get_held_messages() {
if debounce.should_send(&msg) {
log::debug!(target: "notify_push::send", "Sending debounced {} to {}", msg, user_id);
METRICS.add_message();
user_ws_tx.send(msg.into()).await.ok();
}
}
}
Err(_timout) => {
......
......@@ -40,6 +40,9 @@ pub struct DebounceMap {
file: Instant,
activity: Instant,
notification: Instant,
file_held: bool,
activity_held: bool,
notification_held: bool,
}
impl Default for DebounceMap {
......@@ -49,6 +52,9 @@ impl Default for DebounceMap {
file: past,
activity: past,
notification: past,
file_held: false,
activity_held: false,
notification_held: false,
}
}
}
......@@ -58,9 +64,13 @@ impl DebounceMap {
pub fn should_send(&mut self, ty: &MessageType) -> bool {
if DEBOUNCE_ENABLE.load(Ordering::Relaxed) {
let last_send = self.get_last_send(ty);
if Instant::now().duration_since(last_send) > Self::get_debounce_time(ty) {
if Instant::now().duration_since(last_send) > Self::debounce_time(ty) {
self.set_last_send(ty);
self.set_held(ty, false);
true
} else if Instant::now().duration_since(last_send) > Duration::from_millis(100) {
self.set_held(ty, true);
false
} else {
false
}
......@@ -69,6 +79,26 @@ impl DebounceMap {
}
}
pub fn has_held_message(&self) -> bool {
self.file_held || self.activity_held || self.notification_held
}
pub fn get_held_messages(&self) -> impl Iterator<Item = MessageType> {
self.file_held
.then(|| MessageType::File)
.into_iter()
.chain(
self.activity_held
.then(|| MessageType::Activity)
.into_iter(),
)
.chain(
self.notification_held
.then(|| MessageType::Notification)
.into_iter(),
)
}
fn get_last_send(&self, ty: &MessageType) -> Instant {
match ty {
MessageType::File => self.file,
......@@ -87,11 +117,20 @@ impl DebounceMap {
}
}
fn get_debounce_time(ty: &MessageType) -> Duration {
fn set_held(&mut self, ty: &MessageType, held: bool) {
match ty {
MessageType::File => self.file_held = held,
MessageType::Activity => self.activity_held = held,
MessageType::Notification => self.notification_held = held,
MessageType::Custom(..) => {} // no debouncing for custom messages
}
}
fn debounce_time(ty: &MessageType) -> Duration {
match ty {
MessageType::File => Duration::from_secs(5),
MessageType::Activity => Duration::from_secs(15),
MessageType::Notification => Duration::from_secs(1),
MessageType::File => Duration::from_secs(60),
MessageType::Activity => Duration::from_secs(120),
MessageType::Notification => Duration::from_secs(30),
MessageType::Custom(..) => Duration::from_millis(1), // no debouncing for custom messages
}
}
......
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment