From 1cbee7e47bd8baa1b919afe24c61f0ad6ab52919 Mon Sep 17 00:00:00 2001 From: Kenneth Allen Date: Sat, 27 Jan 2024 19:57:06 +1100 Subject: [PATCH] Wake the ack batcher up intelligently --- src/main.rs | 2 +- src/processor.rs | 97 +++++++++++++++++++++++++++++------------------- 2 files changed, 60 insertions(+), 39 deletions(-) diff --git a/src/main.rs b/src/main.rs index 12fa051..ab76217 100644 --- a/src/main.rs +++ b/src/main.rs @@ -88,7 +88,7 @@ async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> { async fn process(test: Test, i: usize) -> Result<()> { let (dur, fail) = { let mut rng = thread_rng(); - (Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.1)) + (Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.01)) }; sleep(dur).await; //if fail { println!("{} Fail {} {:?}", i, test.int, dur); } diff --git a/src/processor.rs b/src/processor.rs index a092e8c..1dd21dc 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -12,21 +12,24 @@ pub struct Processor { chan: Channel, cons_tags: Vec, join_set: JoinSet>, - ack_batcher: UnboundedSender<(DeliveryTag, Result<(), bool>)>, + ack_sink: UnboundedSender<(DeliveryTag, Result<(), bool>)>, } impl Processor { pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result { let chan = conn.create_channel().await?; chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?; + let mut join_set = JoinSet::new(); - let (ack_batcher, ack_recv) = unbounded_channel(); - join_set.spawn(AckBatcher { chan: chan.clone(), max_wait: Duration::from_secs(1), recv: ack_recv }.run()); + + let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_secs(1), chan.clone()); + join_set.spawn(ack_batcher.run()); + Ok(Self { conn, chan, cons_tags: vec![], - ack_batcher, + ack_sink, join_set, }) } @@ -45,7 +48,7 @@ impl Processor { ).await?; self.cons_tags.push(consumer.tag()); - let ack_batcher = self.ack_batcher.clone(); + let ack_batcher = self.ack_sink.clone(); self.join_set.spawn(async move { let tracker = TaskTracker::new(); while let Some(delivery) = consumer.try_next().await? { @@ -67,7 +70,7 @@ impl Processor { } pub async fn shutdown(self) -> lapin::Result<()> { - let Self { cons_tags, conn, chan, mut join_set, ack_batcher } = self; + let Self { cons_tags, conn, chan, mut join_set, ack_sink: ack_batcher } = self; for cons_tag in cons_tags { let chan = chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially @@ -88,48 +91,66 @@ struct AckBatcher { max_wait: Duration, chan: Channel, recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>, + pending: BTreeMap>, + acked: DeliveryTag, } impl AckBatcher { + fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, Result<(), bool>)>, Self) { + let (send, recv) = unbounded_channel(); + (send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 }) + } + + fn enqueue(&mut self, tag: DeliveryTag, res: Result<(), bool>) { + let inserted = self.pending.insert(tag, res); + debug_assert_eq!(inserted, None); + } + + fn ready(&self) -> bool { + self.pending.keys().next() == Some(&(self.acked + 1)) + } + async fn run(mut self) -> lapin::Result<()> { - let mut next_ack = Instant::now() + self.max_wait; - let mut pending = BTreeMap::new(); - let mut acked = 0; + let mut not_before = Instant::now(); loop { - select! { - next = self.recv.recv() => { - match next { - Some((tag, result)) => { - let inserted = pending.insert(tag, result); - debug_assert_eq!(inserted, None); - }, - //None => break Ok(()), - None => { println!("ack done"); break Ok(()) }, - } - } - _ = sleep_until(next_ack) => { - println!("ack status {:?}", pending.len()); - loop { - if pending.keys().next() != Some(&(acked + 1)) { break; } - let (mut ack_to, res) = pending.pop_first().unwrap(); - while pending.first_key_value() == Some((&(ack_to + 1), &res)) { - pending.pop_first(); - ack_to += 1; + let mut data_good = false; + let mut time_good = false; + while !(time_good && data_good) { + select! { + next = self.recv.recv() => { + match next { + Some((tag, result)) => { + self.enqueue(tag, result); + data_good = data_good || tag == self.acked + 1; + }, + //None => return Ok(()), + None => { println!("ack done"); return Ok(()) }, } - - println!("ack sending {} {:?}", ack_to, res); - match res { - Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await, - Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, - }?; - - acked = ack_to; } - println!("ack sleep"); - next_ack += self.max_wait; + _ = sleep_until(not_before), if !time_good => time_good = true } } + + println!("ack status {:?}", self.pending.len()); + loop { + let (mut ack_to, res) = self.pending.pop_first().unwrap(); + while self.pending.first_key_value() == Some((&(ack_to + 1), &res)) { + self.pending.pop_first(); + ack_to += 1; + } + + println!("ack sending {} {:?}", ack_to, res); + match res { + Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await, + Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, + }?; + + self.acked = ack_to; + if !self.ready() { break; } + } + println!("ack sleep"); + not_before = Instant::now() + self.max_wait; } } }