diff options
author | Sam Rijs <srijs@airpost.net> | 2018-03-15 03:38:59 +1100 |
---|---|---|
committer | Carl Lerche <me@carllerche.com> | 2018-03-14 09:38:59 -0700 |
commit | 923a80e098d4d8355c65d3c80e5789ea0cbded95 (patch) | |
tree | d3e19b20d4f040ef7b83398a91370c25c245a3b0 /tests | |
parent | 64435f5b35efa761a8c3bf67f599e01b27f9d0a6 (diff) |
Move tokio::net module into tokio tcp/udp crates (#224)
Diffstat (limited to 'tests')
-rw-r--r-- | tests/chain.rs | 49 | ||||
-rw-r--r-- | tests/echo.rs | 51 | ||||
-rw-r--r-- | tests/limit.rs | 43 | ||||
-rw-r--r-- | tests/stream-buffered.rs | 54 | ||||
-rw-r--r-- | tests/tcp.rs | 130 | ||||
-rw-r--r-- | tests/udp.rs | 261 |
6 files changed, 0 insertions, 588 deletions
diff --git a/tests/chain.rs b/tests/chain.rs deleted file mode 100644 index b9ac4818..00000000 --- a/tests/chain.rs +++ /dev/null @@ -1,49 +0,0 @@ -extern crate futures; -extern crate tokio; -extern crate tokio_io; - -use std::net::TcpStream; -use std::thread; -use std::io::{Write, Read}; - -use futures::Future; -use futures::stream::Stream; -use tokio_io::io::read_to_end; -use tokio::net::TcpListener; - -macro_rules! t { - ($e:expr) => (match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - }) -} - -#[test] -fn chain_clients() { - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); - let addr = t!(srv.local_addr()); - - let t = thread::spawn(move || { - let mut s1 = TcpStream::connect(&addr).unwrap(); - s1.write_all(b"foo ").unwrap(); - let mut s2 = TcpStream::connect(&addr).unwrap(); - s2.write_all(b"bar ").unwrap(); - let mut s3 = TcpStream::connect(&addr).unwrap(); - s3.write_all(b"baz").unwrap(); - }); - - let clients = srv.incoming().take(3); - let copied = clients.collect().and_then(|clients| { - let mut clients = clients.into_iter(); - let a = clients.next().unwrap(); - let b = clients.next().unwrap(); - let c = clients.next().unwrap(); - - read_to_end(a.chain(b).chain(c), Vec::new()) - }); - - let (_, data) = t!(copied.wait()); - t.join().unwrap(); - - assert_eq!(data, b"foo bar baz"); -} diff --git a/tests/echo.rs b/tests/echo.rs deleted file mode 100644 index d5bdae81..00000000 --- a/tests/echo.rs +++ /dev/null @@ -1,51 +0,0 @@ -extern crate env_logger; -extern crate futures; -extern crate tokio; -extern crate tokio_io; - -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::thread; - -use futures::Future; -use futures::stream::Stream; -use tokio::net::TcpListener; -use tokio_io::AsyncRead; -use tokio_io::io::copy; - -macro_rules! t { - ($e:expr) => (match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - }) -} - -#[test] -fn echo_server() { - drop(env_logger::init()); - - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); - let addr = t!(srv.local_addr()); - - let msg = "foo bar baz"; - let t = thread::spawn(move || { - let mut s = TcpStream::connect(&addr).unwrap(); - - for _i in 0..1024 { - assert_eq!(t!(s.write(msg.as_bytes())), msg.len()); - let mut buf = [0; 1024]; - assert_eq!(t!(s.read(&mut buf)), msg.len()); - assert_eq!(&buf[..msg.len()], msg.as_bytes()); - } - }); - - let clients = srv.incoming(); - let client = clients.into_future().map(|e| e.0.unwrap()).map_err(|e| e.0); - let halves = client.map(|s| s.split()); - let copied = halves.and_then(|(a, b)| copy(a, b)); - - let (amt, _, _) = t!(copied.wait()); - t.join().unwrap(); - - assert_eq!(amt, msg.len() as u64 * 1024); -} diff --git a/tests/limit.rs b/tests/limit.rs deleted file mode 100644 index 7055ce9b..00000000 --- a/tests/limit.rs +++ /dev/null @@ -1,43 +0,0 @@ -extern crate futures; -extern crate tokio; -extern crate tokio_io; - -use std::net::TcpStream; -use std::thread; -use std::io::{Write, Read}; - -use futures::Future; -use futures::stream::Stream; -use tokio_io::io::read_to_end; -use tokio::net::TcpListener; - -macro_rules! t { - ($e:expr) => (match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - }) -} - -#[test] -fn limit() { - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); - let addr = t!(srv.local_addr()); - - let t = thread::spawn(move || { - let mut s1 = TcpStream::connect(&addr).unwrap(); - s1.write_all(b"foo bar baz").unwrap(); - }); - - let clients = srv.incoming().take(1); - let copied = clients.collect().and_then(|clients| { - let mut clients = clients.into_iter(); - let a = clients.next().unwrap(); - - read_to_end(a.take(4), Vec::new()) - }); - - let (_, data) = t!(copied.wait()); - t.join().unwrap(); - - assert_eq!(data, b"foo "); -} diff --git a/tests/stream-buffered.rs b/tests/stream-buffered.rs deleted file mode 100644 index 78fe1c37..00000000 --- a/tests/stream-buffered.rs +++ /dev/null @@ -1,54 +0,0 @@ -extern crate env_logger; -extern crate futures; -extern crate tokio; -extern crate tokio_io; - -use std::io::{Read, Write}; -use std::net::TcpStream; -use std::thread; - -use futures::Future; -use futures::stream::Stream; -use tokio_io::io::copy; -use tokio_io::AsyncRead; -use tokio::net::TcpListener; - -macro_rules! t { - ($e:expr) => (match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - }) -} - -#[test] -fn echo_server() { - drop(env_logger::init()); - - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); - let addr = t!(srv.local_addr()); - - let t = thread::spawn(move || { - let mut s1 = t!(TcpStream::connect(&addr)); - let mut s2 = t!(TcpStream::connect(&addr)); - - let msg = b"foo"; - assert_eq!(t!(s1.write(msg)), msg.len()); - assert_eq!(t!(s2.write(msg)), msg.len()); - let mut buf = [0; 1024]; - assert_eq!(t!(s1.read(&mut buf)), msg.len()); - assert_eq!(&buf[..msg.len()], msg); - assert_eq!(t!(s2.read(&mut buf)), msg.len()); - assert_eq!(&buf[..msg.len()], msg); - }); - - let future = srv.incoming() - .map(|s| s.split()) - .map(|(a, b)| copy(a, b).map(|_| ())) - .buffered(10) - .take(2) - .collect(); - - t!(future.wait()); - - t.join().unwrap(); -} diff --git a/tests/tcp.rs b/tests/tcp.rs deleted file mode 100644 index 5694e8a6..00000000 --- a/tests/tcp.rs +++ /dev/null @@ -1,130 +0,0 @@ -extern crate env_logger; -extern crate tokio; -extern crate mio; -extern crate futures; - -use std::{net, thread}; -use std::sync::mpsc::channel; - -use tokio::net::{TcpListener, TcpStream}; -use tokio::prelude::*; - - -macro_rules! t { - ($e:expr) => (match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - }) -} - -#[test] -fn connect() { - drop(env_logger::init()); - let srv = t!(net::TcpListener::bind("127.0.0.1:0")); - let addr = t!(srv.local_addr()); - let t = thread::spawn(move || { - t!(srv.accept()).0 - }); - - let stream = TcpStream::connect(&addr); - let mine = t!(stream.wait()); - let theirs = t.join().unwrap(); - - assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); - assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); -} - -#[test] -fn accept() { - drop(env_logger::init()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); - let addr = t!(srv.local_addr()); - - let (tx, rx) = channel(); - let client = srv.incoming().map(move |t| { - tx.send(()).unwrap(); - t - }).into_future().map_err(|e| e.0); - assert!(rx.try_recv().is_err()); - let t = thread::spawn(move || { - net::TcpStream::connect(&addr).unwrap() - }); - - let (mine, _remaining) = t!(client.wait()); - let mine = mine.unwrap(); - let theirs = t.join().unwrap(); - - assert_eq!(t!(mine.local_addr()), t!(theirs.peer_addr())); - assert_eq!(t!(theirs.local_addr()), t!(mine.peer_addr())); -} - -#[test] -fn accept2() { - drop(env_logger::init()); - let srv = t!(TcpListener::bind(&t!("127.0.0.1:0".parse()))); - let addr = t!(srv.local_addr()); - - let t = thread::spawn(move || { - net::TcpStream::connect(&addr).unwrap() - }); - - let (tx, rx) = channel(); - let client = srv.incoming().map(move |t| { - tx.send(()).unwrap(); - t - }).into_future().map_err(|e| e.0); - assert!(rx.try_recv().is_err()); - - let (mine, _remaining) = t!(client.wait()); - mine.unwrap(); - t.join().unwrap(); -} - -#[cfg(unix)] -mod unix { - use tokio::net::TcpStream; - use tokio::prelude::*; - - use env_logger; - use futures::future; - use mio::unix::UnixReady; - - use std::{net, thread}; - use std::time::Duration; - - #[test] - fn poll_hup() { - drop(env_logger::init()); - - let srv = t!(net::TcpListener::bind("127.0.0.1:0")); - let addr = t!(srv.local_addr()); - let t = thread::spawn(move || { - let mut client = t!(srv.accept()).0; - client.write(b"hello world").unwrap(); - thread::sleep(Duration::from_millis(200)); - }); - - let mut stream = t!(TcpStream::connect(&addr).wait()); - - // Poll for HUP before reading. - future::poll_fn(|| { - stream.poll_read_ready(UnixReady::hup().into()) - }).wait().unwrap(); - - // Same for write half - future::poll_fn(|| { - stream.poll_write_ready() - }).wait().unwrap(); - - let mut buf = vec![0; 11]; - - // Read the data - future::poll_fn(|| { - stream.poll_read(&mut buf) - }).wait().unwrap(); - - assert_eq!(b"hello world", &buf[..]); - - t.join().unwrap(); - } -} diff --git a/tests/udp.rs b/tests/udp.rs deleted file mode 100644 index 8faebee6..00000000 --- a/tests/udp.rs +++ /dev/null @@ -1,261 +0,0 @@ -#![allow(deprecated)] - -extern crate futures; -extern crate tokio; -#[macro_use] -extern crate tokio_io; -extern crate bytes; -extern crate env_logger; - -use std::io; -use std::net::SocketAddr; - -use futures::{Future, Poll, Stream, Sink}; - -use tokio::net::{UdpSocket, UdpFramed}; -use tokio_io::codec::{Encoder, Decoder}; -use bytes::{BytesMut, BufMut}; - -macro_rules! t { - ($e:expr) => (match $e { - Ok(e) => e, - Err(e) => panic!("{} failed with {:?}", stringify!($e), e), - }) -} - -fn send_messages<S: SendFn + Clone, R: RecvFn + Clone>(send: S, recv: R) { - let mut a = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into())); - let mut b = t!(UdpSocket::bind(&([127, 0, 0, 1], 0).into())); - let a_addr = t!(a.local_addr()); - let b_addr = t!(b.local_addr()); - - { - let send = SendMessage::new(a, send.clone(), b_addr, b"1234"); - let recv = RecvMessage::new(b, recv.clone(), a_addr, b"1234"); - let (sendt, received) = t!(send.join(recv).wait()); - a = sendt; - b = received; - } - - { - let send = SendMessage::new(a, send, b_addr, b""); - let recv = RecvMessage::new(b, recv, a_addr, b""); - t!(send.join(recv).wait()); - } -} - -#[test] -fn send_to_and_recv_from() { - send_messages(SendTo {}, RecvFrom {}); -} - -#[test] -fn send_and_recv() { - send_messages(Send {}, Recv {}); -} - -trait SendFn { - fn send(&self, &mut UdpSocket, &[u8], &SocketAddr) -> Result<usize, io::Error>; -} - -#[derive(Debug, Clone)] -struct SendTo {} - -impl SendFn for SendTo { - fn send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> { - socket.send_to(buf, addr) - } -} - -#[derive(Debug, Clone)] -struct Send {} - -impl SendFn for Send { - fn send(&self, socket: &mut UdpSocket, buf: &[u8], addr: &SocketAddr) -> Result<usize, io::Error> { - socket.connect(addr).expect("could not connect"); - socket.send(buf) - } -} - -struct SendMessage<S> { - socket: Option<UdpSocket>, - send: S, - addr: SocketAddr, - data: &'static [u8], -} - -impl<S: SendFn> SendMessage<S> { - fn new(socket: UdpSocket, send: S, addr: SocketAddr, data: &'static [u8]) -> SendMessage<S> { - SendMessage { - socket: Some(socket), - send: send, - addr: addr, - data: data, - } - } -} - -impl<S: SendFn> Future for SendMessage<S> { - type Item = UdpSocket; - type Error = io::Error; - - fn poll(&mut self) -> Poll<UdpSocket, io::Error> { - let n = try_nb!(self.send.send(self.socket.as_mut().unwrap(), &self.data[..], &self.addr)); - - assert_eq!(n, self.data.len()); - - Ok(self.socket.take().unwrap().into()) - } -} - -trait RecvFn { - fn recv(&self, &mut UdpSocket, &mut [u8], &SocketAddr) -> Result<usize, io::Error>; -} - -#[derive(Debug, Clone)] -struct RecvFrom {} - -impl RecvFn for RecvFrom { - fn recv(&self, socket: &mut UdpSocket, buf: &mut [u8], - expected_addr: &SocketAddr) -> Result<usize, io::Error> { - socket.recv_from(buf).map(|(s, addr)| { - assert_eq!(addr, *expected_addr); - s - }) - } -} - -#[derive(Debug, Clone)] -struct Recv {} - -impl RecvFn for Recv { - fn recv(&self, socket: &mut UdpSocket, buf: &mut [u8], _: &SocketAddr) -> Result<usize, io::Error> { - socket.recv(buf) - } -} - -struct RecvMessage<R> { - socket: Option<UdpSocket>, - recv: R, - expected_addr: SocketAddr, - expected_data: &'static [u8], -} - -impl<R: RecvFn> RecvMessage<R> { - fn new(socket: UdpSocket, recv: R, expected_addr: SocketAddr, - expected_data: &'static [u8]) -> RecvMessage<R> { - RecvMessage { - socket: Some(socket), - recv: recv, - expected_addr: expected_addr, - expected_data: expected_data, - } - } -} - -impl<R: RecvFn> Future for RecvMessage<R> { - type Item = UdpSocket; - type Error = io::Error; - - fn poll(&mut self) -> Poll<UdpSocket, io::Error> { - let mut buf = vec![0u8; 10 + self.expected_data.len() * 10]; - let n = try_nb!(self.recv.recv(&mut self.socket.as_mut().unwrap(), &mut buf[..], - &self.expected_addr)); - - assert_eq!(n, self.expected_data.len()); - assert_eq!(&buf[..self.expected_data.len()], &self.expected_data[..]); - - Ok(self.socket.take().unwrap().into()) - } -} - -#[test] -fn send_dgrams() { - let mut a = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let mut b = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let mut buf = [0u8; 50]; - let b_addr = t!(b.local_addr()); - - { - let send = a.send_dgram(&b"4321"[..], &b_addr); - let recv = b.recv_dgram(&mut buf[..]); - let (sendt, received) = t!(send.join(recv).wait()); - assert_eq!(received.2, 4); - assert_eq!(&received.1[..4], b"4321"); - a = sendt.0; - b = received.0; - } - - { - let send = a.send_dgram(&b""[..], &b_addr); - let recv = b.recv_dgram(&mut buf[..]); - let received = t!(send.join(recv).wait()).1; - assert_eq!(received.2, 0); - } -} - -pub struct ByteCodec; - -impl Decoder for ByteCodec { - type Item = Vec<u8>; - type Error = io::Error; - - fn decode(&mut self, buf: &mut BytesMut) -> Result<Option<Vec<u8>>, io::Error> { - let len = buf.len(); - Ok(Some(buf.split_to(len).to_vec())) - } -} - -impl Encoder for ByteCodec { - type Item = Vec<u8>; - type Error = io::Error; - - fn encode(&mut self, data: Vec<u8>, buf: &mut BytesMut) -> Result<(), io::Error> { - buf.reserve(data.len()); - buf.put(data); - Ok(()) - } -} - -#[test] -fn send_framed() { - drop(env_logger::init()); - - let mut a_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let mut b_soc = t!(UdpSocket::bind(&t!("127.0.0.1:0".parse()))); - let a_addr = t!(a_soc.local_addr()); - let b_addr = t!(b_soc.local_addr()); - - { - let a = UdpFramed::new(a_soc, ByteCodec); - let b = UdpFramed::new(b_soc, ByteCodec); - - let msg = b"4567".to_vec(); - - let send = a.send((msg.clone(), b_addr)); - let recv = b.into_future().map_err(|e| e.0); - let (sendt, received) = t!(send.join(recv).wait()); - - let (data, addr) = received.0.unwrap(); - assert_eq!(msg, data); - assert_eq!(a_addr, addr); - - a_soc = sendt.into_inner(); - b_soc = received.1.into_inner(); - } - - { - let a = UdpFramed::new(a_soc, ByteCodec); - let b = UdpFramed::new(b_soc, ByteCodec); - - let msg = b"".to_vec(); - - let send = a.send((msg.clone(), b_addr)); - let recv = b.into_future().map_err(|e| e.0); - let received = t!(send.join(recv).wait()).1; - - let (data, addr) = received.0.unwrap(); - assert_eq!(msg, data); - assert_eq!(a_addr, addr); - } -} |