1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
use crate::*;
use serde::Serialize;

use axum::{
    body::Full,
    extract::{
        ws::{Message, WebSocketUpgrade},
        Extension, Path, TypedHeader,
    },
    http::StatusCode,
    response::IntoResponse,
    Json,
};

use eiffelvis_core::domain::{
    types::LeanEvent,
    user_queries::{Query, TrackedQuery},
};
use hyper::Response;

pub(crate) fn make_service<T: EiffelVisHttpApp>(app: App<T>) -> Router {
    Router::new()
        .route("/", get(event_dump::<T>))
        .route("/get_event/:id", get(get_event::<T>))
        .route("/events_with_root/:id", get(events_with_root::<T>))
        .route("/ws", get(establish_websocket::<T>))
        .layer(CorsLayer::new().allow_origin(Any).allow_methods(Any))
        .layer(Extension(app))
}

/// Dumps the entire event store into a json array
pub async fn event_dump<T: EiffelVisHttpApp>(
    Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
    let lk = app.read().await;

    let dump = lk.dump::<&BaseEvent>();

    Json(&dump).into_response()
}

/// Returns full event that belongs to given uuid
pub async fn get_event<T: EiffelVisHttpApp>(
    Path(find_id): Path<Uuid>,
    Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
    let lk = app.read().await;
    if let Some(event) = lk.get_event(find_id) {
        Json(event).into_response()
    } else {
        Response::builder()
            .status(StatusCode::NOT_FOUND)
            .body(Full::default())
            .unwrap()
            .into_response()
    }
}

/// Returns the sub-graph for given id
pub async fn events_with_root<T: EiffelVisHttpApp>(
    Path(find_id): Path<Uuid>,
    Extension(app): Extension<App<T>>,
) -> impl IntoResponse {
    let lk = app.read().await;
    Json(lk.get_subgraph_with_roots::<&BaseEvent>(&[find_id])).into_response()
}

#[derive(Debug, Clone, Serialize)]
struct QueryRes {
    repr: String,
    error: Option<String>,
}

/// Establishes a websocket with the client,
/// [Query] in json format is expected to be send by the client
/// Backend will then use a [TrackedQuery] to gradually send results.
pub async fn establish_websocket<T: EiffelVisHttpApp>(
    Extension(app): Extension<App<T>>,
    ws: WebSocketUpgrade,
    user_agent: Option<TypedHeader<headers::UserAgent>>,
) -> impl IntoResponse {
    if let Some(TypedHeader(user_agent)) = user_agent {
        println!("`{}` connected to websocket", user_agent.as_str());
    }

    ws.on_upgrade(move |mut socket| async move {
        let mut interval = tokio::time::interval(std::time::Duration::from_millis(500));

        let mut req_handler: Option<TrackedQuery<_>> = None;

        while let Ok(()) = tokio::select! {
            usr = socket.recv() => {
                match usr {
                    Some(Ok(Message::Text(ref msg))) => {
                        let res = match serde_json::from_str::<Query>(msg) {
                            Ok(rq) => {
                                req_handler = Some(TrackedQuery::new(rq));
                                None
                            },
                            Err(err) => Some(format!("{}", err))
                        };
                        let res = QueryRes { repr: msg.clone(), error: res };
                        println!("Request {:?}", res);
                        socket.send(Message::Text(serde_json::to_string(&res).unwrap())).await.map_err(|_| ())
                    },
                    _ => Err(())
                }
            },
            _ = interval.tick() => {
                if let Some(handler) = req_handler.as_mut() {
                    let events: Vec<LeanEvent> = handler.handle(&*app.read().await);
                    if !events.is_empty() {
                        socket.send(Message::Text(serde_json::to_string(&events).unwrap())).await.map_err(|_| ())
                    } else {
                        Ok(())
                    }
                } else {
                    Ok(())
                }
            }
        } {}

        println!("Client disconnected");
    })
}