Implement basic ack batching
This commit is contained in:
parent
95f1b8622e
commit
6705bd69d9
22
src/main.rs
22
src/main.rs
@ -12,6 +12,8 @@ use tokio::{signal, time::sleep};
|
|||||||
|
|
||||||
use crate::processor::Processor;
|
use crate::processor::Processor;
|
||||||
|
|
||||||
|
static QUEUE_COUNT: usize = 10;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
let quit_signal = signal::ctrl_c();
|
let quit_signal = signal::ctrl_c();
|
||||||
@ -21,14 +23,24 @@ async fn main() -> Result<()> {
|
|||||||
exchanges: vec![
|
exchanges: vec![
|
||||||
ExchangeDefinition { name: "hello".into(), kind: None, options: None, arguments: None, bindings: vec![] },
|
ExchangeDefinition { name: "hello".into(), kind: None, options: None, arguments: None, bindings: vec![] },
|
||||||
],
|
],
|
||||||
queues: (0..10).map(|i| QueueDefinition { name: format!("hello-{}", i).into(), options: None, arguments: None, bindings: vec![BindingDefinition { source: "hello".into(), routing_key: "".into(), arguments: FieldTable::default() }] }).collect(),
|
queues: (0..QUEUE_COUNT).map(|i|
|
||||||
|
QueueDefinition { name: format!("hello-{}", i).into(), options: None, arguments: None,
|
||||||
|
bindings: vec![
|
||||||
|
BindingDefinition {
|
||||||
|
source: "hello".into(),
|
||||||
|
routing_key: "".into(),
|
||||||
|
arguments: FieldTable::default(),
|
||||||
|
},
|
||||||
|
]
|
||||||
|
}
|
||||||
|
).collect(),
|
||||||
channels: vec![],
|
channels: vec![],
|
||||||
}).await?;
|
}).await?;
|
||||||
|
|
||||||
tokio::try_join!(
|
tokio::try_join!(
|
||||||
async {
|
async {
|
||||||
let mut proc = Processor::new(broker_connect().await?, 10_000).await?;
|
let mut proc = Processor::new(broker_connect().await?, 10_000).await?;
|
||||||
for i in 0..10 {
|
for i in 0..QUEUE_COUNT {
|
||||||
proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?;
|
proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?;
|
||||||
}
|
}
|
||||||
quit_signal.await.unwrap();
|
quit_signal.await.unwrap();
|
||||||
@ -56,8 +68,8 @@ async fn broker_connect() -> lapin::Result<Connection> {
|
|||||||
struct Test { int: usize }
|
struct Test { int: usize }
|
||||||
|
|
||||||
async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
|
async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
|
||||||
for int in 0..100 {
|
for int in 0..50_000 {
|
||||||
println!("Sending {}", int);
|
if int % 1000 == 0 { println!("Sending {}", int) };
|
||||||
let payload = Test { int };
|
let payload = Test { int };
|
||||||
let confirm = chan.basic_publish(
|
let confirm = chan.basic_publish(
|
||||||
"hello",
|
"hello",
|
||||||
@ -79,6 +91,6 @@ async fn process(test: Test, i: usize) -> Result<()> {
|
|||||||
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.1))
|
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.1))
|
||||||
};
|
};
|
||||||
sleep(dur).await;
|
sleep(dur).await;
|
||||||
if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
|
//if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
|
||||||
if fail { Err(anyhow!("oops")) } else { Ok(()) }
|
if fail { Err(anyhow!("oops")) } else { Ok(()) }
|
||||||
}
|
}
|
||||||
|
|||||||
@ -1,29 +1,33 @@
|
|||||||
use std::future::Future;
|
use std::{collections::BTreeMap, future::Future, time::Duration};
|
||||||
|
|
||||||
use anyhow::Result;
|
use anyhow::Result;
|
||||||
use futures_lite::prelude::*;
|
use futures_lite::prelude::*;
|
||||||
use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{FieldTable, ShortString, ShortUInt}, Channel, Connection};
|
use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection};
|
||||||
use serde::de::DeserializeOwned;
|
use serde::de::DeserializeOwned;
|
||||||
use tokio::task::JoinSet;
|
use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinSet, time::{sleep_until, Instant}};
|
||||||
use tokio_util::task::TaskTracker;
|
use tokio_util::task::TaskTracker;
|
||||||
|
|
||||||
|
|
||||||
pub struct Processor {
|
pub struct Processor {
|
||||||
conn: Connection,
|
conn: Connection,
|
||||||
chan: Channel,
|
chan: Channel,
|
||||||
cons_tags: Vec<ShortString>,
|
cons_tags: Vec<ShortString>,
|
||||||
join_set: JoinSet<lapin::Result<()>>,
|
join_set: JoinSet<lapin::Result<()>>,
|
||||||
|
ack_batcher: UnboundedSender<(DeliveryTag, Result<(), bool>)>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Processor {
|
impl Processor {
|
||||||
pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result<Self> {
|
pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result<Self> {
|
||||||
let chan = conn.create_channel().await?;
|
let chan = conn.create_channel().await?;
|
||||||
chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?;
|
chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?;
|
||||||
|
let mut join_set = JoinSet::new();
|
||||||
|
let (ack_batcher, ack_recv) = unbounded_channel();
|
||||||
|
join_set.spawn(AckBatcher { chan: chan.clone(), max_wait: Duration::from_secs(1), recv: ack_recv }.run());
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
conn,
|
conn,
|
||||||
chan,
|
chan,
|
||||||
cons_tags: vec![],
|
cons_tags: vec![],
|
||||||
join_set: JoinSet::new(),
|
ack_batcher,
|
||||||
|
join_set,
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -41,19 +45,18 @@ impl Processor {
|
|||||||
).await?;
|
).await?;
|
||||||
self.cons_tags.push(consumer.tag());
|
self.cons_tags.push(consumer.tag());
|
||||||
|
|
||||||
|
let ack_batcher = self.ack_batcher.clone();
|
||||||
self.join_set.spawn(async move {
|
self.join_set.spawn(async move {
|
||||||
let tracker = TaskTracker::new();
|
let tracker = TaskTracker::new();
|
||||||
while let Some(delivery) = consumer.try_next().await? {
|
while let Some(delivery) = consumer.try_next().await? {
|
||||||
if let Ok(data) = serde_json::from_slice(&delivery.data) {
|
if let Ok(data) = serde_json::from_slice(&delivery.data) {
|
||||||
|
let ack_batcher = ack_batcher.clone();
|
||||||
let fut = f(data);
|
let fut = f(data);
|
||||||
tracker.spawn(async move {
|
tracker.spawn(async move {
|
||||||
match fut.await {
|
ack_batcher.send((delivery.delivery_tag, fut.await.map_err(|_| true))).unwrap();
|
||||||
Ok(()) => delivery.ack(BasicAckOptions::default()).await,
|
|
||||||
Err(_) => delivery.nack(BasicNackOptions { multiple: false, requeue: true }).await,
|
|
||||||
}
|
|
||||||
});
|
});
|
||||||
} else {
|
} else {
|
||||||
tracker.spawn(async move { delivery.nack(BasicNackOptions::default()).await });
|
ack_batcher.send((delivery.delivery_tag, Err(false))).unwrap();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
tracker.close();
|
tracker.close();
|
||||||
@ -63,17 +66,70 @@ impl Processor {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub async fn shutdown(mut self) -> lapin::Result<()> {
|
pub async fn shutdown(self) -> lapin::Result<()> {
|
||||||
for cons_tag in self.cons_tags {
|
let Self { cons_tags, conn, chan, mut join_set, ack_batcher } = self;
|
||||||
let chan = self.chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially
|
|
||||||
self.join_set.spawn(async move {
|
for cons_tag in cons_tags {
|
||||||
|
let chan = chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially
|
||||||
|
join_set.spawn(async move {
|
||||||
chan.basic_cancel(cons_tag.as_str(), BasicCancelOptions::default()).await
|
chan.basic_cancel(cons_tag.as_str(), BasicCancelOptions::default()).await
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
while let Some(res) = self.join_set.join_next().await {
|
drop(ack_batcher);
|
||||||
|
while let Some(res) = join_set.join_next().await {
|
||||||
res.unwrap()?;
|
res.unwrap()?;
|
||||||
}
|
}
|
||||||
self.conn.close(0, "").await
|
conn.close(0, "").await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct AckBatcher {
|
||||||
|
max_wait: Duration,
|
||||||
|
chan: Channel,
|
||||||
|
recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AckBatcher {
|
||||||
|
async fn run(mut self) -> lapin::Result<()> {
|
||||||
|
let mut next_ack = Instant::now() + self.max_wait;
|
||||||
|
let mut pending = BTreeMap::new();
|
||||||
|
let mut acked = 0;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
next = self.recv.recv() => {
|
||||||
|
match next {
|
||||||
|
Some((tag, result)) => {
|
||||||
|
let inserted = pending.insert(tag, result);
|
||||||
|
debug_assert_eq!(inserted, None);
|
||||||
|
},
|
||||||
|
//None => break Ok(()),
|
||||||
|
None => { println!("ack done"); break Ok(()) },
|
||||||
|
}
|
||||||
|
}
|
||||||
|
_ = sleep_until(next_ack) => {
|
||||||
|
println!("ack status {:?}", pending.len());
|
||||||
|
loop {
|
||||||
|
if pending.keys().next() != Some(&(acked + 1)) { break; }
|
||||||
|
let (mut ack_to, res) = pending.pop_first().unwrap();
|
||||||
|
while pending.first_key_value() == Some((&(ack_to + 1), &res)) {
|
||||||
|
pending.pop_first();
|
||||||
|
ack_to += 1;
|
||||||
|
}
|
||||||
|
|
||||||
|
println!("ack sending {} {:?}", ack_to, res);
|
||||||
|
match res {
|
||||||
|
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
||||||
|
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
||||||
|
}?;
|
||||||
|
|
||||||
|
acked = ack_to;
|
||||||
|
}
|
||||||
|
println!("ack sleep");
|
||||||
|
next_ack += self.max_wait;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user