Refine ack batching
This commit is contained in:
parent
9bcdb99a53
commit
28d55aa64a
15
src/ack.rs
15
src/ack.rs
@ -15,7 +15,7 @@ pub struct AckBatcher {
|
|||||||
acked: DeliveryTag,
|
acked: DeliveryTag,
|
||||||
}
|
}
|
||||||
|
|
||||||
static ACK_BUFFER_LEN: usize = 1_000;
|
static ACK_BUFFER_LEN: usize = 100;
|
||||||
|
|
||||||
impl AckBatcher {
|
impl AckBatcher {
|
||||||
pub fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, AckResult)>, Self) {
|
pub fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, AckResult)>, Self) {
|
||||||
@ -35,9 +35,9 @@ impl AckBatcher {
|
|||||||
let mut not_before = Instant::now();
|
let mut not_before = Instant::now();
|
||||||
let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN);
|
let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN);
|
||||||
|
|
||||||
|
let mut time_good = true;
|
||||||
loop {
|
loop {
|
||||||
let mut data_good = false;
|
let mut data_good = false;
|
||||||
let mut time_good = false;
|
|
||||||
while !(time_good && data_good) {
|
while !(time_good && data_good) {
|
||||||
select! {
|
select! {
|
||||||
num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN) => {
|
num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN) => {
|
||||||
@ -52,25 +52,30 @@ impl AckBatcher {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
println!("ack status {:?}", self.pending);
|
//println!("ack status {:?}", self.pending);
|
||||||
|
let mut ack_count = 0;
|
||||||
|
let mut max_ack_size = 0;
|
||||||
loop {
|
loop {
|
||||||
|
ack_count += 1;
|
||||||
let (ack_range, res) = self.pending.peek_first().unwrap();
|
let (ack_range, res) = self.pending.peek_first().unwrap();
|
||||||
let ack_to = ack_range.end - 1;
|
let ack_to = ack_range.end - 1;
|
||||||
let res = *res;
|
let res = *res;
|
||||||
self.pending.drop_first();
|
self.pending.drop_first();
|
||||||
|
|
||||||
println!("ack sending {} {:?}", ack_to, res);
|
//println!("ack send {} {:?}", ack_to, res);
|
||||||
//println!("rm.drop_first(); rm.check_invariants();");
|
//println!("rm.drop_first(); rm.check_invariants();");
|
||||||
match res {
|
match res {
|
||||||
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
||||||
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
||||||
}?;
|
}?;
|
||||||
|
|
||||||
|
max_ack_size = max_ack_size.max(ack_to - self.acked);
|
||||||
self.acked = ack_to;
|
self.acked = ack_to;
|
||||||
if !self.ready() { break; }
|
if !self.ready() { break; }
|
||||||
}
|
}
|
||||||
println!("ack sleep");
|
println!("ack sleep {} {}", ack_count, max_ack_size);
|
||||||
not_before = Instant::now() + self.max_wait;
|
not_before = Instant::now() + self.max_wait;
|
||||||
|
time_good = false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -17,7 +17,7 @@ use tokio::{signal, time::sleep};
|
|||||||
use crate::processor::Processor;
|
use crate::processor::Processor;
|
||||||
|
|
||||||
static QUEUE_COUNT: usize = 10;
|
static QUEUE_COUNT: usize = 10;
|
||||||
static SEND_COUNT: usize = 10;
|
static SEND_COUNT: usize = 100_000;
|
||||||
|
|
||||||
#[tokio::main]
|
#[tokio::main]
|
||||||
async fn main() -> Result<()> {
|
async fn main() -> Result<()> {
|
||||||
|
|||||||
@ -24,7 +24,7 @@ impl Processor {
|
|||||||
|
|
||||||
let mut join_set = JoinSet::new();
|
let mut join_set = JoinSet::new();
|
||||||
|
|
||||||
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_secs(1), chan.clone());
|
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_millis(100), chan.clone());
|
||||||
join_set.spawn(ack_batcher.run());
|
join_set.spawn(ack_batcher.run());
|
||||||
|
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
|
|||||||
Loading…
Reference in New Issue
Block a user