diff options
Diffstat (limited to 'tokio/src')
-rw-r--r-- | tokio/src/async_await.rs | 17 | ||||
-rw-r--r-- | tokio/src/executor.rs (renamed from tokio/src/executor/mod.rs) | 40 | ||||
-rw-r--r-- | tokio/src/executor/current_thread/mod.rs | 166 | ||||
-rw-r--r-- | tokio/src/io.rs | 21 | ||||
-rw-r--r-- | tokio/src/lib.rs | 17 | ||||
-rw-r--r-- | tokio/src/net.rs | 20 | ||||
-rw-r--r-- | tokio/src/prelude.rs | 13 | ||||
-rw-r--r-- | tokio/src/reactor.rs (renamed from tokio/src/reactor/mod.rs) | 6 | ||||
-rw-r--r-- | tokio/src/reactor/poll_evented.rs | 545 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/async_await.rs | 17 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/builder.rs | 24 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/mod.rs | 13 | ||||
-rw-r--r-- | tokio/src/runtime/current_thread/runtime.rs | 69 | ||||
-rw-r--r-- | tokio/src/runtime/mod.rs | 9 | ||||
-rw-r--r-- | tokio/src/runtime/threadpool/async_await.rs | 18 | ||||
-rw-r--r-- | tokio/src/runtime/threadpool/mod.rs | 3 | ||||
-rw-r--r-- | tokio/src/sync.rs | 2 |
17 files changed, 64 insertions, 936 deletions
diff --git a/tokio/src/async_await.rs b/tokio/src/async_await.rs deleted file mode 100644 index 29ebce88..00000000 --- a/tokio/src/async_await.rs +++ /dev/null @@ -1,17 +0,0 @@ -use tokio_futures::compat; - -/// Like `tokio::run`, but takes an `async` block -pub fn run_async<F>(future: F) -where - F: std::future::Future<Output = ()> + Send + 'static, -{ - crate::run(compat::infallible_into_01(future)); -} - -/// Like `tokio::spawn`, but takes an `async` block -pub fn spawn_async<F>(future: F) -where - F: std::future::Future<Output = ()> + Send + 'static, -{ - crate::spawn(compat::infallible_into_01(future)); -} diff --git a/tokio/src/executor/mod.rs b/tokio/src/executor.rs index 41fb65a7..266da320 100644 --- a/tokio/src/executor/mod.rs +++ b/tokio/src/executor.rs @@ -39,41 +39,15 @@ //! [`Executor`]: trait.Executor.html //! [`spawn`]: fn.spawn.html -#[deprecated( - since = "0.1.8", - note = "use tokio-current-thread crate or functions in tokio::runtime::current_thread instead", -)] -#[doc(hidden)] -pub mod current_thread; - -#[deprecated(since = "0.1.8", note = "use tokio-threadpool crate instead")] -#[doc(hidden)] -/// Re-exports of [`tokio-threadpool`], deprecated in favor of the crate. -/// -/// [`tokio-threadpool`]: https://docs.rs/tokio-threadpool/0.1 -pub mod thread_pool { - pub use tokio_threadpool::{ - Builder, - Sender, - Shutdown, - ThreadPool, - }; -} - +use std::future::Future; pub use tokio_executor::{Executor, TypedExecutor, DefaultExecutor, SpawnError}; -use futures::{Future, IntoFuture}; -use futures::future::{self, FutureResult}; - /// Return value from the `spawn` function. /// /// Currently this value doesn't actually provide any functionality. However, it /// provides a way to add functionality later without breaking backwards /// compatibility. /// -/// This also implements `IntoFuture` so that it can be used as the return value -/// in a `for_each` loop. -/// /// See [`spawn`] for more details. /// /// [`spawn`]: fn.spawn.html @@ -126,18 +100,8 @@ pub struct Spawn(()); /// /// [`DefaultExecutor`]: struct.DefaultExecutor.html pub fn spawn<F>(f: F) -> Spawn -where F: Future<Item = (), Error = ()> + 'static + Send +where F: Future<Output = ()> + 'static + Send { ::tokio_executor::spawn(f); Spawn(()) } - -impl IntoFuture for Spawn { - type Future = FutureResult<(), ()>; - type Item = (); - type Error = (); - - fn into_future(self) -> Self::Future { - future::ok(()) - } -} diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs deleted file mode 100644 index aa5efb94..00000000 --- a/tokio/src/executor/current_thread/mod.rs +++ /dev/null @@ -1,166 +0,0 @@ -#![allow(deprecated)] - -//! Execute many tasks concurrently on the current thread. -//! -//! [`CurrentThread`] is an executor that keeps tasks on the same thread that -//! they were spawned from. This allows it to execute futures that are not -//! `Send`. -//! -//! A single [`CurrentThread`] instance is able to efficiently manage a large -//! number of tasks and will attempt to schedule all tasks fairly. -//! -//! All tasks that are being managed by a [`CurrentThread`] executor are able to -//! spawn additional tasks by calling [`spawn`]. This function only works from -//! within the context of a running [`CurrentThread`] instance. -//! -//! The easiest way to start a new [`CurrentThread`] executor is to call -//! [`block_on_all`] with an initial task to seed the executor. -//! -//! For example: -//! -//! ``` -//! # use tokio::executor::current_thread; -//! use futures::future::lazy; -//! -//! // Calling execute here results in a panic -//! // current_thread::spawn(my_future); -//! -//! # pub fn main() { -//! current_thread::block_on_all(lazy(|| { -//! // The execution context is setup, futures may be executed. -//! current_thread::spawn(lazy(|| { -//! println!("called from the current thread executor"); -//! Ok(()) -//! })); -//! -//! Ok::<_, ()>(()) -//! })); -//! # } -//! ``` -//! -//! The `block_on_all` function will block the current thread until **all** -//! tasks that have been spawned onto the [`CurrentThread`] instance have -//! completed. -//! -//! More fine-grain control can be achieved by using [`CurrentThread`] directly. -//! -//! ``` -//! # use tokio::executor::current_thread::CurrentThread; -//! use futures::future::{lazy, empty}; -//! use std::time::Duration; -//! -//! // Calling execute here results in a panic -//! // current_thread::spawn(my_future); -//! -//! # pub fn main() { -//! let mut current_thread = CurrentThread::new(); -//! -//! // Spawn a task, the task is not executed yet. -//! current_thread.spawn(lazy(|| { -//! println!("Spawning a task"); -//! Ok(()) -//! })); -//! -//! // Spawn a task that never completes -//! current_thread.spawn(empty()); -//! -//! // Run the executor, but only until the provided future completes. This -//! // provides the opportunity to start executing previously spawned tasks. -//! let res = current_thread.block_on(lazy(|| { -//! Ok::<_, ()>("Hello") -//! })).unwrap(); -//! -//! // Now, run the executor for *at most* 1 second. Since a task was spawned -//! // that never completes, this function will return with an error. -//! current_thread.run_timeout(Duration::from_secs(1)).unwrap_err(); -//! # } -//! ``` -//! -//! # Execution model -//! -//! Internally, [`CurrentThread`] maintains a queue. When one of its tasks is -//! notified, the task gets added to the queue. The executor will pop tasks from -//! the queue and call [`Future::poll`]. If the task gets notified while it is -//! being executed, it won't get re-executed until all other tasks currently in -//! the queue get polled. -//! -//! Before the task is polled, a thread-local variable referencing the current -//! [`CurrentThread`] instance is set. This enables [`spawn`] to spawn new tasks -//! onto the same executor without having to thread through a handle value. -//! -//! If the [`CurrentThread`] instance still has uncompleted tasks, but none of -//! these tasks are ready to be polled, the current thread is put to sleep. When -//! a task is notified, the thread is woken up and processing resumes. -//! -//! All tasks managed by [`CurrentThread`] remain on the current thread. When a -//! task completes, it is dropped. -//! -//! [`spawn`]: fn.spawn.html -//! [`block_on_all`]: fn.block_on_all.html -//! [`CurrentThread`]: struct.CurrentThread.html -//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll - -pub use tokio_current_thread::{ - BlockError, - CurrentThread, - Entered, - Handle, - RunError, - RunTimeoutError, - TaskExecutor, - Turn, - TurnError, - block_on_all, - spawn, -}; - -use std::cell::Cell; -use std::marker::PhantomData; - -use futures::future::{self}; - -#[deprecated(since = "0.1.2", note = "use block_on_all instead")] -#[doc(hidden)] -#[derive(Debug)] -pub struct Context<'a> { - cancel: Cell<bool>, - _p: PhantomData<&'a ()>, -} - -impl<'a> Context<'a> { - /// Cancels *all* executing futures. - pub fn cancel_all_spawned(&self) { - self.cancel.set(true); - } -} - -#[deprecated(since = "0.1.2", note = "use block_on_all instead")] -#[doc(hidden)] -pub fn run<F, R>(f: F) -> R - where F: FnOnce(&mut Context<'_>) -> R -{ - let mut context = Context { - cancel: Cell::new(false), - _p: PhantomData, - }; - - let mut current_thread = CurrentThread::new(); - - let ret = current_thread - .block_on(future::lazy(|| Ok::<_, ()>(f(&mut context)))) - .unwrap(); - - if context.cancel.get() { - return ret; - } - - current_thread.run().unwrap(); - ret -} - -#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")] -#[doc(hidden)] -pub fn task_executor() -> TaskExecutor { - TaskExecutor::current() -} - diff --git a/tokio/src/io.rs b/tokio/src/io.rs index feb1f6b2..4d2b5e35 100644 --- a/tokio/src/io.rs +++ b/tokio/src/io.rs @@ -25,22 +25,13 @@ //! [output]: fn.stdout.html //! [error]: fn.stderr.html //! -//! # Utility functions -//! -//! Utilities functions are provided for working with [`AsyncRead`] / -//! [`AsyncWrite`] types. For example, [`copy`] asynchronously copies all -//! data from a source to a destination. -//! //! # `std` re-exports //! -//! Additionally, [`Read`], [`Write`], [`Error`], [`ErrorKind`], and -//! [`Result`] are re-exported from `std::io` for ease of use. +//! Additionally, [`Error`], [`ErrorKind`], and [`Result`] are re-exported +//! from `std::io` for ease of use. //! //! [`AsyncRead`]: trait.AsyncRead.html //! [`AsyncWrite`]: trait.AsyncWrite.html -//! [`copy`]: fn.copy.html -//! [`Read`]: trait.Read.html -//! [`Write`]: trait.Write.html //! [`Error`]: struct.Error.html //! [`ErrorKind`]: enum.ErrorKind.html //! [`Result`]: type.Result.html @@ -51,12 +42,6 @@ pub use tokio_io::{AsyncRead, AsyncWrite}; #[cfg(feature = "fs")] pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout}; -// Utils -pub use tokio_io::io::{ - copy, flush, lines, read, read_exact, read_to_end, read_until, shutdown, write_all, Copy, - Flush, Lines, ReadExact, ReadHalf, ReadToEnd, ReadUntil, Shutdown, WriteAll, WriteHalf, -}; - // Re-export io::Error so that users don't have to deal // with conflicts when `use`ing `futures::io` and `std::io`. -pub use std::io::{Error, ErrorKind, Read, Result, Write}; +pub use std::io::{Error, ErrorKind, Result}; diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs index 99fdae85..080e0463 100644 --- a/tokio/src/lib.rs +++ b/tokio/src/lib.rs @@ -1,7 +1,6 @@ #![doc(html_root_url = "https://docs.rs/tokio/0.1.20")] #![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)] #![cfg_attr(test, deny(warnings))] -#![cfg_attr(feature = "async-await-preview", feature(async_await, await_macro))] #![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))] //! A runtime for writing reliable, asynchronous, and slim applications. @@ -92,7 +91,7 @@ pub mod reactor; pub mod sync; #[cfg(feature = "timer")] pub mod timer; -pub mod util; +//pub mod util; if_runtime! { pub mod executor; @@ -101,17 +100,3 @@ if_runtime! { pub use crate::executor::spawn; pub use crate::runtime::run; } - -// ===== Experimental async/await support ===== - -#[cfg(feature = "async-await-preview")] -mod async_await; - -#[cfg(feature = "async-await-preview")] -pub use async_await::{run_async, spawn_async}; - -#[cfg(feature = "async-await-preview")] -pub use tokio_futures::async_wait; - -#[cfg(feature = "async-await-preview")] -pub use tokio_macros::{main, test}; diff --git a/tokio/src/net.rs b/tokio/src/net.rs index a6b425da..6e7efb37 100644 --- a/tokio/src/net.rs +++ b/tokio/src/net.rs @@ -41,20 +41,11 @@ pub mod tcp { //! [`TcpListener`]: struct.TcpListener.html //! [incoming_method]: struct.TcpListener.html#method.incoming //! [`Incoming`]: struct.Incoming.html - pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream}; + pub use tokio_tcp::{TcpListener, TcpStream}; } #[cfg(feature = "tcp")] pub use self::tcp::{TcpListener, TcpStream}; -#[cfg(feature = "tcp")] -#[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")] -#[doc(hidden)] -pub type ConnectFuture = self::tcp::ConnectFuture; -#[cfg(feature = "tcp")] -#[deprecated(note = "use `tokio::net::tcp::Incoming` instead")] -#[doc(hidden)] -pub type Incoming = self::tcp::Incoming; - #[cfg(feature = "udp")] pub mod udp { //! UDP bindings for `tokio`. @@ -76,15 +67,6 @@ pub mod udp { #[cfg(feature = "udp")] pub use self::udp::{UdpFramed, UdpSocket}; -#[cfg(feature = "udp")] -#[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")] -#[doc(hidden)] -pub type RecvDgram<T> = self::udp::RecvDgram<T>; -#[cfg(feature = "udp")] -#[deprecated(note = "use `tokio::net::udp::SendDgram` instead")] -#[doc(hidden)] -pub type SendDgram<T> = self::udp::SendDgram<T>; - #[cfg(all(unix, feature = "uds"))] pub mod unix { //! Unix domain socket bindings for `tokio` (only available on unix systems). diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs index e364e8bf..000a680c 100644 --- a/tokio/src/prelude.rs +++ b/tokio/src/prelude.rs @@ -10,15 +10,8 @@ //! //! The prelude may grow over time as additional items see ubiquitous use. -pub use crate::util::{FutureExt, StreamExt}; -pub use futures::{future, stream, task, Async, AsyncSink, Future, IntoFuture, Poll, Sink, Stream}; -pub use std::io::{Read, Write}; -#[cfg(feature = "async-await-preview")] -#[doc(inline)] -pub use tokio_futures::{ - io::{AsyncReadExt, AsyncWriteExt}, - sink::SinkExt, - stream::StreamExt as StreamAsyncExt, -}; +pub use std::future::Future; +pub use std::task::{self, Poll}; +//pub use crate::util::{FutureExt, StreamExt}; #[cfg(feature = "io")] pub use tokio_io::{AsyncRead, AsyncWrite}; diff --git a/tokio/src/reactor/mod.rs b/tokio/src/reactor.rs index eccb341f..e343bcf4 100644 --- a/tokio/src/reactor/mod.rs +++ b/tokio/src/reactor.rs @@ -134,10 +134,6 @@ //! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html //! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html -mod poll_evented; - -#[allow(deprecated)] -pub use self::poll_evented::PollEvented; pub use tokio_reactor::{ - Background, Handle, PollEvented as PollEvented2, Reactor, Registration, Turn, + Handle, PollEvented, Reactor, Registration, Turn, }; diff --git a/tokio/src/reactor/poll_evented.rs b/tokio/src/reactor/poll_evented.rs deleted file mode 100644 index 2b055116..00000000 --- a/tokio/src/reactor/poll_evented.rs +++ /dev/null @@ -1,545 +0,0 @@ -//! Readiness tracking streams, backing I/O objects. -//! -//! This module contains the core type which is used to back all I/O on object -//! in `tokio-core`. The `PollEvented` type is the implementation detail of -//! all I/O. Each `PollEvented` manages registration with a reactor, -//! acquisition of a token, and tracking of the readiness state on the -//! underlying I/O primitive. - -#![allow(deprecated, warnings)] - -use crate::reactor::{Handle, Registration}; -use futures::{task, Async, Poll}; -use mio::event::Evented; -use mio::Ready; -use std::fmt; -use std::io::{self, Read, Write}; -use std::sync::atomic::AtomicUsize; -use std::sync::atomic::Ordering::Relaxed; -use std::sync::Mutex; -use tokio_io::{AsyncRead, AsyncWrite}; - -#[deprecated(since = "0.1.2", note = "PollEvented2 instead")] -#[doc(hidden)] -pub struct PollEvented<E> { - io: E, - inner: Inner, - handle: Handle, -} - -struct Inner { - registration: Mutex<Registration>, - - /// Currently visible read readiness - read_readiness: AtomicUsize, - - /// Currently visible write readiness - write_readiness: AtomicUsize, -} - -impl<E: fmt::Debug> fmt::Debug for PollEvented<E> { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.debug_struct("PollEvented").field("io", &self.io).finish() - } -} - -impl<E> PollEvented<E> { - /// Creates a new readiness stream associated with the provided - /// `loop_handle` and for the given `source`. - pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>> - where - E: Evented, - { - let registration = Registration::new(); - registration.register(&io)?; - - Ok(PollEvented { - io: io, - inner: Inner { - registration: Mutex::new(registration), - read_readiness: AtomicUsize::new(0), - write_readiness: AtomicUsize::new(0), - }, - handle: handle.clone(), - }) - } - - /// Tests to see if this source is ready to be read from or not. - /// - /// If this stream is not ready for a read then `Async::NotReady` will be - /// returned and the current task will be scheduled to receive a - /// notification when the stream is readable again. In other words, this - /// method is only safe to call from within the context of a future's task, - /// typically done in a `Future::poll` method. - /// - /// This is mostly equivalent to `self.poll_ready(Ready::readable())`. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn poll_read(&mut self) -> Async<()> { - if self.poll_read2().is_ready() { - return ().into(); - } - - Async::NotReady - } - - fn poll_read2(&self) -> Async<Ready> { - let r = self.inner.registration.lock().unwrap(); - - // Load the cached readiness - match self.inner.read_readiness.load(Relaxed) { - 0 => {} - mut n => { - // Check what's new with the reactor. - if let Some(ready) = r.take_read_ready().unwrap() { - n |= ready2usize(ready); - self.inner.read_readiness.store(n, Relaxed); - } - - return usize2ready(n).into(); - } - } - - let ready = match r.poll_read_ready().unwrap() { - Async::Ready(r) => r, - _ => return Async::NotReady, - }; - - // Cache the value - self.inner.read_readiness.store(ready2usize(ready), Relaxed); - - ready.into() - } - - /// Tests to see if this source is ready to be written to or not. - /// - /// If this stream is not ready for a write then `Async::NotReady` will be returned - /// and the current task will be scheduled to receive a notification when - /// the stream is writable again. In other words, this method is only safe - /// to call from within the context of a future's task, typically done in a - /// `Future::poll` method. - /// - /// This is mostly equivalent to `self.poll_ready(Ready::writable())`. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn poll_write(&mut self) -> Async<()> { - let r = self.inner.registration.lock().unwrap(); - - match self.inner.write_readiness.load(Relaxed) { - 0 => {} - mut n => { - // Check what's new with the reactor. - if let Some(ready) = r.take_write_ready().unwrap() { - n |= ready2usize(ready); - self.inner.write_readiness.store(n, Relaxed); - } - - return ().into(); - } - } - - let ready = match r.poll_write_ready().unwrap() { - Async::Ready(r) => r, - _ => return Async::NotReady, - }; - - // Cache the value - self.inner - .write_readiness - .store(ready2usize(ready), Relaxed); - - ().into() - } - - /// Test to see whether this source fulfills any condition listed in `mask` - /// provided. - /// - /// The `mask` given here is a mio `Ready` set of possible events. This can - /// contain any events like read/write but also platform-specific events - /// such as hup and error. The `mask` indicates events that are interested - /// in being ready. - /// - /// If any event in `mask` is ready then it is returned through - /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty - /// and contains all events that are currently ready in the `mask` provided. - /// - /// If no events are ready in the `mask` provided then the current task is - /// scheduled to receive a notification when any of them become ready. If - /// the `writable` event is contained within `mask` then this - /// `PollEvented`'s `write` task will be blocked and otherwise the `read` - /// task will be blocked. This is generally only relevant if you're working - /// with this `PollEvented` object on multiple tasks. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> { - let mut ret = Ready::empty(); - - if mask.is_empty() { - return ret.into(); - } - - if mask.is_writable() { - if self.poll_write().is_ready() { - ret = Ready::writable(); - } - } - - let mask = mask - Ready::writable(); - - if !mask.is_empty() { - if let Async::Ready(v) = self.poll_read2() { - ret |= v & mask; - } - } - - if ret.is_empty() { - if mask.is_writable() { - let _ = self.need_write(); - } - - if mask.is_readable() { - let _ = self.need_read(); - } - - Async::NotReady - } else { - ret.into() - } - } - - /// Indicates to this source of events that the corresponding I/O object is - /// no longer readable, but it needs to be. - /// - /// This function, like `poll_read`, is only safe to call from the context - /// of a future's task (typically in a `Future::poll` implementation). It - /// informs this readiness stream that the underlying object is no longer - /// readable, typically because a "would block" error was seen. - /// - /// *All* readiness bits associated with this stream except the writable bit - /// will be reset when this method is called. The current task is then - /// scheduled to receive a notification whenever anything changes other than - /// the writable bit. Note that this typically just means the readable bit - /// is used here, but if you're using a custom I/O object for events like - /// hup/error this may also be relevant. - /// - /// Note that it is also only valid to call this method if `poll_read` - /// previously indicated that the object is readable. That is, this function - /// must always be paired with calls to `poll_read` previously. - /// - /// # Errors - /// - /// This function will return an error if the `Reactor` that this `PollEvented` - /// is associated with has gone away (been destroyed). The error means that - /// the ambient futures task could not be scheduled to receive a - /// notification and typically means that the error should be propagated - /// outwards. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn need_read(&mut self) -> io::Result<()> { - self.inner.read_readiness.store(0, Relaxed); - - if self.poll_read().is_ready() { - // Notify the current task - task::current().notify(); - } - - Ok(()) - } - - /// Indicates to this source of events that the corresponding I/O object is - /// no longer writable, but it needs to be. - /// - /// This function, like `poll_write`, is only safe to call from the context - /// of a future's task (typically in a `Future::poll` implementation). It - /// informs this readiness stream that the underlying object is no longer - /// writable, typically because a "would block" error was seen. - /// - /// The flag indicating that this stream is writable is unset and the - /// current task is scheduled to receive a notification when the stream is - /// then again writable. - /// - /// Note that it is also only valid to call this method if `poll_write` - /// previously indicated that the object is writable. That is, this function - /// must always be paired with calls to `poll_write` previously. - /// - /// # Errors - /// - /// This function will return an error if the `Reactor` that this `PollEvented` - /// is associated with has gone away (been destroyed). The error means that - /// the ambient futures task could not be scheduled to receive a - /// notification and typically means that the error should be propagated - /// outwards. - /// - /// # Panics - /// - /// This function will panic if called outside the context of a future's - /// task. - pub fn need_write(&mut self) -> io::Result<()> { - self.inner.write_readiness.store(0, Relaxed); - - if self.poll_write().is_ready() { - // Notify the current task - task::current().notify(); - } - - Ok(()) - } - - /// Returns a reference to the event loop handle that this readiness stream - /// is associated with. - pub fn handle(&self) -> &Handle { - &self.handle - } - - /// Returns a shared reference to the underlying I/O object this readiness - /// stream is wrapping. - pub fn get_ref(&self) -> &E { - &self.io - } - - /// Returns a mutable reference to the underlying I/O object this readiness - /// stream is wrapping. - pub fn get_mut(&mut self) -> &mut E { - &mut self.io - } - - /// Consumes the `PollEvented` and returns the underlying I/O object - pub fn into_inner(self) -> E { - self.io - } - - /// Deregisters this source of events from the reactor core specified. - /// - /// This method can optionally be called to unregister the underlying I/O - /// object with the event loop that the `handle` provided points to. - /// Typically this method is not required as this automatically happens when - /// `E` is dropped, but for some use cases the `E` object doesn't represent - /// an owned reference, so dropping it won't automatically unregister with - /// the event loop. - /// - /// This consumes `self` as it will no longer provide events after the - /// method is called, and will likely return an error if this `PollEvented` - /// was created on a separate event loop from the `handle` specified. - pub fn deregister(&self) -> io::Result<()> - where - E: Evented, - { - self.inner.registration.lock().unwrap().deregister(&self.io) - } -} - -impl<E: Read> Read for PollEvented<E> { - fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> { - if let Async::NotReady = self.poll_read() { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_mut().read(buf); - - if is_wouldblock(&r) { - self.need_read()?; - } - - return r; - } -} - -impl<E: Write> Write for PollEvented<E> { - fn write(&mut self, buf: &[u8]) -> io::Result<usize> { - if let Async::NotReady = self.poll_write() { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_mut().write(buf); - - if is_wouldblock(&r) { - self.need_write()?; - } - - return r; - } - - fn flush(&mut self) -> io::Result<()> { - if let Async::NotReady = self.poll_write() { - return Err(io::ErrorKind::WouldBlock.into()); - } - - let r = self.get_mut().flush(); - - if is_wouldblock(&r) { - self.need_write()?; - } - - return r; - } -} - -impl<E: Read> AsyncRead for PollEvented<E> {} - -impl<E: Write> AsyncWrite for PollEvented<E> { - fn shutdown(&mut self) -> Poll<(), io::Error> { - Ok(().into()) - } -} - -fn is_wouldblock<T>(r: &io::Result<T>) -> bool { - match *r { - Ok(_) => false, - Err(ref e) => e.kind() == io::ErrorKind::WouldBlock, - } -} - -const READ: usize = 1 << 0; -const WRITE: usize = 1 << 1; - -fn ready2usize(ready: Ready) -> usize { - let mut bits = 0; - if ready.is_readable() { - bits |= READ; - } - if ready.is_writable() { - bits |= WRITE; - } - bits | platform::ready2usize(ready) -} - -fn usize2ready(bits: usize) -> Ready { - let mut ready = Ready::empty(); - if bits & READ != 0 { - ready.insert(Ready::readable()); - } - if bits & WRITE != 0 { - ready.insert(Ready::writable()); - } - ready | platform::usize2ready(bits) -} - -#[cfg(unix)] -mod platform { - use mio::unix::UnixReady; - use mio::Ready; - - const HUP: usize = 1 << 2; - const ERROR: usize = 1 << 3; - const AIO: usize = 1 << 4; - const LIO: usize = 1 << 5; - - #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))] - fn is_aio(ready: &Ready) -> bool { - UnixReady::from(*ready).is_aio() - } - - #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))] - fn is_aio(_ready: &Ready) -> bool { - false - } - - #[cfg(target_os = "freebsd")] - fn is_lio(ready: &Ready) -> bool { - UnixReady::from(*ready).is_lio() - } - - #[cfg(not(target_os = "freebsd"))] - fn is_lio(_ready: &Ready) -> bool { - false - } - - pub fn ready2usize(ready: Ready) -> usize { - let ready = UnixReady::from(ready); - let mut bits = 0; - if is_aio(&ready) { - bits |= AIO; - } - if is_lio(&ready) { - bits |= LIO; - } - if ready.is_error() { - bits |= ERROR; - } - if ready.is_hup() { - bits |= HUP; - } - bits - } - - #[cfg(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos" - ))] - fn usize2ready_aio(ready: &mut UnixReady) { - ready.insert(UnixReady::aio()); - } - - #[cfg(not(any( - target_os = "dragonfly", - target_os = "freebsd", - target_os = "ios", - target_os = "macos" - )))] - fn usize2ready_aio(_ready: &mut UnixReady) { - // aio not available here → empty - } - - #[cfg(target_os = "freebsd")] - fn usize2ready_lio(ready: &mut UnixReady) { - ready.insert(UnixReady::lio()); - } - - #[cfg(not(target_os = "freebsd"))] - fn usize2ready_lio(_ready: &mut UnixReady) { - // lio not available here → empty - } - - pub fn usize2ready(bits: usize) -> Ready { - let mut ready = UnixReady::from(Ready::empty()); - if bits & AIO != 0 { - usize2ready_aio(&mut ready); - } - if bits & LIO != 0 { - usize2ready_lio(&mut ready); - } - if bits & HUP != 0 { - ready.insert(UnixReady::hup()); - } - if bits & ERROR != 0 { - ready.insert(UnixReady::error()); - } - ready.into() - } -} - -#[cfg(windows)] -mod platform { - use mio::Ready; - - pub fn all() -> Ready { - // No platform-specific Readinesses for Windows - Ready::empty() - } - - pub fn hup() -> Ready { - Ready::empty() - } - - pub fn ready2usize(_r: Ready) -> usize { - 0 - } |