diff options
author | Carl Lerche <me@carllerche.com> | 2018-03-08 13:38:52 -0800 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-08 13:38:52 -0800 |
commit | fed4d72effbf5187a0e62c1307798daa24c8f7fc (patch) | |
tree | 3c508233a42d0c8327c01482d58ec3d189017cce /examples/chat.rs | |
parent | 142bd3b2a90aaf33c14248ffcfcb1955f6b1fe66 (diff) |
Improve the chat example, making it more robust (#199)
This handles cases where clients send large amounts of data while on
localhost.
Closes #192
Diffstat (limited to 'examples/chat.rs')
-rw-r--r-- | examples/chat.rs | 26 |
1 files changed, 25 insertions, 1 deletions
diff --git a/examples/chat.rs b/examples/chat.rs index 8d18ded1..f325c2d2 100644 --- a/examples/chat.rs +++ b/examples/chat.rs @@ -172,8 +172,20 @@ impl Future for Peer { type Error = io::Error; fn poll(&mut self) -> Poll<(), io::Error> { + // Tokio (and futures) use cooperative scheduling without any + // preemption. If a task never yields execution back to the executor, + // then other tasks may be starved. + // + // To deal with this, robust applications should not have any unbounded + // loops. In this example, we will read at most `LINES_PER_TICK` lines + // from the client on each tick. + // + // If the limit is hit, the current task is notified, informing the + // executor to schedule the task again asap. + const LINES_PER_TICK: usize = 10; + // Receive all messages from peers. - loop { + for i in 0..LINES_PER_TICK { // Polling an `UnboundedReceiver` cannot fail, so `unwrap` here is // safe. match self.rx.poll().unwrap() { @@ -181,6 +193,14 @@ impl Future for Peer { // Buffer the line. Once all lines are buffered, they will // be flushed to the socket (right below). self.lines.buffer(&v); + + // If this is the last iteration, the loop will break even + // though there could still be lines to read. Because we did + // not reach `Async::NotReady`, we have to notify ourselves + // in order to tell the executor to schedule the task again. + if i+1 == LINES_PER_TICK { + task::current().notify(); + } } _ => break, } @@ -256,6 +276,10 @@ impl Lines { /// This writes the line to an internal buffer. Calls to `poll_flush` will /// attempt to flush this buffer to the socket. fn buffer(&mut self, line: &[u8]) { + // Ensure the buffer has capacity. Ideally this would not be unbounded, + // but to keep the example simple, we will not limit this. + self.wr.reserve(line.len()); + // Push the line onto the end of the write buffer. // // The `put` function is from the `BufMut` trait. |