diff options
-rw-r--r-- | Cargo.toml | 1 | ||||
-rw-r--r-- | examples/chat.rs | 4 | ||||
-rw-r--r-- | examples/compress.rs | 4 | ||||
-rw-r--r-- | examples/connect.rs | 8 | ||||
-rw-r--r-- | examples/echo-threads.rs | 6 | ||||
-rw-r--r-- | examples/echo-udp.rs | 6 | ||||
-rw-r--r-- | examples/echo.rs | 4 | ||||
-rw-r--r-- | examples/hello.rs | 3 | ||||
-rw-r--r-- | examples/proxy.rs | 4 | ||||
-rw-r--r-- | examples/sink.rs | 4 | ||||
-rw-r--r-- | examples/tinydb.rs | 4 | ||||
-rw-r--r-- | examples/tinyhttp.rs | 5 | ||||
-rw-r--r-- | examples/udp-codec.rs | 4 | ||||
-rw-r--r-- | tests/buffered.rs | 3 | ||||
-rw-r--r-- | tests/chain.rs | 3 | ||||
-rw-r--r-- | tests/drop-core.rs | 8 | ||||
-rw-r--r-- | tests/echo.rs | 3 | ||||
-rw-r--r-- | tests/global.rs | 3 | ||||
-rw-r--r-- | tests/limit.rs | 3 | ||||
-rw-r--r-- | tests/line-frames.rs | 16 | ||||
-rw-r--r-- | tests/pipe-hup.rs | 4 | ||||
-rw-r--r-- | tests/stream-buffered.rs | 3 | ||||
-rw-r--r-- | tests/tcp.rs | 7 | ||||
-rw-r--r-- | tests/udp.rs | 13 |
24 files changed, 66 insertions, 57 deletions
@@ -52,4 +52,5 @@ serde_json = "1.0" time = "0.1" [patch.crates-io] +futures = { git = "https://github.com/rust-lang-nursery/futures-rs", branch = "tokio-reform" } mio = { git = "https://github.com/carllerche/mio" } diff --git a/examples/chat.rs b/examples/chat.rs index 76e689b9..667f0e9a 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -29,7 +29,7 @@ use std::io::{Error, ErrorKind, BufReader}; use std::sync::{Arc, Mutex}; use futures::Future; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio::net::TcpListener; @@ -134,5 +134,5 @@ fn main() { }); // execute server - srv.wait().unwrap(); + future::blocking(srv).wait().unwrap(); } diff --git a/examples/compress.rs b/examples/compress.rs index 3098abf7..501548ef 100644 --- a/examples/compress.rs +++ b/examples/compress.rs @@ -29,7 +29,7 @@ use std::env; use std::net::SocketAddr; use futures::{Future, Stream, Poll}; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -62,7 +62,7 @@ fn main() { Ok(()) }); - server.wait().unwrap(); + future::blocking(server).wait().unwrap(); } /// The main workhorse of this example. This'll compress all data read from diff --git a/examples/connect.rs b/examples/connect.rs index 1bdc1af4..f0619fbd 100644 --- a/examples/connect.rs +++ b/examples/connect.rs @@ -26,7 +26,7 @@ use std::net::SocketAddr; use std::thread; use futures::sync::mpsc; -use futures::{Sink, Future, Stream}; +use futures::{future, Sink, Stream}; use futures_cpupool::CpuPool; fn main() { @@ -71,9 +71,9 @@ fn main() { // loop. In this case, though, we know it's ok as the event loop isn't // otherwise running anything useful. let mut out = io::stdout(); - stdout.for_each(|chunk| { + future::blocking(stdout.for_each(|chunk| { out.write_all(&chunk) - }).wait().unwrap(); + })).wait().unwrap(); } mod tcp { @@ -244,7 +244,7 @@ fn read_stdin(mut tx: mpsc::Sender<Vec<u8>>) { Ok(n) => n, }; buf.truncate(n); - tx = match tx.send(buf).wait() { + tx = match future::blocking(tx.send(buf)).wait() { Ok(tx) => tx, Err(_) => break, }; diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs index 6ce8b156..e2525c80 100644 --- a/examples/echo-threads.rs +++ b/examples/echo-threads.rs @@ -24,7 +24,7 @@ use std::net::SocketAddr; use std::thread; use futures::prelude::*; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures::sync::mpsc; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; @@ -61,7 +61,7 @@ fn main() { next = (next + 1) % channels.len(); Ok(()) }); - srv.wait().unwrap(); + future::blocking(srv).wait().unwrap(); } fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { @@ -88,5 +88,5 @@ fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) { Ok(()) }); - done.wait().unwrap(); + future::blocking(done).wait().unwrap(); } diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs index f7e2bf09..2ce43bc0 100644 --- a/examples/echo-udp.rs +++ b/examples/echo-udp.rs @@ -18,7 +18,7 @@ extern crate tokio_io; use std::{env, io}; use std::net::SocketAddr; -use futures::{Future, Poll}; +use futures::{future, Future, Poll}; use tokio::net::UdpSocket; struct Server { @@ -58,9 +58,9 @@ fn main() { // Next we'll create a future to spawn (the one we defined above) and then // we'll block our current thread waiting on the result of the future - Server { + future::blocking(Server { socket: socket, buf: vec![0; 1024], to_send: None, - }.wait().unwrap(); + }).wait().unwrap(); } diff --git a/examples/echo.rs b/examples/echo.rs index 558f3a68..54a28ff7 100644 --- a/examples/echo.rs +++ b/examples/echo.rs @@ -26,7 +26,7 @@ use std::env; use std::net::SocketAddr; use futures::Future; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures::stream::Stream; use futures_cpupool::CpuPool; use tokio_io::AsyncRead; @@ -114,5 +114,5 @@ fn main() { // And finally now that we've define what our server is, we run it! Here we // just need to execute the future we've created and wait for it to complete // using the standard methods in the `futures` crate. - done.wait().unwrap(); + future::blocking(done).wait().unwrap(); } diff --git a/examples/hello.rs b/examples/hello.rs index 5ceb431b..d9e46d17 100644 --- a/examples/hello.rs +++ b/examples/hello.rs @@ -19,6 +19,7 @@ extern crate tokio_io; use std::env; use std::net::SocketAddr; +use futures::future; use futures::prelude::*; use tokio::net::TcpListener; @@ -40,5 +41,5 @@ fn main() { Ok(()) }); - server.wait().unwrap(); + future::blocking(server).wait().unwrap(); } diff --git a/examples/proxy.rs b/examples/proxy.rs index 51735ba1..f73dd30d 100644 --- a/examples/proxy.rs +++ b/examples/proxy.rs @@ -28,7 +28,7 @@ use std::io::{self, Read, Write}; use futures::stream::Stream; use futures::{Future, Poll}; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::{AsyncRead, AsyncWrite}; @@ -92,7 +92,7 @@ fn main() { Ok(()) }); - done.wait().unwrap(); + future::blocking(done).wait().unwrap(); } // This is a custom type used to have a custom implementation of the diff --git a/examples/sink.rs b/examples/sink.rs index 21456ada..3fa5f5ed 100644 --- a/examples/sink.rs +++ b/examples/sink.rs @@ -26,7 +26,7 @@ use std::iter; use std::net::SocketAddr; use futures::Future; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures::stream::{self, Stream}; use futures_cpupool::CpuPool; use tokio_io::IoFuture; @@ -46,7 +46,7 @@ fn main() { pool.execute(write(socket).or_else(|_| Ok(()))).unwrap(); Ok(()) }); - server.wait().unwrap(); + future::blocking(server).wait().unwrap(); } fn write(socket: TcpStream) -> IoFuture<()> { diff --git a/examples/tinydb.rs b/examples/tinydb.rs index 0a68a314..de750404 100644 --- a/examples/tinydb.rs +++ b/examples/tinydb.rs @@ -51,7 +51,7 @@ use std::net::SocketAddr; use std::sync::{Arc, Mutex}; use futures::prelude::*; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures_cpupool::CpuPool; use tokio::net::TcpListener; use tokio_io::AsyncRead; @@ -160,7 +160,7 @@ fn main() { Ok(()) }); - done.wait().unwrap(); + future::blocking(done).wait().unwrap(); } impl Request { diff --git a/examples/tinyhttp.rs b/examples/tinyhttp.rs index 00c16fec..b0106d63 100644 --- a/examples/tinyhttp.rs +++ b/examples/tinyhttp.rs @@ -31,8 +31,7 @@ use std::net::{self, SocketAddr}; use std::thread; use bytes::BytesMut; -use futures::future::Executor; -use futures::future; +use futures::future::{self, Executor}; use futures::sync::mpsc; use futures::{Stream, Future, Sink}; use futures_cpupool::CpuPool; @@ -91,7 +90,7 @@ fn worker(rx: mpsc::UnboundedReceiver<net::TcpStream>) { })).unwrap(); Ok(()) }); - done.wait().unwrap(); + future::blocking(done).wait().unwrap(); } /// "Server logic" is implemented in this function. diff --git a/examples/udp-codec.rs b/examples/udp-codec.rs index c874ebd7..5c11e9f3 100644 --- a/examples/udp-codec.rs +++ b/examples/udp-codec.rs @@ -15,7 +15,7 @@ use std::io; use std::net::SocketAddr; use futures::{Future, Stream, Sink}; -use futures::future::Executor; +use futures::future::{self, Executor}; use futures_cpupool::CpuPool; use tokio::net::{UdpSocket, UdpCodec}; @@ -76,5 +76,5 @@ fn main() { // Spawn the sender of pongs and then wait for our pinger to finish. pool.execute(b.then(|_| Ok(()))).unwrap(); - drop(a.wait()); + drop(future::blocking(a).wait()); } diff --git a/tests/buffered.rs b/tests/buffered.rs index 2ba16b04..60b574f6 100644 --- a/tests/buffered.rs +++ b/tests/buffered.rs @@ -8,6 +8,7 @@ use std::thread; use std::io::{Read, Write, BufReader, BufWriter}; use futures::Future; +use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::copy; use tokio::net::TcpListener; @@ -54,7 +55,7 @@ fn echo_server() { copy(a, b) }); - let (amt, _, _) = t!(copied.wait()); + let (amt, _, _) = t!(blocking(copied).wait()); let (expected, t2) = t.join().unwrap(); let actual = t2.join().unwrap(); diff --git a/tests/chain.rs b/tests/chain.rs index b9ac4818..cd271258 100644 --- a/tests/chain.rs +++ b/tests/chain.rs @@ -7,6 +7,7 @@ use std::thread; use std::io::{Write, Read}; use futures::Future; +use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::read_to_end; use tokio::net::TcpListener; @@ -42,7 +43,7 @@ fn chain_clients() { read_to_end(a.chain(b).chain(c), Vec::new()) }); - let (_, data) = t!(copied.wait()); + let (_, data) = t!(blocking(copied).wait()); t.join().unwrap(); assert_eq!(data, b"foo bar baz"); diff --git a/tests/drop-core.rs b/tests/drop-core.rs index 75ac9b7e..503e81ef 100644 --- a/tests/drop-core.rs +++ b/tests/drop-core.rs @@ -4,7 +4,7 @@ extern crate futures; use std::thread; use std::net; -use futures::future; +use futures::{future, stream}; use futures::prelude::*; use futures::sync::oneshot; use tokio::net::TcpListener; @@ -17,7 +17,7 @@ fn tcp_doesnt_block() { let listener = net::TcpListener::bind("127.0.0.1:0").unwrap(); let listener = TcpListener::from_std(listener, &handle).unwrap(); drop(core); - assert!(listener.incoming().wait().next().unwrap().is_err()); + assert!(stream::blocking(listener.incoming()).next().unwrap().is_err()); } #[test] @@ -34,9 +34,9 @@ fn drop_wakes() { drop(tx); future::ok(()) }); - assert!(new_socket.join(drop_tx).wait().is_err()); + assert!(future::blocking(new_socket.join(drop_tx)).wait().is_err()); }); - drop(rx.wait()); + drop(future::blocking(rx).wait()); drop(core); t.join().unwrap(); } diff --git a/tests/echo.rs b/tests/echo.rs index d5bdae81..778bdeb9 100644 --- a/tests/echo.rs +++ b/tests/echo.rs @@ -8,6 +8,7 @@ use std::net::TcpStream; use std::thread; use futures::Future; +use futures::future::blocking; use futures::stream::Stream; use tokio::net::TcpListener; use tokio_io::AsyncRead; @@ -44,7 +45,7 @@ fn echo_server() { let halves = client.map(|s| s.split()); let copied = halves.and_then(|(a, b)| copy(a, b)); - let (amt, _, _) = t!(copied.wait()); + let (amt, _, _) = t!(blocking(copied).wait()); t.join().unwrap(); assert_eq!(amt, msg.len() as u64 * 1024); diff --git a/tests/global.rs b/tests/global.rs index bf5682fa..4702fc11 100644 --- a/tests/global.rs +++ b/tests/global.rs @@ -3,6 +3,7 @@ extern crate tokio; use std::thread; +use futures::future::blocking; use futures::prelude::*; use tokio::net::{TcpStream, TcpListener}; @@ -23,7 +24,7 @@ fn hammer() { let theirs = srv.incoming().into_future() .map(|(s, _)| s.unwrap()) .map_err(|(s, _)| s); - let (mine, theirs) = t!(mine.join(theirs).wait()); + let (mine, theirs) = t!(blocking(mine.join(theirs)).wait()); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); diff --git a/tests/limit.rs b/tests/limit.rs index 7055ce9b..053f4385 100644 --- a/tests/limit.rs +++ b/tests/limit.rs @@ -7,6 +7,7 @@ use std::thread; use std::io::{Write, Read}; use futures::Future; +use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::read_to_end; use tokio::net::TcpListener; @@ -36,7 +37,7 @@ fn limit() { read_to_end(a.take(4), Vec::new()) }); - let (_, data) = t!(copied.wait()); + let (_, data) = t!(blocking(copied).wait()); t.join().unwrap(); assert_eq!(data, b"foo "); diff --git a/tests/line-frames.rs b/tests/line-frames.rs index 3785dfef..27f1d195 100644 --- a/tests/line-frames.rs +++ b/tests/line-frames.rs @@ -10,7 +10,7 @@ use std::net::Shutdown; use bytes::{BytesMut, BufMut}; use futures::{Future, Stream, Sink}; -use futures::future::Executor; +use futures::future::{blocking, Executor}; use futures_cpupool::CpuPool; use tokio::net::{TcpListener, TcpStream}; use tokio_io::codec::{Encoder, Decoder}; @@ -68,20 +68,20 @@ fn echo() { pool.execute(srv.map_err(|e| panic!("srv error: {}", e))).unwrap(); let client = TcpStream::connect(&addr); - let client = client.wait().unwrap(); - let (client, _) = write_all(client, b"a\n").wait().unwrap(); - let (client, buf, amt) = read(client, vec![0; 1024]).wait().unwrap(); + let client = blocking(client).wait().unwrap(); + let (client, _) = blocking(write_all(client, b"a\n")).wait().unwrap(); + let (client, buf, amt) = blocking(read(client, vec![0; 1024])).wait().unwrap(); assert_eq!(amt, 2); assert_eq!(&buf[..2], b"a\n"); - let (client, _) = write_all(client, b"\n").wait().unwrap(); - let (client, buf, amt) = read(client, buf).wait().unwrap(); + let (client, _) = blocking(write_all(client, b"\n")).wait().unwrap(); + let (client, buf, amt) = blocking(read(client, buf)).wait().unwrap(); assert_eq!(amt, 1); assert_eq!(&buf[..1], b"\n"); - let (client, _) = write_all(client, b"b").wait().unwrap(); + let (client, _) = blocking(write_all(client, b"b")).wait().unwrap(); client.shutdown(Shutdown::Write).unwrap(); - let (_client, buf, amt) = read(client, buf).wait().unwrap(); + let (_client, buf, amt) = blocking(read(client, buf)).wait().unwrap(); assert_eq!(amt, 1); assert_eq!(&buf[..1], b"b"); } diff --git a/tests/pipe-hup.rs b/tests/pipe-hup.rs index a9bf9671..c0406487 100644 --- a/tests/pipe-hup.rs +++ b/tests/pipe-hup.rs @@ -13,7 +13,7 @@ use std::os::unix::io::{AsRawFd, FromRawFd}; use std::thread; use std::time::Duration; -use futures::prelude::*; +use futures::future::blocking; use mio::event::Evented; use mio::unix::{UnixReady, EventedFd}; use mio::{PollOpt, Ready, Token}; @@ -81,7 +81,7 @@ fn hup() { let source = PollEvented::new(MyFile::new(read), &handle).unwrap(); let reader = read_to_end(source, Vec::new()); - let (_, content) = t!(reader.wait()); + let (_, content) = t!(blocking(reader).wait()); assert_eq!(&b"Hello!\nGood bye!\n"[..], &content[..]); t.join().unwrap(); } diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs index 78fe1c37..64786902 100644 --- a/tests/stream-buffered.rs +++ b/tests/stream-buffered.rs @@ -8,6 +8,7 @@ use std::net::TcpStream; use std::thread; use futures::Future; +use futures::future::blocking; use futures::stream::Stream; use tokio_io::io::copy; use tokio_io::AsyncRead; @@ -48,7 +49,7 @@ fn echo_server() { .take(2) .collect(); - t!(future.wait()); + t!(blocking(future).wait()); t.join().unwrap(); } diff --git a/tests/tcp.rs b/tests/tcp.rs index 83cc425c..a1cefacd 100644 --- a/tests/tcp.rs +++ b/tests/tcp.rs @@ -7,6 +7,7 @@ use std::sync::mpsc::channel; use std::thread; use futures::Future; +use futures::future::blocking; use futures::stream::Stream; use tokio::net::{TcpListener, TcpStream}; @@ -27,7 +28,7 @@ fn connect() { }); let stream = TcpStream::connect(&addr); - let mine = t!(stream.wait()); + let mine = t!(blocking(stream).wait()); let theirs = t.join().unwrap(); assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); @@ -50,7 +51,7 @@ fn accept() { net::TcpStream::connect(&addr).unwrap() }); - let (mine, _remaining) = t!(client.wait()); + let (mine, _remaining) = t!(blocking(client).wait()); let mine = mine.unwrap(); let theirs = t.join().unwrap(); @@ -75,7 +76,7 @@ fn accept2() { }).into_future().map_err(|e| e.0); assert!(rx.try_recv().is_err()); - let (mine, _remaining) = t!(client.wait()); + let (mine, _remaining) = t!(blocking(client).wait()); mine.unwrap(); t.join().unwrap(); } diff --git a/tests/udp.rs b/tests/udp.rs index f0a47d37..42b8906d 100644 --- a/tests/udp.rs +++ b/tests/udp.rs @@ -7,6 +7,7 @@ use std::io; use std::net::SocketAddr; use futures::{Future, Poll, Stream, Sink}; +use futures::future::blocking; use tokio::net::{UdpSocket, UdpCodec}; macro_rules! t { @@ -25,7 +26,7 @@ fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) { { let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); - let (sendt, received) = t!(send.join(recv).wait()); + let (sendt, received) = t!(blocking(send.join(recv)).wait()); a = sendt; b = received; } @@ -33,7 +34,7 @@ fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) { { let send = SendMessage::new(a, send, b_addr, b""); let recv = RecvMessage::new(b, recv, a_addr, b""); - t!(send.join(recv).wait()); + t!(blocking(send.join(recv)).wait()); } } @@ -172,7 +173,7 @@ fn send_dgrams() { { let send = a.send_dgram(&b"4321"[..], &b_addr); let recv = b.recv_dgram(&mut buf[..]); - let (sendt, received) = t!(send.join(recv).wait()); + let (sendt, received) = t!(blocking(send.join(recv)).wait()); assert_eq!(received.2, 4); assert_eq!(&received.1[..4], b"4321"); a = sendt.0; @@ -182,7 +183,7 @@ fn send_dgrams() { { let send = a.send_dgram(&b""[..], &b_addr); let recv = b.recv_dgram(&mut buf[..]); - let received = t!(send.join(recv).wait()).1; + let received = t!(blocking(send.join(recv)).wait()).1; assert_eq!(received.2, 0); } } @@ -225,7 +226,7 @@ fn send_framed() { let send = a.send(&b"4567"[..]); let recv = b.into_future().map_err(|e| e.0); - let (sendt, received) = t!(send.join(recv).wait()); + let (sendt, received) = t!(blocking(send.join(recv)).wait()); assert_eq!(received.0, Some(())); a_soc = sendt.into_inner(); @@ -238,7 +239,7 @@ fn send_framed() { let send = a.send(&b""[..]); let recv = b.into_future().map_err(|e| e.0); - let received = t!(send.join(recv).wait()).1; + let received = t!(blocking(send.join(recv)).wait()).1; assert_eq!(received.0, Some(())); } } |