Complete PoC
This commit is contained in:
commit
95f1b8622e
1
.gitignore
vendored
Normal file
1
.gitignore
vendored
Normal file
@ -0,0 +1 @@
|
|||||||
|
/target
|
||||||
1710
Cargo.lock
generated
Normal file
1710
Cargo.lock
generated
Normal file
File diff suppressed because it is too large
Load Diff
19
Cargo.toml
Normal file
19
Cargo.toml
Normal file
@ -0,0 +1,19 @@
|
|||||||
|
[package]
|
||||||
|
name = "lapin-test"
|
||||||
|
version = "0.1.0"
|
||||||
|
edition = "2021"
|
||||||
|
|
||||||
|
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
|
||||||
|
|
||||||
|
[dependencies]
|
||||||
|
anyhow = "1.0.79"
|
||||||
|
futures-lite = "2.2.0"
|
||||||
|
lapin = "2.3.1"
|
||||||
|
libc = "0.2.152"
|
||||||
|
rand = "0.8.5"
|
||||||
|
serde = "1.0.195"
|
||||||
|
serde_json = "1.0.111"
|
||||||
|
tokio = { version = "1.35.1", features = ["full"] }
|
||||||
|
tokio-executor-trait = "2.1.1"
|
||||||
|
tokio-reactor-trait = "1.1.0"
|
||||||
|
tokio-util = { version = "0.7.10", features = ["full"] }
|
||||||
84
src/main.rs
Normal file
84
src/main.rs
Normal file
@ -0,0 +1,84 @@
|
|||||||
|
#![feature(async_closure)]
|
||||||
|
|
||||||
|
mod processor;
|
||||||
|
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use anyhow::{anyhow, Result};
|
||||||
|
use lapin::{options::BasicPublishOptions, publisher_confirm::Confirmation, topology::{BindingDefinition, ExchangeDefinition, QueueDefinition, TopologyDefinition}, types::FieldTable, BasicProperties, Channel, Connection, ConnectionProperties};
|
||||||
|
use rand::{Rng, thread_rng};
|
||||||
|
use serde::{Deserialize, Serialize};
|
||||||
|
use tokio::{signal, time::sleep};
|
||||||
|
|
||||||
|
use crate::processor::Processor;
|
||||||
|
|
||||||
|
#[tokio::main]
|
||||||
|
async fn main() -> Result<()> {
|
||||||
|
let quit_signal = signal::ctrl_c();
|
||||||
|
|
||||||
|
let conn = broker_connect().await?;
|
||||||
|
let _ = conn.restore(TopologyDefinition {
|
||||||
|
exchanges: vec![
|
||||||
|
ExchangeDefinition { name: "hello".into(), kind: None, options: None, arguments: None, bindings: vec![] },
|
||||||
|
],
|
||||||
|
queues: (0..10).map(|i| QueueDefinition { name: format!("hello-{}", i).into(), options: None, arguments: None, bindings: vec![BindingDefinition { source: "hello".into(), routing_key: "".into(), arguments: FieldTable::default() }] }).collect(),
|
||||||
|
channels: vec![],
|
||||||
|
}).await?;
|
||||||
|
|
||||||
|
tokio::try_join!(
|
||||||
|
async {
|
||||||
|
let mut proc = Processor::new(broker_connect().await?, 10_000).await?;
|
||||||
|
for i in 0..10 {
|
||||||
|
proc.listen(&format!("hello-{}", i), &format!("processor-{}", i), move |data| process(data, i)).await?;
|
||||||
|
}
|
||||||
|
quit_signal.await.unwrap();
|
||||||
|
proc.shutdown().await
|
||||||
|
},
|
||||||
|
async {
|
||||||
|
let chan = conn.create_channel().await?;
|
||||||
|
sender(conn, chan).await
|
||||||
|
},
|
||||||
|
)?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn broker_connect() -> lapin::Result<Connection> {
|
||||||
|
Connection::connect(
|
||||||
|
"amqp://localhost",
|
||||||
|
ConnectionProperties::default()
|
||||||
|
.with_executor(tokio_executor_trait::Tokio::current())
|
||||||
|
.with_reactor(tokio_reactor_trait::Tokio),
|
||||||
|
).await
|
||||||
|
}
|
||||||
|
|
||||||
|
#[derive(Serialize, Deserialize)]
|
||||||
|
struct Test { int: usize }
|
||||||
|
|
||||||
|
async fn sender(conn: Connection, chan: Channel) -> lapin::Result<()> {
|
||||||
|
for int in 0..100 {
|
||||||
|
println!("Sending {}", int);
|
||||||
|
let payload = Test { int };
|
||||||
|
let confirm = chan.basic_publish(
|
||||||
|
"hello",
|
||||||
|
"",
|
||||||
|
BasicPublishOptions::default(),
|
||||||
|
&serde_json::to_vec(&payload).unwrap(),
|
||||||
|
BasicProperties::default(),
|
||||||
|
).await?.await?;
|
||||||
|
assert_eq!(confirm, Confirmation::NotRequested);
|
||||||
|
}
|
||||||
|
conn.close(0, "").await?;
|
||||||
|
println!("Done sending");
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
async fn process(test: Test, i: usize) -> Result<()> {
|
||||||
|
let (dur, fail) = {
|
||||||
|
let mut rng = thread_rng();
|
||||||
|
(Duration::from_millis(rng.gen_range(1000..10000)), rng.gen_bool(0.1))
|
||||||
|
};
|
||||||
|
sleep(dur).await;
|
||||||
|
if fail { println!("{} Fail {} {:?}", i, test.int, dur); }
|
||||||
|
if fail { Err(anyhow!("oops")) } else { Ok(()) }
|
||||||
|
}
|
||||||
79
src/processor.rs
Normal file
79
src/processor.rs
Normal file
@ -0,0 +1,79 @@
|
|||||||
|
use std::future::Future;
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use futures_lite::prelude::*;
|
||||||
|
use lapin::{options::{BasicAckOptions, BasicCancelOptions, BasicConsumeOptions, BasicNackOptions, BasicQosOptions}, types::{FieldTable, ShortString, ShortUInt}, Channel, Connection};
|
||||||
|
use serde::de::DeserializeOwned;
|
||||||
|
use tokio::task::JoinSet;
|
||||||
|
use tokio_util::task::TaskTracker;
|
||||||
|
|
||||||
|
|
||||||
|
pub struct Processor {
|
||||||
|
conn: Connection,
|
||||||
|
chan: Channel,
|
||||||
|
cons_tags: Vec<ShortString>,
|
||||||
|
join_set: JoinSet<lapin::Result<()>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Processor {
|
||||||
|
pub async fn new(conn: Connection, prefetch_count: ShortUInt) -> lapin::Result<Self> {
|
||||||
|
let chan = conn.create_channel().await?;
|
||||||
|
chan.basic_qos(prefetch_count, BasicQosOptions::default()).await?;
|
||||||
|
Ok(Self {
|
||||||
|
conn,
|
||||||
|
chan,
|
||||||
|
cons_tags: vec![],
|
||||||
|
join_set: JoinSet::new(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn listen<T: DeserializeOwned, F: 'static + Send + Future<Output=Result<()>>>(
|
||||||
|
&mut self,
|
||||||
|
queue: &str,
|
||||||
|
consumer_tag: &str,
|
||||||
|
f: impl 'static + Send + Fn(T) -> F,
|
||||||
|
) -> lapin::Result<()> {
|
||||||
|
let mut consumer = self.chan.basic_consume(
|
||||||
|
queue,
|
||||||
|
consumer_tag,
|
||||||
|
BasicConsumeOptions::default(),
|
||||||
|
FieldTable::default()
|
||||||
|
).await?;
|
||||||
|
self.cons_tags.push(consumer.tag());
|
||||||
|
|
||||||
|
self.join_set.spawn(async move {
|
||||||
|
let tracker = TaskTracker::new();
|
||||||
|
while let Some(delivery) = consumer.try_next().await? {
|
||||||
|
if let Ok(data) = serde_json::from_slice(&delivery.data) {
|
||||||
|
let fut = f(data);
|
||||||
|
tracker.spawn(async move {
|
||||||
|
match fut.await {
|
||||||
|
Ok(()) => delivery.ack(BasicAckOptions::default()).await,
|
||||||
|
Err(_) => delivery.nack(BasicNackOptions { multiple: false, requeue: true }).await,
|
||||||
|
}
|
||||||
|
});
|
||||||
|
} else {
|
||||||
|
tracker.spawn(async move { delivery.nack(BasicNackOptions::default()).await });
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tracker.close();
|
||||||
|
tracker.wait().await;
|
||||||
|
lapin::Result::Ok(())
|
||||||
|
});
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn shutdown(mut self) -> lapin::Result<()> {
|
||||||
|
for cons_tag in self.cons_tags {
|
||||||
|
let chan = self.chan.clone(); // Required because of 'static on tokio::spawn. Might want to switch to join_all or just do serially
|
||||||
|
self.join_set.spawn(async move {
|
||||||
|
chan.basic_cancel(cons_tag.as_str(), BasicCancelOptions::default()).await
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
while let Some(res) = self.join_set.join_next().await {
|
||||||
|
res.unwrap()?;
|
||||||
|
}
|
||||||
|
self.conn.close(0, "").await
|
||||||
|
}
|
||||||
|
}
|
||||||
Loading…
Reference in New Issue
Block a user