summaryrefslogtreecommitdiffstats
path: root/server/src/writer.rs
diff options
context:
space:
mode:
authorKornel <kornel@geekhood.net>2019-04-20 03:38:59 +0100
committerKornel <kornel@geekhood.net>2019-04-20 03:40:40 +0100
commit20547918c815f1836d019e22e9006d2bcf9dc380 (patch)
tree66d23f3b371a11d6f6913d9241d399ae036bbb51 /server/src/writer.rs
parent7960a06e2ed2d28d398e7f7ded6dc0e929ceb290 (diff)
Streaming io::Write
Diffstat (limited to 'server/src/writer.rs')
-rw-r--r--server/src/writer.rs43
1 files changed, 43 insertions, 0 deletions
diff --git a/server/src/writer.rs b/server/src/writer.rs
new file mode 100644
index 0000000..1f9c343
--- /dev/null
+++ b/server/src/writer.rs
@@ -0,0 +1,43 @@
+use futures::Stream;
+use futures::sink::{Sink, Wait};
+use futures::sync::mpsc;
+use std::io;
+
+pub struct Writer<T, E>(Wait<mpsc::Sender<Result<T, E>>>);
+
+impl<T, E> Writer<T, E> {
+ pub fn fail(&mut self, error: E) {
+ let _ = self.0.send(Err(error));
+ }
+}
+
+impl<T, E> io::Write for Writer<T, E>
+where
+ T: for<'a> From<&'a [u8]> + Send + Sync + 'static,
+ E: Send + Sync + 'static,
+{
+ fn write(&mut self, d: &[u8]) -> io::Result<usize> {
+ let len = d.len();
+ self.0
+ .send(Ok(d.into()))
+ .map(|()| len)
+ .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))
+ }
+
+ fn write_all(&mut self, d: &[u8]) -> io::Result<()> {
+ self.write(d).map(|_| ())
+ }
+
+ fn flush(&mut self) -> io::Result<()> {
+ self.0
+ .flush()
+ .map_err(|e| io::Error::new(io::ErrorKind::BrokenPipe, e))
+ }
+}
+
+pub fn writer<T, E>() -> (Writer<T, E>, impl Stream<Item = T, Error = E>) {
+ let (tx, rx) = mpsc::channel(3);
+ let w = Writer(tx.wait());
+ let r = rx.then(|r| r.unwrap());
+ (w, r)
+}