summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorleo60228 <leo@60228.dev>2020-05-25 09:03:28 -0400
committerPaul Woolcock <paul@woolcock.us>2020-06-08 14:46:16 -0400
commit791bc83387bbac4fb6ff46f4cf1153e37837b626 (patch)
treef95471d37512506d858ba72eb4929b76a540ba69
parentb877d7983116661a60de815aa0a22762847cbe78 (diff)
Use WebSockets for events
-rw-r--r--Cargo.toml4
-rw-r--r--src/errors.rs10
-rw-r--r--src/lib.rs230
-rw-r--r--src/page.rs2
-rw-r--r--src/registration.rs2
5 files changed, 184 insertions, 64 deletions
diff --git a/Cargo.toml b/Cargo.toml
index f18c2bf..4f03ffb 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -18,13 +18,15 @@ serde_derive = "1"
serde_json = "1"
serde_urlencoded = "0.6.1"
serde_qs = "0.4.5"
-url = "1"
tap-reader = "1"
try_from = "0.3.2"
toml = { version = "0.5.0", optional = true }
hyper-old-types = "0.11.0"
envy = { version = "0.4.0", optional = true }
log = "0.4.6"
+tungstenite = "0.10.1"
+url = "2.1.1"
+url1x = { version = "1", package = "url" }
[dependencies.chrono]
version = "0.4"
diff --git a/src/errors.rs b/src/errors.rs
index b48cb0c..65e5fcc 100644
--- a/src/errors.rs
+++ b/src/errors.rs
@@ -12,6 +12,8 @@ use tomlcrate::de::Error as TomlDeError;
#[cfg(feature = "toml")]
use tomlcrate::ser::Error as TomlSerError;
use url::ParseError as UrlError;
+use url1x::ParseError as ReqwestUrlError;
+use tungstenite::error::Error as WebSocketError;
/// Convience type over `std::result::Result` with `Error` as the error type.
pub type Result<T> = ::std::result::Result<T, Error>;
@@ -33,6 +35,8 @@ pub enum Error {
Io(IoError),
/// Wrapper around the `url::ParseError` struct.
Url(UrlError),
+ /// Wrapper around the `url::ParseError` struct.
+ ReqwestUrl(ReqwestUrlError),
/// Missing Client Id.
ClientIdRequired,
/// Missing Client Secret.
@@ -60,6 +64,8 @@ pub enum Error {
Envy(EnvyError),
/// Error serializing to a query string
SerdeQs(SerdeQsError),
+ /// WebSocket error
+ WebSocket(WebSocketError),
/// Other errors
Other(String),
}
@@ -79,6 +85,7 @@ impl error::Error for Error {
Error::Http(ref e) => e,
Error::Io(ref e) => e,
Error::Url(ref e) => e,
+ Error::ReqwestUrl(ref e) => e,
#[cfg(feature = "toml")]
Error::TomlSer(ref e) => e,
#[cfg(feature = "toml")]
@@ -88,6 +95,7 @@ impl error::Error for Error {
#[cfg(feature = "env")]
Error::Envy(ref e) => e,
Error::SerdeQs(ref e) => e,
+ Error::WebSocket(ref e) => e,
Error::Client(..) | Error::Server(..) => {
return None
@@ -138,6 +146,7 @@ from! {
SerdeError, Serde,
UrlEncodedError, UrlEncoded,
UrlError, Url,
+ ReqwestUrlError, ReqwestUrl,
ApiError, Api,
#[cfg(feature = "toml")] TomlSerError, TomlSer,
#[cfg(feature = "toml")] TomlDeError, TomlDe,
@@ -145,6 +154,7 @@ from! {
HeaderParseError, HeaderParseError,
#[cfg(feature = "env")] EnvyError, Envy,
SerdeQsError, SerdeQs,
+ WebSocketError, WebSocket,
String, Other,
}
diff --git a/src/lib.rs b/src/lib.rs
index 4661784..b82099c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -89,6 +89,8 @@ extern crate serde_urlencoded;
extern crate tap_reader;
extern crate try_from;
extern crate url;
+extern crate url1x;
+extern crate tungstenite;
#[cfg(feature = "env")]
extern crate envy;
@@ -105,12 +107,13 @@ extern crate indoc;
use std::{
borrow::Cow,
- io::{BufRead, BufReader},
+ io::BufRead,
ops,
};
use reqwest::{Client, RequestBuilder, Response};
use tap_reader::Tap;
+use tungstenite::client::AutoStream;
use entities::prelude::*;
use http_send::{HttpSend, HttpSender};
@@ -202,7 +205,7 @@ impl From<Data> for Mastodon<HttpSender> {
}
impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
- type Stream = EventReader<BufReader<Response>>;
+ type Stream = EventReader<WebSocket>;
paged_routes! {
(get) favourites: "favourites" => Status,
@@ -491,124 +494,229 @@ impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
/// # }
/// ```
fn streaming_user(&self) -> Result<Self::Stream> {
- let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "user");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// returns all public statuses
fn streaming_public(&self) -> Result<Self::Stream> {
- let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "public");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all local statuses
fn streaming_local(&self) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route("/api/v1/streaming/public/local")),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "public:local");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all public statuses for a particular hashtag
fn streaming_public_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "hashtag")
+ .append_pair("tag", hashtag);
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all local statuses for a particular hashtag
fn streaming_local_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "hashtag:local")
+ .append_pair("tag", hashtag);
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns statuses for a list
fn streaming_list(&self, list_id: &str) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "list")
+ .append_pair("list", list_id);
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all direct messages
fn streaming_direct(&self) -> Result<Self::Stream> {
- let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "direct");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
+ }
+}
+
+#[derive(Debug)]
+/// WebSocket newtype so that EventStream can be implemented without coherency issues
+pub struct WebSocket(tungstenite::protocol::WebSocket<AutoStream>);
+
+/// A type that streaming events can be read from
+pub trait EventStream {
+ /// Read a message from this stream
+ fn read_message(&mut self) -> Result<String>;
+}
+
+impl<R: BufRead> EventStream for R {
+ fn read_message(&mut self) -> Result<String> {
+ let mut buf = String::new();
+ self.read_line(&mut buf)?;
+ Ok(buf)
+ }
+}
+
+impl EventStream for WebSocket {
+ fn read_message(&mut self) -> Result<String> {
+ Ok(self.0.read_message()?.into_text()?)
}
}
#[derive(Debug)]
/// Iterator that produces events from a mastodon streaming API event stream
-pub struct EventReader<R: BufRead>(R);
-impl<R: BufRead> Iterator for EventReader<R> {
+pub struct EventReader<R: EventStream>(R);
+impl<R: EventStream> Iterator for EventReader<R> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
let mut lines = Vec::new();
- let mut tmp = String::new();
loop {
- if let Ok(..) = self.0.read_line(&mut tmp) {
- let line = tmp.trim().to_string();
- tmp.clear();
- if line.starts_with(":") {
+ if let Ok(line) = self.0.read_message() {
+ let line = line.trim().to_string();
+ if line.starts_with(":") || line.is_empty() {
continue;
}
- if line.is_empty() && !lines.is_empty() {
- if let Ok(event) = self.make_event(&lines) {
- lines.clear();
- return Some(event);
- } else {
- continue;
- }
- }
lines.push(line);
+ if let Ok(event) = self.make_event(&lines) {
+ lines.clear();
+ return Some(event);
+ } else {
+ continue;
+ }
}
}
}
}
-impl<R: BufRead> EventReader<R> {
+impl<R: EventStream> EventReader<R> {
fn make_event(&self, lines: &[String]) -> Result<Event> {
- let event = lines
+ let event;
+ let data;
+ if let Some(event_line) = lines
.iter()
.find(|line| line.starts_with("event:"))
- .ok_or_else(|| Error::Other("No `event:` line".to_string()))?;
- let event = event[6..].trim();
- let data = lines.iter().find(|line| line.starts_with("data:"));
+ {
+ event = event_line[6..].trim().to_string();
+ data = lines.iter().find(|line| line.starts_with("data:")).map(|x| x[5..].trim().to_string());
+ } else {
+ #[derive(Deserialize)]
+ struct Message {
+ pub event: String,
+ pub payload: Option<String>,
+ }
+ let message = serde_json::from_str::<Message>(&lines[0])?;
+ event = message.event;
+ data = message.payload;
+ }
+ let event: &str = &event;
Ok(match event {
"notification" => {
let data = data.ok_or_else(|| {
Error::Other("Missing `data` line for notification".to_string())
})?;
- let data = data[5..].trim();
let notification = serde_json::from_str::<Notification>(&data)?;
Event::Notification(notification)
},
"update" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?;
- let data = data[5..].trim();
let status = serde_json::from_str::<Status>(&data)?;
Event::Update(status)
},
"delete" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?;
- let data = data[5..].trim().to_string();
Event::Delete(data)
},
"filters_changed" => Event::FiltersChanged,
@@ -668,7 +776,7 @@ impl<H: HttpSend> MastodonBuilder<H> {
pub struct MastodonUnauth<H: HttpSend = HttpSender> {
client: Client,
http_sender: H,
- base: url::Url,
+ base: reqwest::Url,
}
impl MastodonUnauth<HttpSender> {
@@ -682,13 +790,13 @@ impl MastodonUnauth<HttpSender> {
Ok(MastodonUnauth {
client: Client::new(),
http_sender: HttpSender,
- base: url::Url::parse(&base)?,
+ base: reqwest::Url::parse(&base)?,
})
}
}
impl<H: HttpSend> MastodonUnauth<H> {
- fn route(&self, url: &str) -> Result<url::Url> {
+ fn route(&self, url: &str) -> Result<reqwest::Url> {
Ok(self.base.join(url)?)
}
diff --git a/src/page.rs b/src/page.rs
index 3a47ce6..09a2dca 100644
--- a/src/page.rs
+++ b/src/page.rs
@@ -3,7 +3,7 @@ use entities::itemsiter::ItemsIter;
use hyper_old_types::header::{parsing, Link, RelationType};
use reqwest::{header::LINK, Response};
use serde::Deserialize;
-use url::Url;
+use reqwest::Url;
use http_send::HttpSend;
diff --git a/src/registration.rs b/src/registration.rs
index 7aba7c5..8417b40 100644
--- a/src/registration.rs
+++ b/src/registration.rs
@@ -2,7 +2,7 @@ use std::borrow::Cow;
use reqwest::{Client, RequestBuilder, Response};
use try_from::TryInto;
-use url::percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET};
+use url1x::percent_encoding::{utf8_percent_encode, DEFAULT_ENCODE_SET};
use apps::{App, AppBuilder};
use http_send::{HttpSend, HttpSender};