summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorD. Scott Boggs <scott@tams.tech>2022-12-18 16:07:31 -0500
committerD. Scott Boggs <scott@tams.tech>2022-12-18 16:07:31 -0500
commitc8cefab6bcafa35973bb4f269ffd2f20870889ed (patch)
tree87cf3e6bc896d460b010701c403fe248f62149eb
parentc811f42054fdf322bc0baefeb0af335176a89af0 (diff)
Switch to tokio-tungstenite for async websocketsfeature/async-websocket
-rw-r--r--Cargo.toml5
-rw-r--r--src/errors.rs2
-rw-r--r--src/event_stream.rs82
-rw-r--r--src/mastodon.rs33
4 files changed, 62 insertions, 60 deletions
diff --git a/Cargo.toml b/Cargo.toml
index 138f53f..467f646 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -20,12 +20,15 @@ serde_json = "1"
serde_qs = "0.4.5"
serde_urlencoded = "0.6.1"
tap-reader = "1"
-tungstenite = "0.18"
url = "1"
# Provides parsing for the link header in get_links() in page.rs
hyper-old-types = "0.11.0"
futures-util = "0.3.25"
+[dependencies.tokio-tungstenite]
+version = "0.18.0"
+features = ["native-tls"]
+
[dependencies.uuid]
version = "1.2.2"
features = ["v4"]
diff --git a/src/errors.rs b/src/errors.rs
index 754d274..bb5bd21 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -8,11 +8,11 @@ use serde::Deserialize;
use serde_json::Error as SerdeError;
use serde_qs::Error as SerdeQsError;
use serde_urlencoded::ser::Error as UrlEncodedError;
+use tokio_tungstenite::tungstenite::{error::Error as WebSocketError, Message as WebSocketMessage};
#[cfg(feature = "toml")]
use tomlcrate::de::Error as TomlDeError;
#[cfg(feature = "toml")]
use tomlcrate::ser::Error as TomlSerError;
-use tungstenite::{error::Error as WebSocketError, Message as WebSocketMessage};
use url::ParseError as UrlError;
/// Convience type over `std::result::Result` with `Error` as the error type.
diff --git a/src/event_stream.rs b/src/event_stream.rs
index 611b194..b3fb86c 100644
--- a/src/event_stream.rs
+++ b/src/event_stream.rs
@@ -3,59 +3,73 @@ use crate::{
errors::Result,
Error,
};
-use futures::{stream::try_unfold, TryStream};
+use futures::{stream::try_unfold, SinkExt, StreamExt, TryStream};
use log::{as_debug, as_serde, debug, error, info, trace};
-use tungstenite::Message;
+use tokio::io::{AsyncRead, AsyncWrite};
+use tokio_tungstenite::tungstenite::Message;
+// use tokio_tungstenite::;
/// Returns a stream of events at the given url location.
-pub fn event_stream(
+pub async fn connect_to_event_stream(
location: String,
) -> Result<impl TryStream<Ok = Event, Error = Error, Item = Result<Event>>> {
trace!(location = location; "connecting to websocket for events");
- let (client, response) = tungstenite::connect(&location)?;
+ let (client, response) = tokio_tungstenite::connect_async(&location).await?;
let status = response.status();
- if !status.is_success() {
+ if status != 101 {
error!(
- status = as_debug!(status),
- body = response.body().as_ref().map(|it| String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()),
- location = &location;
- "error connecting to websocket"
- );
+ status = as_debug!(status),
+ body = response.body().as_ref().map(|it|
+ String::from_utf8_lossy(it.as_slice())).unwrap_or("(empty body)".into()),
+ location = &location;
+ "error connecting to websocket"
+ );
return Err(Error::Api(crate::ApiError {
error: status.canonical_reason().map(String::from),
error_description: None,
}));
}
debug!(location = &location, status = as_debug!(status); "successfully connected to websocket");
- Ok(try_unfold((client, location), |mut this| async move {
- let (ref mut client, ref location) = this;
+ Ok(event_stream(client))
+}
+
+/// Stream mastodon events from the given websocket connection
+pub fn event_stream<S: AsyncRead + AsyncWrite + Unpin>(
+ client: tokio_tungstenite::WebSocketStream<S>,
+) -> impl TryStream<Ok = Event, Error = Error, Item = Result<Event>> {
+ try_unfold(client, |mut this| async move {
let mut lines = vec![];
loop {
- match client.read_message() {
- Ok(Message::Text(line)) => {
- debug!(message = line, location = &location; "received websocket message");
- let line = line.trim().to_string();
- if line.starts_with(":") || line.is_empty() {
+ if let Some(message) = this.next().await {
+ match message {
+ Ok(Message::Text(line)) => {
+ debug!(message = line; "received websocket message");
+ let line = line.trim().to_string();
+ if line.starts_with(":") || line.is_empty() {
+ continue;
+ }
+ lines.push(line);
+ if let Ok(event) = make_event(&lines) {
+ info!(event = as_serde!(event); "received websocket event");
+ lines.clear();
+ return Ok(Some((event, this)));
+ } else {
+ continue;
+ }
+ },
+ Ok(Message::Ping(data)) => {
+ debug!(data = String::from_utf8_lossy(data.as_slice()); "received ping, ponging");
+ this.send(Message::Pong(data)).await?;
continue;
- }
- lines.push(line);
- if let Ok(event) = make_event(&lines) {
- info!(event = as_serde!(event), location = location; "received websocket event");
- lines.clear();
- return Ok(Some((event, this)));
- } else {
- continue;
- }
- },
- Ok(Message::Ping(data)) => {
- debug!(metadata = as_serde!(data); "received ping, ponging");
- client.write_message(Message::Pong(data))?;
- },
- Ok(message) => return Err(message.into()),
- Err(err) => return Err(err.into()),
+ },
+ Ok(message) => return Err(message.into()),
+ Err(err) => return Err(err.into()),
+ }
+ } else {
+ return Ok(None);
}
}
- }))
+ })
}
fn make_event(lines: &[String]) -> Result<Event> {
diff --git a/src/mastodon.rs b/src/mastodon.rs
index 1b3ee0a..d05074f 100644
--- a/src/mastodon.rs
+++ b/src/mastodon.rs
@@ -9,7 +9,8 @@ use crate::{
Empty,
},
errors::{Error, Result},
- event_stream::event_stream,
+ event_stream::connect_to_event_stream,
+ format_err,
helpers::read_response::read_response,
log_serde,
AddFilterRequest,
@@ -22,7 +23,7 @@ use crate::{
UpdatePushRequest,
};
use futures::TryStream;
-use log::{as_debug, as_serde, debug, error, info, trace};
+use log::{as_debug, as_serde, debug, error, trace};
use reqwest::{Client, RequestBuilder};
use url::Url;
use uuid::Uuid;
@@ -394,29 +395,13 @@ impl Mastodon {
/// });
/// ```
pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error = Error>> {
- let call_id = Uuid::new_v4();
- let mut url: Url = self.route("/api/v1/streaming").parse()?;
+ let mut url: Url = self.route("/api/v1/streaming/user").parse()?;
+ url.set_scheme("wss")
+ .map_err(|_| format_err!("error setting URL scheme"))?;
url.query_pairs_mut()
- .append_pair("access_token", &self.data.token)
- .append_pair("stream", "user");
- debug!(
- url = url.as_str(), call_id = as_debug!(call_id);
- "making user streaming API request"
- );
- let response = reqwest::get(url.as_str()).await?;
- let mut url: Url = response.url().as_str().parse()?;
- info!(
- url = url.as_str(), call_id = as_debug!(call_id),
- status = response.status().as_str();
- "received url from streaming API request"
- );
- let new_scheme = match url.scheme() {
- "http" => "ws",
- "https" => "wss",
- x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
- };
- url.set_scheme(new_scheme)
- .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+ .append_pair("access_token", &self.data.token);
+ connect_to_event_stream(url.into_string()).await
+ }
/// Set the bearer authentication token
fn authenticated(&self, request: RequestBuilder) -> RequestBuilder {