1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
use std::{sync::Arc, time::Duration};
use clap::Parser;
use eiffelvis_core::{domain::app::EiffelVisApp, graph_storage::ChunkedGraph};
use tracing::info;
#[derive(Parser, Debug)]
struct Cli {
#[clap(short, long, default_value = "127.0.0.1")]
address: String,
#[clap(short, long, default_value = "3001")]
port: u16,
#[clap(short = 'r', long, default_value = "amqp://localhost:5672/%2f")]
rmq_uri: String,
#[clap(short = 'q', long, default_value = "hello")]
rmq_queue: String,
#[clap(short = 't', long, default_value = "3001")]
timeout: u64,
#[clap(long, default_value = "8")]
max_chunks: usize,
#[clap(long, default_value = "128")]
chunk_size: u32,
#[clap(long)]
tls_cert: Option<String>,
#[clap(long)]
tls_key: Option<String>,
}
#[tokio::main]
async fn main() {
if std::env::var("RUST_LOG").is_err() {
std::env::set_var("RUST_LOG", "info");
}
tracing_subscriber::fmt::init();
let cli = Cli::from_args();
let graph = Arc::new(tokio::sync::RwLock::new(ChunkedGraph::new(
cli.max_chunks,
cli.chunk_size,
)));
let http_server_handle = eiffelvis_http::Handle::new();
let http_server = tokio::spawn(eiffelvis_http::app(
graph.clone(),
cli.address.parse().unwrap(),
cli.port,
http_server_handle.clone(),
cli.tls_cert.zip(cli.tls_key),
));
let mut event_parser = eiffelvis_stream::ampq::AmpqStream::new(
cli.rmq_uri.into(),
cli.rmq_queue.into(),
"eiffelvis".into(),
)
.await
.expect("Failed to connect to ampq server");
let timeout = cli.timeout;
let event_parser = tokio::spawn(async move {
loop {
if let Some(bytes) = event_parser.next().await {
if let Ok(des) = serde_json::from_slice(&bytes) {
EiffelVisApp::push(&mut *graph.write().await, des);
} else {
info!("Received new message but failed to deserialize");
}
} else {
info!("Event stream failed, sleeping for 5 seconds to retry");
tokio::time::sleep(Duration::from_secs(timeout)).await;
}
}
});
tokio::spawn(async move {
shutdown_signal().await;
http_server_handle.graceful_shutdown(None);
});
tokio::select! {
res = event_parser => res.unwrap(),
res = http_server => res.unwrap().unwrap(),
};
}
#[doc(hidden)]
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install CTRL+C signal handler");
}