summaryrefslogtreecommitdiffstats
path: root/src/lib.rs
diff options
context:
space:
mode:
Diffstat (limited to 'src/lib.rs')
-rw-r--r--src/lib.rs230
1 files changed, 169 insertions, 61 deletions
diff --git a/src/lib.rs b/src/lib.rs
index 4661784..b82099c 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -89,6 +89,8 @@ extern crate serde_urlencoded;
extern crate tap_reader;
extern crate try_from;
extern crate url;
+extern crate url1x;
+extern crate tungstenite;
#[cfg(feature = "env")]
extern crate envy;
@@ -105,12 +107,13 @@ extern crate indoc;
use std::{
borrow::Cow,
- io::{BufRead, BufReader},
+ io::BufRead,
ops,
};
use reqwest::{Client, RequestBuilder, Response};
use tap_reader::Tap;
+use tungstenite::client::AutoStream;
use entities::prelude::*;
use http_send::{HttpSend, HttpSender};
@@ -202,7 +205,7 @@ impl From<Data> for Mastodon<HttpSender> {
}
impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
- type Stream = EventReader<BufReader<Response>>;
+ type Stream = EventReader<WebSocket>;
paged_routes! {
(get) favourites: "favourites" => Status,
@@ -491,124 +494,229 @@ impl<H: HttpSend> MastodonClient<H> for Mastodon<H> {
/// # }
/// ```
fn streaming_user(&self) -> Result<Self::Stream> {
- let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "user");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// returns all public statuses
fn streaming_public(&self) -> Result<Self::Stream> {
- let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "public");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all local statuses
fn streaming_local(&self) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route("/api/v1/streaming/public/local")),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "public:local");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all public statuses for a particular hashtag
fn streaming_public_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "hashtag")
+ .append_pair("tag", hashtag);
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all local statuses for a particular hashtag
fn streaming_local_hashtag(&self, hashtag: &str) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "hashtag:local")
+ .append_pair("tag", hashtag);
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns statuses for a list
fn streaming_list(&self, list_id: &str) -> Result<Self::Stream> {
- let response = self.send(
- self.client
- .get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))),
- )?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "list")
+ .append_pair("list", list_id);
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
}
/// Returns all direct messages
fn streaming_direct(&self) -> Result<Self::Stream> {
- let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?;
- let reader = BufReader::new(response);
- Ok(EventReader(reader))
+ let mut url: url::Url = self.route("/api/v1/streaming").parse()?;
+ url.query_pairs_mut()
+ .append_pair("access_token", &self.token)
+ .append_pair("stream", "direct");
+ let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?;
+ let new_scheme = match url.scheme() {
+ "http" => "ws",
+ "https" => "wss",
+ x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
+ };
+ url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
+
+ let client = tungstenite::connect(url.as_str())?.0;
+
+ Ok(EventReader(WebSocket(client)))
+ }
+}
+
+#[derive(Debug)]
+/// WebSocket newtype so that EventStream can be implemented without coherency issues
+pub struct WebSocket(tungstenite::protocol::WebSocket<AutoStream>);
+
+/// A type that streaming events can be read from
+pub trait EventStream {
+ /// Read a message from this stream
+ fn read_message(&mut self) -> Result<String>;
+}
+
+impl<R: BufRead> EventStream for R {
+ fn read_message(&mut self) -> Result<String> {
+ let mut buf = String::new();
+ self.read_line(&mut buf)?;
+ Ok(buf)
+ }
+}
+
+impl EventStream for WebSocket {
+ fn read_message(&mut self) -> Result<String> {
+ Ok(self.0.read_message()?.into_text()?)
}
}
#[derive(Debug)]
/// Iterator that produces events from a mastodon streaming API event stream
-pub struct EventReader<R: BufRead>(R);
-impl<R: BufRead> Iterator for EventReader<R> {
+pub struct EventReader<R: EventStream>(R);
+impl<R: EventStream> Iterator for EventReader<R> {
type Item = Event;
fn next(&mut self) -> Option<Self::Item> {
let mut lines = Vec::new();
- let mut tmp = String::new();
loop {
- if let Ok(..) = self.0.read_line(&mut tmp) {
- let line = tmp.trim().to_string();
- tmp.clear();
- if line.starts_with(":") {
+ if let Ok(line) = self.0.read_message() {
+ let line = line.trim().to_string();
+ if line.starts_with(":") || line.is_empty() {
continue;
}
- if line.is_empty() && !lines.is_empty() {
- if let Ok(event) = self.make_event(&lines) {
- lines.clear();
- return Some(event);
- } else {
- continue;
- }
- }
lines.push(line);
+ if let Ok(event) = self.make_event(&lines) {
+ lines.clear();
+ return Some(event);
+ } else {
+ continue;
+ }
}
}
}
}
-impl<R: BufRead> EventReader<R> {
+impl<R: EventStream> EventReader<R> {
fn make_event(&self, lines: &[String]) -> Result<Event> {
- let event = lines
+ let event;
+ let data;
+ if let Some(event_line) = lines
.iter()
.find(|line| line.starts_with("event:"))
- .ok_or_else(|| Error::Other("No `event:` line".to_string()))?;
- let event = event[6..].trim();
- let data = lines.iter().find(|line| line.starts_with("data:"));
+ {
+ event = event_line[6..].trim().to_string();
+ data = lines.iter().find(|line| line.starts_with("data:")).map(|x| x[5..].trim().to_string());
+ } else {
+ #[derive(Deserialize)]
+ struct Message {
+ pub event: String,
+ pub payload: Option<String>,
+ }
+ let message = serde_json::from_str::<Message>(&lines[0])?;
+ event = message.event;
+ data = message.payload;
+ }
+ let event: &str = &event;
Ok(match event {
"notification" => {
let data = data.ok_or_else(|| {
Error::Other("Missing `data` line for notification".to_string())
})?;
- let data = data[5..].trim();
let notification = serde_json::from_str::<Notification>(&data)?;
Event::Notification(notification)
},
"update" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?;
- let data = data[5..].trim();
let status = serde_json::from_str::<Status>(&data)?;
Event::Update(status)
},
"delete" => {
let data =
data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?;
- let data = data[5..].trim().to_string();
Event::Delete(data)
},
"filters_changed" => Event::FiltersChanged,
@@ -668,7 +776,7 @@ impl<H: HttpSend> MastodonBuilder<H> {
pub struct MastodonUnauth<H: HttpSend = HttpSender> {
client: Client,
http_sender: H,
- base: url::Url,
+ base: reqwest::Url,
}
impl MastodonUnauth<HttpSender> {
@@ -682,13 +790,13 @@ impl MastodonUnauth<HttpSender> {
Ok(MastodonUnauth {
client: Client::new(),
http_sender: HttpSender,
- base: url::Url::parse(&base)?,
+ base: reqwest::Url::parse(&base)?,
})
}
}
impl<H: HttpSend> MastodonUnauth<H> {
- fn route(&self, url: &str) -> Result<url::Url> {
+ fn route(&self, url: &str) -> Result<reqwest::Url> {
Ok(self.base.join(url)?)
}