Wake the ack batcher up intelligently
This commit is contained in:
parent
6705bd69d9
commit
1cbee7e47b
@ -88,7 +88,7 @@ async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
|
|||||||
async fn process(test: Test, i: usize) -> Result<()> {
|
async fn process(test: Test, i: usize) -> Result<()> {
|
||||||
let (dur, fail) = {
|
let (dur, fail) = {
|
||||||
let mut rng = thread_rng();
|
let mut rng = thread_rng();
|
||||||
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.1))
|
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.01))
|
||||||
};
|
};
|
||||||
sleep(dur).await;
|
sleep(dur).await;
|
||||||
//if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
|
//if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
|
||||||
|
|||||||
@ -12,21 +12,24 @@ pub struct Processor {
|
|||||||
chan: Channel,
|
chan: Channel,
|
||||||
cons_tags: Vec<ShortString>,
|
cons_tags: Vec<ShortString>,
|
||||||
join_set: JoinSet<lapin::Result<()>>,
|
join_set: JoinSet<lapin::Result<()>>,
|
||||||
ack_batcher: UnboundedSender<(DeliveryTag, Result<(), bool>)>,
|
ack_sink: UnboundedSender<(DeliveryTag, Result<(), bool>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Processor {
|
impl Processor {
|
||||||
pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result<Self> {
|
pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result<Self> {
|
||||||
let chan = conn.create_channel().await?;
|
let chan = conn.create_channel().await?;
|
||||||
chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?;
|
chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?;
|
||||||
|
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
let (ack_batcher, ack_recv) = unbounded_channel();
|
|
||||||
join_set.spawn(AckBatcher { chan: chan.clone(), max_wait: Duration::from_secs(1), recv: ack_recv }.run());
|
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_secs(1), chan.clone());
|
||||||
|
join_set.spawn(ack_batcher.run());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conn,
|
conn,
|
||||||
chan,
|
chan,
|
||||||
cons_tags: vec![],
|
cons_tags: vec![],
|
||||||
ack_batcher,
|
ack_sink,
|
||||||
join_set,
|
join_set,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
@ -45,7 +48,7 @@ impl Processor {
|
|||||||
).await?;
|
).await?;
|
||||||
self.cons_tags.push(consumer.tag());
|
self.cons_tags.push(consumer.tag());
|
||||||
|
|
||||||
let ack_batcher = self.ack_batcher.clone();
|
let ack_batcher = self.ack_sink.clone();
|
||||||
self.join_set.spawn(async move {
|
self.join_set.spawn(async move {
|
||||||
let tracker = TaskTracker::new();
|
let tracker = TaskTracker::new();
|
||||||
while let Some(delivery) = consumer.try_next().await? {
|
while let Some(delivery) = consumer.try_next().await? {
|
||||||
@ -67,7 +70,7 @@ impl Processor {
|
|||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(self) -> lapin::Result<()> {
|
pub async fn shutdown(self) -> lapin::Result<()> {
|
||||||
let Self { cons_tags, conn, chan, mut join_set, ack_batcher } = self;
|
let Self { cons_tags, conn, chan, mut join_set, ack_sink: ack_batcher } = self;
|
||||||
|
|
||||||
for cons_tag in cons_tags {
|
for cons_tag in cons_tags {
|
||||||
let chan = chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially
|
let chan = chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially
|
||||||
@ -88,48 +91,66 @@ struct AckBatcher {
|
|||||||
max_wait: Duration,
|
max_wait: Duration,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>,
|
recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>,
|
||||||
|
pending: BTreeMap<DeliveryTag, Result<(), bool>>,
|
||||||
|
acked: DeliveryTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl AckBatcher {
|
impl AckBatcher {
|
||||||
|
fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, Result<(), bool>)>, Self) {
|
||||||
|
let (send, recv) = unbounded_channel();
|
||||||
|
(send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 })
|
||||||
|
}
|
||||||
|
|
||||||
|
fn enqueue(&mut self, tag: DeliveryTag, res: Result<(), bool>) {
|
||||||
|
let inserted = self.pending.insert(tag, res);
|
||||||
|
debug_assert_eq!(inserted, None);
|
||||||
|
}
|
||||||
|
|
||||||
|
fn ready(&self) -> bool {
|
||||||
|
self.pending.keys().next() == Some(&(self.acked + 1))
|
||||||
|
}
|
||||||
|
|
||||||
async fn run(mut self) -> lapin::Result<()> {
|
async fn run(mut self) -> lapin::Result<()> {
|
||||||
let mut next_ack = Instant::now() + self.max_wait;
|
let mut not_before = Instant::now();
|
||||||
let mut pending = BTreeMap::new();
|
|
||||||
let mut acked = 0;
|
|
||||||
|
|
||||||
loop {
|
loop {
|
||||||
select! {
|
let mut data_good = false;
|
||||||
next = self.recv.recv() => {
|
let mut time_good = false;
|
||||||
match next {
|
while !(time_good && data_good) {
|
||||||
Some((tag, result)) => {
|
select! {
|
||||||
let inserted = pending.insert(tag, result);
|
next = self.recv.recv() => {
|
||||||
debug_assert_eq!(inserted, None);
|
match next {
|
||||||
},
|
Some((tag, result)) => {
|
||||||
//None => break Ok(()),
|
self.enqueue(tag, result);
|
||||||
None => { println!("ack done"); break Ok(()) },
|
data_good = data_good || tag == self.acked + 1;
|
||||||
}
|
},
|
||||||
}
|
//None => return Ok(()),
|
||||||
_ = sleep_until(next_ack) => {
|
None => { println!("ack done"); return Ok(()) },
|
||||||
println!("ack status {:?}", pending.len());
|
|
||||||
loop {
|
|
||||||
if pending.keys().next() != Some(&(acked + 1)) { break; }
|
|
||||||
let (mut ack_to, res) = pending.pop_first().unwrap();
|
|
||||||
while pending.first_key_value() == Some((&(ack_to + 1), &res)) {
|
|
||||||
pending.pop_first();
|
|
||||||
ack_to += 1;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("ack sending {} {:?}", ack_to, res);
|
|
||||||
match res {
|
|
||||||
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
|
||||||
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
|
||||||
}?;
|
|
||||||
|
|
||||||
acked = ack_to;
|
|
||||||
}
|
}
|
||||||
println!("ack sleep");
|
_ = sleep_until(not_before), if !time_good => time_good = true
|
||||||
next_ack += self.max_wait;
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
println!("ack status {:?}", self.pending.len());
|
||||||
|
loop {
|
||||||
|
let (mut ack_to, res) = self.pending.pop_first().unwrap();
|
||||||
|
while self.pending.first_key_value() == Some((&(ack_to + 1), &res)) {
|
||||||
|
self.pending.pop_first();
|
||||||
|
ack_to += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("ack sending {} {:?}", ack_to, res);
|
||||||
|
match res {
|
||||||
|
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
||||||
|
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
||||||
|
}?;
|
||||||
|
|
||||||
|
self.acked = ack_to;
|
||||||
|
if !self.ready() { break; }
|
||||||
|
}
|
||||||
|
println!("ack sleep");
|
||||||
|
not_before = Instant::now() + self.max_wait;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user