diff --git a/src/ack.rs b/src/ack.rs index b4bc5e1..d4911b9 100644 --- a/src/ack.rs +++ b/src/ack.rs @@ -32,29 +32,26 @@ impl AckBatcher { } pub async fn run(mut self) -> lapin::Result<()> { - let mut not_before = Instant::now(); let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN); + let mut ack_sizes = vec![]; - let mut time_good = true; loop { let mut data_good = false; - while !(time_good && data_good) { - select! { - num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN) => { - if num_received == 0 { println!("ack done"); return Ok(()); } - for (tag, result) in buffer.drain(..) { - //println!("rm.insert({}..{}+1, {:?}); rm.check_invariants();", tag, tag, &result); - self.enqueue(tag, result); - data_good = data_good || tag == self.acked + 1; - } - } - _ = sleep_until(not_before), if !time_good => time_good = true + while !data_good { + let num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN).await; + if num_received == 0 { + println!("ack done {} {} {}", ack_sizes.iter().sum::(), ack_sizes.len(), (ack_sizes.iter().sum::() as f64) / (ack_sizes.len() as f64)); + return Ok(()); + } + for (tag, result) in buffer.drain(..) { + //println!("rm.insert({}..{}+1, {:?}); rm.check_invariants();", tag, tag, &result); + self.enqueue(tag, result); + data_good = data_good || tag == self.acked + 1; } } //println!("ack status {:?}", self.pending); let mut ack_count = 0; - let mut max_ack_size = 0; loop { ack_count += 1; let (ack_range, res) = self.pending.peek_first().unwrap(); @@ -69,13 +66,11 @@ impl AckBatcher { Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await, }?; - max_ack_size = max_ack_size.max(ack_to - self.acked); + ack_sizes.push(ack_to - self.acked); self.acked = ack_to; if !self.ready() { break; } } - println!("ack sleep {} {}", ack_count, max_ack_size); - not_before = Instant::now() + self.max_wait; - time_good = false; + println!("ack sleep {}", ack_count); } } } \ No newline at end of file diff --git a/src/main.rs b/src/main.rs index 7785d5c..7445775 100644 --- a/src/main.rs +++ b/src/main.rs @@ -43,18 +43,18 @@ async fn main() -> Result<()> { }).await?; tokio::try_join!( - async { - let mut proc = Processor::new(broker_connect().await?, 10_000).await?; + tokio::spawn(async { + let mut proc = Processor::new(broker_connect().await?, 65_535).await?; for i in 0..QUEUE_COUNT { proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?; } quit_signal.await.unwrap(); proc.shutdown().await - }, - async { + }), + tokio::spawn(async { let chan = conn.create_channel().await?; sender(conn, chan).await - }, + }), )?; Ok(())