From 28d55aa64a27fa2e1607d9bee6707d10d64d05b3 Mon Sep 17 00:00:00 2001 From: Kenneth Allen Date: Sun, 28 Jan 2024 02:25:56 +1100 Subject: [PATCH] Refine ack batching --- src/ack.rs | 15 ++++++++++----- src/main.rs | 2 +- src/processor.rs | 2 +- 3 files changed, 12 insertions(+), 7 deletions(-) diff --git a/src/ack.rs b/src/ack.rs index 17f27bb..b4bc5e1 100644 --- a/src/ack.rs +++ b/src/ack.rs @@ -15,7 +15,7 @@ pub struct AckBatcher { acked: DeliveryTag, } -static ACK_BUFFER_LEN: usize = 1_000; +static ACK_BUFFER_LEN: usize = 100; impl AckBatcher { pub fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, AckResult)>, Self) { @@ -35,9 +35,9 @@ impl AckBatcher { let mut not_before = Instant::now(); let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN); + let mut time_good = true; loop { let mut data_good = false; - let mut time_good = false; while !(time_good && data_good) { select! { num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN) => { @@ -52,25 +52,30 @@ impl AckBatcher { } } - println!("ack status {:?}", self.pending); + //println!("ack status {:?}", self.pending); + let mut ack_count = 0; + let mut max_ack_size = 0; loop { + ack_count += 1; let (ack_range, res) = self.pending.peek_first().unwrap(); let ack_to = ack_range.end - 1; let res = *res; self.pending.drop_first(); - println!("ack sending {} {:?}", ack_to, res); + //println!("ack send {} {:?}", ack_to, res); //println!("rm.drop_first(); rm.check_invariants();"); match res { Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await, Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, }?; + max_ack_size = max_ack_size.max(ack_to - self.acked); self.acked = ack_to; if !self.ready() { break; } } - println!("ack sleep"); + println!("ack sleep {} {}", ack_count, max_ack_size); not_before = Instant::now() + self.max_wait; + time_good = false; } } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 0d3929c..7785d5c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -17,7 +17,7 @@ use tokio::{signal, time::sleep}; use crate::processor::Processor; static QUEUE_COUNT: usize = 10; -static SEND_COUNT: usize = 10; +static SEND_COUNT: usize = 100_000; #[tokio::main] async fn main() -> Result<()> { diff --git a/src/processor.rs b/src/processor.rs index 548b251..01df8c9 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -24,7 +24,7 @@ impl Processor { let mut join_set = JoinSet::new(); - let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_secs(1), chan.clone()); + let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_millis(100), chan.clone()); join_set.spawn(ack_batcher.run()); Ok(Self {