summaryrefslogtreecommitdiffstats
path: root/src/helpers/read_response.rs
blob: 663bb3cc52181253faab301f6c2bd1b10ce5bb7d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
use std::time::Duration;

use crate::{errors::Result, Error};
use futures::pin_mut;
use futures_util::StreamExt;
use log::{debug, trace, warn};
use reqwest::Response;
use serde::{Deserialize, Serialize};
use tokio::time::timeout;

/// Adapter for reading JSON data from a response with better logging and a
/// fail-safe timeout.
///
/// The reason for this is largely because there was an issue with responses
/// being received, but not closed, we add a timeout on each read and try
/// to parse whatever we got before the timeout.
pub async fn read_response<T>(response: Response) -> Result<T>
where
    T: for<'de> Deserialize<'de> + Serialize,
{
    let mut bytes = vec![];
    let url = response.url().clone();
    let status = response.status();
    trace!(status:serde = crate::helpers::log::Status::from(&response), headers:serde = crate::helpers::log::Headers::from(&response); "attempting to stream response");
    let stream = response.bytes_stream();
    pin_mut!(stream);
    loop {
        if let Ok(data) = timeout(Duration::from_secs(10), stream.next()).await {
            // as of here, we did not time out
            let Some(data) = data else {
                break;
            };
            // as of here, we have not hit the end of the stream yet
            let data = data?;
            // as of here, we did not hit an error while reading the body
            bytes.extend_from_slice(&data);
            debug!(
                data = String::from_utf8_lossy(&data), url = url.as_str(),
                bytes_received_so_far = bytes.len();
                "data chunk received"
            );
        } else {
            warn!(
                url = url.as_str(),
                data_received = bytes.len();
                "API response timed out"
            );
            break;
        }
    }
    // done growing the vec, let's just do this once.
    let bytes = bytes.as_slice();
    trace!(
        url = url.as_str(),
        data = String::from_utf8_lossy(bytes);
        "parsing response"
    );
    if status.is_success() {
        // the the response should deserialize to T
        let result = serde_json::from_slice(bytes)?;
        debug!(
                url = url.as_str(),
            result:serde = result;
            "result parsed successfully"
        );
        Ok(result)
    } else {
        // we've received an error message, let's deserialize that instead.
        let response = serde_json::from_slice(bytes)?;
        debug!(status:? = status, response:serde = response; "error received from API");
        Err(Error::Api { status, response })
    }
}