summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-06 09:59:04 -0800
committerGitHub <noreply@github.com>2018-03-06 09:59:04 -0800
commitf1cb12e14fb047f3f86c852c253962c60ce471e8 (patch)
tree59aab45a28961b00f7c71c5eb083c6242b51ff01 /examples/chat.rs
parent56c579787260abcb9786aa22cfca1ee4b7c3b5ba (diff)
Update examples to track latest Tokio changes (#180)
The exampes included in the repository have lagged behind the changes made. Specifically, they do not use the new runtime construct. This patch updates examples to use the latest features of Tokio.
Diffstat (limited to 'examples/chat.rs')
-rw-r--r--examples/chat.rs37
1 files changed, 17 insertions, 20 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 334a9b87..f7f04fa9 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -26,26 +26,21 @@
#![deny(warnings)]
-#[macro_use]
-extern crate futures;
extern crate tokio;
#[macro_use]
-extern crate tokio_io;
+extern crate futures;
extern crate bytes;
-use tokio::executor::current_thread;
+use tokio::io;
use tokio::net::{TcpListener, TcpStream};
-use tokio_io::{AsyncRead};
-use futures::prelude::*;
+use tokio::prelude::*;
use futures::sync::mpsc;
use futures::future::{self, Either};
use bytes::{BytesMut, Bytes, BufMut};
-use std::io::{self, Write};
-use std::cell::RefCell;
use std::collections::HashMap;
use std::net::SocketAddr;
-use std::rc::Rc;
+use std::sync::{Arc, Mutex};
/// Shorthand for the transmit half of the message channel.
type Tx = mpsc::UnboundedSender<Bytes>;
@@ -88,7 +83,7 @@ struct Peer {
///
/// This is used to broadcast messages read off the socket to all connected
/// peers.
- state: Rc<RefCell<Shared>>,
+ state: Arc<Mutex<Shared>>,
/// Receive half of the message channel.
///
@@ -137,7 +132,7 @@ impl Shared {
impl Peer {
/// Create a new instance of `Peer`.
fn new(name: BytesMut,
- state: Rc<RefCell<Shared>>,
+ state: Arc<Mutex<Shared>>,
lines: Lines) -> Peer
{
// Get the client socket address
@@ -147,7 +142,8 @@ impl Peer {
let (tx, rx) = mpsc::unbounded();
// Add an entry for this `Peer` in the shared state map.
- state.borrow_mut().peers.insert(addr, tx);
+ state.lock().unwrap()
+ .peers.insert(addr, tx);
Peer {
name,
@@ -213,7 +209,7 @@ impl Future for Peer {
let line = line.freeze();
// Now, send the line to all other peers
- for (addr, tx) in &self.state.borrow().peers {
+ for (addr, tx) in &self.state.lock().unwrap().peers {
// Don't send the message to ourselves
if *addr != self.addr {
// The send only fails if the rx half has been dropped,
@@ -240,7 +236,7 @@ impl Future for Peer {
impl Drop for Peer {
fn drop(&mut self) {
- self.state.borrow_mut().peers
+ self.state.lock().unwrap().peers
.remove(&self.addr);
}
}
@@ -275,7 +271,7 @@ impl Lines {
//
// In the case of `io::Result`, an error of `WouldBlock` is
// equivalent to `Async::NotReady.
- let n = try_nb!(self.socket.write(&self.wr));
+ let n = try_ready!(self.socket.poll_write(&self.wr));
// As long as the wr is not empty, a successful write should
// never write 0 bytes.
@@ -344,7 +340,7 @@ impl Stream for Lines {
///
/// This will read the first line from the socket to identify the client, then
/// add the client to the set of connected peers in the chat service.
-fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
+fn process(socket: TcpStream, state: Arc<Mutex<Shared>>) {
// Wrap the socket with the `Lines` codec that we wrote above.
//
// By doing this, we can operate at the line level instead of doing raw byte
@@ -406,8 +402,8 @@ fn process(socket: TcpStream, state: Rc<RefCell<Shared>>) {
println!("connection error = {:?}", e);
});
- // Spawn a new task that processes the socket:
- current_thread::spawn(connection);
+ // Return the connection processing task
+ tokio::spawn(connection);
}
pub fn main() {
@@ -416,7 +412,7 @@ pub fn main() {
// The server task will hold a handle to this. For every new client, the
// `state` handle is cloned and passed into the task that processes the
// client connection.
- let state = Rc::new(RefCell::new(Shared::new()));
+ let state = Arc::new(Mutex::new(Shared::new()));
let addr = "127.0.0.1:6142".parse().unwrap();
@@ -428,6 +424,7 @@ pub fn main() {
// The server task asynchronously iterates over and processes each
// incoming connection.
let server = listener.incoming().for_each(move |socket| {
+ // Spawn a task to process the connection
process(socket, state.clone());
Ok(())
})
@@ -460,5 +457,5 @@ pub fn main() {
//
// In our example, we have not defined a shutdown strategy, so this will
// block until `ctrl-c` is pressed at the terminal.
- current_thread::block_on_all(server).unwrap();
+ tokio::run(server);
}