1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::*;
use serde::Serialize;
use axum::{
body::Full,
extract::{
ws::{Message, WebSocketUpgrade},
Extension, Path, TypedHeader,
},
http::StatusCode,
response::IntoResponse,
Json,
};
use eiffelvis_core::domain::{
types::LeanEvent,
user_queries::{Query, TrackedQuery},
};
use hyper::Response;
pub(crate) fn make_service<T: EiffelVisHttpApp>(app: App<T>) -> Router {
Router::new()
.route("/", get(event_dump::<T>))
.route("/get_event/:id", get(get_event::<T>))
.route("/events_with_root/:id", get(events_with_root::<T>))
.route("/ws", get(establish_websocket::<T>))
.layer(CorsLayer::new().allow_origin(Any).allow_methods(Any))
.layer(Extension(app))
}
pub async fn event_dump<T: EiffelVisHttpApp>(
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
let dump = lk.dump::<&BaseEvent>();
Json(&dump).into_response()
}
pub async fn get_event<T: EiffelVisHttpApp>(
Path(find_id): Path<Uuid>,
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
if let Some(event) = lk.get_event(find_id) {
Json(event).into_response()
} else {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Full::default())
.unwrap()
.into_response()
}
}
pub async fn events_with_root<T: EiffelVisHttpApp>(
Path(find_id): Path<Uuid>,
Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
let lk = app.read().await;
Json(lk.get_subgraph_with_roots::<&BaseEvent>(&[find_id])).into_response()
}
#[derive(Debug, Clone, Serialize)]
struct QueryRes {
repr: String,
error: Option<String>,
}
pub async fn establish_websocket<T: EiffelVisHttpApp>(
Extension(app): Extension<App<T>>,
ws: WebSocketUpgrade,
user_agent: Option<TypedHeader<headers::UserAgent>>,
) -> impl IntoResponse {
if let Some(TypedHeader(user_agent)) = user_agent {
println!("`{}` connected to websocket", user_agent.as_str());
}
ws.on_upgrade(move |mut socket| async move {
let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));
let mut req_handler: Option<TrackedQuery<_>> = None;
while let Ok(()) = tokio::select! {
usr = socket.recv() => {
match usr {
Some(Ok(Message::Text(ref msg))) => {
let res = match serde_json::from_str::<Query>(msg) {
Ok(rq) => {
req_handler = Some(TrackedQuery::new(rq));
None
},
Err(err) => Some(format!("{}", err))
};
let res = QueryRes { repr: msg.clone(), error: res };
println!("Request {:?}", res);
socket.send(Message::Text(serde_json::to_string(&res).unwrap())).await.map_err(|_| ())
},
_ => Err(())
}
},
_ = interval.tick() => {
if let Some(handler) = req_handler.as_mut() {
let events: Vec<LeanEvent> = handler.handle(&*app.read().await);
if !events.is_empty() {
socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ())
} else {
Ok(())
}
} else {
Ok(())
}
}
} {}
println!("Client disconnected");
})
}