diff options
Diffstat (limited to 'src/lib.rs')
-rw-r--r-- | src/lib.rs | 230 |
1 files changed, 169 insertions, 61 deletions
@@ -89,6 +89,8 @@ extern crate serde_urlencoded; extern crate tap_reader; extern crate try_from; extern crate url; +extern crate url1x; +extern crate tungstenite; #[cfg(feature = "env")] extern crate envy; @@ -105,12 +107,13 @@ extern crate indoc; use std::{ borrow::Cow, - io::{BufRead, BufReader}, + io::BufRead, ops, }; use reqwest::{Client, RequestBuilder, Response}; use tap_reader::Tap; +use tungstenite::client::AutoStream; use entities::prelude::*; use http_send::{HttpSend, HttpSender}; @@ -202,7 +205,7 @@ impl From<Data> for Mastodon<HttpSender> { } impl<H: HttpSend> MastodonClient<H> for Mastodon<H> { - type Stream = EventReader<BufReader<Response>>; + type Stream = EventReader<WebSocket>; paged_routes! { (get) favourites: "favourites" => Status, @@ -491,124 +494,229 @@ impl<H: HttpSend> MastodonClient<H> for Mastodon<H> { /// # } /// ``` fn streaming_user(&self) -> Result<Self::Stream> { - let response = self.send(self.client.get(&self.route("/api/v1/streaming/user")))?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "user"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// returns all public statuses fn streaming_public(&self) -> Result<Self::Stream> { - let response = self.send(self.client.get(&self.route("/api/v1/streaming/public")))?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "public"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all local statuses fn streaming_local(&self) -> Result<Self::Stream> { - let response = self.send( - self.client - .get(&self.route("/api/v1/streaming/public/local")), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "public:local"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all public statuses for a particular hashtag fn streaming_public_hashtag(&self, hashtag: &str) -> Result<Self::Stream> { - let response = self.send( - self.client - .get(&self.route(&format!("/api/v1/streaming/hashtag?tag={}", hashtag))), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "hashtag") + .append_pair("tag", hashtag); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all local statuses for a particular hashtag fn streaming_local_hashtag(&self, hashtag: &str) -> Result<Self::Stream> { - let response = self.send( - self.client - .get(&self.route(&format!("/api/v1/streaming/hashtag/local?tag={}", hashtag))), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "hashtag:local") + .append_pair("tag", hashtag); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns statuses for a list fn streaming_list(&self, list_id: &str) -> Result<Self::Stream> { - let response = self.send( - self.client - .get(&self.route(&format!("/api/v1/streaming/list?list={}", list_id))), - )?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "list") + .append_pair("list", list_id); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) } /// Returns all direct messages fn streaming_direct(&self) -> Result<Self::Stream> { - let response = self.send(self.client.get(&self.route("/api/v1/streaming/direct")))?; - let reader = BufReader::new(response); - Ok(EventReader(reader)) + let mut url: url::Url = self.route("/api/v1/streaming").parse()?; + url.query_pairs_mut() + .append_pair("access_token", &self.token) + .append_pair("stream", "direct"); + let mut url: url::Url = reqwest::get(url.as_str())?.url().as_str().parse()?; + let new_scheme = match url.scheme() { + "http" => "ws", + "https" => "wss", + x => return Err(Error::Other(format!("Bad URL scheme: {}", x))), + }; + url.set_scheme(new_scheme).map_err(|_| Error::Other("Bad URL scheme!".to_string()))?; + + let client = tungstenite::connect(url.as_str())?.0; + + Ok(EventReader(WebSocket(client))) + } +} + +#[derive(Debug)] +/// WebSocket newtype so that EventStream can be implemented without coherency issues +pub struct WebSocket(tungstenite::protocol::WebSocket<AutoStream>); + +/// A type that streaming events can be read from +pub trait EventStream { + /// Read a message from this stream + fn read_message(&mut self) -> Result<String>; +} + +impl<R: BufRead> EventStream for R { + fn read_message(&mut self) -> Result<String> { + let mut buf = String::new(); + self.read_line(&mut buf)?; + Ok(buf) + } +} + +impl EventStream for WebSocket { + fn read_message(&mut self) -> Result<String> { + Ok(self.0.read_message()?.into_text()?) } } #[derive(Debug)] /// Iterator that produces events from a mastodon streaming API event stream -pub struct EventReader<R: BufRead>(R); -impl<R: BufRead> Iterator for EventReader<R> { +pub struct EventReader<R: EventStream>(R); +impl<R: EventStream> Iterator for EventReader<R> { type Item = Event; fn next(&mut self) -> Option<Self::Item> { let mut lines = Vec::new(); - let mut tmp = String::new(); loop { - if let Ok(..) = self.0.read_line(&mut tmp) { - let line = tmp.trim().to_string(); - tmp.clear(); - if line.starts_with(":") { + if let Ok(line) = self.0.read_message() { + let line = line.trim().to_string(); + if line.starts_with(":") || line.is_empty() { continue; } - if line.is_empty() && !lines.is_empty() { - if let Ok(event) = self.make_event(&lines) { - lines.clear(); - return Some(event); - } else { - continue; - } - } lines.push(line); + if let Ok(event) = self.make_event(&lines) { + lines.clear(); + return Some(event); + } else { + continue; + } } } } } -impl<R: BufRead> EventReader<R> { +impl<R: EventStream> EventReader<R> { fn make_event(&self, lines: &[String]) -> Result<Event> { - let event = lines + let event; + let data; + if let Some(event_line) = lines .iter() .find(|line| line.starts_with("event:")) - .ok_or_else(|| Error::Other("No `event:` line".to_string()))?; - let event = event[6..].trim(); - let data = lines.iter().find(|line| line.starts_with("data:")); + { + event = event_line[6..].trim().to_string(); + data = lines.iter().find(|line| line.starts_with("data:")).map(|x| x[5..].trim().to_string()); + } else { + #[derive(Deserialize)] + struct Message { + pub event: String, + pub payload: Option<String>, + } + let message = serde_json::from_str::<Message>(&lines[0])?; + event = message.event; + data = message.payload; + } + let event: &str = &event; Ok(match event { "notification" => { let data = data.ok_or_else(|| { Error::Other("Missing `data` line for notification".to_string()) })?; - let data = data[5..].trim(); let notification = serde_json::from_str::<Notification>(&data)?; Event::Notification(notification) }, "update" => { let data = data.ok_or_else(|| Error::Other("Missing `data` line for update".to_string()))?; - let data = data[5..].trim(); let status = serde_json::from_str::<Status>(&data)?; Event::Update(status) }, "delete" => { let data = data.ok_or_else(|| Error::Other("Missing `data` line for delete".to_string()))?; - let data = data[5..].trim().to_string(); Event::Delete(data) }, "filters_changed" => Event::FiltersChanged, @@ -668,7 +776,7 @@ impl<H: HttpSend> MastodonBuilder<H> { pub struct MastodonUnauth<H: HttpSend = HttpSender> { client: Client, http_sender: H, - base: url::Url, + base: reqwest::Url, } impl MastodonUnauth<HttpSender> { @@ -682,13 +790,13 @@ impl MastodonUnauth<HttpSender> { Ok(MastodonUnauth { client: Client::new(), http_sender: HttpSender, - base: url::Url::parse(&base)?, + base: reqwest::Url::parse(&base)?, }) } } impl<H: HttpSend> MastodonUnauth<H> { - fn route(&self, url: &str) -> Result<url::Url> { + fn route(&self, url: &str) -> Result<reqwest::Url> { Ok(self.base.join(url)?) } |