summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorD. Scott Boggs <scott@tams.tech>2023-02-13 11:54:49 -0500
committerD. Scott Boggs <scott@tams.tech>2023-02-13 12:10:21 -0500
commit022cf0e7cd4df51956499ec4ac34d32e87181d2d (patch)
treecf06ab02bb9c60d7bc42064d690ee49f5ed3b2e2
parent6f5a2a5e9024fca4687efeb8b03c2650ec111a7d (diff)
Pass client instance to the event streamfix/pass-client-to-stream
-rw-r--r--examples/log_events.rs2
-rw-r--r--src/event_stream.rs17
-rw-r--r--src/lib.rs2
-rw-r--r--src/macros.rs10
-rw-r--r--src/mastodon.rs1
5 files changed, 15 insertions, 17 deletions
diff --git a/examples/log_events.rs b/examples/log_events.rs
index 3dd8a80..8858f4d 100644
--- a/examples/log_events.rs
+++ b/examples/log_events.rs
@@ -15,7 +15,7 @@ async fn run() -> Result<()> {
let stream = mastodon.stream_user().await?;
info!("watching mastodon for events. This will run forever, press Ctrl+C to kill the program.");
stream
- .try_for_each(|event| async move {
+ .try_for_each(|(event, _client)| async move {
match event {
// fill in how you want to handle events here.
_ => warn!(event = as_serde!(event); "unrecognized event received"),
diff --git a/src/event_stream.rs b/src/event_stream.rs
index c35ac9a..d6cb7c0 100644
--- a/src/event_stream.rs
+++ b/src/event_stream.rs
@@ -1,10 +1,6 @@
use std::io;
-use crate::{
- entities::{event::Event, prelude::Notification, status::Status},
- errors::Result,
- Error,
-};
+use crate::{errors::Result, prelude::*, Error};
use futures::{stream::try_unfold, TryStream, TryStreamExt};
use log::{as_debug, as_serde, debug, error, info, trace};
use reqwest::Response;
@@ -18,14 +14,15 @@ use tokio_util::io::StreamReader;
pub fn event_stream(
response: Response,
location: String,
-) -> impl TryStream<Ok = Event, Error = Error> {
+ client: &Mastodon,
+) -> impl TryStream<Ok = (Event, Mastodon), Error = Error> + '_ {
let stream = StreamReader::new(response.bytes_stream().map_err(|err| {
error!(err = as_debug!(err); "error reading stream");
io::Error::new(io::ErrorKind::BrokenPipe, format!("{err:?}"))
}));
let lines_iter = stream.lines();
- try_unfold((lines_iter, location), |mut this| async move {
- let (ref mut lines_iter, ref location) = this;
+ try_unfold((lines_iter, location, client), |mut this| async move {
+ let (ref mut lines_iter, ref location, client) = this;
let mut lines = vec![];
while let Some(line) = lines_iter.next_line().await? {
debug!(message = line, location = &location; "received message");
@@ -37,7 +34,7 @@ pub fn event_stream(
if let Ok(event) = make_event(&lines) {
info!(event = as_serde!(event), location = location; "received event");
lines.clear();
- return Ok(Some((event, this)));
+ return Ok(Some(((event, client.clone()), this)));
} else {
continue;
}
@@ -46,7 +43,7 @@ pub fn event_stream(
})
}
-fn make_event(lines: &[String]) -> Result<Event> {
+pub(crate) fn make_event(lines: &[String]) -> Result<Event> {
let event;
let data;
if let Some(event_line) = lines.iter().find(|line| line.starts_with("event:")) {
diff --git a/src/lib.rs b/src/lib.rs
index 60dc78e..1188636 100644
--- a/src/lib.rs
+++ b/src/lib.rs
@@ -40,7 +40,7 @@
//! let client = Mastodon::from(data);
//! tokio_test::block_on(async {
//! let stream = client.stream_user().await.unwrap();
-//! stream.try_for_each(|event| async move {
+//! stream.try_for_each(|(event, _client)| async move {
//! match event {
//! Event::Update(ref status) => { /* .. */ },
//! Event::Notification(ref notification) => { /* .. */ },
diff --git a/src/macros.rs b/src/macros.rs
index 1ebbfba..24373a1 100644
--- a/src/macros.rs
+++ b/src/macros.rs
@@ -527,7 +527,8 @@ tokio_test::block_on(async {
}).await.unwrap();
});"
),
- pub async fn $fn_name(&self) -> Result<impl TryStream<Ok=Event, Error=Error>> {
+ pub async fn $fn_name(&self) -> Result<impl TryStream<Ok=(Event, Mastodon), Error=Error> + '_> {
+ use $crate::event_stream::event_stream;
let url = self.route(&format!("/api/v1/streaming/{}", $stream));
let response = self.authenticated(self.client.get(&url)).header("Accept", "application/json").send().await?;
debug!(
@@ -537,7 +538,7 @@ tokio_test::block_on(async {
);
let status = response.status();
if status.is_success() {
- Ok(event_stream(response, url))
+ Ok(event_stream(response, url, self))
} else {
let response = response.json().await?;
Err(Error::Api{ status, response })
@@ -575,7 +576,8 @@ tokio_test::block_on(async {
}).await.unwrap();
});"
),
- pub async fn $fn_name(&self, $param: $param_type) -> Result<impl TryStream<Ok=Event, Error=Error>> {
+ pub async fn $fn_name(&self, $param: $param_type) -> Result<impl TryStream<Ok=(Event, Mastodon), Error=Error> + '_> {
+ use $crate::event_stream::event_stream;
let mut url: Url = self.route(concat!("/api/v1/streaming/", stringify!($stream))).parse()?;
url.query_pairs_mut().append_pair(stringify!($param), $param.as_ref());
let url = url.to_string();
@@ -587,7 +589,7 @@ tokio_test::block_on(async {
);
let status = response.status();
if status.is_success() {
- Ok(event_stream(response, url))
+ Ok(event_stream(response, url, self))
} else {
let response = response.json().await?;
Err(Error::Api{ status, response })
diff --git a/src/mastodon.rs b/src/mastodon.rs
index 68f976d..d337f56 100644
--- a/src/mastodon.rs
+++ b/src/mastodon.rs
@@ -9,7 +9,6 @@ use crate::{
Empty,
},
errors::{Error, Result},
- event_stream::event_stream,
helpers::read_response::read_response,
log_serde,
polling_time::PollingTime,