Compare commits

..

3 Commits

Author SHA1 Message Date
Kenneth Allen
f0c76b9af2 Try removing delay between acks 2024-01-28 18:29:54 +11:00
Kenneth Allen
28d55aa64a Refine ack batching 2024-01-28 02:25:56 +11:00
Kenneth Allen
9bcdb99a53 Store pending acks in range map 2024-01-28 02:09:56 +11:00
4 changed files with 343 additions and 81 deletions

76
src/ack.rs Normal file
View File

@ -0,0 +1,76 @@
use std::time::Duration;
use lapin::{options::{BasicAckOptions, BasicNackOptions}, types::DeliveryTag, Channel};
use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, time::{sleep_until, Instant}};
use crate::range_map::RangeMap;
pub type AckResult = Result<(), bool>;
pub struct AckBatcher {
max_wait: Duration,
chan: Channel,
recv: UnboundedReceiver<(DeliveryTag, AckResult)>,
pending: RangeMap<DeliveryTag, AckResult>,
acked: DeliveryTag,
}
static ACK_BUFFER_LEN: usize = 100;
impl AckBatcher {
pub fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, AckResult)>, Self) {
let (send, recv) = unbounded_channel();
(send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 })
}
fn enqueue(&mut self, tag: DeliveryTag, res: AckResult) {
self.pending.insert(tag..tag+1, res);
}
fn ready(&self) -> bool {
self.pending.peek_first().is_some_and(|(range, _)| range.start == &(self.acked + 1))
}
pub async fn run(mut self) -> lapin::Result<()> {
let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN);
let mut ack_sizes = vec![];
loop {
let mut data_good = false;
while !data_good {
let num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN).await;
if num_received == 0 {
println!("ack done {} {} {}", ack_sizes.iter().sum::<u64>(), ack_sizes.len(), (ack_sizes.iter().sum::<u64>() as f64) / (ack_sizes.len() as f64));
return Ok(());
}
for (tag, result) in buffer.drain(..) {
//println!("rm.insert({}..{}+1, {:?}); rm.check_invariants();", tag, tag, &result);
self.enqueue(tag, result);
data_good = data_good || tag == self.acked + 1;
}
}
//println!("ack status {:?}", self.pending);
let mut ack_count = 0;
loop {
ack_count += 1;
let (ack_range, res) = self.pending.peek_first().unwrap();
let ack_to = ack_range.end - 1;
let res = *res;
self.pending.drop_first();
//println!("ack send {} {:?}", ack_to, res);
//println!("rm.drop_first(); rm.check_invariants();");
match res {
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
}?;
ack_sizes.push(ack_to - self.acked);
self.acked = ack_to;
if !self.ready() { break; }
}
println!("ack sleep {}", ack_count);
}
}
}

View File

@ -1,6 +1,10 @@
#![feature(async_closure)]
#![feature(btree_cursors)]
#![feature(let_chains)]
mod ack;
mod processor;
pub mod range_map;
use std::time::Duration;
@ -13,6 +17,7 @@ use tokio::{signal, time::sleep};
use crate::processor::Processor;
static QUEUE_COUNT: usize = 10;
static SEND_COUNT: usize = 100_000;
#[tokio::main]
async fn main() -> Result<()> {
@ -38,18 +43,18 @@ async fn main() -> Result<()> {
}).await?;
tokio::try_join!(
async {
let mut proc = Processor::new(broker_connect().await?, 10_000).await?;
tokio::spawn(async {
let mut proc = Processor::new(broker_connect().await?, 65_535).await?;
for i in 0..QUEUE_COUNT {
proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?;
}
quit_signal.await.unwrap();
proc.shutdown().await
},
async {
}),
tokio::spawn(async {
let chan = conn.create_channel().await?;
sender(conn, chan).await
},
}),
)?;
Ok(())
@ -68,7 +73,7 @@ async fn broker_connect() -> lapin::Result<Connection> {
struct Test { int: usize }
async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
for int in 0..50_000 {
for int in 0..SEND_COUNT {
if int % 1000 == 0 { println!("Sending {}", int) };
let payload = Test { int };
let confirm = chan.basic_publish(
@ -85,12 +90,12 @@ async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
Ok(())
}
async fn process(test: Test, i: usize) -> Result<()> {
async fn process(_test: Test, _i: usize) -> Result<()> {
let (dur, fail) = {
let mut rng = thread_rng();
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.01))
};
sleep(dur).await;
//if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
//if fail { println!("{} Fail {} {:?}", _i, _test.int, dur); }
if fail { Err(anyhow!("oops")) } else { Ok(()) }
}

View File

@ -1,18 +1,20 @@
use std::{collections::BTreeMap, future::Future, time::Duration};
use std::{future::Future, time::Duration};
use anyhow::Result;
use futures_lite::prelude::*;
use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection};
use lapin::{options::{BasicCancelOptions, BasicConsumeOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection};
use serde::de::DeserializeOwned;
use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinSet, time::{sleep_until, Instant}};
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
use tokio_util::task::TaskTracker;
use crate::ack::{AckBatcher, AckResult};
pub struct Processor {
conn: Connection,
chan: Channel,
cons_tags: Vec<ShortString>,
join_set: JoinSet<lapin::Result<()>>,
ack_sink: UnboundedSender<(DeliveryTag, Result<(), bool>)>,
ack_sink: UnboundedSender<(DeliveryTag, AckResult)>,
}
impl Processor {
@ -22,7 +24,7 @@ impl Processor {
let mut join_set = JoinSet::new();
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_secs(1), chan.clone());
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_millis(100), chan.clone());
join_set.spawn(ack_batcher.run());
Ok(Self {
@ -86,71 +88,3 @@ impl Processor {
conn.close(0, "").await
}
}
struct AckBatcher {
max_wait: Duration,
chan: Channel,
recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>,
pending: BTreeMap<DeliveryTag, Result<(), bool>>,
acked: DeliveryTag,
}
impl AckBatcher {
fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, Result<(), bool>)>, Self) {
let (send, recv) = unbounded_channel();
(send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 })
}
fn enqueue(&mut self, tag: DeliveryTag, res: Result<(), bool>) {
let inserted = self.pending.insert(tag, res);
debug_assert_eq!(inserted, None);
}
fn ready(&self) -> bool {
self.pending.keys().next() == Some(&(self.acked + 1))
}
async fn run(mut self) -> lapin::Result<()> {
let mut not_before = Instant::now();
loop {
let mut data_good = false;
let mut time_good = false;
while !(time_good && data_good) {
select! {
next = self.recv.recv() => {
match next {
Some((tag, result)) => {
self.enqueue(tag, result);
data_good = data_good || tag == self.acked + 1;
},
//None => return Ok(()),
None => { println!("ack done"); return Ok(()) },
}
}
_ = sleep_until(not_before), if !time_good => time_good = true
}
}
println!("ack status {:?}", self.pending.len());
loop {
let (mut ack_to, res) = self.pending.pop_first().unwrap();
while self.pending.first_key_value() == Some((&(ack_to + 1), &res)) {
self.pending.pop_first();
ack_to += 1;
}
println!("ack sending {} {:?}", ack_to, res);
match res {
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
}?;
self.acked = ack_to;
if !self.ready() { break; }
}
println!("ack sleep");
not_before = Instant::now() + self.max_wait;
}
}
}

247
src/range_map.rs Normal file
View File

@ -0,0 +1,247 @@
use std::{cmp::Ordering, collections::BTreeMap, fmt::Debug, ops::{Bound, Range}};
#[derive(Clone, Debug)]
pub struct RangeMap<K, V> {
ranges: BTreeMap<K, Option<V>>,
}
impl<K: Debug + Ord, V: Debug + PartialEq> RangeMap<K, V> {
pub fn insert(&mut self, range: Range<K>, val: V) {
if range.is_empty() { return; }
let Range { start, end } = range;
let mut curs = self.ranges.upper_bound_mut(Bound::Included(&start));
/*
print!("{:?} ", curs.peek_prev());
print!("{:?} ", curs.key_value());
println!("{:?}", curs.peek_next());
*/
if let Some((key, old_val)) = curs.key_value_mut() && key == &start {
*old_val = Some(val);
} else {
curs.insert_after(start, Some(val));
curs.move_next();
}
while curs.as_cursor().peek_prev().map(|(_, v)| v) == curs.value() {
curs.remove_current_and_move_back();
}
curs.move_next();
/*
print!("{:?} ", curs.peek_prev());
print!("{:?} ", curs.key_value());
println!("{:?}", curs.peek_next());
*/
let mut next_val = None;
loop {
match curs.key().map(|key| key.cmp(&end)) {
Some(Ordering::Less) => { // Delete entry but remember value as potential next value
next_val = curs.remove_current().unwrap().1;
},
Some(Ordering::Equal) => break, // Already correct
_ => { // Next entry is missing or beyond end
curs.insert_before(end, next_val);
break;
},
}
}
while curs.as_cursor().peek_prev().map(|(_, v)| v) == curs.value() {
curs.remove_current();
}
}
pub fn drop_first(&mut self) {
if self.ranges.pop_first().is_some() { // Remove first
if self.ranges.iter().next().is_some_and(|(_, val)| val.is_none()) {
// Potentially clean up a single empty range after
self.ranges.pop_first();
}
}
}
#[cfg(test)]
fn check_invariants(&self) where V: Debug {
use std::iter::zip;
if let Some((_, val)) = self.ranges.first_key_value() {
assert!(val.is_some());
assert!(self.ranges.len() >= 2);
let (_, last_val) = self.ranges.last_key_value().unwrap();
assert!(last_val.is_none());
}
for (a, b) in zip(self.ranges.values(), self.ranges.values().skip(1)) {
assert_ne!(a, b);
}
}
}
impl<K, V> RangeMap<K, V> {
pub fn peek_first(&self) -> Option<(Range<&K>, &V)> {
let mut iter = self.ranges.iter();
let (start, val) = iter.next()?;
let (end, _) = iter.next().unwrap();
Some((start..end, val.as_ref().unwrap()))
}
pub fn clear(&mut self) { self.ranges.clear(); }
pub fn is_empty(&self) -> bool { self.ranges.is_empty() }
}
impl<K, V> Default for RangeMap<K, V> {
fn default() -> Self {
Self { ranges: Default::default() }
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn basic() {
let mut rm = RangeMap::default();
for (i, (insert, expected)) in [
(None, None ),
(Some((5..10, 50)), Some((&5..&10, &50))),
(None, None ),
(Some((5..10, 50)), Some((&5..&10, &50))),
(Some((1.. 5, 10)), Some((&1..& 5, &10))),
(None , Some((&5..&10, &50))),
(Some((1.. 3, 10)), Some((&1..& 3, &10))),
(None , Some((&5..&10, &50))),
(Some((1.. 7, 10)), Some((&1..& 7, &10))),
(None , Some((&7..&10, &50))),
(Some((1..10, 10)), Some((&1..&10, &10))),
(None , None ),
(Some((5..10, 50)), Some((&5..&10, &50))),
(Some((1..12, 10)), Some((&1..&12, &10))),
(None , None ),
(Some((5..10, 50)), Some((&5..&10, &50))),
].into_iter().enumerate() {
match insert {
Some((k, v)) => rm.insert(k, v),
None => rm.drop_first(),
}
rm.check_invariants();
assert_eq!(rm.peek_first(), expected, "Failed at operation {}", i);
}
}
#[test]
fn generated() {
let mut rm = RangeMap::<usize, Result<(), bool>>::default();
rm.insert(59..59+1, Ok(())); rm.check_invariants();
rm.insert(80..80+1, Ok(())); rm.check_invariants();
rm.insert(100..100+1, Ok(())); rm.check_invariants();
rm.insert(86..86+1, Ok(())); rm.check_invariants();
rm.insert(98..98+1, Ok(())); rm.check_invariants();
rm.insert(70..70+1, Ok(())); rm.check_invariants();
rm.insert(39..39+1, Ok(())); rm.check_invariants();
rm.insert(35..35+1, Ok(())); rm.check_invariants();
rm.insert(89..89+1, Ok(())); rm.check_invariants();
rm.insert(94..94+1, Ok(())); rm.check_invariants();
rm.insert(77..77+1, Ok(())); rm.check_invariants();
rm.insert(82..82+1, Ok(())); rm.check_invariants();
rm.insert(6..6+1, Ok(())); rm.check_invariants();
rm.insert(15..15+1, Ok(())); rm.check_invariants();
rm.insert(5..5+1, Ok(())); rm.check_invariants();
rm.insert(8..8+1, Ok(())); rm.check_invariants();
rm.insert(10..10+1, Ok(())); rm.check_invariants();
rm.insert(17..17+1, Ok(())); rm.check_invariants();
rm.insert(25..25+1, Ok(())); rm.check_invariants();
rm.insert(44..44+1, Ok(())); rm.check_invariants();
rm.insert(43..43+1, Ok(())); rm.check_invariants();
rm.insert(55..55+1, Ok(())); rm.check_invariants();
rm.insert(63..63+1, Ok(())); rm.check_invariants();
rm.insert(24..24+1, Ok(())); rm.check_invariants();
rm.insert(36..36+1, Ok(())); rm.check_invariants();
rm.insert(51..51+1, Ok(())); rm.check_invariants();
rm.insert(11..11+1, Ok(())); rm.check_invariants();
rm.insert(1..1+1, Ok(())); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
rm.insert(71..71+1, Ok(())); rm.check_invariants();
rm.insert(75..75+1, Ok(())); rm.check_invariants();
rm.insert(74..74+1, Ok(())); rm.check_invariants();
rm.insert(26..26+1, Ok(())); rm.check_invariants();
rm.insert(97..97+1, Ok(())); rm.check_invariants();
rm.insert(66..66+1, Ok(())); rm.check_invariants();
rm.insert(49..49+1, Ok(())); rm.check_invariants();
rm.insert(69..69+1, Ok(())); rm.check_invariants();
rm.insert(64..64+1, Ok(())); rm.check_invariants();
rm.insert(38..38+1, Ok(())); rm.check_invariants();
rm.insert(48..48+1, Ok(())); rm.check_invariants();
rm.insert(62..62+1, Ok(())); rm.check_invariants();
rm.insert(46..46+1, Ok(())); rm.check_invariants();
rm.insert(57..57+1, Ok(())); rm.check_invariants();
rm.insert(13..13+1, Ok(())); rm.check_invariants();
rm.insert(30..30+1, Ok(())); rm.check_invariants();
rm.insert(99..99+1, Ok(())); rm.check_invariants();
rm.insert(18..18+1, Ok(())); rm.check_invariants();
rm.insert(40..40+1, Ok(())); rm.check_invariants();
rm.insert(84..84+1, Ok(())); rm.check_invariants();
rm.insert(4..4+1, Ok(())); rm.check_invariants();
rm.insert(9..9+1, Ok(())); rm.check_invariants();
rm.insert(14..14+1, Ok(())); rm.check_invariants();
rm.insert(16..16+1, Ok(())); rm.check_invariants();
rm.insert(12..12+1, Ok(())); rm.check_invariants();
rm.insert(53..53+1, Ok(())); rm.check_invariants();
rm.insert(21..21+1, Ok(())); rm.check_invariants();
rm.insert(54..54+1, Ok(())); rm.check_invariants();
rm.insert(28..28+1, Ok(())); rm.check_invariants();
rm.insert(68..68+1, Ok(())); rm.check_invariants();
rm.insert(95..95+1, Ok(())); rm.check_invariants();
rm.insert(90..90+1, Ok(())); rm.check_invariants();
rm.insert(73..73+1, Ok(())); rm.check_invariants();
rm.insert(83..83+1, Ok(())); rm.check_invariants();
rm.insert(32..32+1, Ok(())); rm.check_invariants();
rm.insert(27..27+1, Ok(())); rm.check_invariants();
rm.insert(34..34+1, Ok(())); rm.check_invariants();
rm.insert(61..61+1, Ok(())); rm.check_invariants();
rm.insert(2..2+1, Ok(())); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
rm.insert(31..31+1, Ok(())); rm.check_invariants();
rm.insert(20..20+1, Ok(())); rm.check_invariants();
rm.insert(47..47+1, Ok(())); rm.check_invariants();
rm.insert(52..52+1, Ok(())); rm.check_invariants();
rm.insert(87..87+1, Ok(())); rm.check_invariants();
rm.insert(37..37+1, Ok(())); rm.check_invariants();
rm.insert(22..22+1, Ok(())); rm.check_invariants();
rm.insert(42..42+1, Ok(())); rm.check_invariants();
rm.insert(33..33+1, Ok(())); rm.check_invariants();
rm.insert(23..23+1, Ok(())); rm.check_invariants();
rm.insert(60..60+1, Ok(())); rm.check_invariants();
rm.insert(65..65+1, Ok(())); rm.check_invariants();
rm.insert(19..19+1, Ok(())); rm.check_invariants();
rm.insert(3..3+1, Ok(())); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
rm.insert(85..85+1, Ok(())); rm.check_invariants();
rm.insert(67..67+1, Ok(())); rm.check_invariants();
rm.insert(76..76+1, Ok(())); rm.check_invariants();
rm.insert(72..72+1, Ok(())); rm.check_invariants();
rm.insert(78..78+1, Ok(())); rm.check_invariants();
rm.insert(50..50+1, Ok(())); rm.check_invariants();
rm.insert(96..96+1, Ok(())); rm.check_invariants();
rm.insert(45..45+1, Ok(())); rm.check_invariants();
rm.insert(7..7+1, Ok(())); rm.check_invariants();
rm.insert(93..93+1, Ok(())); rm.check_invariants();
rm.insert(92..92+1, Ok(())); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
rm.insert(56..56+1, Ok(())); rm.check_invariants();
rm.insert(88..88+1, Ok(())); rm.check_invariants();
rm.insert(29..29+1, Ok(())); rm.check_invariants();
rm.insert(58..58+1, Ok(())); rm.check_invariants();
rm.insert(91..91+1, Ok(())); rm.check_invariants();
rm.insert(41..41+1, Ok(())); rm.check_invariants();
rm.insert(81..81+1, Ok(())); rm.check_invariants();
rm.insert(79..79+1, Ok(())); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
rm.drop_first(); rm.check_invariants();
}
}