Compare commits
3 Commits
1cbee7e47b
...
f0c76b9af2
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f0c76b9af2 | ||
|
|
28d55aa64a | ||
|
|
9bcdb99a53 |
76
src/ack.rs
Normal file
76
src/ack.rs
Normal file
@ -0,0 +1,76 @@
|
||||
use std::time::Duration;
|
||||
|
||||
use lapin::{options::{BasicAckOptions, BasicNackOptions}, types::DeliveryTag, Channel};
|
||||
use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, time::{sleep_until, Instant}};
|
||||
|
||||
use crate::range_map::RangeMap;
|
||||
|
||||
pub type AckResult = Result<(), bool>;
|
||||
|
||||
pub struct AckBatcher {
|
||||
max_wait: Duration,
|
||||
chan: Channel,
|
||||
recv: UnboundedReceiver<(DeliveryTag, AckResult)>,
|
||||
pending: RangeMap<DeliveryTag, AckResult>,
|
||||
acked: DeliveryTag,
|
||||
}
|
||||
|
||||
static ACK_BUFFER_LEN: usize = 100;
|
||||
|
||||
impl AckBatcher {
|
||||
pub fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, AckResult)>, Self) {
|
||||
let (send, recv) = unbounded_channel();
|
||||
(send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 })
|
||||
}
|
||||
|
||||
fn enqueue(&mut self, tag: DeliveryTag, res: AckResult) {
|
||||
self.pending.insert(tag..tag+1, res);
|
||||
}
|
||||
|
||||
fn ready(&self) -> bool {
|
||||
self.pending.peek_first().is_some_and(|(range, _)| range.start == &(self.acked + 1))
|
||||
}
|
||||
|
||||
pub async fn run(mut self) -> lapin::Result<()> {
|
||||
let mut buffer = Vec::with_capacity(ACK_BUFFER_LEN);
|
||||
let mut ack_sizes = vec![];
|
||||
|
||||
loop {
|
||||
let mut data_good = false;
|
||||
while !data_good {
|
||||
let num_received = self.recv.recv_many(&mut buffer, ACK_BUFFER_LEN).await;
|
||||
if num_received == 0 {
|
||||
println!("ack done {} {} {}", ack_sizes.iter().sum::<u64>(), ack_sizes.len(), (ack_sizes.iter().sum::<u64>() as f64) / (ack_sizes.len() as f64));
|
||||
return Ok(());
|
||||
}
|
||||
for (tag, result) in buffer.drain(..) {
|
||||
//println!("rm.insert({}..{}+1, {:?}); rm.check_invariants();", tag, tag, &result);
|
||||
self.enqueue(tag, result);
|
||||
data_good = data_good || tag == self.acked + 1;
|
||||
}
|
||||
}
|
||||
|
||||
//println!("ack status {:?}", self.pending);
|
||||
let mut ack_count = 0;
|
||||
loop {
|
||||
ack_count += 1;
|
||||
let (ack_range, res) = self.pending.peek_first().unwrap();
|
||||
let ack_to = ack_range.end - 1;
|
||||
let res = *res;
|
||||
self.pending.drop_first();
|
||||
|
||||
//println!("ack send {} {:?}", ack_to, res);
|
||||
//println!("rm.drop_first(); rm.check_invariants();");
|
||||
match res {
|
||||
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
||||
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
||||
}?;
|
||||
|
||||
ack_sizes.push(ack_to - self.acked);
|
||||
self.acked = ack_to;
|
||||
if !self.ready() { break; }
|
||||
}
|
||||
println!("ack sleep {}", ack_count);
|
||||
}
|
||||
}
|
||||
}
|
||||
21
src/main.rs
21
src/main.rs
@ -1,6 +1,10 @@
|
||||
#![feature(async_closure)]
|
||||
#![feature(btree_cursors)]
|
||||
#![feature(let_chains)]
|
||||
|
||||
mod ack;
|
||||
mod processor;
|
||||
pub mod range_map;
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
@ -13,6 +17,7 @@ use tokio::{signal, time::sleep};
|
||||
use crate::processor::Processor;
|
||||
|
||||
static QUEUE_COUNT: usize = 10;
|
||||
static SEND_COUNT: usize = 100_000;
|
||||
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<()> {
|
||||
@ -38,18 +43,18 @@ async fn main() -> Result<()> {
|
||||
}).await?;
|
||||
|
||||
tokio::try_join!(
|
||||
async {
|
||||
let mut proc = Processor::new(broker_connect().await?, 10_000).await?;
|
||||
tokio::spawn(async {
|
||||
let mut proc = Processor::new(broker_connect().await?, 65_535).await?;
|
||||
for i in 0..QUEUE_COUNT {
|
||||
proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?;
|
||||
}
|
||||
quit_signal.await.unwrap();
|
||||
proc.shutdown().await
|
||||
},
|
||||
async {
|
||||
}),
|
||||
tokio::spawn(async {
|
||||
let chan = conn.create_channel().await?;
|
||||
sender(conn, chan).await
|
||||
},
|
||||
}),
|
||||
)?;
|
||||
|
||||
Ok(())
|
||||
@ -68,7 +73,7 @@ async fn broker_connect() -> lapin::Result<Connection> {
|
||||
struct Test { int: usize }
|
||||
|
||||
async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
|
||||
for int in 0..50_000 {
|
||||
for int in 0..SEND_COUNT {
|
||||
if int % 1000 == 0 { println!("Sending {}", int) };
|
||||
let payload = Test { int };
|
||||
let confirm = chan.basic_publish(
|
||||
@ -85,12 +90,12 @@ async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn process(test: Test, i: usize) -> Result<()> {
|
||||
async fn process(_test: Test, _i: usize) -> Result<()> {
|
||||
let (dur, fail) = {
|
||||
let mut rng = thread_rng();
|
||||
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.01))
|
||||
};
|
||||
sleep(dur).await;
|
||||
//if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
|
||||
//if fail { println!("{} Fail {} {:?}", _i, _test.int, dur); }
|
||||
if fail { Err(anyhow!("oops")) } else { Ok(()) }
|
||||
}
|
||||
|
||||
@ -1,18 +1,20 @@
|
||||
use std::{collections::BTreeMap, future::Future, time::Duration};
|
||||
use std::{future::Future, time::Duration};
|
||||
|
||||
use anyhow::Result;
|
||||
use futures_lite::prelude::*;
|
||||
use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection};
|
||||
use lapin::{options::{BasicCancelOptions, BasicConsumeOptions, BasicQosOptions}, types::{DeliveryTag, FieldTable, ShortString, ShortUInt}, Channel, Connection};
|
||||
use serde::de::DeserializeOwned;
|
||||
use tokio::{select, sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender}, task::JoinSet, time::{sleep_until, Instant}};
|
||||
use tokio::{sync::mpsc::UnboundedSender, task::JoinSet};
|
||||
use tokio_util::task::TaskTracker;
|
||||
|
||||
use crate::ack::{AckBatcher, AckResult};
|
||||
|
||||
pub struct Processor {
|
||||
conn: Connection,
|
||||
chan: Channel,
|
||||
cons_tags: Vec<ShortString>,
|
||||
join_set: JoinSet<lapin::Result<()>>,
|
||||
ack_sink: UnboundedSender<(DeliveryTag, Result<(), bool>)>,
|
||||
ack_sink: UnboundedSender<(DeliveryTag, AckResult)>,
|
||||
}
|
||||
|
||||
impl Processor {
|
||||
@ -22,7 +24,7 @@ impl Processor {
|
||||
|
||||
let mut join_set = JoinSet::new();
|
||||
|
||||
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_secs(1), chan.clone());
|
||||
let (ack_sink, ack_batcher) = AckBatcher::new(Duration::from_millis(100), chan.clone());
|
||||
join_set.spawn(ack_batcher.run());
|
||||
|
||||
Ok(Self {
|
||||
@ -86,71 +88,3 @@ impl Processor {
|
||||
conn.close(0, "").await
|
||||
}
|
||||
}
|
||||
|
||||
struct AckBatcher {
|
||||
max_wait: Duration,
|
||||
chan: Channel,
|
||||
recv: UnboundedReceiver<(DeliveryTag, Result<(), bool>)>,
|
||||
pending: BTreeMap<DeliveryTag, Result<(), bool>>,
|
||||
acked: DeliveryTag,
|
||||
}
|
||||
|
||||
impl AckBatcher {
|
||||
fn new(max_wait: Duration, chan: Channel) -> (UnboundedSender<(DeliveryTag, Result<(), bool>)>, Self) {
|
||||
let (send, recv) = unbounded_channel();
|
||||
(send, Self { max_wait, chan, recv, pending: Default::default(), acked: 0 })
|
||||
}
|
||||
|
||||
fn enqueue(&mut self, tag: DeliveryTag, res: Result<(), bool>) {
|
||||
let inserted = self.pending.insert(tag, res);
|
||||
debug_assert_eq!(inserted, None);
|
||||
}
|
||||
|
||||
fn ready(&self) -> bool {
|
||||
self.pending.keys().next() == Some(&(self.acked + 1))
|
||||
}
|
||||
|
||||
async fn run(mut self) -> lapin::Result<()> {
|
||||
let mut not_before = Instant::now();
|
||||
|
||||
loop {
|
||||
let mut data_good = false;
|
||||
let mut time_good = false;
|
||||
while !(time_good && data_good) {
|
||||
select! {
|
||||
next = self.recv.recv() => {
|
||||
match next {
|
||||
Some((tag, result)) => {
|
||||
self.enqueue(tag, result);
|
||||
data_good = data_good || tag == self.acked + 1;
|
||||
},
|
||||
//None => return Ok(()),
|
||||
None => { println!("ack done"); return Ok(()) },
|
||||
}
|
||||
}
|
||||
_ = sleep_until(not_before), if !time_good => time_good = true
|
||||
}
|
||||
}
|
||||
|
||||
println!("ack status {:?}", self.pending.len());
|
||||
loop {
|
||||
let (mut ack_to, res) = self.pending.pop_first().unwrap();
|
||||
while self.pending.first_key_value() == Some((&(ack_to + 1), &res)) {
|
||||
self.pending.pop_first();
|
||||
ack_to += 1;
|
||||
}
|
||||
|
||||
println!("ack sending {} {:?}", ack_to, res);
|
||||
match res {
|
||||
Ok(()) => self.chan.basic_ack(ack_to, BasicAckOptions { multiple: true }).await,
|
||||
Err(requeue) => self.chan.basic_nack(ack_to, BasicNackOptions { multiple: true, requeue }).await,
|
||||
}?;
|
||||
|
||||
self.acked = ack_to;
|
||||
if !self.ready() { break; }
|
||||
}
|
||||
println!("ack sleep");
|
||||
not_before = Instant::now() + self.max_wait;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
247
src/range_map.rs
Normal file
247
src/range_map.rs
Normal file
@ -0,0 +1,247 @@
|
||||
use std::{cmp::Ordering, collections::BTreeMap, fmt::Debug, ops::{Bound, Range}};
|
||||
|
||||
#[derive(Clone, Debug)]
|
||||
pub struct RangeMap<K, V> {
|
||||
ranges: BTreeMap<K, Option<V>>,
|
||||
}
|
||||
|
||||
impl<K: Debug + Ord, V: Debug + PartialEq> RangeMap<K, V> {
|
||||
pub fn insert(&mut self, range: Range<K>, val: V) {
|
||||
if range.is_empty() { return; }
|
||||
let Range { start, end } = range;
|
||||
|
||||
let mut curs = self.ranges.upper_bound_mut(Bound::Included(&start));
|
||||
/*
|
||||
print!("{:?} ", curs.peek_prev());
|
||||
print!("{:?} ", curs.key_value());
|
||||
println!("{:?}", curs.peek_next());
|
||||
*/
|
||||
if let Some((key, old_val)) = curs.key_value_mut() && key == &start {
|
||||
*old_val = Some(val);
|
||||
} else {
|
||||
curs.insert_after(start, Some(val));
|
||||
curs.move_next();
|
||||
}
|
||||
while curs.as_cursor().peek_prev().map(|(_, v)| v) == curs.value() {
|
||||
curs.remove_current_and_move_back();
|
||||
}
|
||||
curs.move_next();
|
||||
|
||||
/*
|
||||
print!("{:?} ", curs.peek_prev());
|
||||
print!("{:?} ", curs.key_value());
|
||||
println!("{:?}", curs.peek_next());
|
||||
*/
|
||||
|
||||
let mut next_val = None;
|
||||
loop {
|
||||
match curs.key().map(|key| key.cmp(&end)) {
|
||||
Some(Ordering::Less) => { // Delete entry but remember value as potential next value
|
||||
next_val = curs.remove_current().unwrap().1;
|
||||
},
|
||||
Some(Ordering::Equal) => break, // Already correct
|
||||
_ => { // Next entry is missing or beyond end
|
||||
curs.insert_before(end, next_val);
|
||||
break;
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
while curs.as_cursor().peek_prev().map(|(_, v)| v) == curs.value() {
|
||||
curs.remove_current();
|
||||
}
|
||||
}
|
||||
|
||||
pub fn drop_first(&mut self) {
|
||||
if self.ranges.pop_first().is_some() { // Remove first
|
||||
if self.ranges.iter().next().is_some_and(|(_, val)| val.is_none()) {
|
||||
// Potentially clean up a single empty range after
|
||||
self.ranges.pop_first();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
fn check_invariants(&self) where V: Debug {
|
||||
use std::iter::zip;
|
||||
|
||||
if let Some((_, val)) = self.ranges.first_key_value() {
|
||||
assert!(val.is_some());
|
||||
assert!(self.ranges.len() >= 2);
|
||||
|
||||
let (_, last_val) = self.ranges.last_key_value().unwrap();
|
||||
assert!(last_val.is_none());
|
||||
}
|
||||
|
||||
for (a, b) in zip(self.ranges.values(), self.ranges.values().skip(1)) {
|
||||
assert_ne!(a, b);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
impl<K, V> RangeMap<K, V> {
|
||||
pub fn peek_first(&self) -> Option<(Range<&K>, &V)> {
|
||||
let mut iter = self.ranges.iter();
|
||||
let (start, val) = iter.next()?;
|
||||
let (end, _) = iter.next().unwrap();
|
||||
Some((start..end, val.as_ref().unwrap()))
|
||||
}
|
||||
|
||||
pub fn clear(&mut self) { self.ranges.clear(); }
|
||||
|
||||
pub fn is_empty(&self) -> bool { self.ranges.is_empty() }
|
||||
}
|
||||
|
||||
impl<K, V> Default for RangeMap<K, V> {
|
||||
fn default() -> Self {
|
||||
Self { ranges: Default::default() }
|
||||
}
|
||||
}
|
||||
|
||||
#[cfg(test)]
|
||||
mod tests {
|
||||
use super::*;
|
||||
|
||||
#[test]
|
||||
fn basic() {
|
||||
let mut rm = RangeMap::default();
|
||||
|
||||
for (i, (insert, expected)) in [
|
||||
(None, None ),
|
||||
(Some((5..10, 50)), Some((&5..&10, &50))),
|
||||
(None, None ),
|
||||
(Some((5..10, 50)), Some((&5..&10, &50))),
|
||||
(Some((1.. 5, 10)), Some((&1..& 5, &10))),
|
||||
(None , Some((&5..&10, &50))),
|
||||
(Some((1.. 3, 10)), Some((&1..& 3, &10))),
|
||||
(None , Some((&5..&10, &50))),
|
||||
(Some((1.. 7, 10)), Some((&1..& 7, &10))),
|
||||
(None , Some((&7..&10, &50))),
|
||||
(Some((1..10, 10)), Some((&1..&10, &10))),
|
||||
(None , None ),
|
||||
(Some((5..10, 50)), Some((&5..&10, &50))),
|
||||
(Some((1..12, 10)), Some((&1..&12, &10))),
|
||||
(None , None ),
|
||||
(Some((5..10, 50)), Some((&5..&10, &50))),
|
||||
].into_iter().enumerate() {
|
||||
match insert {
|
||||
Some((k, v)) => rm.insert(k, v),
|
||||
None => rm.drop_first(),
|
||||
}
|
||||
rm.check_invariants();
|
||||
assert_eq!(rm.peek_first(), expected, "Failed at operation {}", i);
|
||||
}
|
||||
}
|
||||
|
||||
#[test]
|
||||
fn generated() {
|
||||
let mut rm = RangeMap::<usize, Result<(), bool>>::default();
|
||||
rm.insert(59..59+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(80..80+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(100..100+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(86..86+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(98..98+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(70..70+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(39..39+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(35..35+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(89..89+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(94..94+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(77..77+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(82..82+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(6..6+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(15..15+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(5..5+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(8..8+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(10..10+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(17..17+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(25..25+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(44..44+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(43..43+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(55..55+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(63..63+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(24..24+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(36..36+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(51..51+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(11..11+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(1..1+1, Ok(())); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
rm.insert(71..71+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(75..75+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(74..74+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(26..26+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(97..97+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(66..66+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(49..49+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(69..69+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(64..64+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(38..38+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(48..48+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(62..62+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(46..46+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(57..57+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(13..13+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(30..30+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(99..99+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(18..18+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(40..40+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(84..84+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(4..4+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(9..9+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(14..14+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(16..16+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(12..12+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(53..53+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(21..21+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(54..54+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(28..28+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(68..68+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(95..95+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(90..90+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(73..73+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(83..83+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(32..32+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(27..27+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(34..34+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(61..61+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(2..2+1, Ok(())); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
rm.insert(31..31+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(20..20+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(47..47+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(52..52+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(87..87+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(37..37+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(22..22+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(42..42+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(33..33+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(23..23+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(60..60+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(65..65+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(19..19+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(3..3+1, Ok(())); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
rm.insert(85..85+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(67..67+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(76..76+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(72..72+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(78..78+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(50..50+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(96..96+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(45..45+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(7..7+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(93..93+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(92..92+1, Ok(())); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
rm.insert(56..56+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(88..88+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(29..29+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(58..58+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(91..91+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(41..41+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(81..81+1, Ok(())); rm.check_invariants();
|
||||
rm.insert(79..79+1, Ok(())); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
rm.drop_first(); rm.check_invariants();
|
||||
}
|
||||
}
|
||||
Loading…
Reference in New Issue
Block a user