summaryrefslogtreecommitdiffstats
path: root/examples/chat.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2018-03-08 13:38:52 -0800
committerGitHub <noreply@github.com>2018-03-08 13:38:52 -0800
commitfed4d72effbf5187a0e62c1307798daa24c8f7fc (patch)
tree3c508233a42d0c8327c01482d58ec3d189017cce /examples/chat.rs
parent142bd3b2a90aaf33c14248ffcfcb1955f6b1fe66 (diff)
Improve the chat example, making it more robust (#199)
This handles cases where clients send large amounts of data while on localhost. Closes #192
Diffstat (limited to 'examples/chat.rs')
-rw-r--r--examples/chat.rs26
1 files changed, 25 insertions, 1 deletions
diff --git a/examples/chat.rs b/examples/chat.rs
index 8d18ded1..f325c2d2 100644
--- a/examples/chat.rs
+++ b/examples/chat.rs
@@ -172,8 +172,20 @@ impl Future for Peer {
type Error = io::Error;
fn poll(&mut self) -> Poll<(), io::Error> {
+ // Tokio (and futures) use cooperative scheduling without any
+ // preemption. If a task never yields execution back to the executor,
+ // then other tasks may be starved.
+ //
+ // To deal with this, robust applications should not have any unbounded
+ // loops. In this example, we will read at most `LINES_PER_TICK` lines
+ // from the client on each tick.
+ //
+ // If the limit is hit, the current task is notified, informing the
+ // executor to schedule the task again asap.
+ const LINES_PER_TICK: usize = 10;
+
// Receive all messages from peers.
- loop {
+ for i in 0..LINES_PER_TICK {
// Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is
// safe.
match self.rx.poll().unwrap() {
@@ -181,6 +193,14 @@ impl Future for Peer {
// Buffer the line. Once all lines are buffered, they will
// be flushed to the socket (right below).
self.lines.buffer(&v);
+
+ // If this is the last iteration, the loop will break even
+ // though there could still be lines to read. Because we did
+ // not reach `Async::NotReady`, we have to notify ourselves
+ // in order to tell the executor to schedule the task again.
+ if i+1 == LINES_PER_TICK {
+ task::current().notify();
+ }
}
_ => break,
}
@@ -256,6 +276,10 @@ impl Lines {
/// This writes the line to an internal buffer. Calls to `poll_flush` will
/// attempt to flush this buffer to the socket.
fn buffer(&mut self, line: &[u8]) {
+ // Ensure the buffer has capacity. Ideally this would not be unbounded,
+ // but to keep the example simple, we will not limit this.
+ self.wr.reserve(line.len());
+
// Push the line onto the end of the write buffer.
//
// The `put` function is from the `BufMut` trait.