summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorD. Scott Boggs <scott@tams.tech>2022-12-18 17:30:23 -0500
committerD. Scott Boggs <scott@tams.tech>2022-12-18 17:30:23 -0500
commitc5141972e405e30ec333a7a8768a6a33f4e242a1 (patch)
tree590ae24596829465b113362394f3e8ebc67a5125
parent610d51c59332f61ce81b52d16899c1885890334e (diff)
Add macro for streaming routes
-rw-r--r--src/macros.rs44
-rw-r--r--src/mastodon.rs70
2 files changed, 50 insertions, 64 deletions
diff --git a/src/macros.rs b/src/macros.rs
index ddcc555..2446143 100644
--- a/src/macros.rs
+++ b/src/macros.rs
@@ -438,3 +438,47 @@ macro_rules! paged_routes_with_id {
() => {}
}
+
+macro_rules! streaming {
+ {$($stream:ident@$fn_name:ident ($desc:tt),)*} => {
+ $(
+ doc_comment! {
+ concat!(
+ $desc,
+ "\n\nExample:\n\n",
+ "
+use elefren::prelude::*;
+use elefren::entities::event::Event;
+use futures_util::{pin_mut, StreamExt, TryStreamExt};
+
+tokio_test::block_on(async {
+ let data = Data::default();
+ let client = Mastodon::from(data);
+ let stream = client.",
+ stringify!($fn_name),
+ "().await.unwrap();
+ stream.try_for_each(|event| async move {
+ match event {
+ Event::Update(ref status) => { /* .. */ },
+ Event::Notification(ref notification) => { /* .. */ },
+ Event::Delete(ref id) => { /* .. */ },
+ Event::FiltersChanged => { /* .. */ },
+ }
+ Ok(())
+ }).await.unwrap();
+});"
+ ),
+ pub async fn $fn_name(&self) -> Result<impl TryStream<Ok=Event, Error=Error>> {
+ let url = self.route(concat!("/api/v1/streaming/", stringify!($stream)));
+ let response = self.authenticated(self.client.get(&url)).send().await?;
+ debug!(
+ status = log_serde!(response Status), url = &url,
+ headers = log_serde!(response Headers);
+ "received API response"
+ );
+ Ok(event_stream(response.error_for_status()?, url))
+ }
+ }
+ )*
+ };
+}
diff --git a/src/mastodon.rs b/src/mastodon.rs
index b7fb9c7..83152f1 100644
--- a/src/mastodon.rs
+++ b/src/mastodon.rs
@@ -21,8 +21,8 @@ use crate::{
UpdateCredsRequest,
UpdatePushRequest,
};
-use futures::{stream::try_unfold, TryStream};
-use log::{as_debug, as_serde, debug, error, info, trace};
+use futures::TryStream;
+use log::{as_debug, as_serde, debug, error, trace};
use reqwest::{Client, RequestBuilder};
use url::Url;
use uuid::Uuid;
@@ -129,6 +129,10 @@ impl Mastodon {
(post) unendorse_user: "accounts/{}/unpin" => Relationship,
}
+ streaming! {
+ user@stream_user ("returns events that are relevant to the authorized user, i.e. home timeline & notifications"),
+ }
+
/// Create a new Mastodon Client
pub fn new(client: Client, data: Data) -> Self {
Mastodon(Arc::new(MastodonClient {
@@ -368,68 +372,6 @@ impl Mastodon {
self.following(&me.id).await
}
- /// returns events that are relevant to the authorized user, i.e. home
- /// timeline & notifications
- ///
- /// // Example
- ///
- /// ```no_run
- /// use elefren::prelude::*;
- /// use elefren::entities::event::Event;
- /// use futures_util::{pin_mut, StreamExt, TryStreamExt};
- ///
- /// tokio_test::block_on(async {
- /// let data = Data::default();
- /// let client = Mastodon::from(data);
- /// let stream = client.streaming_user().await.unwrap();
- /// stream.try_for_each(|event| async move {
- /// match event {
- /// Event::Update(ref status) => { /* .. */ },
- /// Event::Notification(ref notification) => { /* .. */ },
- /// Event::Delete(ref id) => { /* .. */ },
- /// Event::FiltersChanged => { /* .. */ },
- /// }
- /// Ok(())
- /// }).await.unwrap();
- /// });
- /// ```
- pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error = Error>> {
- let url = self.route("/api/v1/streaming/user");
- let response = self.authenticated(self.client.get(&url)).send().await?;
- debug!(
- status = log_serde!(response Status), url = &url,
- headers = log_serde!(response Headers);
- "received API response"
- );
- Ok(event_stream(response.error_for_status()?, url))
- }
-
- // pub async fn streaming_user(&self) -> Result<impl TryStream<Ok = Event, Error
- // = Error>> { let call_id = Uuid::new_v4();
- // let mut url: Url = self.route("/api/v1/streaming").parse()?;
- // url.query_pairs_mut()
- // .append_pair("access_token", &self.data.token)
- // .append_pair("stream", "user");
- // debug!(
- // url = url.as_str(), call_id = as_debug!(call_id);
- // "making user streaming API request"
- // );
- // let response = reqwest::get(url.as_str()).await?;
- // let mut url: Url = response.url().as_str().parse()?;
- // info!(
- // url = url.as_str(), call_id = as_debug!(call_id),
- // status = response.status().as_str();
- // "received url from streaming API request"
- // );
- // let new_scheme = match url.scheme() {
- // "http" => "ws",
- // "https" => "wss",
- // x => return Err(Error::Other(format!("Bad URL scheme: {}", x))),
- // };
- // url.set_scheme(new_scheme)
- // .map_err(|_| Error::Other("Bad URL scheme!".to_string()))?;
- // }
-
/// Set the bearer authentication token
fn authenticated(&self, request: RequestBuilder) -> RequestBuilder {
request.bearer_auth(&self.data.token)