diff --git a/src/main.rs b/src/main.rs index 77061f6..12fa051 100644 --- a/src/main.rs +++ b/src/main.rs @@ -12,6 +12,8 @@ use tokio::{signal, time::sleep}; use crate::processor::Processor; +static QUEUE_COUNT: usize = 10; + #[tokio::main] async fn main() -> Result<()> { let quit_signal = signal::ctrl_c(); @@ -21,14 +23,24 @@ async fn main() -> Result<()> { exchanges: vec![ ExchangeDefinition { name: "hello".into(), kind: None, options: None, arguments: None, bindings: vec![] }, ], - queues: (0..10).map(|i| QueueDefinition { name: format!("hello-{}", i).into(), options: None, arguments: None, bindings: vec![BindingDefinition { source: "hello".into(), routing_key: "".into(), arguments: FieldTable::default() }] }).collect(), + queues: (0..QUEUE_COUNT).map(|i| + QueueDefinition { name: format!("hello-{}", i).into(), options: None, arguments: None, + bindings: vec![ + BindingDefinition { + source: "hello".into(), + routing_key: "".into(), + arguments: FieldTable::default(), + }, + ] + } + ).collect(), channels: vec![], }).await?; tokio::try_join!( async { let mut proc = Processor::new(broker_connect().await?, 10_000).await?; - for i in 0..10 { + for i in 0..QUEUE_COUNT { proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?; } quit_signal.await.unwrap(); @@ -56,8 +68,8 @@ async fn broker_connect() -> lapin::Result { struct Test { int: usize } async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> { - for int in 0..100 { - println!("Sending {}", int); + for int in 0..50_000 { + if int % 1000 == 0 { println!("Sending {}", int) }; let payload = Test { int }; let confirm = chan.basic_publish( "hello", @@ -79,6 +91,6 @@ async fn process(test: Test, i: usize) -> Result<()> { (Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.1)) }; sleep(dur).await; - if fail { println!("{} Fail {} {:?}", i, test.int, dur); } + //if fail { println!("{} Fail {} {:?}", i, test.int, dur); } if fail { Err(anyhow!("oops")) } else { Ok(()) } } diff --git a/src/processor.rs b/src/processor.rs index 20c5e03..a092e8c 100644 --- a/src/processor.rs +++ b/src/processor.rs @@ -1,29 +1,33 @@ -use std::future::Future; +use std::{collections::BTreeMap, future::Future, time::Duration}; use anyhow::Result; use futures_lite::prelude::*; -use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{FieldTable, ShortString, ShortUInt}, Channel, Connection}; +use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection}; use serde::de::DeserializeOwned; -use tokio::task::JoinSet; +use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinSet, time::{sleep_until, Instant}}; use tokio_util::task::TaskTracker; - pub struct Processor { conn: Connection, chan: Channel, cons_tags: Vec, join_set: JoinSet>, + ack_batcher: UnboundedSender<(DeliveryTag, Result<(), bool>)>, } impl Processor { pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result { let chan = conn.create_channel().await?; chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?; + let mut join_set = JoinSet::new(); + let (ack_batcher, ack_recv) = unbounded_channel(); + join_set.spawn(AckBatcher { chan: chan.clone(), max_wait: Duration::from_secs(1), recv: ack_recv }.run()); Ok(Self { conn, chan, cons_tags: vec![], - join_set: JoinSet::new(), + ack_batcher, + join_set, }) } @@ -41,19 +45,18 @@ impl Processor { ).await?; self.cons_tags.push(consumer.tag()); + let ack_batcher = self.ack_batcher.clone(); self.join_set.spawn(async move { let tracker = TaskTracker::new(); while let Some(delivery) = consumer.try_next().await? { if let Ok(data) = serde_json::from_slice(&delivery.data) { + let ack_batcher = ack_batcher.clone(); let fut = f(data); tracker.spawn(async move { - match fut.await { - Ok(()) => delivery.ack(BasicAckOptions::default()).await, - Err(_) => delivery.nack(BasicNackOptions { multiple: false, requeue: true }).await, - } + ack_batcher.send((delivery.delivery_tag, fut.await.map_err(|_| true))).unwrap(); }); } else { - tracker.spawn(async move { delivery.nack(BasicNackOptions::default()).await }); + ack_batcher.send((delivery.delivery_tag, Err(false))).unwrap(); } } tracker.close(); @@ -63,17 +66,70 @@ impl Processor { Ok(()) } - pub async fn shutdown(mut self) -> lapin::Result<()> { - for cons_tag in self.cons_tags { - let chan = self.chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially - self.join_set.spawn(async move { + pub async fn shutdown(self) -> lapin::Result<()> { + let Self { cons_tags, conn, chan, mut join_set, ack_batcher } = self; + + for cons_tag in cons_tags { + let chan = chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially + join_set.spawn(async move { chan.basic_cancel(cons_tag.as_str(), BasicCancelOptions::default()).await }); } - while let Some(res) = self.join_set.join_next().await { + drop(ack_batcher); + while let Some(res) = join_set.join_next().await { res.unwrap()?; } - self.conn.close(0, "").await + conn.close(0, "").await + } +} + +struct AckBatcher { + max_wait: Duration, + chan: Channel, + recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>, +} + +impl AckBatcher { + async fn run(mut self) -> lapin::Result<()> { + let mut next_ack = Instant::now() + self.max_wait; + let mut pending = BTreeMap::new(); + let mut acked = 0; + + loop { + select! { + next = self.recv.recv() => { + match next { + Some((tag, result)) => { + let inserted = pending.insert(tag, result); + debug_assert_eq!(inserted, None); + }, + //None => break Ok(()), + None => { println!("ack done"); break Ok(()) }, + } + } + _ = sleep_until(next_ack) => { + println!("ack status {:?}", pending.len()); + loop { + if pending.keys().next() != Some(&(acked + 1)) { break; } + let (mut ack_to, res) = pending.pop_first().unwrap(); + while pending.first_key_value() == Some((&(ack_to + 1), &res)) { + pending.pop_first(); + ack_to += 1; + } + + println!("ack sending {} {:?}", ack_to, res); + match res { + Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await, + Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, + }?; + + acked = ack_to; + } + println!("ack sleep"); + next_ack += self.max_wait; + } + } + } } }