summaryrefslogtreecommitdiffstats
path: root/tokio/src
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src')
-rw-r--r--tokio/src/async_await.rs17
-rw-r--r--tokio/src/executor.rs (renamed from tokio/src/executor/mod.rs)40
-rw-r--r--tokio/src/executor/current_thread/mod.rs166
-rw-r--r--tokio/src/io.rs21
-rw-r--r--tokio/src/lib.rs17
-rw-r--r--tokio/src/net.rs20
-rw-r--r--tokio/src/prelude.rs13
-rw-r--r--tokio/src/reactor.rs (renamed from tokio/src/reactor/mod.rs)6
-rw-r--r--tokio/src/reactor/poll_evented.rs545
-rw-r--r--tokio/src/runtime/current_thread/async_await.rs17
-rw-r--r--tokio/src/runtime/current_thread/builder.rs24
-rw-r--r--tokio/src/runtime/current_thread/mod.rs13
-rw-r--r--tokio/src/runtime/current_thread/runtime.rs69
-rw-r--r--tokio/src/runtime/mod.rs9
-rw-r--r--tokio/src/runtime/threadpool/async_await.rs18
-rw-r--r--tokio/src/runtime/threadpool/mod.rs3
-rw-r--r--tokio/src/sync.rs2
17 files changed, 64 insertions, 936 deletions
diff --git a/tokio/src/async_await.rs b/tokio/src/async_await.rs
deleted file mode 100644
index 29ebce88..00000000
--- a/tokio/src/async_await.rs
+++ /dev/null
@@ -1,17 +0,0 @@
-use tokio_futures::compat;
-
-/// Like `tokio::run`, but takes an `async` block
-pub fn run_async<F>(future: F)
-where
- F: std::future::Future<Output = ()> + Send + 'static,
-{
- crate::run(compat::infallible_into_01(future));
-}
-
-/// Like `tokio::spawn`, but takes an `async` block
-pub fn spawn_async<F>(future: F)
-where
- F: std::future::Future<Output = ()> + Send + 'static,
-{
- crate::spawn(compat::infallible_into_01(future));
-}
diff --git a/tokio/src/executor/mod.rs b/tokio/src/executor.rs
index 41fb65a7..266da320 100644
--- a/tokio/src/executor/mod.rs
+++ b/tokio/src/executor.rs
@@ -39,41 +39,15 @@
//! [`Executor`]: trait.Executor.html
//! [`spawn`]: fn.spawn.html
-#[deprecated(
- since = "0.1.8",
- note = "use tokio-current-thread crate or functions in tokio::runtime::current_thread instead",
-)]
-#[doc(hidden)]
-pub mod current_thread;
-
-#[deprecated(since = "0.1.8", note = "use tokio-threadpool crate instead")]
-#[doc(hidden)]
-/// Re-exports of [`tokio-threadpool`], deprecated in favor of the crate.
-///
-/// [`tokio-threadpool`]: https://docs.rs/tokio-threadpool/0.1
-pub mod thread_pool {
- pub use tokio_threadpool::{
- Builder,
- Sender,
- Shutdown,
- ThreadPool,
- };
-}
-
+use std::future::Future;
pub use tokio_executor::{Executor, TypedExecutor, DefaultExecutor, SpawnError};
-use futures::{Future, IntoFuture};
-use futures::future::{self, FutureResult};
-
/// Return value from the `spawn` function.
///
/// Currently this value doesn't actually provide any functionality. However, it
/// provides a way to add functionality later without breaking backwards
/// compatibility.
///
-/// This also implements `IntoFuture` so that it can be used as the return value
-/// in a `for_each` loop.
-///
/// See [`spawn`] for more details.
///
/// [`spawn`]: fn.spawn.html
@@ -126,18 +100,8 @@ pub struct Spawn(());
///
/// [`DefaultExecutor`]: struct.DefaultExecutor.html
pub fn spawn<F>(f: F) -> Spawn
-where F: Future<Item = (), Error = ()> + 'static + Send
+where F: Future<Output = ()> + 'static + Send
{
::tokio_executor::spawn(f);
Spawn(())
}
-
-impl IntoFuture for Spawn {
- type Future = FutureResult<(), ()>;
- type Item = ();
- type Error = ();
-
- fn into_future(self) -> Self::Future {
- future::ok(())
- }
-}
diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs
deleted file mode 100644
index aa5efb94..00000000
--- a/tokio/src/executor/current_thread/mod.rs
+++ /dev/null
@@ -1,166 +0,0 @@
-#![allow(deprecated)]
-
-//! Execute many tasks concurrently on the current thread.
-//!
-//! [`CurrentThread`] is an executor that keeps tasks on the same thread that
-//! they were spawned from. This allows it to execute futures that are not
-//! `Send`.
-//!
-//! A single [`CurrentThread`] instance is able to efficiently manage a large
-//! number of tasks and will attempt to schedule all tasks fairly.
-//!
-//! All tasks that are being managed by a [`CurrentThread`] executor are able to
-//! spawn additional tasks by calling [`spawn`]. This function only works from
-//! within the context of a running [`CurrentThread`] instance.
-//!
-//! The easiest way to start a new [`CurrentThread`] executor is to call
-//! [`block_on_all`] with an initial task to seed the executor.
-//!
-//! For example:
-//!
-//! ```
-//! # use tokio::executor::current_thread;
-//! use futures::future::lazy;
-//!
-//! // Calling execute here results in a panic
-//! // current_thread::spawn(my_future);
-//!
-//! # pub fn main() {
-//! current_thread::block_on_all(lazy(|| {
-//! // The execution context is setup, futures may be executed.
-//! current_thread::spawn(lazy(|| {
-//! println!("called from the current thread executor");
-//! Ok(())
-//! }));
-//!
-//! Ok::<_, ()>(())
-//! }));
-//! # }
-//! ```
-//!
-//! The `block_on_all` function will block the current thread until **all**
-//! tasks that have been spawned onto the [`CurrentThread`] instance have
-//! completed.
-//!
-//! More fine-grain control can be achieved by using [`CurrentThread`] directly.
-//!
-//! ```
-//! # use tokio::executor::current_thread::CurrentThread;
-//! use futures::future::{lazy, empty};
-//! use std::time::Duration;
-//!
-//! // Calling execute here results in a panic
-//! // current_thread::spawn(my_future);
-//!
-//! # pub fn main() {
-//! let mut current_thread = CurrentThread::new();
-//!
-//! // Spawn a task, the task is not executed yet.
-//! current_thread.spawn(lazy(|| {
-//! println!("Spawning a task");
-//! Ok(())
-//! }));
-//!
-//! // Spawn a task that never completes
-//! current_thread.spawn(empty());
-//!
-//! // Run the executor, but only until the provided future completes. This
-//! // provides the opportunity to start executing previously spawned tasks.
-//! let res = current_thread.block_on(lazy(|| {
-//! Ok::<_, ()>("Hello")
-//! })).unwrap();
-//!
-//! // Now, run the executor for *at most* 1 second. Since a task was spawned
-//! // that never completes, this function will return with an error.
-//! current_thread.run_timeout(Duration::from_secs(1)).unwrap_err();
-//! # }
-//! ```
-//!
-//! # Execution model
-//!
-//! Internally, [`CurrentThread`] maintains a queue. When one of its tasks is
-//! notified, the task gets added to the queue. The executor will pop tasks from
-//! the queue and call [`Future::poll`]. If the task gets notified while it is
-//! being executed, it won't get re-executed until all other tasks currently in
-//! the queue get polled.
-//!
-//! Before the task is polled, a thread-local variable referencing the current
-//! [`CurrentThread`] instance is set. This enables [`spawn`] to spawn new tasks
-//! onto the same executor without having to thread through a handle value.
-//!
-//! If the [`CurrentThread`] instance still has uncompleted tasks, but none of
-//! these tasks are ready to be polled, the current thread is put to sleep. When
-//! a task is notified, the thread is woken up and processing resumes.
-//!
-//! All tasks managed by [`CurrentThread`] remain on the current thread. When a
-//! task completes, it is dropped.
-//!
-//! [`spawn`]: fn.spawn.html
-//! [`block_on_all`]: fn.block_on_all.html
-//! [`CurrentThread`]: struct.CurrentThread.html
-//! [`Future::poll`]: https://docs.rs/futures/0.1/futures/future/trait.Future.html#tymethod.poll
-
-pub use tokio_current_thread::{
- BlockError,
- CurrentThread,
- Entered,
- Handle,
- RunError,
- RunTimeoutError,
- TaskExecutor,
- Turn,
- TurnError,
- block_on_all,
- spawn,
-};
-
-use std::cell::Cell;
-use std::marker::PhantomData;
-
-use futures::future::{self};
-
-#[deprecated(since = "0.1.2", note = "use block_on_all instead")]
-#[doc(hidden)]
-#[derive(Debug)]
-pub struct Context<'a> {
- cancel: Cell<bool>,
- _p: PhantomData<&'a ()>,
-}
-
-impl<'a> Context<'a> {
- /// Cancels *all* executing futures.
- pub fn cancel_all_spawned(&self) {
- self.cancel.set(true);
- }
-}
-
-#[deprecated(since = "0.1.2", note = "use block_on_all instead")]
-#[doc(hidden)]
-pub fn run<F, R>(f: F) -> R
- where F: FnOnce(&mut Context<'_>) -> R
-{
- let mut context = Context {
- cancel: Cell::new(false),
- _p: PhantomData,
- };
-
- let mut current_thread = CurrentThread::new();
-
- let ret = current_thread
- .block_on(future::lazy(|| Ok::<_, ()>(f(&mut context))))
- .unwrap();
-
- if context.cancel.get() {
- return ret;
- }
-
- current_thread.run().unwrap();
- ret
-}
-
-#[deprecated(since = "0.1.2", note = "use TaskExecutor::current instead")]
-#[doc(hidden)]
-pub fn task_executor() -> TaskExecutor {
- TaskExecutor::current()
-}
-
diff --git a/tokio/src/io.rs b/tokio/src/io.rs
index feb1f6b2..4d2b5e35 100644
--- a/tokio/src/io.rs
+++ b/tokio/src/io.rs
@@ -25,22 +25,13 @@
//! [output]: fn.stdout.html
//! [error]: fn.stderr.html
//!
-//! # Utility functions
-//!
-//! Utilities functions are provided for working with [`AsyncRead`] /
-//! [`AsyncWrite`] types. For example, [`copy`] asynchronously copies all
-//! data from a source to a destination.
-//!
//! # `std` re-exports
//!
-//! Additionally, [`Read`], [`Write`], [`Error`], [`ErrorKind`], and
-//! [`Result`] are re-exported from `std::io` for ease of use.
+//! Additionally, [`Error`], [`ErrorKind`], and [`Result`] are re-exported
+//! from `std::io` for ease of use.
//!
//! [`AsyncRead`]: trait.AsyncRead.html
//! [`AsyncWrite`]: trait.AsyncWrite.html
-//! [`copy`]: fn.copy.html
-//! [`Read`]: trait.Read.html
-//! [`Write`]: trait.Write.html
//! [`Error`]: struct.Error.html
//! [`ErrorKind`]: enum.ErrorKind.html
//! [`Result`]: type.Result.html
@@ -51,12 +42,6 @@ pub use tokio_io::{AsyncRead, AsyncWrite};
#[cfg(feature = "fs")]
pub use tokio_fs::{stderr, stdin, stdout, Stderr, Stdin, Stdout};
-// Utils
-pub use tokio_io::io::{
- copy, flush, lines, read, read_exact, read_to_end, read_until, shutdown, write_all, Copy,
- Flush, Lines, ReadExact, ReadHalf, ReadToEnd, ReadUntil, Shutdown, WriteAll, WriteHalf,
-};
-
// Re-export io::Error so that users don't have to deal
// with conflicts when `use`ing `futures::io` and `std::io`.
-pub use std::io::{Error, ErrorKind, Read, Result, Write};
+pub use std::io::{Error, ErrorKind, Result};
diff --git a/tokio/src/lib.rs b/tokio/src/lib.rs
index 99fdae85..080e0463 100644
--- a/tokio/src/lib.rs
+++ b/tokio/src/lib.rs
@@ -1,7 +1,6 @@
#![doc(html_root_url = "https://docs.rs/tokio/0.1.20")]
#![deny(missing_docs, missing_debug_implementations, rust_2018_idioms)]
#![cfg_attr(test, deny(warnings))]
-#![cfg_attr(feature = "async-await-preview", feature(async_await, await_macro))]
#![doc(test(no_crate_inject, attr(deny(rust_2018_idioms))))]
//! A runtime for writing reliable, asynchronous, and slim applications.
@@ -92,7 +91,7 @@ pub mod reactor;
pub mod sync;
#[cfg(feature = "timer")]
pub mod timer;
-pub mod util;
+//pub mod util;
if_runtime! {
pub mod executor;
@@ -101,17 +100,3 @@ if_runtime! {
pub use crate::executor::spawn;
pub use crate::runtime::run;
}
-
-// ===== Experimental async/await support =====
-
-#[cfg(feature = "async-await-preview")]
-mod async_await;
-
-#[cfg(feature = "async-await-preview")]
-pub use async_await::{run_async, spawn_async};
-
-#[cfg(feature = "async-await-preview")]
-pub use tokio_futures::async_wait;
-
-#[cfg(feature = "async-await-preview")]
-pub use tokio_macros::{main, test};
diff --git a/tokio/src/net.rs b/tokio/src/net.rs
index a6b425da..6e7efb37 100644
--- a/tokio/src/net.rs
+++ b/tokio/src/net.rs
@@ -41,20 +41,11 @@ pub mod tcp {
//! [`TcpListener`]: struct.TcpListener.html
//! [incoming_method]: struct.TcpListener.html#method.incoming
//! [`Incoming`]: struct.Incoming.html
- pub use tokio_tcp::{ConnectFuture, Incoming, TcpListener, TcpStream};
+ pub use tokio_tcp::{TcpListener, TcpStream};
}
#[cfg(feature = "tcp")]
pub use self::tcp::{TcpListener, TcpStream};
-#[cfg(feature = "tcp")]
-#[deprecated(note = "use `tokio::net::tcp::ConnectFuture` instead")]
-#[doc(hidden)]
-pub type ConnectFuture = self::tcp::ConnectFuture;
-#[cfg(feature = "tcp")]
-#[deprecated(note = "use `tokio::net::tcp::Incoming` instead")]
-#[doc(hidden)]
-pub type Incoming = self::tcp::Incoming;
-
#[cfg(feature = "udp")]
pub mod udp {
//! UDP bindings for `tokio`.
@@ -76,15 +67,6 @@ pub mod udp {
#[cfg(feature = "udp")]
pub use self::udp::{UdpFramed, UdpSocket};
-#[cfg(feature = "udp")]
-#[deprecated(note = "use `tokio::net::udp::RecvDgram` instead")]
-#[doc(hidden)]
-pub type RecvDgram<T> = self::udp::RecvDgram<T>;
-#[cfg(feature = "udp")]
-#[deprecated(note = "use `tokio::net::udp::SendDgram` instead")]
-#[doc(hidden)]
-pub type SendDgram<T> = self::udp::SendDgram<T>;
-
#[cfg(all(unix, feature = "uds"))]
pub mod unix {
//! Unix domain socket bindings for `tokio` (only available on unix systems).
diff --git a/tokio/src/prelude.rs b/tokio/src/prelude.rs
index e364e8bf..000a680c 100644
--- a/tokio/src/prelude.rs
+++ b/tokio/src/prelude.rs
@@ -10,15 +10,8 @@
//!
//! The prelude may grow over time as additional items see ubiquitous use.
-pub use crate::util::{FutureExt, StreamExt};
-pub use futures::{future, stream, task, Async, AsyncSink, Future, IntoFuture, Poll, Sink, Stream};
-pub use std::io::{Read, Write};
-#[cfg(feature = "async-await-preview")]
-#[doc(inline)]
-pub use tokio_futures::{
- io::{AsyncReadExt, AsyncWriteExt},
- sink::SinkExt,
- stream::StreamExt as StreamAsyncExt,
-};
+pub use std::future::Future;
+pub use std::task::{self, Poll};
+//pub use crate::util::{FutureExt, StreamExt};
#[cfg(feature = "io")]
pub use tokio_io::{AsyncRead, AsyncWrite};
diff --git a/tokio/src/reactor/mod.rs b/tokio/src/reactor.rs
index eccb341f..e343bcf4 100644
--- a/tokio/src/reactor/mod.rs
+++ b/tokio/src/reactor.rs
@@ -134,10 +134,6 @@
//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
-mod poll_evented;
-
-#[allow(deprecated)]
-pub use self::poll_evented::PollEvented;
pub use tokio_reactor::{
- Background, Handle, PollEvented as PollEvented2, Reactor, Registration, Turn,
+ Handle, PollEvented, Reactor, Registration, Turn,
};
diff --git a/tokio/src/reactor/poll_evented.rs b/tokio/src/reactor/poll_evented.rs
deleted file mode 100644
index 2b055116..00000000
--- a/tokio/src/reactor/poll_evented.rs
+++ /dev/null
@@ -1,545 +0,0 @@
-//! Readiness tracking streams, backing I/O objects.
-//!
-//! This module contains the core type which is used to back all I/O on object
-//! in `tokio-core`. The `PollEvented` type is the implementation detail of
-//! all I/O. Each `PollEvented` manages registration with a reactor,
-//! acquisition of a token, and tracking of the readiness state on the
-//! underlying I/O primitive.
-
-#![allow(deprecated, warnings)]
-
-use crate::reactor::{Handle, Registration};
-use futures::{task, Async, Poll};
-use mio::event::Evented;
-use mio::Ready;
-use std::fmt;
-use std::io::{self, Read, Write};
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::Relaxed;
-use std::sync::Mutex;
-use tokio_io::{AsyncRead, AsyncWrite};
-
-#[deprecated(since = "0.1.2", note = "PollEvented2 instead")]
-#[doc(hidden)]
-pub struct PollEvented<E> {
- io: E,
- inner: Inner,
- handle: Handle,
-}
-
-struct Inner {
- registration: Mutex<Registration>,
-
- /// Currently visible read readiness
- read_readiness: AtomicUsize,
-
- /// Currently visible write readiness
- write_readiness: AtomicUsize,
-}
-
-impl<E: fmt::Debug> fmt::Debug for PollEvented<E> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("PollEvented").field("io", &self.io).finish()
- }
-}
-
-impl<E> PollEvented<E> {
- /// Creates a new readiness stream associated with the provided
- /// `loop_handle` and for the given `source`.
- pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>>
- where
- E: Evented,
- {
- let registration = Registration::new();
- registration.register(&io)?;
-
- Ok(PollEvented {
- io: io,
- inner: Inner {
- registration: Mutex::new(registration),
- read_readiness: AtomicUsize::new(0),
- write_readiness: AtomicUsize::new(0),
- },
- handle: handle.clone(),
- })
- }
-
- /// Tests to see if this source is ready to be read from or not.
- ///
- /// If this stream is not ready for a read then `Async::NotReady` will be
- /// returned and the current task will be scheduled to receive a
- /// notification when the stream is readable again. In other words, this
- /// method is only safe to call from within the context of a future's task,
- /// typically done in a `Future::poll` method.
- ///
- /// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn poll_read(&mut self) -> Async<()> {
- if self.poll_read2().is_ready() {
- return ().into();
- }
-
- Async::NotReady
- }
-
- fn poll_read2(&self) -> Async<Ready> {
- let r = self.inner.registration.lock().unwrap();
-
- // Load the cached readiness
- match self.inner.read_readiness.load(Relaxed) {
- 0 => {}
- mut n => {
- // Check what's new with the reactor.
- if let Some(ready) = r.take_read_ready().unwrap() {
- n |= ready2usize(ready);
- self.inner.read_readiness.store(n, Relaxed);
- }
-
- return usize2ready(n).into();
- }
- }
-
- let ready = match r.poll_read_ready().unwrap() {
- Async::Ready(r) => r,
- _ => return Async::NotReady,
- };
-
- // Cache the value
- self.inner.read_readiness.store(ready2usize(ready), Relaxed);
-
- ready.into()
- }
-
- /// Tests to see if this source is ready to be written to or not.
- ///
- /// If this stream is not ready for a write then `Async::NotReady` will be returned
- /// and the current task will be scheduled to receive a notification when
- /// the stream is writable again. In other words, this method is only safe
- /// to call from within the context of a future's task, typically done in a
- /// `Future::poll` method.
- ///
- /// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn poll_write(&mut self) -> Async<()> {
- let r = self.inner.registration.lock().unwrap();
-
- match self.inner.write_readiness.load(Relaxed) {
- 0 => {}
- mut n => {
- // Check what's new with the reactor.
- if let Some(ready) = r.take_write_ready().unwrap() {
- n |= ready2usize(ready);
- self.inner.write_readiness.store(n, Relaxed);
- }
-
- return ().into();
- }
- }
-
- let ready = match r.poll_write_ready().unwrap() {
- Async::Ready(r) => r,
- _ => return Async::NotReady,
- };
-
- // Cache the value
- self.inner
- .write_readiness
- .store(ready2usize(ready), Relaxed);
-
- ().into()
- }
-
- /// Test to see whether this source fulfills any condition listed in `mask`
- /// provided.
- ///
- /// The `mask` given here is a mio `Ready` set of possible events. This can
- /// contain any events like read/write but also platform-specific events
- /// such as hup and error. The `mask` indicates events that are interested
- /// in being ready.
- ///
- /// If any event in `mask` is ready then it is returned through
- /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
- /// and contains all events that are currently ready in the `mask` provided.
- ///
- /// If no events are ready in the `mask` provided then the current task is
- /// scheduled to receive a notification when any of them become ready. If
- /// the `writable` event is contained within `mask` then this
- /// `PollEvented`'s `write` task will be blocked and otherwise the `read`
- /// task will be blocked. This is generally only relevant if you're working
- /// with this `PollEvented` object on multiple tasks.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> {
- let mut ret = Ready::empty();
-
- if mask.is_empty() {
- return ret.into();
- }
-
- if mask.is_writable() {
- if self.poll_write().is_ready() {
- ret = Ready::writable();
- }
- }
-
- let mask = mask - Ready::writable();
-
- if !mask.is_empty() {
- if let Async::Ready(v) = self.poll_read2() {
- ret |= v & mask;
- }
- }
-
- if ret.is_empty() {
- if mask.is_writable() {
- let _ = self.need_write();
- }
-
- if mask.is_readable() {
- let _ = self.need_read();
- }
-
- Async::NotReady
- } else {
- ret.into()
- }
- }
-
- /// Indicates to this source of events that the corresponding I/O object is
- /// no longer readable, but it needs to be.
- ///
- /// This function, like `poll_read`, is only safe to call from the context
- /// of a future's task (typically in a `Future::poll` implementation). It
- /// informs this readiness stream that the underlying object is no longer
- /// readable, typically because a "would block" error was seen.
- ///
- /// *All* readiness bits associated with this stream except the writable bit
- /// will be reset when this method is called. The current task is then
- /// scheduled to receive a notification whenever anything changes other than
- /// the writable bit. Note that this typically just means the readable bit
- /// is used here, but if you're using a custom I/O object for events like
- /// hup/error this may also be relevant.
- ///
- /// Note that it is also only valid to call this method if `poll_read`
- /// previously indicated that the object is readable. That is, this function
- /// must always be paired with calls to `poll_read` previously.
- ///
- /// # Errors
- ///
- /// This function will return an error if the `Reactor` that this `PollEvented`
- /// is associated with has gone away (been destroyed). The error means that
- /// the ambient futures task could not be scheduled to receive a
- /// notification and typically means that the error should be propagated
- /// outwards.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn need_read(&mut self) -> io::Result<()> {
- self.inner.read_readiness.store(0, Relaxed);
-
- if self.poll_read().is_ready() {
- // Notify the current task
- task::current().notify();
- }
-
- Ok(())
- }
-
- /// Indicates to this source of events that the corresponding I/O object is
- /// no longer writable, but it needs to be.
- ///
- /// This function, like `poll_write`, is only safe to call from the context
- /// of a future's task (typically in a `Future::poll` implementation). It
- /// informs this readiness stream that the underlying object is no longer
- /// writable, typically because a "would block" error was seen.
- ///
- /// The flag indicating that this stream is writable is unset and the
- /// current task is scheduled to receive a notification when the stream is
- /// then again writable.
- ///
- /// Note that it is also only valid to call this method if `poll_write`
- /// previously indicated that the object is writable. That is, this function
- /// must always be paired with calls to `poll_write` previously.
- ///
- /// # Errors
- ///
- /// This function will return an error if the `Reactor` that this `PollEvented`
- /// is associated with has gone away (been destroyed). The error means that
- /// the ambient futures task could not be scheduled to receive a
- /// notification and typically means that the error should be propagated
- /// outwards.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn need_write(&mut self) -> io::Result<()> {
- self.inner.write_readiness.store(0, Relaxed);
-
- if self.poll_write().is_ready() {
- // Notify the current task
- task::current().notify();
- }
-
- Ok(())
- }
-
- /// Returns a reference to the event loop handle that this readiness stream
- /// is associated with.
- pub fn handle(&self) -> &Handle {
- &self.handle
- }
-
- /// Returns a shared reference to the underlying I/O object this readiness
- /// stream is wrapping.
- pub fn get_ref(&self) -> &E {
- &self.io
- }
-
- /// Returns a mutable reference to the underlying I/O object this readiness
- /// stream is wrapping.
- pub fn get_mut(&mut self) -> &mut E {
- &mut self.io
- }
-
- /// Consumes the `PollEvented` and returns the underlying I/O object
- pub fn into_inner(self) -> E {
- self.io
- }
-
- /// Deregisters this source of events from the reactor core specified.
- ///
- /// This method can optionally be called to unregister the underlying I/O
- /// object with the event loop that the `handle` provided points to.
- /// Typically this method is not required as this automatically happens when
- /// `E` is dropped, but for some use cases the `E` object doesn't represent
- /// an owned reference, so dropping it won't automatically unregister with
- /// the event loop.
- ///
- /// This consumes `self` as it will no longer provide events after the
- /// method is called, and will likely return an error if this `PollEvented`
- /// was created on a separate event loop from the `handle` specified.
- pub fn deregister(&self) -> io::Result<()>
- where
- E: Evented,
- {
- self.inner.registration.lock().unwrap().deregister(&self.io)
- }
-}
-
-impl<E: Read> Read for PollEvented<E> {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_read() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().read(buf);
-
- if is_wouldblock(&r) {
- self.need_read()?;
- }
-
- return r;
- }
-}
-
-impl<E: Write> Write for PollEvented<E> {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_write() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().write(buf);
-
- if is_wouldblock(&r) {
- self.need_write()?;
- }
-
- return r;
- }
-
- fn flush(&mut self) -> io::Result<()> {
- if let Async::NotReady = self.poll_write() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().flush();
-
- if is_wouldblock(&r) {
- self.need_write()?;
- }
-
- return r;
- }
-}
-
-impl<E: Read> AsyncRead for PollEvented<E> {}
-
-impl<E: Write> AsyncWrite for PollEvented<E> {
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- Ok(().into())
- }
-}
-
-fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
- match *r {
- Ok(_) => false,
- Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
- }
-}
-
-const READ: usize = 1 << 0;
-const WRITE: usize = 1 << 1;
-
-fn ready2usize(ready: Ready) -> usize {
- let mut bits = 0;
- if ready.is_readable() {
- bits |= READ;
- }
- if ready.is_writable() {
- bits |= WRITE;
- }
- bits | platform::ready2usize(ready)
-}
-
-fn usize2ready(bits: usize) -> Ready {
- let mut ready = Ready::empty();
- if bits & READ != 0 {
- ready.insert(Ready::readable());
- }
- if bits & WRITE != 0 {
- ready.insert(Ready::writable());
- }
- ready | platform::usize2ready(bits)
-}
-
-#[cfg(unix)]
-mod platform {
- use mio::unix::UnixReady;
- use mio::Ready;
-
- const HUP: usize = 1 << 2;
- const ERROR: usize = 1 << 3;
- const AIO: usize = 1 << 4;
- const LIO: usize = 1 << 5;
-
- #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
- fn is_aio(ready: &Ready) -> bool {
- UnixReady::from(*ready).is_aio()
- }
-
- #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
- fn is_aio(_ready: &Ready) -> bool {
- false
- }
-
- #[cfg(target_os = "freebsd")]
- fn is_lio(ready: &Ready) -> bool {
- UnixReady::from(*ready).is_lio()
- }
-
- #[cfg(not(target_os = "freebsd"))]
- fn is_lio(_ready: &Ready) -> bool {
- false
- }
-
- pub fn ready2usize(ready: Ready) -> usize {
- let ready = UnixReady::from(ready);
- let mut bits = 0;
- if is_aio(&ready) {
- bits |= AIO;
- }
- if is_lio(&ready) {
- bits |= LIO;
- }
- if ready.is_error() {
- bits |= ERROR;
- }
- if ready.is_hup() {
- bits |= HUP;
- }
- bits
- }
-
- #[cfg(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "ios",
- target_os = "macos"
- ))]
- fn usize2ready_aio(ready: &mut UnixReady) {
- ready.insert(UnixReady::aio());
- }
-
- #[cfg(not(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "ios",
- target_os = "macos"
- )))]
- fn usize2ready_aio(_ready: &mut UnixReady) {
- // aio not available here → empty
- }
-
- #[cfg(target_os = "freebsd")]
- fn usize2ready_lio(ready: &mut UnixReady) {
- ready.insert(UnixReady::lio());
- }
-
- #[cfg(not(target_os = "freebsd"))]
- fn usize2ready_lio(_ready: &mut UnixReady) {
- // lio not available here → empty
- }
-
- pub fn usize2ready(bits: usize) -> Ready {
- let mut ready = UnixReady::from(Ready::empty());
- if bits & AIO != 0 {
- usize2ready_aio(&mut ready);
- }
- if bits & LIO != 0 {
- usize2ready_lio(&mut ready);
- }
- if bits & HUP != 0 {
- ready.insert(UnixReady::hup());
- }
- if bits & ERROR != 0 {
- ready.insert(UnixReady::error());
- }
- ready.into()
- }
-}
-
-#[cfg(windows)]
-mod platform {
- use mio::Ready;
-
- pub fn all() -> Ready {
- // No platform-specific Readinesses for Windows
- Ready::empty()
- }
-
- pub fn hup() -> Ready {
- Ready::empty()
- }
-
- pub fn ready2usize(_r: Ready) -> usize {
- 0
- }