summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.toml1
-rw-r--r--examples/chat.rs4
-rw-r--r--examples/compress.rs4
-rw-r--r--examples/connect.rs8
-rw-r--r--examples/echo-threads.rs6
-rw-r--r--examples/echo-udp.rs6
-rw-r--r--examples/echo.rs4
-rw-r--r--examples/hello.rs3
-rw-r--r--examples/proxy.rs4
-rw-r--r--examples/sink.rs4
-rw-r--r--examples/tinydb.rs4
-rw-r--r--examples/tinyhttp.rs5
-rw-r--r--examples/udp-codec.rs4
-rw-r--r--tests/buffered.rs3
-rw-r--r--tests/chain.rs3
-rw-r--r--tests/drop-core.rs8
-rw-r--r--tests/echo.rs3
-rw-r--r--tests/global.rs3
-rw-r--r--tests/limit.rs3
-rw-r--r--tests/line-frames.rs16
-rw-r--r--tests/pipe-hup.rs4
-rw-r--r--tests/stream-buffered.rs3
-rw-r--r--tests/tcp.rs7
-rw-r--r--tests/udp.rs13
24 files changed, 66 insertions, 57 deletions
diff --git a/Cargo.toml b/Cargo.toml
index b4fbe13e..4c0293ea 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -52,4 +52,5 @@ serde_json = "1.0"
time = "0.1"
[patch.crates-io]
+futures = { git = "https://github.com/rust-lang-nursery/futures-rs", branch = "tokio-reform" }
mio = { git = "https://github.com/carllerche/mio" }
diff --git a/examples/chat.rs b/examples/chat.rs
index 76e689b9..667f0e9a 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -29,7 +29,7 @@ use std::io::{Error, ErrorKind, BufReader};
use std::sync::{Arc, Mutex};
use futures::Future;
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
@@ -134,5 +134,5 @@ fn main() {
});
// execute server
- srv.wait().unwrap();
+ future::blocking(srv).wait().unwrap();
}
diff --git a/examples/compress.rs b/examples/compress.rs
index 3098abf7..501548ef 100644
--- a/examples/compress.rs
+++ b/examples/compress.rs
@@ -29,7 +29,7 @@ use std::env;
use std::net::SocketAddr;
use futures::{Future, Stream, Poll};
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -62,7 +62,7 @@ fn main() {
Ok(())
});
- server.wait().unwrap();
+ future::blocking(server).wait().unwrap();
}
/// The main workhorse of this example. This'll compress all data read from
diff --git a/examples/connect.rs b/examples/connect.rs
index 1bdc1af4..f0619fbd 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -26,7 +26,7 @@ use std::net::SocketAddr;
use std::thread;
use futures::sync::mpsc;
-use futures::{Sink, Future, Stream};
+use futures::{future, Sink, Stream};
use futures_cpupool::CpuPool;
fn main() {
@@ -71,9 +71,9 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
- stdout.for_each(|chunk| {
+ future::blocking(stdout.for_each(|chunk| {
out.write_all(&chunk)
- }).wait().unwrap();
+ })).wait().unwrap();
}
mod tcp {
@@ -244,7 +244,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) {
Ok(n) => n,
};
buf.truncate(n);
- tx = match tx.send(buf).wait() {
+ tx = match future::blocking(tx.send(buf)).wait() {
Ok(tx) => tx,
Err(_) => break,
};
diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs
index 6ce8b156..e2525c80 100644
--- a/examples/echo-threads.rs
+++ b/examples/echo-threads.rs
@@ -24,7 +24,7 @@ use std::net::SocketAddr;
use std::thread;
use futures::prelude::*;
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures::sync::mpsc;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
@@ -61,7 +61,7 @@ fn main() {
next = (next + 1) % channels.len();
Ok(())
});
- srv.wait().unwrap();
+ future::blocking(srv).wait().unwrap();
}
fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
@@ -88,5 +88,5 @@ fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
Ok(())
});
- done.wait().unwrap();
+ future::blocking(done).wait().unwrap();
}
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index f7e2bf09..2ce43bc0 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -18,7 +18,7 @@ extern crate tokio_io;
use std::{env, io};
use std::net::SocketAddr;
-use futures::{Future, Poll};
+use futures::{future, Future, Poll};
use tokio::net::UdpSocket;
struct Server {
@@ -58,9 +58,9 @@ fn main() {
// Next we'll create a future to spawn (the one we defined above) and then
// we'll block our current thread waiting on the result of the future
- Server {
+ future::blocking(Server {
socket: socket,
buf: vec![0; 1024],
to_send: None,
- }.wait().unwrap();
+ }).wait().unwrap();
}
diff --git a/examples/echo.rs b/examples/echo.rs
index 558f3a68..54a28ff7 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -26,7 +26,7 @@ use std::env;
use std::net::SocketAddr;
use futures::Future;
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures::stream::Stream;
use futures_cpupool::CpuPool;
use tokio_io::AsyncRead;
@@ -114,5 +114,5 @@ fn main() {
// And finally now that we've define what our server is, we run it! Here we
// just need to execute the future we've created and wait for it to complete
// using the standard methods in the `futures` crate.
- done.wait().unwrap();
+ future::blocking(done).wait().unwrap();
}
diff --git a/examples/hello.rs b/examples/hello.rs
index 5ceb431b..d9e46d17 100644
--- a/examples/hello.rs
+++ b/examples/hello.rs
@@ -19,6 +19,7 @@ extern crate tokio_io;
use std::env;
use std::net::SocketAddr;
+use futures::future;
use futures::prelude::*;
use tokio::net::TcpListener;
@@ -40,5 +41,5 @@ fn main() {
Ok(())
});
- server.wait().unwrap();
+ future::blocking(server).wait().unwrap();
}
diff --git a/examples/proxy.rs b/examples/proxy.rs
index 51735ba1..f73dd30d 100644
--- a/examples/proxy.rs
+++ b/examples/proxy.rs
@@ -28,7 +28,7 @@ use std::io::{self, Read, Write};
use futures::stream::Stream;
use futures::{Future, Poll};
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio_io::{AsyncRead, AsyncWrite};
@@ -92,7 +92,7 @@ fn main() {
Ok(())
});
- done.wait().unwrap();
+ future::blocking(done).wait().unwrap();
}
// This is a custom type used to have a custom implementation of the
diff --git a/examples/sink.rs b/examples/sink.rs
index 21456ada..3fa5f5ed 100644
--- a/examples/sink.rs
+++ b/examples/sink.rs
@@ -26,7 +26,7 @@ use std::iter;
use std::net::SocketAddr;
use futures::Future;
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures::stream::{self, Stream};
use futures_cpupool::CpuPool;
use tokio_io::IoFuture;
@@ -46,7 +46,7 @@ fn main() {
pool.execute(write(socket).or_else(|_| Ok(()))).unwrap();
Ok(())
});
- server.wait().unwrap();
+ future::blocking(server).wait().unwrap();
}
fn write(socket: TcpStream) -> IoFuture<()> {
diff --git a/examples/tinydb.rs b/examples/tinydb.rs
index 0a68a314..de750404 100644
--- a/examples/tinydb.rs
+++ b/examples/tinydb.rs
@@ -51,7 +51,7 @@ use std::net::SocketAddr;
use std::sync::{Arc, Mutex};
use futures::prelude::*;
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures_cpupool::CpuPool;
use tokio::net::TcpListener;
use tokio_io::AsyncRead;
@@ -160,7 +160,7 @@ fn main() {
Ok(())
});
- done.wait().unwrap();
+ future::blocking(done).wait().unwrap();
}
impl Request {
diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs
index 00c16fec..b0106d63 100644
--- a/examples/tinyhttp.rs
+++ b/examples/tinyhttp.rs
@@ -31,8 +31,7 @@ use std::net::{self, SocketAddr};
use std::thread;
use bytes::BytesMut;
-use futures::future::Executor;
-use futures::future;
+use futures::future::{self, Executor};
use futures::sync::mpsc;
use futures::{Stream, Future, Sink};
use futures_cpupool::CpuPool;
@@ -91,7 +90,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) {
})).unwrap();
Ok(())
});
- done.wait().unwrap();
+ future::blocking(done).wait().unwrap();
}
/// "Server logic" is implemented in this function.
diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs
index c874ebd7..5c11e9f3 100644
--- a/examples/udp-codec.rs
+++ b/examples/udp-codec.rs
@@ -15,7 +15,7 @@ use std::io;
use std::net::SocketAddr;
use futures::{Future, Stream, Sink};
-use futures::future::Executor;
+use futures::future::{self, Executor};
use futures_cpupool::CpuPool;
use tokio::net::{UdpSocket, UdpCodec};
@@ -76,5 +76,5 @@ fn main() {
// Spawn the sender of pongs and then wait for our pinger to finish.
pool.execute(b.then(|_| Ok(()))).unwrap();
- drop(a.wait());
+ drop(future::blocking(a).wait());
}
diff --git a/tests/buffered.rs b/tests/buffered.rs
index 2ba16b04..60b574f6 100644
--- a/tests/buffered.rs
+++ b/tests/buffered.rs
@@ -8,6 +8,7 @@ use std::thread;
use std::io::{Read, Write, BufReader, BufWriter};
use futures::Future;
+use futures::future::blocking;
use futures::stream::Stream;
use tokio_io::io::copy;
use tokio::net::TcpListener;
@@ -54,7 +55,7 @@ fn echo_server() {
copy(a, b)
});
- let (amt, _, _) = t!(copied.wait());
+ let (amt, _, _) = t!(blocking(copied).wait());
let (expected, t2) = t.join().unwrap();
let actual = t2.join().unwrap();
diff --git a/tests/chain.rs b/tests/chain.rs
index b9ac4818..cd271258 100644
--- a/tests/chain.rs
+++ b/tests/chain.rs
@@ -7,6 +7,7 @@ use std::thread;
use std::io::{Write, Read};
use futures::Future;
+use futures::future::blocking;
use futures::stream::Stream;
use tokio_io::io::read_to_end;
use tokio::net::TcpListener;
@@ -42,7 +43,7 @@ fn chain_clients() {
read_to_end(a.chain(b).chain(c), Vec::new())
});
- let (_, data) = t!(copied.wait());
+ let (_, data) = t!(blocking(copied).wait());
t.join().unwrap();
assert_eq!(data, b"foo bar baz");
diff --git a/tests/drop-core.rs b/tests/drop-core.rs
index 75ac9b7e..503e81ef 100644
--- a/tests/drop-core.rs
+++ b/tests/drop-core.rs
@@ -4,7 +4,7 @@ extern crate futures;
use std::thread;
use std::net;
-use futures::future;
+use futures::{future, stream};
use futures::prelude::*;
use futures::sync::oneshot;
use tokio::net::TcpListener;
@@ -17,7 +17,7 @@ fn tcp_doesnt_block() {
let listener = net::TcpListener::bind("127.0.0.1:0").unwrap();
let listener = TcpListener::from_std(listener, &handle).unwrap();
drop(core);
- assert!(listener.incoming().wait().next().unwrap().is_err());
+ assert!(stream::blocking(listener.incoming()).next().unwrap().is_err());
}
#[test]
@@ -34,9 +34,9 @@ fn drop_wakes() {
drop(tx);
future::ok(())
});
- assert!(new_socket.join(drop_tx).wait().is_err());
+ assert!(future::blocking(new_socket.join(drop_tx)).wait().is_err());
});
- drop(rx.wait());
+ drop(future::blocking(rx).wait());
drop(core);
t.join().unwrap();
}
diff --git a/tests/echo.rs b/tests/echo.rs
index d5bdae81..778bdeb9 100644
--- a/tests/echo.rs
+++ b/tests/echo.rs
@@ -8,6 +8,7 @@ use std::net::TcpStream;
use std::thread;
use futures::Future;
+use futures::future::blocking;
use futures::stream::Stream;
use tokio::net::TcpListener;
use tokio_io::AsyncRead;
@@ -44,7 +45,7 @@ fn echo_server() {
let halves = client.map(|s| s.split());
let copied = halves.and_then(|(a, b)| copy(a, b));
- let (amt, _, _) = t!(copied.wait());
+ let (amt, _, _) = t!(blocking(copied).wait());
t.join().unwrap();
assert_eq!(amt, msg.len() as u64 * 1024);
diff --git a/tests/global.rs b/tests/global.rs
index bf5682fa..4702fc11 100644
--- a/tests/global.rs
+++ b/tests/global.rs
@@ -3,6 +3,7 @@ extern crate tokio;
use std::thread;
+use futures::future::blocking;
use futures::prelude::*;
use tokio::net::{TcpStream, TcpListener};
@@ -23,7 +24,7 @@ fn hammer() {
let theirs = srv.incoming().into_future()
.map(|(s, _)| s.unwrap())
.map_err(|(s, _)| s);
- let (mine, theirs) = t!(mine.join(theirs).wait());
+ let (mine, theirs) = t!(blocking(mine.join(theirs)).wait());
assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr()));
diff --git a/tests/limit.rs b/tests/limit.rs
index 7055ce9b..053f4385 100644
--- a/tests/limit.rs
+++ b/tests/limit.rs
@@ -7,6 +7,7 @@ use std::thread;
use std::io::{Write, Read};
use futures::Future;
+use futures::future::blocking;
use futures::stream::Stream;
use tokio_io::io::read_to_end;
use tokio::net::TcpListener;
@@ -36,7 +37,7 @@ fn limit() {
read_to_end(a.take(4), Vec::new())
});
- let (_, data) = t!(copied.wait());
+ let (_, data) = t!(blocking(copied).wait());
t.join().unwrap();
assert_eq!(data, b"foo ");
diff --git a/tests/line-frames.rs b/tests/line-frames.rs
index 3785dfef..27f1d195 100644
--- a/tests/line-frames.rs
+++ b/tests/line-frames.rs
@@ -10,7 +10,7 @@ use std::net::Shutdown;
use bytes::{BytesMut, BufMut};
use futures::{Future, Stream, Sink};
-use futures::future::Executor;
+use futures::future::{blocking, Executor};
use futures_cpupool::CpuPool;
use tokio::net::{TcpListener, TcpStream};
use tokio_io::codec::{Encoder, Decoder};
@@ -68,20 +68,20 @@ fn echo() {
pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap();
let client = TcpStream::connect(&addr);
- let client = client.wait().unwrap();
- let (client, _) = write_all(client, b"a\n").wait().unwrap();
- let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap();
+ let client = blocking(client).wait().unwrap();
+ let (client, _) = blocking(write_all(client, b"a\n")).wait().unwrap();
+ let (client, buf, amt) = blocking(read(client, vec![0; 1024])).wait().unwrap();
assert_eq!(amt, 2);
assert_eq!(&buf[..2], b"a\n");
- let (client, _) = write_all(client, b"\n").wait().unwrap();
- let (client, buf, amt) = read(client, buf).wait().unwrap();
+ let (client, _) = blocking(write_all(client, b"\n")).wait().unwrap();
+ let (client, buf, amt) = blocking(read(client, buf)).wait().unwrap();
assert_eq!(amt, 1);
assert_eq!(&buf[..1], b"\n");
- let (client, _) = write_all(client, b"b").wait().unwrap();
+ let (client, _) = blocking(write_all(client, b"b")).wait().unwrap();
client.shutdown(Shutdown::Write).unwrap();
- let (_client, buf, amt) = read(client, buf).wait().unwrap();
+ let (_client, buf, amt) = blocking(read(client, buf)).wait().unwrap();
assert_eq!(amt, 1);
assert_eq!(&buf[..1], b"b");
}
diff --git a/tests/pipe-hup.rs b/tests/pipe-hup.rs
index a9bf9671..c0406487 100644
--- a/tests/pipe-hup.rs
+++ b/tests/pipe-hup.rs
@@ -13,7 +13,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd};
use std::thread;
use std::time::Duration;
-use futures::prelude::*;
+use futures::future::blocking;
use mio::event::Evented;
use mio::unix::{UnixReady, EventedFd};
use mio::{PollOpt, Ready, Token};
@@ -81,7 +81,7 @@ fn hup() {
let source = PollEvented::new(MyFile::new(read), &handle).unwrap();
let reader = read_to_end(source, Vec::new());
- let (_, content) = t!(reader.wait());
+ let (_, content) = t!(blocking(reader).wait());
assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]);
t.join().unwrap();
}
diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs
index 78fe1c37..64786902 100644
--- a/tests/stream-buffered.rs
+++ b/tests/stream-buffered.rs
@@ -8,6 +8,7 @@ use std::net::TcpStream;
use std::thread;
use futures::Future;
+use futures::future::blocking;
use futures::stream::Stream;
use tokio_io::io::copy;
use tokio_io::AsyncRead;
@@ -48,7 +49,7 @@ fn echo_server() {
.take(2)
.collect();
- t!(future.wait());
+ t!(blocking(future).wait());
t.join().unwrap();
}
diff --git a/tests/tcp.rs b/tests/tcp.rs
index 83cc425c..a1cefacd 100644
--- a/tests/tcp.rs
+++ b/tests/tcp.rs
@@ -7,6 +7,7 @@ use std::sync::mpsc::channel;
use std::thread;
use futures::Future;
+use futures::future::blocking;
use futures::stream::Stream;
use tokio::net::{TcpListener, TcpStream};
@@ -27,7 +28,7 @@ fn connect() {
});
let stream = TcpStream::connect(&addr);
- let mine = t!(stream.wait());
+ let mine = t!(blocking(stream).wait());
let theirs = t.join().unwrap();
assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr()));
@@ -50,7 +51,7 @@ fn accept() {
net::TcpStream::connect(&addr).unwrap()
});
- let (mine, _remaining) = t!(client.wait());
+ let (mine, _remaining) = t!(blocking(client).wait());
let mine = mine.unwrap();
let theirs = t.join().unwrap();
@@ -75,7 +76,7 @@ fn accept2() {
}).into_future().map_err(|e| e.0);
assert!(rx.try_recv().is_err());
- let (mine, _remaining) = t!(client.wait());
+ let (mine, _remaining) = t!(blocking(client).wait());
mine.unwrap();
t.join().unwrap();
}
diff --git a/tests/udp.rs b/tests/udp.rs
index f0a47d37..42b8906d 100644
--- a/tests/udp.rs
+++ b/tests/udp.rs
@@ -7,6 +7,7 @@ use std::io;
use std::net::SocketAddr;
use futures::{Future, Poll, Stream, Sink};
+use futures::future::blocking;
use tokio::net::{UdpSocket, UdpCodec};
macro_rules! t {
@@ -25,7 +26,7 @@ fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
{
let send = SendMessage::new(a, send.clone(), b_addr, b"1234");
let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234");
- let (sendt, received) = t!(send.join(recv).wait());
+ let (sendt, received) = t!(blocking(send.join(recv)).wait());
a = sendt;
b = received;
}
@@ -33,7 +34,7 @@ fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) {
{
let send = SendMessage::new(a, send, b_addr, b"");
let recv = RecvMessage::new(b, recv, a_addr, b"");
- t!(send.join(recv).wait());
+ t!(blocking(send.join(recv)).wait());
}
}
@@ -172,7 +173,7 @@ fn send_dgrams() {
{
let send = a.send_dgram(&b"4321"[..], &b_addr);
let recv = b.recv_dgram(&mut buf[..]);
- let (sendt, received) = t!(send.join(recv).wait());
+ let (sendt, received) = t!(blocking(send.join(recv)).wait());
assert_eq!(received.2, 4);
assert_eq!(&received.1[..4], b"4321");
a = sendt.0;
@@ -182,7 +183,7 @@ fn send_dgrams() {
{
let send = a.send_dgram(&b""[..], &b_addr);
let recv = b.recv_dgram(&mut buf[..]);
- let received = t!(send.join(recv).wait()).1;
+ let received = t!(blocking(send.join(recv)).wait()).1;
assert_eq!(received.2, 0);
}
}
@@ -225,7 +226,7 @@ fn send_framed() {
let send = a.send(&b"4567"[..]);
let recv = b.into_future().map_err(|e| e.0);
- let (sendt, received) = t!(send.join(recv).wait());
+ let (sendt, received) = t!(blocking(send.join(recv)).wait());
assert_eq!(received.0, Some(()));
a_soc = sendt.into_inner();
@@ -238,7 +239,7 @@ fn send_framed() {
let send = a.send(&b""[..]);
let recv = b.into_future().map_err(|e| e.0);
- let received = t!(send.join(recv).wait()).1;
+ let received = t!(blocking(send.join(recv)).wait()).1;
assert_eq!(received.0, Some(()));
}
}