diff options
Diffstat (limited to 'tokio/src/io/registration.rs')
-rw-r--r-- | tokio/src/io/registration.rs | 237 |
1 files changed, 21 insertions, 216 deletions
diff --git a/tokio/src/io/registration.rs b/tokio/src/io/registration.rs index b805d2b9..e4ec096f 100644 --- a/tokio/src/io/registration.rs +++ b/tokio/src/io/registration.rs @@ -1,4 +1,4 @@ -use crate::io::driver::{platform, Direction, Handle, ScheduledIo}; +use crate::io::driver::{Direction, Handle, ReadyEvent, ScheduledIo}; use crate::util::slab; use mio::{self, Evented}; @@ -38,7 +38,7 @@ cfg_io_driver! { /// [`poll_read_ready`]: method@Self::poll_read_ready` /// [`poll_write_ready`]: method@Self::poll_write_ready` #[derive(Debug)] - pub struct Registration { + pub(crate) struct Registration { /// Handle to the associated driver. handle: Handle, @@ -53,28 +53,6 @@ unsafe impl Sync for Registration {} // ===== impl Registration ===== impl Registration { - /// Registers the I/O resource with the default reactor. - /// - /// # Return - /// - /// - `Ok` if the registration happened successfully - /// - `Err` if an error was encountered during registration - /// - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new<T>(io: &T) -> io::Result<Registration> - where - T: Evented, - { - Registration::new_with_ready(io, mio::Ready::all()) - } - /// Registers the I/O resource with the default reactor, for a specific `mio::Ready` state. /// `new_with_ready` should be used over `new` when you need control over the readiness state, /// such as when a file descriptor only allows reads. This does not add `hup` or `error` so if @@ -96,23 +74,6 @@ impl Registration { /// /// - `Ok` if the registration happened successfully /// - `Err` if an error was encountered during registration - /// - /// - /// # Panics - /// - /// This function panics if thread-local runtime is not set. - /// - /// The runtime is usually set implicitly when this function is called - /// from a future driven by a tokio runtime, otherwise runtime can be set - /// explicitly with [`Runtime::enter`](crate::runtime::Runtime::enter) function. - pub fn new_with_ready<T>(io: &T, ready: mio::Ready) -> io::Result<Registration> - where - T: Evented, - { - Self::new_with_ready_and_handle(io, ready, Handle::current()) - } - - /// Same as `new_with_ready` but also accepts an explicit handle. pub(crate) fn new_with_ready_and_handle<T>( io: &T, ready: mio::Ready, @@ -149,7 +110,7 @@ impl Registration { /// no longer result in notifications getting sent for this registration. /// /// `Err` is returned if an error is encountered. - pub fn deregister<T>(&mut self, io: &T) -> io::Result<()> + pub(super) fn deregister<T>(&mut self, io: &T) -> io::Result<()> where T: Evented, { @@ -160,192 +121,36 @@ impl Registration { inner.deregister_source(io) } - /// Polls for events on the I/O resource's read readiness stream. - /// - /// If the I/O resource receives a new read readiness event since the last - /// call to `poll_read_ready`, it is returned. If it has not, the current - /// task is notified once a new event is received. - /// - /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, - /// the function will always return `Ready(HUP)`. This should be treated as - /// the end of the readiness stream. - /// - /// # Return value - /// - /// There are several possible return values: - /// - /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received - /// a new readiness event. The readiness value is included. - /// - /// * `Poll::Pending` means that no new readiness events have been received - /// since the last call to `poll_read_ready`. - /// - /// * `Poll::Ready(Err(err))` means that the registration has encountered an - /// error. This could represent a permanent internal error for example. - /// - /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_read_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - let v = self.poll_ready(Direction::Read, Some(cx)).map_err(|e| { - coop.made_progress(); - e - })?; - match v { - Some(v) => { - coop.made_progress(); - Poll::Ready(Ok(v)) - } - None => Poll::Pending, - } - } - - /// Consume any pending read readiness event. - /// - /// This function is identical to [`poll_read_ready`] **except** that it - /// will not notify the current task when a new event is received. As such, - /// it is safe to call this function from outside of a task context. - /// - /// [`poll_read_ready`]: method@Self::poll_read_ready - pub fn take_read_ready(&self) -> io::Result<Option<mio::Ready>> { - self.poll_ready(Direction::Read, None) - } - - /// Polls for events on the I/O resource's write readiness stream. - /// - /// If the I/O resource receives a new write readiness event since the last - /// call to `poll_write_ready`, it is returned. If it has not, the current - /// task is notified once a new event is received. - /// - /// All events except `HUP` are [edge-triggered]. Once `HUP` is returned, - /// the function will always return `Ready(HUP)`. This should be treated as - /// the end of the readiness stream. - /// - /// # Return value - /// - /// There are several possible return values: - /// - /// * `Poll::Ready(Ok(readiness))` means that the I/O resource has received - /// a new readiness event. The readiness value is included. - /// - /// * `Poll::Pending` means that no new readiness events have been received - /// since the last call to `poll_write_ready`. - /// - /// * `Poll::Ready(Err(err))` means that the registration has encountered an - /// error. This could represent a permanent internal error for example. - /// - /// [edge-triggered]: struct@mio::Poll#edge-triggered-and-level-triggered - /// - /// # Panics - /// - /// This function will panic if called from outside of a task context. - pub fn poll_write_ready(&self, cx: &mut Context<'_>) -> Poll<io::Result<mio::Ready>> { - // Keep track of task budget - let coop = ready!(crate::coop::poll_proceed(cx)); - - let v = self.poll_ready(Direction::Write, Some(cx)).map_err(|e| { - coop.made_progress(); - e - })?; - match v { - Some(v) => { - coop.made_progress(); - Poll::Ready(Ok(v)) - } - None => Poll::Pending, - } - } - - /// Consumes any pending write readiness event. - /// - /// This function is identical to [`poll_write_ready`] **except** that it - /// will not notify the current task when a new event is received. As such, - /// it is safe to call this function from outside of a task context. - /// - /// [`poll_write_ready`]: method@Self::poll_write_ready - pub fn take_write_ready(&self) -> io::Result<Option<mio::Ready>> { - self.poll_ready(Direction::Write, None) + pub(super) fn clear_readiness(&self, event: ReadyEvent) { + self.shared.clear_readiness(event); } /// Polls for events on the I/O resource's `direction` readiness stream. /// /// If called with a task context, notify the task when a new event is /// received. - fn poll_ready( + pub(super) fn poll_readiness( &self, + cx: &mut Context<'_>, direction: Direction, - cx: Option<&mut Context<'_>>, - ) -> io::Result<Option<mio::Ready>> { - let inner = match self.handle.inner() { - Some(inner) => inner, - None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")), - }; - - // If the task should be notified about new events, ensure that it has - // been registered - if let Some(ref cx) = cx { - inner.register(&self.shared, direction, cx.waker().clone()) + ) -> Poll<io::Result<ReadyEvent>> { + if self.handle.inner().is_none() { + return Poll::Ready(Err(io::Error::new(io::ErrorKind::Other, "reactor gone"))); } - let mask = direction.mask(); - let mask_no_hup = (mask - platform::hup() - platform::error()).as_usize(); - - // This consumes the current readiness state **except** for HUP and - // error. HUP and error are excluded because a) they are final states - // and never transitition out and b) both the read AND the write - // directions need to be able to obvserve these states. - // - // # Platform-specific behavior - // - // HUP and error readiness are platform-specific. On epoll platforms, - // HUP has specific conditions that must be met by both peers of a - // connection in order to be triggered. - // - // On epoll platforms, `EPOLLERR` is signaled through - // `UnixReady::error()` and is important to be observable by both read - // AND write. A specific case that `EPOLLERR` occurs is when the read - // end of a pipe is closed. When this occurs, a peer blocked by - // writing to the pipe should be notified. - let curr_ready = self - .shared - .set_readiness(None, |curr| curr & (!mask_no_hup)) - .unwrap_or_else(|_| unreachable!()); - - let mut ready = mask & mio::Ready::from_usize(curr_ready); - - if ready.is_empty() { - if let Some(cx) = cx { - // Update the task info - match direction { - Direction::Read => self.shared.reader.register_by_ref(cx.waker()), - Direction::Write => self.shared.writer.register_by_ref(cx.waker()), - } - - // Try again - let curr_ready = self - .shared - .set_readiness(None, |curr| curr & (!mask_no_hup)) - .unwrap(); - ready = mask & mio::Ready::from_usize(curr_ready); - } - } - - if ready.is_empty() { - Ok(None) - } else { - Ok(Some(ready)) - } + // Keep track of task budget + let coop = ready!(crate::coop::poll_proceed(cx)); + let ev = ready!(self.shared.poll_readiness(cx, direction)); + coop.made_progress(); + Poll::Ready(Ok(ev)) } } -impl Drop for Registration { - fn drop(&mut self) { - drop(self.shared.reader.take_waker()); - drop(self.shared.writer.take_waker()); +cfg_io_readiness! { + impl Registration { + pub(super) async fn readiness(&self, interest: mio::Ready) -> io::Result<ReadyEvent> { + // TODO: does this need to return a `Result`? + Ok(self.shared.readiness(interest).await) + } } } |