summaryrefslogtreecommitdiffstats
path: root/tokio/src/reactor/poll_evented.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/reactor/poll_evented.rs')
-rw-r--r--tokio/src/reactor/poll_evented.rs545
1 files changed, 0 insertions, 545 deletions
diff --git a/tokio/src/reactor/poll_evented.rs b/tokio/src/reactor/poll_evented.rs
deleted file mode 100644
index 2b055116..00000000
--- a/tokio/src/reactor/poll_evented.rs
+++ /dev/null
@@ -1,545 +0,0 @@
-//! Readiness tracking streams, backing I/O objects.
-//!
-//! This module contains the core type which is used to back all I/O on object
-//! in `tokio-core`. The `PollEvented` type is the implementation detail of
-//! all I/O. Each `PollEvented` manages registration with a reactor,
-//! acquisition of a token, and tracking of the readiness state on the
-//! underlying I/O primitive.
-
-#![allow(deprecated, warnings)]
-
-use crate::reactor::{Handle, Registration};
-use futures::{task, Async, Poll};
-use mio::event::Evented;
-use mio::Ready;
-use std::fmt;
-use std::io::{self, Read, Write};
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::Relaxed;
-use std::sync::Mutex;
-use tokio_io::{AsyncRead, AsyncWrite};
-
-#[deprecated(since = "0.1.2", note = "PollEvented2 instead")]
-#[doc(hidden)]
-pub struct PollEvented<E> {
- io: E,
- inner: Inner,
- handle: Handle,
-}
-
-struct Inner {
- registration: Mutex<Registration>,
-
- /// Currently visible read readiness
- read_readiness: AtomicUsize,
-
- /// Currently visible write readiness
- write_readiness: AtomicUsize,
-}
-
-impl<E: fmt::Debug> fmt::Debug for PollEvented<E> {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- f.debug_struct("PollEvented").field("io", &self.io).finish()
- }
-}
-
-impl<E> PollEvented<E> {
- /// Creates a new readiness stream associated with the provided
- /// `loop_handle` and for the given `source`.
- pub fn new(io: E, handle: &Handle) -> io::Result<PollEvented<E>>
- where
- E: Evented,
- {
- let registration = Registration::new();
- registration.register(&io)?;
-
- Ok(PollEvented {
- io: io,
- inner: Inner {
- registration: Mutex::new(registration),
- read_readiness: AtomicUsize::new(0),
- write_readiness: AtomicUsize::new(0),
- },
- handle: handle.clone(),
- })
- }
-
- /// Tests to see if this source is ready to be read from or not.
- ///
- /// If this stream is not ready for a read then `Async::NotReady` will be
- /// returned and the current task will be scheduled to receive a
- /// notification when the stream is readable again. In other words, this
- /// method is only safe to call from within the context of a future's task,
- /// typically done in a `Future::poll` method.
- ///
- /// This is mostly equivalent to `self.poll_ready(Ready::readable())`.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn poll_read(&mut self) -> Async<()> {
- if self.poll_read2().is_ready() {
- return ().into();
- }
-
- Async::NotReady
- }
-
- fn poll_read2(&self) -> Async<Ready> {
- let r = self.inner.registration.lock().unwrap();
-
- // Load the cached readiness
- match self.inner.read_readiness.load(Relaxed) {
- 0 => {}
- mut n => {
- // Check what's new with the reactor.
- if let Some(ready) = r.take_read_ready().unwrap() {
- n |= ready2usize(ready);
- self.inner.read_readiness.store(n, Relaxed);
- }
-
- return usize2ready(n).into();
- }
- }
-
- let ready = match r.poll_read_ready().unwrap() {
- Async::Ready(r) => r,
- _ => return Async::NotReady,
- };
-
- // Cache the value
- self.inner.read_readiness.store(ready2usize(ready), Relaxed);
-
- ready.into()
- }
-
- /// Tests to see if this source is ready to be written to or not.
- ///
- /// If this stream is not ready for a write then `Async::NotReady` will be returned
- /// and the current task will be scheduled to receive a notification when
- /// the stream is writable again. In other words, this method is only safe
- /// to call from within the context of a future's task, typically done in a
- /// `Future::poll` method.
- ///
- /// This is mostly equivalent to `self.poll_ready(Ready::writable())`.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn poll_write(&mut self) -> Async<()> {
- let r = self.inner.registration.lock().unwrap();
-
- match self.inner.write_readiness.load(Relaxed) {
- 0 => {}
- mut n => {
- // Check what's new with the reactor.
- if let Some(ready) = r.take_write_ready().unwrap() {
- n |= ready2usize(ready);
- self.inner.write_readiness.store(n, Relaxed);
- }
-
- return ().into();
- }
- }
-
- let ready = match r.poll_write_ready().unwrap() {
- Async::Ready(r) => r,
- _ => return Async::NotReady,
- };
-
- // Cache the value
- self.inner
- .write_readiness
- .store(ready2usize(ready), Relaxed);
-
- ().into()
- }
-
- /// Test to see whether this source fulfills any condition listed in `mask`
- /// provided.
- ///
- /// The `mask` given here is a mio `Ready` set of possible events. This can
- /// contain any events like read/write but also platform-specific events
- /// such as hup and error. The `mask` indicates events that are interested
- /// in being ready.
- ///
- /// If any event in `mask` is ready then it is returned through
- /// `Async::Ready`. The `Ready` set returned is guaranteed to not be empty
- /// and contains all events that are currently ready in the `mask` provided.
- ///
- /// If no events are ready in the `mask` provided then the current task is
- /// scheduled to receive a notification when any of them become ready. If
- /// the `writable` event is contained within `mask` then this
- /// `PollEvented`'s `write` task will be blocked and otherwise the `read`
- /// task will be blocked. This is generally only relevant if you're working
- /// with this `PollEvented` object on multiple tasks.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn poll_ready(&mut self, mask: Ready) -> Async<Ready> {
- let mut ret = Ready::empty();
-
- if mask.is_empty() {
- return ret.into();
- }
-
- if mask.is_writable() {
- if self.poll_write().is_ready() {
- ret = Ready::writable();
- }
- }
-
- let mask = mask - Ready::writable();
-
- if !mask.is_empty() {
- if let Async::Ready(v) = self.poll_read2() {
- ret |= v & mask;
- }
- }
-
- if ret.is_empty() {
- if mask.is_writable() {
- let _ = self.need_write();
- }
-
- if mask.is_readable() {
- let _ = self.need_read();
- }
-
- Async::NotReady
- } else {
- ret.into()
- }
- }
-
- /// Indicates to this source of events that the corresponding I/O object is
- /// no longer readable, but it needs to be.
- ///
- /// This function, like `poll_read`, is only safe to call from the context
- /// of a future's task (typically in a `Future::poll` implementation). It
- /// informs this readiness stream that the underlying object is no longer
- /// readable, typically because a "would block" error was seen.
- ///
- /// *All* readiness bits associated with this stream except the writable bit
- /// will be reset when this method is called. The current task is then
- /// scheduled to receive a notification whenever anything changes other than
- /// the writable bit. Note that this typically just means the readable bit
- /// is used here, but if you're using a custom I/O object for events like
- /// hup/error this may also be relevant.
- ///
- /// Note that it is also only valid to call this method if `poll_read`
- /// previously indicated that the object is readable. That is, this function
- /// must always be paired with calls to `poll_read` previously.
- ///
- /// # Errors
- ///
- /// This function will return an error if the `Reactor` that this `PollEvented`
- /// is associated with has gone away (been destroyed). The error means that
- /// the ambient futures task could not be scheduled to receive a
- /// notification and typically means that the error should be propagated
- /// outwards.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn need_read(&mut self) -> io::Result<()> {
- self.inner.read_readiness.store(0, Relaxed);
-
- if self.poll_read().is_ready() {
- // Notify the current task
- task::current().notify();
- }
-
- Ok(())
- }
-
- /// Indicates to this source of events that the corresponding I/O object is
- /// no longer writable, but it needs to be.
- ///
- /// This function, like `poll_write`, is only safe to call from the context
- /// of a future's task (typically in a `Future::poll` implementation). It
- /// informs this readiness stream that the underlying object is no longer
- /// writable, typically because a "would block" error was seen.
- ///
- /// The flag indicating that this stream is writable is unset and the
- /// current task is scheduled to receive a notification when the stream is
- /// then again writable.
- ///
- /// Note that it is also only valid to call this method if `poll_write`
- /// previously indicated that the object is writable. That is, this function
- /// must always be paired with calls to `poll_write` previously.
- ///
- /// # Errors
- ///
- /// This function will return an error if the `Reactor` that this `PollEvented`
- /// is associated with has gone away (been destroyed). The error means that
- /// the ambient futures task could not be scheduled to receive a
- /// notification and typically means that the error should be propagated
- /// outwards.
- ///
- /// # Panics
- ///
- /// This function will panic if called outside the context of a future's
- /// task.
- pub fn need_write(&mut self) -> io::Result<()> {
- self.inner.write_readiness.store(0, Relaxed);
-
- if self.poll_write().is_ready() {
- // Notify the current task
- task::current().notify();
- }
-
- Ok(())
- }
-
- /// Returns a reference to the event loop handle that this readiness stream
- /// is associated with.
- pub fn handle(&self) -> &Handle {
- &self.handle
- }
-
- /// Returns a shared reference to the underlying I/O object this readiness
- /// stream is wrapping.
- pub fn get_ref(&self) -> &E {
- &self.io
- }
-
- /// Returns a mutable reference to the underlying I/O object this readiness
- /// stream is wrapping.
- pub fn get_mut(&mut self) -> &mut E {
- &mut self.io
- }
-
- /// Consumes the `PollEvented` and returns the underlying I/O object
- pub fn into_inner(self) -> E {
- self.io
- }
-
- /// Deregisters this source of events from the reactor core specified.
- ///
- /// This method can optionally be called to unregister the underlying I/O
- /// object with the event loop that the `handle` provided points to.
- /// Typically this method is not required as this automatically happens when
- /// `E` is dropped, but for some use cases the `E` object doesn't represent
- /// an owned reference, so dropping it won't automatically unregister with
- /// the event loop.
- ///
- /// This consumes `self` as it will no longer provide events after the
- /// method is called, and will likely return an error if this `PollEvented`
- /// was created on a separate event loop from the `handle` specified.
- pub fn deregister(&self) -> io::Result<()>
- where
- E: Evented,
- {
- self.inner.registration.lock().unwrap().deregister(&self.io)
- }
-}
-
-impl<E: Read> Read for PollEvented<E> {
- fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_read() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().read(buf);
-
- if is_wouldblock(&r) {
- self.need_read()?;
- }
-
- return r;
- }
-}
-
-impl<E: Write> Write for PollEvented<E> {
- fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
- if let Async::NotReady = self.poll_write() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().write(buf);
-
- if is_wouldblock(&r) {
- self.need_write()?;
- }
-
- return r;
- }
-
- fn flush(&mut self) -> io::Result<()> {
- if let Async::NotReady = self.poll_write() {
- return Err(io::ErrorKind::WouldBlock.into());
- }
-
- let r = self.get_mut().flush();
-
- if is_wouldblock(&r) {
- self.need_write()?;
- }
-
- return r;
- }
-}
-
-impl<E: Read> AsyncRead for PollEvented<E> {}
-
-impl<E: Write> AsyncWrite for PollEvented<E> {
- fn shutdown(&mut self) -> Poll<(), io::Error> {
- Ok(().into())
- }
-}
-
-fn is_wouldblock<T>(r: &io::Result<T>) -> bool {
- match *r {
- Ok(_) => false,
- Err(ref e) => e.kind() == io::ErrorKind::WouldBlock,
- }
-}
-
-const READ: usize = 1 << 0;
-const WRITE: usize = 1 << 1;
-
-fn ready2usize(ready: Ready) -> usize {
- let mut bits = 0;
- if ready.is_readable() {
- bits |= READ;
- }
- if ready.is_writable() {
- bits |= WRITE;
- }
- bits | platform::ready2usize(ready)
-}
-
-fn usize2ready(bits: usize) -> Ready {
- let mut ready = Ready::empty();
- if bits & READ != 0 {
- ready.insert(Ready::readable());
- }
- if bits & WRITE != 0 {
- ready.insert(Ready::writable());
- }
- ready | platform::usize2ready(bits)
-}
-
-#[cfg(unix)]
-mod platform {
- use mio::unix::UnixReady;
- use mio::Ready;
-
- const HUP: usize = 1 << 2;
- const ERROR: usize = 1 << 3;
- const AIO: usize = 1 << 4;
- const LIO: usize = 1 << 5;
-
- #[cfg(any(target_os = "dragonfly", target_os = "freebsd"))]
- fn is_aio(ready: &Ready) -> bool {
- UnixReady::from(*ready).is_aio()
- }
-
- #[cfg(not(any(target_os = "dragonfly", target_os = "freebsd")))]
- fn is_aio(_ready: &Ready) -> bool {
- false
- }
-
- #[cfg(target_os = "freebsd")]
- fn is_lio(ready: &Ready) -> bool {
- UnixReady::from(*ready).is_lio()
- }
-
- #[cfg(not(target_os = "freebsd"))]
- fn is_lio(_ready: &Ready) -> bool {
- false
- }
-
- pub fn ready2usize(ready: Ready) -> usize {
- let ready = UnixReady::from(ready);
- let mut bits = 0;
- if is_aio(&ready) {
- bits |= AIO;
- }
- if is_lio(&ready) {
- bits |= LIO;
- }
- if ready.is_error() {
- bits |= ERROR;
- }
- if ready.is_hup() {
- bits |= HUP;
- }
- bits
- }
-
- #[cfg(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "ios",
- target_os = "macos"
- ))]
- fn usize2ready_aio(ready: &mut UnixReady) {
- ready.insert(UnixReady::aio());
- }
-
- #[cfg(not(any(
- target_os = "dragonfly",
- target_os = "freebsd",
- target_os = "ios",
- target_os = "macos"
- )))]
- fn usize2ready_aio(_ready: &mut UnixReady) {
- // aio not available here → empty
- }
-
- #[cfg(target_os = "freebsd")]
- fn usize2ready_lio(ready: &mut UnixReady) {
- ready.insert(UnixReady::lio());
- }
-
- #[cfg(not(target_os = "freebsd"))]
- fn usize2ready_lio(_ready: &mut UnixReady) {
- // lio not available here → empty
- }
-
- pub fn usize2ready(bits: usize) -> Ready {
- let mut ready = UnixReady::from(Ready::empty());
- if bits & AIO != 0 {
- usize2ready_aio(&mut ready);
- }
- if bits & LIO != 0 {
- usize2ready_lio(&mut ready);
- }
- if bits & HUP != 0 {
- ready.insert(UnixReady::hup());
- }
- if bits & ERROR != 0 {
- ready.insert(UnixReady::error());
- }
- ready.into()
- }
-}
-
-#[cfg(windows)]
-mod platform {
- use mio::Ready;
-
- pub fn all() -> Ready {
- // No platform-specific Readinesses for Windows
- Ready::empty()
- }
-
- pub fn hup() -> Ready {
- Ready::empty()
- }
-
- pub fn ready2usize(_r: Ready) -> usize {
- 0
- }
-
- pub fn usize2ready(_r: usize) -> Ready {
- Ready::empty()
- }
-}