1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
use std::{borrow::Cow, str::FromStr};
use futures::StreamExt;
use lapin::{tcp::OwnedTLSConfig, uri::AMQPUri, Connection, ConnectionProperties, Consumer};
pub struct AmpqStream {
addr: Cow<'static, str>,
queue: Cow<'static, str>,
consumer_tag: Cow<'static, str>,
consumer: Option<Consumer>,
}
impl AmpqStream {
pub async fn new(
addr: Cow<'static, str>,
queue: Cow<'static, str>,
consumer_tag: Cow<'static, str>,
) -> Option<Self> {
Some(Self {
consumer: Some(make_ampq_consumer(&addr, &queue, &consumer_tag).await?),
addr,
queue,
consumer_tag,
})
}
pub async fn next(&mut self) -> Option<Vec<u8>> {
if self.consumer.is_none() {
self.consumer = make_ampq_consumer(&self.addr, &self.queue, &self.consumer_tag).await;
}
if let Some(consumer) = &mut self.consumer {
if let Some(Ok(delivery)) = consumer.next().await {
Some(delivery.data)
} else {
None
}
} else {
self.consumer = None;
None
}
}
}
async fn make_ampq_consumer(addr: &str, queue: &str, consumer_tag: &str) -> Option<Consumer> {
let connection = Connection::connect_uri_with_config(
AMQPUri::from_str(addr).ok()?,
#[cfg(unix)]
ConnectionProperties::default()
.with_executor(tokio_executor_trait::Tokio::current())
.with_reactor(tokio_reactor_trait::Tokio),
#[cfg(not(unix))]
ConnectionProperties::default().with_executor(tokio_executor_trait::Tokio::current()),
OwnedTLSConfig::default(),
)
.await
.ok()?;
let consumer = connection
.create_channel()
.await
.ok()?
.basic_consume(
queue,
consumer_tag,
lapin::options::BasicConsumeOptions {
no_ack: true, ..Default::default()
},
lapin::types::FieldTable::default(),
)
.await
.ok()?;
Some(consumer)
}