summaryrefslogtreecommitdiffstats
path: root/tokio/src/io/registration.rs
diff options
context:
space:
mode:
Diffstat (limited to 'tokio/src/io/registration.rs')
-rw-r--r--tokio/src/io/registration.rs237
1 files changed, 21 insertions, 216 deletions
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs
index b805d2b9..e4ec096f 100644
--- a/tokio/src/io/registration.rs
+++ b/tokio/src/io/registration.rs
@@ -1,4 +1,4 @@
-use crate::io::driver::{platform, Direction, Handle, ScheduledIo};
+use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo};
use crate::util::slab;
use mio::{self, Evented};
@@ -38,7 +38,7 @@ cfg_io_driver! {
/// [`poll_read_ready`]: method@Self::poll_read_ready`
/// [`poll_write_ready`]: method@Self::poll_write_ready`
#[derive(Debug)]
- pub struct Registration {
+ pub(crate) struct Registration {
/// Handle to the associated driver.
handle: Handle,
@@ -53,28 +53,6 @@ unsafe impl Sync for Registration {}
// ===== impl Registration =====
impl Registration {
- /// Registers the I/O resource with the default reactor.
- ///
- /// # Return
- ///
- /// - `Ok` if the registration happened successfully
- /// - `Err` if an error was encountered during registration
- ///
- ///
- /// # Panics
- ///
- /// This function panics if thread-local runtime is not set.
- ///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
- pub fn new<T>(io: &T) -> io::Result<Registration>
- where
- T: Evented,
- {
- Registration::new_with_ready(io, mio::Ready::all())
- }
-
/// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state.
/// `new_with_ready` should be used over `new` when you need control over the readiness state,
/// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if
@@ -96,23 +74,6 @@ impl Registration {
///
/// - `Ok` if the registration happened successfully
/// - `Err` if an error was encountered during registration
- ///
- ///
- /// # Panics
- ///
- /// This function panics if thread-local runtime is not set.
- ///
- /// The runtime is usually set implicitly when this function is called
- /// from a future driven by a tokio runtime, otherwise runtime can be set
- /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function.
- pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration>
- where
- T: Evented,
- {
- Self::new_with_ready_and_handle(io, ready, Handle::current())
- }
-
- /// Same as `new_with_ready` but also accepts an explicit handle.
pub(crate) fn new_with_ready_and_handle<T>(
io: &T,
ready: mio::Ready,
@@ -149,7 +110,7 @@ impl Registration {
/// no longer result in notifications getting sent for this registration.
///
/// `Err` is returned if an error is encountered.
- pub fn deregister<T>(&mut self, io: &T) -> io::Result<()>
+ pub(super) fn deregister<T>(&mut self, io: &T) -> io::Result<()>
where
T: Evented,
{
@@ -160,192 +121,36 @@ impl Registration {
inner.deregister_source(io)
}
- /// Polls for events on the I/O resource's read readiness stream.
- ///
- /// If the I/O resource receives a new read readiness event since the last
- /// call to `poll_read_ready`, it is returned. If it has not, the current
- /// task is notified once a new event is received.
- ///
- /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
- /// the function will always return `Ready(HUP)`. This should be treated as
- /// the end of the readiness stream.
- ///
- /// # Return value
- ///
- /// There are several possible return values:
- ///
- /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
- /// a new readiness event. The readiness value is included.
- ///
- /// * `Poll::Pending` means that no new readiness events have been received
- /// since the last call to `poll_read_ready`.
- ///
- /// * `Poll::Ready(Err(err))` means that the registration has encountered an
- /// error. This could represent a permanent internal error for example.
- ///
- /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
- ///
- /// # Panics
- ///
- /// This function will panic if called from outside of a task context.
- pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| {
- coop.made_progress();
- e
- })?;
- match v {
- Some(v) => {
- coop.made_progress();
- Poll::Ready(Ok(v))
- }
- None => Poll::Pending,
- }
- }
-
- /// Consume any pending read readiness event.
- ///
- /// This function is identical to [`poll_read_ready`] **except** that it
- /// will not notify the current task when a new event is received. As such,
- /// it is safe to call this function from outside of a task context.
- ///
- /// [`poll_read_ready`]: method@Self::poll_read_ready
- pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> {
- self.poll_ready(Direction::Read, None)
- }
-
- /// Polls for events on the I/O resource's write readiness stream.
- ///
- /// If the I/O resource receives a new write readiness event since the last
- /// call to `poll_write_ready`, it is returned. If it has not, the current
- /// task is notified once a new event is received.
- ///
- /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned,
- /// the function will always return `Ready(HUP)`. This should be treated as
- /// the end of the readiness stream.
- ///
- /// # Return value
- ///
- /// There are several possible return values:
- ///
- /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received
- /// a new readiness event. The readiness value is included.
- ///
- /// * `Poll::Pending` means that no new readiness events have been received
- /// since the last call to `poll_write_ready`.
- ///
- /// * `Poll::Ready(Err(err))` means that the registration has encountered an
- /// error. This could represent a permanent internal error for example.
- ///
- /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered
- ///
- /// # Panics
- ///
- /// This function will panic if called from outside of a task context.
- pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> {
- // Keep track of task budget
- let coop = ready!(crate::coop::poll_proceed(cx));
-
- let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| {
- coop.made_progress();
- e
- })?;
- match v {
- Some(v) => {
- coop.made_progress();
- Poll::Ready(Ok(v))
- }
- None => Poll::Pending,
- }
- }
-
- /// Consumes any pending write readiness event.
- ///
- /// This function is identical to [`poll_write_ready`] **except** that it
- /// will not notify the current task when a new event is received. As such,
- /// it is safe to call this function from outside of a task context.
- ///
- /// [`poll_write_ready`]: method@Self::poll_write_ready
- pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> {
- self.poll_ready(Direction::Write, None)
+ pub(super) fn clear_readiness(&self, event: ReadyEvent) {
+ self.shared.clear_readiness(event);
}
/// Polls for events on the I/O resource's `direction` readiness stream.
///
/// If called with a task context, notify the task when a new event is
/// received.
- fn poll_ready(
+ pub(super) fn poll_readiness(
&self,
+ cx: &mut Context<'_>,
direction: Direction,
- cx: Option<&mut Context<'_>>,
- ) -> io::Result<Option<mio::Ready>> {
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
- };
-
- // If the task should be notified about new events, ensure that it has
- // been registered
- if let Some(ref cx) = cx {
- inner.register(&self.shared, direction, cx.waker().clone())
+ ) -> Poll<io::Result<ReadyEvent>> {
+ if self.handle.inner().is_none() {
+ return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone")));
}
- let mask = direction.mask();
- let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize();
-
- // This consumes the current readiness state **except** for HUP and
- // error. HUP and error are excluded because a) they are final states
- // and never transitition out and b) both the read AND the write
- // directions need to be able to obvserve these states.
- //
- // # Platform-specific behavior
- //
- // HUP and error readiness are platform-specific. On epoll platforms,
- // HUP has specific conditions that must be met by both peers of a
- // connection in order to be triggered.
- //
- // On epoll platforms, `EPOLLERR` is signaled through
- // `UnixReady::error()` and is important to be observable by both read
- // AND write. A specific case that `EPOLLERR` occurs is when the read
- // end of a pipe is closed. When this occurs, a peer blocked by
- // writing to the pipe should be notified.
- let curr_ready = self
- .shared
- .set_readiness(None, |curr| curr & (!mask_no_hup))
- .unwrap_or_else(|_| unreachable!());
-
- let mut ready = mask & mio::Ready::from_usize(curr_ready);
-
- if ready.is_empty() {
- if let Some(cx) = cx {
- // Update the task info
- match direction {
- Direction::Read => self.shared.reader.register_by_ref(cx.waker()),
- Direction::Write => self.shared.writer.register_by_ref(cx.waker()),
- }
-
- // Try again
- let curr_ready = self
- .shared
- .set_readiness(None, |curr| curr & (!mask_no_hup))
- .unwrap();
- ready = mask & mio::Ready::from_usize(curr_ready);
- }
- }
-
- if ready.is_empty() {
- Ok(None)
- } else {
- Ok(Some(ready))
- }
+ // Keep track of task budget
+ let coop = ready!(crate::coop::poll_proceed(cx));
+ let ev = ready!(self.shared.poll_readiness(cx, direction));
+ coop.made_progress();
+ Poll::Ready(Ok(ev))
}
}
-impl Drop for Registration {
- fn drop(&mut self) {
- drop(self.shared.reader.take_waker());
- drop(self.shared.writer.take_waker());
+cfg_io_readiness! {
+ impl Registration {
+ pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> {
+ // TODO: does this need to return a `Result`?
+ Ok(self.shared.readiness(interest).await)
+ }
}
}