diff --git a/src/ack.rs b/src/ack.rs new file mode 100644 index 0000000..17f27bb --- /dev/null +++ b/src/ack.rs @@ -0,0 +1,76 @@ +use std::time::Duration; + +use lapin::{options::{BasicAckOptions, BasicNackOptions}, types::DeliveryTag, Channel}; +use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, time::{sleep_until, Instant}}; + +use crate::range_map::RangeMap; + +pub type AckResult = Result<(), bool>; + +pub struct AckBatcher { + max_wait: Duration, + chan: Channel, + recv: UnboundedReceiver<(DeliveryTag, AckResult)>, + pending: RangeMap, + acked: DeliveryTag, +} + +static ACK_BUFFER_LEN: usize = 1_000; + +impl AckBatcher { + pub fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, AckResult)>, Self) { + let (send, recv) = unbounded_channel(); + (send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 }) + } + + fn enqueue(&mut self, tag: DeliveryTag, res: AckResult) { + self.pending.insert(tag..tag+1, res); + } + + fn ready(&self) -> bool { + self.pending.peek_first().is_some_and(|(range, _)| range.start == &(self.acked + 1)) + } + + pub async fn run(mut self) -> lapin::Result<()> { + let mut not_before = Instant::now(); + let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN); + + loop { + let mut data_good = false; + let mut time_good = false; + while !(time_good && data_good) { + select! { + num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN) => { + if num_received == 0 { println!("ack done"); return Ok(()); } + for (tag, result) in buffer.drain(..) { + //println!("rm.insert({}..{}+1, {:?}); rm.check_invariants();", tag, tag, &result); + self.enqueue(tag, result); + data_good = data_good || tag == self.acked + 1; + } + } + _ = sleep_until(not_before), if !time_good => time_good = true + } + } + + println!("ack status {:?}", self.pending); + loop { + let (ack_range, res) = self.pending.peek_first().unwrap(); + let ack_to = ack_range.end - 1; + let res = *res; + self.pending.drop_first(); + + println!("ack sending {} {:?}", ack_to, res); + //println!("rm.drop_first(); rm.check_invariants();"); + match res { + Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await, + Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, + }?; + + self.acked = ack_to; + if !self.ready() { break; } + } + println!("ack sleep"); + not_before = Instant::now() + self.max_wait; + } + } +} \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index ab76217..0d3929c 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,6 +1,10 @@ #![feature(async_closure)] +#![feature(btree_cursors)] +#![feature(let_chains)] +mod ack; mod processor; +pub mod range_map; use std::time::Duration; @@ -13,6 +17,7 @@ use tokio::{signal, time::sleep}; use crate::processor::Processor; static QUEUE_COUNT: usize = 10; +static SEND_COUNT: usize = 10; #[tokio::main] async fn main() -> Result<()> { @@ -68,7 +73,7 @@ async fn broker_connect() -> lapin::Result { struct Test { int: usize } async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> { - for int in 0..50_000 { + for int in 0..SEND_COUNT { if int % 1000 == 0 { println!("Sending {}", int) }; let payload = Test { int }; let confirm = chan.basic_publish( @@ -85,12 +90,12 @@ async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> { Ok(()) } -async fn process(test: Test, i: usize) -> Result<()> { +async fn process(_test: Test, _i: usize) -> Result<()> { let (dur, fail) = { let mut rng = thread_rng(); (Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.01)) }; sleep(dur).await; - //if fail { println!("{} Fail {} {:?}", i, test.int, dur); } + //if fail { println!("{} Fail {} {:?}", _i, _test.int, dur); } if fail { Err(anyhow!("oops")) } else { Ok(()) } } diff --git a/src/processor.rs b/src/processor.rs index 1dd21dc..548b251 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,18 +1,20 @@ -use std::{collections::BTreeMap, future::Future, time::Duration}; +use std::{future::Future, time::Duration}; use anyhow::Result; use futures_lite::prelude::*; -use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection}; +use lapin::{options::{BasicCancelOptions, BasicConsumeOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection}; use serde::de::DeserializeOwned; -use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinSet, time::{sleep_until, Instant}}; +use tokio::{sync::mpsc::UnboundedSender, task::JoinSet}; use tokio_util::task::TaskTracker; +use crate::ack::{AckBatcher, AckResult}; + pub struct Processor { conn: Connection, chan: Channel, cons_tags: Vec, join_set: JoinSet>, - ack_sink: UnboundedSender<(DeliveryTag, Result<(), bool>)>, + ack_sink: UnboundedSender<(DeliveryTag, AckResult)>, } impl Processor { @@ -86,71 +88,3 @@ impl Processor { conn.close(0, "").await } } - -struct AckBatcher { - max_wait: Duration, - chan: Channel, - recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>, - pending: BTreeMap>, - acked: DeliveryTag, -} - -impl AckBatcher { - fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, Result<(), bool>)>, Self) { - let (send, recv) = unbounded_channel(); - (send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 }) - } - - fn enqueue(&mut self, tag: DeliveryTag, res: Result<(), bool>) { - let inserted = self.pending.insert(tag, res); - debug_assert_eq!(inserted, None); - } - - fn ready(&self) -> bool { - self.pending.keys().next() == Some(&(self.acked + 1)) - } - - async fn run(mut self) -> lapin::Result<()> { - let mut not_before = Instant::now(); - - loop { - let mut data_good = false; - let mut time_good = false; - while !(time_good && data_good) { - select! { - next = self.recv.recv() => { - match next { - Some((tag, result)) => { - self.enqueue(tag, result); - data_good = data_good || tag == self.acked + 1; - }, - //None => return Ok(()), - None => { println!("ack done"); return Ok(()) }, - } - } - _ = sleep_until(not_before), if !time_good => time_good = true - } - } - - println!("ack status {:?}", self.pending.len()); - loop { - let (mut ack_to, res) = self.pending.pop_first().unwrap(); - while self.pending.first_key_value() == Some((&(ack_to + 1), &res)) { - self.pending.pop_first(); - ack_to += 1; - } - - println!("ack sending {} {:?}", ack_to, res); - match res { - Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await, - Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, - }?; - - self.acked = ack_to; - if !self.ready() { break; } - } - println!("ack sleep"); - not_before = Instant::now() + self.max_wait; - } - } -} diff --git a/src/range_map.rs b/src/range_map.rs new file mode 100644 index 0000000..ce5ad33 --- /dev/null +++ b/src/range_map.rs @@ -0,0 +1,247 @@ +use std::{cmp::Ordering, collections::BTreeMap, fmt::Debug, ops::{Bound, Range}}; + +#[derive(Clone, Debug)] +pub struct RangeMap { + ranges: BTreeMap>, +} + +impl RangeMap { + pub fn insert(&mut self, range: Range, val: V) { + if range.is_empty() { return; } + let Range { start, end } = range; + + let mut curs = self.ranges.upper_bound_mut(Bound::Included(&start)); + /* + print!("{:?} ", curs.peek_prev()); + print!("{:?} ", curs.key_value()); + println!("{:?}", curs.peek_next()); + */ + if let Some((key, old_val)) = curs.key_value_mut() && key == &start { + *old_val = Some(val); + } else { + curs.insert_after(start, Some(val)); + curs.move_next(); + } + while curs.as_cursor().peek_prev().map(|(_, v)| v) == curs.value() { + curs.remove_current_and_move_back(); + } + curs.move_next(); + + /* + print!("{:?} ", curs.peek_prev()); + print!("{:?} ", curs.key_value()); + println!("{:?}", curs.peek_next()); + */ + + let mut next_val = None; + loop { + match curs.key().map(|key| key.cmp(&end)) { + Some(Ordering::Less) => { // Delete entry but remember value as potential next value + next_val = curs.remove_current().unwrap().1; + }, + Some(Ordering::Equal) => break, // Already correct + _ => { // Next entry is missing or beyond end + curs.insert_before(end, next_val); + break; + }, + } + } + + while curs.as_cursor().peek_prev().map(|(_, v)| v) == curs.value() { + curs.remove_current(); + } + } + + pub fn drop_first(&mut self) { + if self.ranges.pop_first().is_some() { // Remove first + if self.ranges.iter().next().is_some_and(|(_, val)| val.is_none()) { + // Potentially clean up a single empty range after + self.ranges.pop_first(); + } + } + } + + #[cfg(test)] + fn check_invariants(&self) where V: Debug { + use std::iter::zip; + + if let Some((_, val)) = self.ranges.first_key_value() { + assert!(val.is_some()); + assert!(self.ranges.len() >= 2); + + let (_, last_val) = self.ranges.last_key_value().unwrap(); + assert!(last_val.is_none()); + } + + for (a, b) in zip(self.ranges.values(), self.ranges.values().skip(1)) { + assert_ne!(a, b); + } + } +} + +impl RangeMap { + pub fn peek_first(&self) -> Option<(Range<&K>, &V)> { + let mut iter = self.ranges.iter(); + let (start, val) = iter.next()?; + let (end, _) = iter.next().unwrap(); + Some((start..end, val.as_ref().unwrap())) + } + + pub fn clear(&mut self) { self.ranges.clear(); } + + pub fn is_empty(&self) -> bool { self.ranges.is_empty() } +} + +impl Default for RangeMap { + fn default() -> Self { + Self { ranges: Default::default() } + } +} + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn basic() { + let mut rm = RangeMap::default(); + + for (i, (insert, expected)) in [ + (None, None ), + (Some((5..10, 50)), Some((&5..&10, &50))), + (None, None ), + (Some((5..10, 50)), Some((&5..&10, &50))), + (Some((1.. 5, 10)), Some((&1..& 5, &10))), + (None , Some((&5..&10, &50))), + (Some((1.. 3, 10)), Some((&1..& 3, &10))), + (None , Some((&5..&10, &50))), + (Some((1.. 7, 10)), Some((&1..& 7, &10))), + (None , Some((&7..&10, &50))), + (Some((1..10, 10)), Some((&1..&10, &10))), + (None , None ), + (Some((5..10, 50)), Some((&5..&10, &50))), + (Some((1..12, 10)), Some((&1..&12, &10))), + (None , None ), + (Some((5..10, 50)), Some((&5..&10, &50))), + ].into_iter().enumerate() { + match insert { + Some((k, v)) => rm.insert(k, v), + None => rm.drop_first(), + } + rm.check_invariants(); + assert_eq!(rm.peek_first(), expected, "Failed at operation {}", i); + } + } + + #[test] + fn generated() { + let mut rm = RangeMap::>::default(); + rm.insert(59..59+1, Ok(())); rm.check_invariants(); + rm.insert(80..80+1, Ok(())); rm.check_invariants(); + rm.insert(100..100+1, Ok(())); rm.check_invariants(); + rm.insert(86..86+1, Ok(())); rm.check_invariants(); + rm.insert(98..98+1, Ok(())); rm.check_invariants(); + rm.insert(70..70+1, Ok(())); rm.check_invariants(); + rm.insert(39..39+1, Ok(())); rm.check_invariants(); + rm.insert(35..35+1, Ok(())); rm.check_invariants(); + rm.insert(89..89+1, Ok(())); rm.check_invariants(); + rm.insert(94..94+1, Ok(())); rm.check_invariants(); + rm.insert(77..77+1, Ok(())); rm.check_invariants(); + rm.insert(82..82+1, Ok(())); rm.check_invariants(); + rm.insert(6..6+1, Ok(())); rm.check_invariants(); + rm.insert(15..15+1, Ok(())); rm.check_invariants(); + rm.insert(5..5+1, Ok(())); rm.check_invariants(); + rm.insert(8..8+1, Ok(())); rm.check_invariants(); + rm.insert(10..10+1, Ok(())); rm.check_invariants(); + rm.insert(17..17+1, Ok(())); rm.check_invariants(); + rm.insert(25..25+1, Ok(())); rm.check_invariants(); + rm.insert(44..44+1, Ok(())); rm.check_invariants(); + rm.insert(43..43+1, Ok(())); rm.check_invariants(); + rm.insert(55..55+1, Ok(())); rm.check_invariants(); + rm.insert(63..63+1, Ok(())); rm.check_invariants(); + rm.insert(24..24+1, Ok(())); rm.check_invariants(); + rm.insert(36..36+1, Ok(())); rm.check_invariants(); + rm.insert(51..51+1, Ok(())); rm.check_invariants(); + rm.insert(11..11+1, Ok(())); rm.check_invariants(); + rm.insert(1..1+1, Ok(())); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + rm.insert(71..71+1, Ok(())); rm.check_invariants(); + rm.insert(75..75+1, Ok(())); rm.check_invariants(); + rm.insert(74..74+1, Ok(())); rm.check_invariants(); + rm.insert(26..26+1, Ok(())); rm.check_invariants(); + rm.insert(97..97+1, Ok(())); rm.check_invariants(); + rm.insert(66..66+1, Ok(())); rm.check_invariants(); + rm.insert(49..49+1, Ok(())); rm.check_invariants(); + rm.insert(69..69+1, Ok(())); rm.check_invariants(); + rm.insert(64..64+1, Ok(())); rm.check_invariants(); + rm.insert(38..38+1, Ok(())); rm.check_invariants(); + rm.insert(48..48+1, Ok(())); rm.check_invariants(); + rm.insert(62..62+1, Ok(())); rm.check_invariants(); + rm.insert(46..46+1, Ok(())); rm.check_invariants(); + rm.insert(57..57+1, Ok(())); rm.check_invariants(); + rm.insert(13..13+1, Ok(())); rm.check_invariants(); + rm.insert(30..30+1, Ok(())); rm.check_invariants(); + rm.insert(99..99+1, Ok(())); rm.check_invariants(); + rm.insert(18..18+1, Ok(())); rm.check_invariants(); + rm.insert(40..40+1, Ok(())); rm.check_invariants(); + rm.insert(84..84+1, Ok(())); rm.check_invariants(); + rm.insert(4..4+1, Ok(())); rm.check_invariants(); + rm.insert(9..9+1, Ok(())); rm.check_invariants(); + rm.insert(14..14+1, Ok(())); rm.check_invariants(); + rm.insert(16..16+1, Ok(())); rm.check_invariants(); + rm.insert(12..12+1, Ok(())); rm.check_invariants(); + rm.insert(53..53+1, Ok(())); rm.check_invariants(); + rm.insert(21..21+1, Ok(())); rm.check_invariants(); + rm.insert(54..54+1, Ok(())); rm.check_invariants(); + rm.insert(28..28+1, Ok(())); rm.check_invariants(); + rm.insert(68..68+1, Ok(())); rm.check_invariants(); + rm.insert(95..95+1, Ok(())); rm.check_invariants(); + rm.insert(90..90+1, Ok(())); rm.check_invariants(); + rm.insert(73..73+1, Ok(())); rm.check_invariants(); + rm.insert(83..83+1, Ok(())); rm.check_invariants(); + rm.insert(32..32+1, Ok(())); rm.check_invariants(); + rm.insert(27..27+1, Ok(())); rm.check_invariants(); + rm.insert(34..34+1, Ok(())); rm.check_invariants(); + rm.insert(61..61+1, Ok(())); rm.check_invariants(); + rm.insert(2..2+1, Ok(())); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + rm.insert(31..31+1, Ok(())); rm.check_invariants(); + rm.insert(20..20+1, Ok(())); rm.check_invariants(); + rm.insert(47..47+1, Ok(())); rm.check_invariants(); + rm.insert(52..52+1, Ok(())); rm.check_invariants(); + rm.insert(87..87+1, Ok(())); rm.check_invariants(); + rm.insert(37..37+1, Ok(())); rm.check_invariants(); + rm.insert(22..22+1, Ok(())); rm.check_invariants(); + rm.insert(42..42+1, Ok(())); rm.check_invariants(); + rm.insert(33..33+1, Ok(())); rm.check_invariants(); + rm.insert(23..23+1, Ok(())); rm.check_invariants(); + rm.insert(60..60+1, Ok(())); rm.check_invariants(); + rm.insert(65..65+1, Ok(())); rm.check_invariants(); + rm.insert(19..19+1, Ok(())); rm.check_invariants(); + rm.insert(3..3+1, Ok(())); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + rm.insert(85..85+1, Ok(())); rm.check_invariants(); + rm.insert(67..67+1, Ok(())); rm.check_invariants(); + rm.insert(76..76+1, Ok(())); rm.check_invariants(); + rm.insert(72..72+1, Ok(())); rm.check_invariants(); + rm.insert(78..78+1, Ok(())); rm.check_invariants(); + rm.insert(50..50+1, Ok(())); rm.check_invariants(); + rm.insert(96..96+1, Ok(())); rm.check_invariants(); + rm.insert(45..45+1, Ok(())); rm.check_invariants(); + rm.insert(7..7+1, Ok(())); rm.check_invariants(); + rm.insert(93..93+1, Ok(())); rm.check_invariants(); + rm.insert(92..92+1, Ok(())); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + rm.insert(56..56+1, Ok(())); rm.check_invariants(); + rm.insert(88..88+1, Ok(())); rm.check_invariants(); + rm.insert(29..29+1, Ok(())); rm.check_invariants(); + rm.insert(58..58+1, Ok(())); rm.check_invariants(); + rm.insert(91..91+1, Ok(())); rm.check_invariants(); + rm.insert(41..41+1, Ok(())); rm.check_invariants(); + rm.insert(81..81+1, Ok(())); rm.check_invariants(); + rm.insert(79..79+1, Ok(())); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + rm.drop_first(); rm.check_invariants(); + } +}