diff options
author | D. Scott Boggs <scott@tams.tech> | 2022-12-18 16:07:31 -0500 |
---|---|---|
committer | D. Scott Boggs <scott@tams.tech> | 2022-12-18 16:07:31 -0500 |
commit | c8cefab6bcafa35973bb4f269ffd2f20870889ed (patch) | |
tree | 87cf3e6bc896d460b010701c403fe248f62149eb | |
parent | c811f42054fdf322bc0baefeb0af335176a89af0 (diff) |
Switch to tokio-tungstenite for async websocketsfeature/async-websocket
-rw-r--r-- | Cargo.toml | 5 | ||||
-rw-r--r-- | src/errors.rs | 2 | ||||
-rw-r--r-- | src/event_stream.rs | 82 | ||||
-rw-r--r-- | src/mastodon.rs | 33 |
4 files changed, 62 insertions, 60 deletions
@@ -20,12 +20,15 @@ serde_json = "1" serde_qs = "0.4.5" serde_urlencoded = "0.6.1" tap-reader = "1" -tungstenite = "0.18" url = "1" # Provides parsing for the link header in get_links() in page.rs hyper-old-types = "0.11.0" futures-util = "0.3.25" +[dependencies.tokio-tungstenite] +version = "0.18.0" +features = ["native-tls"] + [dependencies.uuid] version = "1.2.2" features = ["v4"] diff --git a/src/errors.rs b/src/errors.rs index 754d274..bb5bd21 100644 --- a/src/errors.rs +++ b/src/errors.rs @@ -8,11 +8,11 @@ use serde::Deserialize; use serde_json::Error as SerdeError; use serde_qs::Error as SerdeQsError; use serde_urlencoded::ser::Error as UrlEncodedError; +use tokio_tungstenite::tungstenite::{error::Error as WebSocketError, Message as WebSocketMessage}; #[cfg(feature = "toml")] use tomlcrate::de::Error as TomlDeError; #[cfg(feature = "toml")] use tomlcrate::ser::Error as TomlSerError; -use tungstenite::{error::Error as WebSocketError, Message as WebSocketMessage}; use url::ParseError as UrlError; /// Convience type over `std::result::Result` with `Error` as the error type. diff --git a/src/event_stream.rs b/src/event_stream.rs index 611b194..b3fb86c 100644 --- a/src/event_stream.rs +++ b/src/event_stream.rs @@ -3,59 +3,73 @@ use crate::{ errors::Result, Error, }; -use futures::{stream::try_unfold, TryStream}; +use futures::{stream::try_unfold, SinkExt, StreamExt, TryStream}; use log::{as_debug, as_serde, debug, error, info, trace}; -use tungstenite::Message; +use tokio::io::{AsyncRead, AsyncWrite}; +use tokio_tungstenite::tungstenite::Message; +// use tokio_tungstenite::; /// Returns a stream of events at the given url location. -pub fn event_stream( +pub async fn connect_to_event_stream( location: String, ) -> Result<impl TryStream<Ok = Event, Error = Error, Item = Result<Event>>> { trace!(location = location; "connecting to websocket for events"); - let (client, response) = tungstenite::connect(&location)?; + let (client, response) = tokio_tungstenite::connect_async(&location).await?; let status = response.status(); - if !status.is_success() { + if status != 101 { error!( - status = as_debug!(status), - body = response.body().as_ref().map(|it| String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()), - location = &location; - "error connecting to websocket" - ); + status = as_debug!(status), + body = response.body().as_ref().map(|it| + String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()), + location = &location; + "error connecting to websocket" + ); return Err(Error::Api(crate::ApiError { error: status.canonical_reason().map(String::from), error_description: None, })); } debug!(location = &location, status = as_debug!(status); "successfully connected to websocket"); - Ok(try_unfold((client, location), |mut this| async move { - let (ref mut client, ref location) = this; + Ok(event_stream(client)) +} + +/// Stream mastodon events from the given websocket connection +pub fn event_stream<S: AsyncRead + AsyncWrite + Unpin>( + client: tokio_tungstenite::WebSocketStream<S>, +) -> impl TryStream<Ok = Event, Error = Error, Item = Result<Event>> { + try_unfold(client, |mut this| async move { let mut lines = vec![]; loop { - match client.read_message() { - Ok(Message::Text(line)) => { - debug!(message = line, location = &location; "received websocket message"); - let line = line.trim().to_string(); - if line.starts_with(":") || line.is_empty() { + if let Some(message) = this.next().await { + match message { + Ok(Message::Text(line)) => { + debug!(message = line; "received websocket message"); + let line = line.trim().to_string(); + if line.starts_with(":") || line.is_empty() { + continue; + } + lines.push(line); + if let Ok(event) = make_event(&lines) { + info!(event = as_serde!(event); "received websocket event"); + lines.clear(); + return Ok(Some((event, this))); + } else { + continue; + } + }, + Ok(Message::Ping(data)) => { + debug!(data = String::from_utf8_lossy(data.as_slice()); "received ping, ponging"); + this.send(Message::Pong(data)).await?; continue; - } - lines.push(line); - if let Ok(event) = make_event(&lines) { - info!(event = as_serde!(event), location = location; "received websocket event"); - lines.clear(); - return Ok(Some((event, this))); - } else { - continue; - } - }, - Ok(Message::Ping(data)) => { - debug!(metadata = as_serde!(data); "received ping, ponging"); - client.write_message(Message::Pong(data))?; - }, - Ok(message) => return Err(message.into()), - Err(err) => return Err(err.into()), + }, + Ok(message) => return Err(message.into()), + Err(err) => return Err(err.into()), + } + } else { + return Ok(None); } } - })) + }) } fn make_event(lines: &[String]) -> Result<Event> { diff --git a/src/mastodon.rs b/src/mastodon.rs index 1b3ee0a..d05074f 100644 --- a/src/mastodon.rs +++ b/src/mastodon.rs @@ -9,7 +9,8 @@ use crate::{ Empty, }, errors::{Error, Result}, - event_stream::event_stream, + event_stream::connect_to_event_stream, + format_err, helpers::read_response::read_response, log_serde, AddFilterRequest, @@ -22,7 +23,7 @@ use crate::{ UpdatePushRequest, }; use futures::TryStream; -use log::{as_debug, as_serde, debug, error, info, trace}; +use log::{as_debug, as_serde, debug, error, trace}; use reqwest::{Client, RequestBuilder}; use url::Url; use uuid::Uuid; @@ -394,29 +395,13 @@ impl Mastodon { /// }); /// ``` pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error = Error>> { - let call_id = Uuid::new_v4(); - let mut url: Url = self.route("/api/v1/streaming").parse()?; + let mut url: Url = self.route("/api/v1/streaming/user").parse()?; + url.set_scheme("wss") + .map_err(|_| format_err!("error setting URL scheme"))?; url.query_pairs_mut() - .append_pair("access_token", &self.data.token) - .append_pair("stream", "user"); - debug!( - url = url.as_str(), call_id = as_debug!(call_id); - "making user streaming API request" - ); - let response = reqwest::get(url.as_str()).await?; - let mut url: Url = response.url().as_str().parse()?; - info!( - url = url.as_str(), call_id = as_debug!(call_id), - status = response.status().as_str(); - "received url from streaming API request" - ); - let new_scheme = match url.scheme() { - "http" => "ws", - "https" => "wss", - x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), - }; - url.set_scheme(new_scheme) - .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + .append_pair("access_token", &self.data.token); + connect_to_event_stream(url.into_string()).await + } /// Set the bearer authentication token fn authenticated(&self, request: RequestBuilder) -> RequestBuilder { |