summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--examples/README.md34
-rw-r--r--examples/chat-combinator.rs206
-rw-r--r--examples/chat.rs37
-rw-r--r--examples/compress.rs122
-rw-r--r--examples/connect.rs64
-rw-r--r--examples/echo-threads.rs92
-rw-r--r--examples/echo-udp.rs19
-rw-r--r--examples/echo.rs132
-rw-r--r--examples/hello.rs44
-rw-r--r--examples/hello_world.rs11
-rw-r--r--examples/proxy.rs110
-rw-r--r--examples/sink.rs58
-rw-r--r--examples/tinydb.rs131
-rw-r--r--examples/tinyhttp.rs92
-rw-r--r--examples/udp-codec.rs23
15 files changed, 423 insertions, 752 deletions
diff --git a/examples/README.md b/examples/README.md
index 3f4734c6..916c2cc0 100644
--- a/examples/README.md
+++ b/examples/README.md
@@ -1,9 +1,7 @@
-## Examples of `tokio-core`
+## Examples of how to use Tokio
This directory contains a number of examples showcasing various capabilities of
-the `tokio` crate. Most of these examples also leverage the `futures` and
-`tokio_io` crates, along with a number of other miscellaneous dependencies for
-various tasks.
+the `tokio` crate.
All examples can be executed with:
@@ -13,37 +11,37 @@ cargo run --example $name
A high level description of each example is:
-* `hello` - a tiny server that simply writes "Hello!" to all connected clients
- and then terminates the connection, should help see how to create and
+* `hello_world` - a tiny server that writes "hello world" to all connected
+ clients and then terminates the connection, should help see how to create and
initialize `tokio`.
-* `echo` - this is your standard TCP "echo server" which simply accepts
- connections and then echos back any contents that are read from each connected
- client.
+
+* `echo` - this is your standard TCP "echo server" which accepts connections and
+ then echos back any contents that are read from each connected client.
+
* `echo-udp` - again your standard "echo server", except for UDP instead of TCP.
This will echo back any packets received to the original sender.
-* `echo-threads` - servers the same purpose as the `echo` example, except this
- shows off using multiple cores on a machine for doing I/O processing.
+
* `connect` - this is a `nc`-like clone which can be used to interact with most
other examples. The program creates a TCP connection or UDP socket to sends
all information read on stdin to the remote peer, displaying any data received
on stdout. Often quite useful when interacting with the various other servers
here!
+
* `chat` - this spins up a local TCP server which will broadcast from any
connected client to all other connected clients. You can connect to this in
multiple terminals and use it to chat between the terminals.
+
+* `chat-combinator` - Similar to `chat`, but this uses a much more functional
+ programming approch using combinators.
+
* `proxy` - an example proxy server that will forward all connected TCP clients
to the remote address specified when starting the program.
-* `sink` - a benchmark-like example which shows writing 0s infinitely to any
- connected client.
+
* `tinyhttp` - a tiny HTTP/1.1 server which doesn't support HTTP request bodies
showcasing running on multiple cores, working with futures and spawning
tasks, and finally framing a TCP connection to discrete request/response
objects.
-* `udp-codec` - an example of using the `Encoder`/`Decoder` traits for UDP
- along with a small ping-pong protocol happening locally.
-* `compress` - an echo-like server where instead of echoing back everything read
- it echos back a gzip-compressed version of everything read! All compression
- occurs on a CPU pool to offload work from the event loop.
+
* `tinydb` - an in-memory database which shows sharing state between all
connected clients, notably the key/value store of this database.
diff --git a/examples/chat-combinator.rs b/examples/chat-combinator.rs
index 76e689b9..11754183 100644
--- a/examples/chat-combinator.rs
+++ b/examples/chat-combinator.rs
@@ -1,8 +1,10 @@
//! A chat server that broadcasts a message to all connections.
//!
-//! This is a simple line-based server which accepts connections, reads lines
-//! from those connections, and broadcasts the lines to all other connected
-//! clients. In a sense this is a bit of a "poor man's chat server".
+//! This is a line-based server which accepts connections, reads lines from
+//! those connections, and broadcasts the lines to all other connected clients.
+//!
+//! This example is similar to chat.rs, but uses combinators and a much more
+//! functional style.
//!
//! You can test this out by running:
//!
@@ -17,122 +19,132 @@
//! connected clients they'll all join the same room and see everyone else's
//! messages.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
-extern crate tokio_io;
+extern crate futures;
+
+use tokio::io;
+use tokio::net::TcpListener;
+use tokio::prelude::*;
use std::collections::HashMap;
use std::iter;
use std::env;
-use std::io::{Error, ErrorKind, BufReader};
+use std::io::{BufReader};
use std::sync::{Arc, Mutex};
-use futures::Future;
-use futures::future::Executor;
-use futures::stream::{self, Stream};
-use futures_cpupool::CpuPool;
-use tokio::net::TcpListener;
-use tokio_io::io;
-use tokio_io::AsyncRead;
-
fn main() {
+ // Create the TCP listener we'll accept connections on.
let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
let addr = addr.parse().unwrap();
- // Create the TCP listener we'll accept connections on.
let socket = TcpListener::bind(&addr).unwrap();
println!("Listening on: {}", addr);
- // This is currently a multi threaded server.
- //
- // Once the same thread executor lands, transition to single threaded.
+ // This is running on the Tokio runtime, so it will be multi-threaded. The
+ // `Arc<Mutex<...>>` allows state to be shared across the threads.
let connections = Arc::new(Mutex::new(HashMap::new()));
- let srv = socket.incoming().for_each(move |stream| {
- let addr = stream.peer_addr().unwrap();
-
- println!("New Connection: {}", addr);
- let (reader, writer) = stream.split();
-
- // Create a channel for our stream, which other sockets will use to
- // send us messages. Then register our address with the stream to send
- // data to us.
- let (tx, rx) = futures::sync::mpsc::unbounded();
- connections.lock().unwrap().insert(addr, tx);
-
- // Define here what we do for the actual I/O. That is, read a bunch of
- // lines from the socket and dispatch them while we also write any lines
- // from other sockets.
- let connections_inner = connections.clone();
- let reader = BufReader::new(reader);
-
- // Model the read portion of this socket by mapping an infinite
- // iterator to each line off the socket. This "loop" is then
- // terminated with an error once we hit EOF on the socket.
- let iter = stream::iter_ok::<_, Error>(iter::repeat(()));
- let socket_reader = iter.fold(reader, move |reader, _| {
- // Read a line off the socket, failing if we're at EOF
- let line = io::read_until(reader, b'\n', Vec::new());
- let line = line.and_then(|(reader, vec)| {
- if vec.len() == 0 {
- Err(Error::new(ErrorKind::BrokenPipe, "broken pipe"))
- } else {
- Ok((reader, vec))
- }
+ // The server task asynchronously iterates over and processes each incoming
+ // connection.
+ let srv = socket.incoming()
+ .map_err(|e| println!("failed to accept socket; error = {:?}", e))
+ .for_each(move |stream| {
+ // The client's socket address
+ let addr = stream.peer_addr().unwrap();
+
+ println!("New Connection: {}", addr);
+
+ // Split the TcpStream into two separate handles. One handle for reading
+ // and one handle for writing. This lets us use separate tasks for
+ // reading and writing.
+ let (reader, writer) = stream.split();
+
+ // Create a channel for our stream, which other sockets will use to
+ // send us messages. Then register our address with the stream to send
+ // data to us.
+ let (tx, rx) = futures::sync::mpsc::unbounded();
+ connections.lock().unwrap().insert(addr, tx);
+
+ // Define here what we do for the actual I/O. That is, read a bunch of
+ // lines from the socket and dispatch them while we also write any lines
+ // from other sockets.
+ let connections_inner = connections.clone();
+ let reader = BufReader::new(reader);
+
+ // Model the read portion of this socket by mapping an infinite
+ // iterator to each line off the socket. This "loop" is then
+ // terminated with an error once we hit EOF on the socket.
+ let iter = stream::iter_ok::<_, io::Error>(iter::repeat(()));
+
+ let socket_reader = iter.fold(reader, move |reader, _| {
+ // Read a line off the socket, failing if we're at EOF
+ let line = io::read_until(reader, b'\n', Vec::new());
+ let line = line.and_then(|(reader, vec)| {
+ if vec.len() == 0 {
+ Err(io::Error::new(io::ErrorKind::BrokenPipe, "broken pipe"))
+ } else {
+ Ok((reader, vec))
+ }
+ });
+
+ // Convert the bytes we read into a string, and then send that
+ // string to all other connected clients.
+ let line = line.map(|(reader, vec)| {
+ (reader, String::from_utf8(vec))
+ });
+
+ // Move the connection state into the closure below.
+ let connections = connections_inner.clone();
+
+ line.map(move |(reader, message)| {
+ println!("{}: {:?}", addr, message);
+ let mut conns = connections.lock().unwrap();
+
+ if let Ok(msg) = message {
+ // For each open connection except the sender, send the
+ // string via the channel.
+ let iter = conns.iter_mut()
+ .filter(|&(&k, _)| k != addr)
+ .map(|(_, v)| v);
+ for tx in iter {
+ tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
+ }
+ } else {
+ let tx = conns.get_mut(&addr).unwrap();
+ tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
+ }
+
+ reader
+ })
});
- // Convert the bytes we read into a string, and then send that
- // string to all other connected clients.
- let line = line.map(|(reader, vec)| {
- (reader, String::from_utf8(vec))
+ // Whenever we receive a string on the Receiver, we write it to
+ // `WriteHalf<TcpStream>`.
+ let socket_writer = rx.fold(writer, |writer, msg| {
+ let amt = io::write_all(writer, msg.into_bytes());
+ let amt = amt.map(|(writer, _)| writer);
+ amt.map_err(|_| ())
});
- let connections = connections_inner.clone();
- line.map(move |(reader, message)| {
- println!("{}: {:?}", addr, message);
- let mut conns = connections.lock().unwrap();
- if let Ok(msg) = message {
- // For each open connection except the sender, send the
- // string via the channel.
- let iter = conns.iter_mut()
- .filter(|&(&k, _)| k != addr)
- .map(|(_, v)| v);
- for tx in iter {
- tx.unbounded_send(format!("{}: {}", addr, msg)).unwrap();
- }
- } else {
- let tx = conns.get_mut(&addr).unwrap();
- tx.unbounded_send("You didn't send valid UTF-8.".to_string()).unwrap();
- }
- reader
- })
- });
- // Whenever we receive a string on the Receiver, we write it to
- // `WriteHalf<TcpStream>`.
- let socket_writer = rx.fold(writer, |writer, msg| {
- let amt = io::write_all(writer, msg.into_bytes());
- let amt = amt.map(|(writer, _)| writer);
- amt.map_err(|_| ())
- });
+ // Now that we've got futures representing each half of the socket, we
+ // use the `select` combinator to wait for either half to be done to
+ // tear down the other. Then we spawn off the result.
+ let connections = connections.clone();
+ let socket_reader = socket_reader.map_err(|_| ());
+ let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
- let pool = CpuPool::new(1);
-
- // Now that we've got futures representing each half of the socket, we
- // use the `select` combinator to wait for either half to be done to
- // tear down the other. Then we spawn off the result.
- let connections = connections.clone();
- let socket_reader = socket_reader.map_err(|_| ());
- let connection = socket_reader.map(|_| ()).select(socket_writer.map(|_| ()));
- pool.execute(connection.then(move |_| {
- connections.lock().unwrap().remove(&addr);
- println!("Connection {} closed.", addr);
- Ok(())
- })).unwrap();
+ // Spawn a task to process the connection
+ tokio::spawn(connection.then(move |_| {
+ connections.lock().unwrap().remove(&addr);
+ println!("Connection {} closed.", addr);
+ Ok(())
+ }));
- Ok(())
- });
+ Ok(())
+ });
// execute server
- srv.wait().unwrap();
+ tokio::run(srv);
}
diff --git a/examples/chat.rs b/examples/chat.rs
index 334a9b87..f7f04fa9 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -26,26 +26,21 @@
#![deny(warnings)]
-#[macro_use]
-extern crate futures;
extern crate tokio;
#[macro_use]
-extern crate tokio_io;
+extern crate futures;
extern crate bytes;
-use tokio::executor::current_thread;
+use tokio::io;
use tokio::net::{TcpListener, TcpStream};
-use tokio_io::{AsyncRead};
-use futures::prelude::*;
+use tokio::prelude::*;
use futures::sync::mpsc;
use futures::future::{self, Either};
use bytes::{BytesMut, Bytes, BufMut};
-use std::io::{self, Write};
-use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
-use std::rc::Rc;
+use std::sync::{Arc, Mutex};
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<Bytes>;
@@ -88,7 +83,7 @@ struct Peer {
///
/// This is used to broadcast messages read off the socket to all connected
/// peers.
- state: Rc<RefCell<Shared>>,
+ state: Arc<Mutex<Shared>>,
/// Receive half of the message channel.
///
@@ -137,7 +132,7 @@ impl Shared {
impl Peer {
/// Create a new instance of `Peer`.
fn new(name: BytesMut,
- state: Rc<RefCell<Shared>>,
+ state: Arc<Mutex<Shared>>,
lines: Lines) -> Peer
{
// Get the client socket address
@@ -147,7 +142,8 @@ impl Peer {
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
- state.borrow_mut().peers.insert(addr, tx);
+ state.lock().unwrap()
+ .peers.insert(addr, tx);
Peer {
name,
@@ -213,7 +209,7 @@ impl Future for Peer {
let line = line.freeze();
// Now, send the line to all other peers
- for (addr, tx) in &self.state.borrow().peers {
+ for (addr, tx) in &self.state.lock().unwrap().peers {
// Don't send the message to ourselves
if *addr != self.addr {
// The send only fails if the rx half has been dropped,
@@ -240,7 +236,7 @@ impl Future for Peer {
impl Drop for Peer {
fn drop(&mut self) {
- self.state.borrow_mut().peers
+ self.state.lock().unwrap().peers
.remove(&self.addr);
}
}
@@ -275,7 +271,7 @@ impl Lines {
//
// In the case of `io::Result`, an error of `WouldBlock` is
// equivalent to `Async::NotReady.
- let n = try_nb!(self.socket.write(&self.wr));
+ let n = try_ready!(self.socket.poll_write(&self.wr));
// As long as the wr is not empty, a successful write should
// never write 0 bytes.
@@ -344,7 +340,7 @@ impl Stream for Lines {
///
/// This will read the first line from the socket to identify the client, then
/// add the client to the set of connected peers in the chat service.
-fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
+fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// Wrap the socket with the `Lines` codec that we wrote above.
//
// By doing this, we can operate at the line level instead of doing raw byte
@@ -406,8 +402,8 @@ fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
println!("connection error = {:?}", e);
});
- // Spawn a new task that processes the socket:
- current_thread::spawn(connection);
+ // Return the connection processing task
+ tokio::spawn(connection);
}
pub fn main() {
@@ -416,7 +412,7 @@ pub fn main() {
// The server task will hold a handle to this. For every new client, the
// `state` handle is cloned and passed into the task that processes the
// client connection.
- let state = Rc::new(RefCell::new(Shared::new()));
+ let state = Arc::new(Mutex::new(Shared::new()));
let addr = "127.0.0.1:6142".parse().unwrap();
@@ -428,6 +424,7 @@ pub fn main() {
// The server task asynchronously iterates over and processes each
// incoming connection.
let server = listener.incoming().for_each(move |socket| {
+ // Spawn a task to process the connection
process(socket, state.clone());
Ok(())
})
@@ -460,5 +457,5 @@ pub fn main() {
//
// In our example, we have not defined a shutdown strategy, so this will
// block until `ctrl-c` is pressed at the terminal.
- current_thread::block_on_all(server).unwrap();
+ tokio::run(server);
}
diff --git a/examples/compress.rs b/examples/compress.rs
deleted file mode 100644
index 3098abf7..00000000
--- a/examples/compress.rs
+++ /dev/null
@@ -1,122 +0,0 @@
-//! An example of offloading work to a thread pool instead of doing work on the
-//! main event loop.
-//!
-//! In this example the server will act as a form of echo server except that
-//! it'll echo back gzip-compressed data. Each connected client will have the
-//! data written streamed back as the compressed version is available, and all
-//! compressing will occur on a thread pool rather than the main event loop.
-//!
-//! You can preview this example with in one terminal:
-//!
-//! cargo run --example compress
-//!
-//! and in another terminal;
-//!
-//! echo test | cargo run --example connect 127.0.0.1:8080 | gunzip
-//!
-//! The latter command will need to be tweaked for non-unix-like shells, but
-//! you can also redirect the stdout of the `connect` program to a file
-//! and then decompress that.
-
-extern crate futures;
-extern crate futures_cpupool;
-extern crate flate2;
-extern crate tokio;
-extern crate tokio_io;
-
-use std::io;
-use std::env;
-use std::net::SocketAddr;
-
-use futures::{Future, Stream, Poll};
-use futures::future::Executor;
-use futures_cpupool::CpuPool;
-use tokio::net::{TcpListener, TcpStream};
-use tokio_io::{AsyncRead, AsyncWrite};
-use flate2::write::GzEncoder;
-
-fn main() {
- // As with many other examples, parse our CLI arguments and prepare the
- // reactor.
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse::<SocketAddr>().unwrap();
- let socket = TcpListener::bind(&addr).unwrap();
- println!("Listening on: {}", addr);
-
- // This is where we're going to offload our computationally heavy work
- // (compressing) to. Here we just use a convenience constructor to create a
- // pool of threads equal to the number of CPUs we have.
- let pool = CpuPool::new_num_cpus();
-
- // The compress logic will happen in the function below, but everything's
- // still a future! Each client is spawned to concurrently get processed.
- let server = socket.incoming().for_each(move |socket| {
- let addr = socket.peer_addr().unwrap();
- pool.execute(compress(socket, &pool).then(move |result| {
- match result {
- Ok((r, w)) => println!("{}: compressed {} bytes to {}", addr, r, w),
- Err(e) => println!("{}: failed when compressing: {}", addr, e),
- }
- Ok(())
- })).unwrap();
- Ok(())
- });
-
- server.wait().unwrap();
-}
-
-/// The main workhorse of this example. This'll compress all data read from
-/// `socket` on the `pool` provided, writing it back out to `socket` as it's
-/// available.
-fn compress(socket: TcpStream, pool: &CpuPool)
- -> Box<Future<Item = (u64, u64), Error = io::Error> + Send>
-{
- use tokio_io::io;
-
- // The general interface that `CpuPool` provides is that we'll *spawn a
- // future* onto it. All execution of the future will occur on the `CpuPool`
- // and we'll get back a handle representing the completed value of the
- // future. In essence it's our job here to create a future that represents
- // compressing `socket`, and then we'll simply spawn it at the very end.
- //
- // Here we exploit the fact that `TcpStream` itself is `Send` in this
- // function as well. That is, we can read/write the TCP stream on any
- // thread, and we'll get notifications about it being ready from the reactor
- // thread.
- //
- // Otherwise this is the same as the echo server except that after splitting
- // we apply some encoding to one side, followed by a `shutdown` when we're
- // done to ensure that all gz footers are written.
- let (read, write) = socket.split();
- let write = Count { io: write, amt: 0 };
- let write = GzEncoder::new(write, flate2::Compression::best());
- let process = io::copy(read, write).and_then(|(amt, _read, write)| {
- io::shutdown(write).map(move |io| (amt, io.get_ref().amt))
- });
-
- // Spawn the future so is executes entirely on the thread pool here
- Box::new(pool.spawn(process))
-}
-
-struct Count<T> {
- io: T,
- amt: u64,
-}
-
-impl<T: io::Write> io::Write for Count<T> {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- let n = self.io.write(buf)?;
- self.amt += n as u64;
- Ok(n)
- }
-
- fn flush(&mut self) -> io::Result<()> {
- self.io.flush()
- }
-}
-
-impl<T: AsyncWrite> AsyncWrite for Count<T> {
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- self.io.shutdown()
- }
-}
diff --git a/examples/connect.rs b/examples/connect.rs
index 26614b96..f3ea6970 100644
--- a/examples/connect.rs
+++ b/examples/connect.rs
@@ -14,10 +14,11 @@
//! this repository! Many of them recommend running this as a simple "hook up
//! stdin/stdout to a server" to get up and running.
-extern crate futures;
-extern crate futures_cpupool;
+#![deny(warnings)]
+
extern crate tokio;
extern crate tokio_io;
+extern crate futures;
extern crate bytes;
use std::env;
@@ -25,9 +26,8 @@ use std::io::{self, Read, Write};
use std::net::SocketAddr;
use std::thread;
+use tokio::prelude::*;
use futures::sync::mpsc;
-use futures::{Future, Sink, Stream};
-use futures_cpupool::CpuPool;
fn main() {
// Determine if we're going to run in TCP or UDP mode
@@ -46,8 +46,6 @@ fn main() {
});
let addr = addr.parse::<SocketAddr>().unwrap();
- let pool = CpuPool::new(1);
-
// Right now Tokio doesn't support a handle to stdin running on the event
// loop, so we farm out that work to a separate thread. This thread will
// read data (with blocking I/O) from stdin and then send it to the event
@@ -60,9 +58,9 @@ fn main() {
// our UDP connection to get a stream of bytes we're going to emit to
// stdout.
let stdout = if tcp {
- tcp::connect(&addr, &pool, Box::new(stdin_rx))
+ tcp::connect(&addr, Box::new(stdin_rx))
} else {
- udp::connect(&addr, &pool, Box::new(stdin_rx))
+ udp::connect(&addr, Box::new(stdin_rx))
};
// And now with our stream of bytes to write to stdout, we execute that in
@@ -71,15 +69,21 @@ fn main() {
// loop. In this case, though, we know it's ok as the event loop isn't
// otherwise running anything useful.
let mut out = io::stdout();
- stdout.for_each(|chunk| {
- out.write_all(&chunk)
- }).wait().unwrap();
+
+ tokio::run({
+ stdout
+ .for_each(move |chunk| {
+ out.write_all(&chunk)
+ })
+ .map_err(|e| println!("error reading stdout; error = {:?}", e))
+ });
}
mod codec {
use std::io;
use bytes::{BufMut, BytesMut};
use tokio_io::codec::{Encoder, Decoder};
+
/// A simple `Codec` implementation that just ships bytes around.
///
/// This type is used for "framing" a TCP/UDP stream of bytes but it's really
@@ -115,24 +119,21 @@ mod codec {
}
mod tcp {
- use std::io;
- use std::net::SocketAddr;
+ use tokio;
+ use tokio::net::TcpStream;
+ use tokio::prelude::*;
use bytes::BytesMut;
- use futures::{Future, Stream};
- use futures::future::Executor;
- use futures_cpupool::CpuPool;
- use tokio::net::TcpStream;
- use tokio_io::AsyncRead;
use codec::Bytes;
+ use std::io;
+ use std::net::SocketAddr;
+
pub fn connect(addr: &SocketAddr,
- pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Box<Stream<Item = BytesMut, Error = io::Error>>
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
{
let tcp = TcpStream::connect(addr);
- let pool = pool.clone();
// After the TCP connection has been established, we set up our client
// to start forwarding data.
@@ -151,12 +152,14 @@ mod tcp {
// with us reading data from the stream.
Box::new(tcp.map(move |stream| {
let (sink, stream) = stream.framed(Bytes).split();
- pool.execute(stdin.forward(sink).then(|result| {
+
+ tokio::spawn(stdin.forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- })).unwrap();
+ }));
+
stream
}).flatten_stream())
}
@@ -166,17 +169,16 @@ mod udp {
use std::io;
use std::net::SocketAddr;
- use bytes::BytesMut;
- use futures::{Future, Stream};
- use futures::future::Executor;
- use futures_cpupool::CpuPool;
+ use tokio;
use tokio::net::{UdpSocket, UdpFramed};
+ use tokio::prelude::*;
+ use bytes::BytesMut;
+
use codec::Bytes;
pub fn connect(&addr: &SocketAddr,
- pool: &CpuPool,
stdin: Box<Stream<Item = Vec<u8>, Error = io::Error> + Send>)
- -> Box<Stream<Item = BytesMut, Error = io::Error>>
+ -> Box<Stream<Item = BytesMut, Error = io::Error> + Send>
{
// We'll bind our UDP socket to a local IP/port, but for now we
// basically let the OS pick both of those.
@@ -196,14 +198,14 @@ mod udp {
// All bytes from `stdin` will go to the `addr` specified in our
// argument list. Like with TCP this is spawned concurrently
- pool.execute(stdin.map(move |chunk| {
+ tokio::spawn(stdin.map(move |chunk| {
(chunk, addr)
}).forward(sink).then(|result| {
if let Err(e) = result {
panic!("failed to write to socket: {}", e)
}
Ok(())
- })).unwrap();
+ }));
// With UDP we could receive data from any source, so filter out
// anything coming from a different address
diff --git a/examples/echo-threads.rs b/examples/echo-threads.rs
deleted file mode 100644
index 6ce8b156..00000000
--- a/examples/echo-threads.rs
+++ /dev/null
@@ -1,92 +0,0 @@
-//! A multithreaded version of an echo server
-//!
-//! This server implements the same functionality as the `echo` example, except
-//! that this example will use all cores of the machine to do I/O instead of
-//! just one. This examples works by having the main thread using blocking I/O
-//! and shipping accepted sockets to worker threads in a round-robin fashion.
-//!
-//! To see this server in action, you can run this in one terminal:
-//!
-//! cargo run --example echo-threads
-//!
-//! and in another terminal you can run:
-//!
-//! cargo run --example connect 127.0.0.1:8080
-
-extern crate futures;
-extern crate futures_cpupool;
-extern crate num_cpus;
-extern crate tokio;
-extern crate tokio_io;
-
-use std::env;
-use std::net::SocketAddr;
-use std::thread;
-
-use futures::prelude::*;
-use futures::future::Executor;
-use futures::sync::mpsc;
-use futures_cpupool::CpuPool;
-use tokio_io::AsyncRead;
-use tokio_io::io::copy;
-use tokio::net::{TcpStream, TcpListener};
-
-fn main() {
- // First argument, the address to bind
- let addr = env::args().nth(1).unwrap_or("127.0.0.1:8080".to_string());
- let addr = addr.parse::<SocketAddr>().unwrap();
-
- // Second argument, the number of threads we'll be using
- let num_threads = env::args().nth(2).and_then(|s| s.parse().ok())
- .unwrap_or(num_cpus::get());
-
- let listener = TcpListener::bind(&addr).expect("failed to bind");
- println!("Listening on: {}", addr);
-
- // Spin up our worker threads, creating a channel routing to each worker
- // thread that we'll use below.
- let mut channels = Vec::new();
- for _ in 0..num_threads {
- let (tx, rx) = mpsc::unbounded();
- channels.push(tx);
- thread::spawn(|| worker(rx));
- }
-
- // Infinitely accept sockets from our `TcpListener`. Each socket is then
- // shipped round-robin to a particular thread which will associate the
- // socket with the corresponding event loop and process the connection.
- let mut next = 0;
- let srv = listener.incoming().for_each(|socket| {
- channels[next].unbounded_send(socket).expect("worker thread died");
- next = (next + 1) % channels.len();
- Ok(())
- });
- srv.wait().unwrap();
-}
-
-fn worker(rx: mpsc::UnboundedReceiver<TcpStream>) {
- let pool = CpuPool::new(1);
-
- let done = rx.for_each(move |socket| {
- let addr = socket.peer_addr().expect("failed to get remote address");
-
- // Like the single-threaded `echo` example we split the socket halves
- // and use the `copy` helper to ship bytes back and forth. Afterwards we
- // spawn the task to run concurrently on this thread, and then print out
- // what happened afterwards
- let (reader, writer) = socket.split();
- let amt = copy(reader, writer);
- let msg = amt.then(move |result| {
- match result {
- Ok((amt, _, _)) => println!("wrote {} bytes to {}", amt, addr),
- Err(e) => println!("error on {}: {}", addr, e),
- }
-
- Ok(())
- });
- pool.execute(msg).unwrap();
-
- Ok(())
- });
- done.wait().unwrap();
-}
diff --git a/examples/echo-udp.rs b/examples/echo-udp.rs
index 18f2d393..e25f0936 100644
--- a/examples/echo-udp.rs
+++ b/examples/echo-udp.rs
@@ -10,15 +10,16 @@
//!
//! Each line you type in to the `nc` terminal should be echo'd back to you!
+#![deny(warnings)]
+
#[macro_use]
extern crate futures;
extern crate tokio;
-extern crate tokio_io;
use std::{env, io};
use std::net::SocketAddr;
-use futures::{Future, Poll};
+use tokio::prelude::*;
use tokio::net::UdpSocket;
struct Server {
@@ -56,11 +57,17 @@ fn main() {
let socket = UdpSocket::bind(&addr).unwrap();
println!("Listening on: {}", socket.local_addr().unwrap());
- // Next we'll create a future to spawn (the one we defined above) and then
- // we'll block our current thread waiting on the result of the future
- Server {
+ let server = Server {
socket: socket,
buf: vec![0; 1024],
to_send: None,
- }.wait().unwrap();
+ };
+
+ // This starts the server task.
+ //
+ // `map_err` handles the error by logging it and maps the future to a type
+ // that can be spawned.
+ //
+ // `tokio::run` spanws the task on the Tokio runtime and starts running.
+ tokio::run(server.map_err(|e| println!("server error = {:?}", e)));
}
diff --git a/examples/echo.rs b/examples/echo.rs
index 558f3a68..e4fe65cc 100644
--- a/examples/echo.rs
+++ b/examples/echo.rs
@@ -1,9 +1,11 @@
-//! A "hello world" echo server with tokio-core
+//! A "hello world" echo server with Tokio