summaryrefslogtreecommitdiffstats
path: root/tokio-net
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-08-15 15:04:21 -0700
committerGitHub <noreply@github.com>2019-08-15 15:04:21 -0700
commitf1f61a3b15d17767b12bfd8c0c5712db1b089b0b (patch)
treedd9cd51fee4b0b33bcbd079d1e96b4009e8f5fe8 /tokio-net
parentd0a8e5d6f2921fadc51a9612f6fe558e4213560f (diff)
net: reorganize crate in anticipation of #1264 (#1453)
Space is made to add `tcp`, `udp`, `uds`, ... modules.
Diffstat (limited to 'tokio-net')
-rw-r--r--tokio-net/src/driver/mod.rs140
-rw-r--r--tokio-net/src/driver/platform.rs28
-rw-r--r--tokio-net/src/driver/reactor.rs532
-rw-r--r--tokio-net/src/driver/registration.rs (renamed from tokio-net/src/registration.rs)6
-rw-r--r--tokio-net/src/driver/sharded_rwlock.rs (renamed from tokio-net/src/sharded_rwlock.rs)0
-rw-r--r--tokio-net/src/lib.rs569
-rw-r--r--tokio-net/src/util/mod.rs5
-rw-r--r--tokio-net/src/util/poll_evented.rs (renamed from tokio-net/src/poll_evented.rs)11
8 files changed, 715 insertions, 576 deletions
diff --git a/tokio-net/src/driver/mod.rs b/tokio-net/src/driver/mod.rs
new file mode 100644
index 00000000..2ba54d9a
--- /dev/null
+++ b/tokio-net/src/driver/mod.rs
@@ -0,0 +1,140 @@
+//! Event loop that drives Tokio I/O resources.
+//!
+//! This module contains [`Reactor`], which is the event loop that drives all
+//! Tokio I/O resources. It is the reactor's job to receive events from the
+//! operating system ([epoll], [kqueue], [IOCP], etc...) and forward them to
+//! waiting tasks. It is the bridge between operating system and the futures
+//! model.
+//!
+//! # Overview
+//!
+//! When using Tokio, all operations are asynchronous and represented by
+//! futures. These futures, representing the application logic, are scheduled by
+//! an executor (see [runtime model] for more details). Executors wait for
+//! notifications before scheduling the future for execution time, i.e., nothing
+//! happens until an event is received indicating that the task can make
+//! progress.
+//!
+//! The reactor receives events from the operating system and notifies the
+//! executor.
+//!
+//! Let's start with a basic example, establishing a TCP connection.
+//!
+//! ```
+//! #![feature(async_await)]
+//!
+//! use tokio::net::TcpStream;
+//!
+//! # async fn process<T>(t: T) {}
+//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
+//! let addr = "93.184.216.34:9243".parse()?;
+//!
+//! let stream = TcpStream::connect(&addr).await?;
+//!
+//! println!("successfully connected");
+//!
+//! process(stream).await;
+//! # Ok(())
+//! # }
+//! ```
+//!
+//! Establishing a TCP connection usually cannot be completed immediately.
+//! [`TcpStream::connect`] does not block the current thread. Instead, it
+//! returns a [future][connect-future] that resolves once the TCP connection has
+//! been established. The connect future itself has no way of knowing when the
+//! TCP connection has been established.
+//!
+//! Before returning the future, [`TcpStream::connect`] registers the socket
+//! with a reactor. This registration process, handled by [`Registration`], is
+//! what links the [`TcpStream`] with the [`Reactor`] instance. At this point,
+//! the reactor starts listening for connection events from the operating system
+//! for that socket.
+//!
+//! Once the connect future is passed to [`tokio::run`], it is spawned onto a
+//! thread pool. The thread pool waits until it is notified that the connection
+//! has completed.
+//!
+//! When the TCP connection is established, the reactor receives an event from
+//! the operating system. It then notifies the thread pool, telling it that the
+//! connect future can complete. At this point, the thread pool will schedule
+//! the task to run on one of its worker threads. This results in the `and_then`
+//! closure to get executed.
+//!
+//! ## Lazy registration
+//!
+//! Notice how the snippet above does not explicitly reference a reactor. When
+//! [`TcpStream::connect`] is called, it registers the socket with a reactor,
+//! but no reactor is specified. This works because the registration process
+//! mentioned above is actually lazy. It doesn't *actually* happen in the
+//! [`connect`] function. Instead, the registration is established the first
+//! time that the task is polled (again, see [runtime model]).
+//!
+//! A reactor instance is automatically made available when using the Tokio
+//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor
+//! sets a thread-local variable referencing the associated [`Reactor`] instance
+//! and [`Handle::current`] (used by [`Registration`]) returns the reference.
+//!
+//! ## Implementation
+//!
+//! The reactor implementation uses [`mio`] to interface with the operating
+//! system's event queue. A call to [`Reactor::poll`] results in a single
+//! call to [`Poll::poll`] which in turn results in a single call to the
+//! operating system's selector.
+//!
+//! The reactor maintains state for each registered I/O resource. This tracks
+//! the executor task to notify when events are provided by the operating
+//! system's selector. This state is stored in a `Sync` data structure and
+//! referenced by [`Registration`]. When the [`Registration`] instance is
+//! dropped, this state is cleaned up. Because the state is stored in a `Sync`
+//! data structure, the [`Registration`] instance is able to be moved to other
+//! threads.
+//!
+//! By default, a runtime's default reactor runs on a background thread. This
+//! ensures that application code cannot significantly impact the reactor's
+//! responsiveness.
+//!
+//! ## Integrating with the reactor
+//!
+//! Tokio comes with a number of I/O resources, like TCP and UDP sockets, that
+//! automatically integrate with the reactor. However, library authors or
+//! applications may wish to implement their own resources that are also backed
+//! by the reactor.
+//!
+//! There are a couple of ways to do this.
+//!
+//! If the custom I/O resource implements [`mio::Evented`] and implements
+//! [`std::io::Read`] and / or [`std::io::Write`], then [`PollEvented`] is the
+//! most suited.
+//!
+//! Otherwise, [`Registration`] can be used directly. This provides the lowest
+//! level primitive needed for integrating with the reactor: a stream of
+//! readiness events.
+//!
+//! [`Reactor`]: struct.Reactor.html
+//! [`Registration`]: struct.Registration.html
+//! [runtime model]: https://tokio.rs/docs/internals/runtime-model/
+//! [epoll]: http://man7.org/linux/man-pages/man7/epoll.7.html
+//! [kqueue]: https://www.freebsd.org/cgi/man.cgi?query=kqueue&sektion=2
+//! [IOCP]: https://msdn.microsoft.com/en-us/library/windows/desktop/aa365198(v=vs.85).aspx
+//! [`TcpStream::connect`]: ../net/struct.TcpStream.html#method.connect
+//! [`connect`]: ../net/struct.TcpStream.html#method.connect
+//! [connect-future]: ../net/struct.ConnectFuture.html
+//! [`tokio::run`]: ../runtime/fn.run.html
+//! [`TcpStream`]: ../net/struct.TcpStream.html
+//! [runtime]: ../runtime
+//! [`Handle::current`]: struct.Handle.html#method.current
+//! [`mio`]: https://github.com/carllerche/mio
+//! [`Reactor::poll`]: struct.Reactor.html#method.poll
+//! [`Poll::poll`]: https://docs.rs/mio/0.6/mio/struct.Poll.html#method.poll
+//! [`mio::Evented`]: https://docs.rs/mio/0.6/mio/trait.Evented.html
+//! [`PollEvented`]: struct.PollEvented.html
+//! [`std::io::Read`]: https://doc.rust-lang.org/std/io/trait.Read.html
+//! [`std::io::Write`]: https://doc.rust-lang.org/std/io/trait.Write.html
+
+pub(crate) mod platform;
+mod reactor;
+mod registration;
+mod sharded_rwlock;
+
+pub use self::reactor::{set_default, Handle, Reactor};
+pub use self::registration::Registration;
diff --git a/tokio-net/src/driver/platform.rs b/tokio-net/src/driver/platform.rs
new file mode 100644
index 00000000..4cfe7345
--- /dev/null
+++ b/tokio-net/src/driver/platform.rs
@@ -0,0 +1,28 @@
+pub(crate) use self::sys::*;
+
+#[cfg(unix)]
+mod sys {
+ use mio::unix::UnixReady;
+ use mio::Ready;
+
+ pub(crate) fn hup() -> Ready {
+ UnixReady::hup().into()
+ }
+
+ pub(crate) fn is_hup(ready: Ready) -> bool {
+ UnixReady::from(ready).is_hup()
+ }
+}
+
+#[cfg(windows)]
+mod sys {
+ use mio::Ready;
+
+ pub(crate) fn hup() -> Ready {
+ Ready::empty()
+ }
+
+ pub(crate) fn is_hup(_: Ready) -> bool {
+ false
+ }
+}
diff --git a/tokio-net/src/driver/reactor.rs b/tokio-net/src/driver/reactor.rs
new file mode 100644
index 00000000..2aa60374
--- /dev/null
+++ b/tokio-net/src/driver/reactor.rs
@@ -0,0 +1,532 @@
+use super::platform;
+use super::sharded_rwlock::RwLock;
+
+use tokio_executor::park::{Park, Unpark};
+use tokio_sync::AtomicWaker;
+
+use log::{debug, log_enabled, trace, Level};
+use mio::event::Evented;
+use slab::Slab;
+use std::cell::RefCell;
+use std::io;
+use std::marker::PhantomData;
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+use std::os::unix::io::{AsRawFd, RawFd};
+use std::sync::atomic::AtomicUsize;
+use std::sync::atomic::Ordering::{Relaxed, SeqCst};
+use std::sync::{Arc, Weak};
+use std::task::Waker;
+use std::time::{Duration, Instant};
+use std::{fmt, usize};
+
+/// The core reactor, or event loop.
+///
+/// The event loop is the main source of blocking in an application which drives
+/// all other I/O events and notifications happening. Each event loop can have
+/// multiple handles pointing to it, each of which can then be used to create
+/// various I/O objects to interact with the event loop in interesting ways.
+pub struct Reactor {
+ /// Reuse the `mio::Events` value across calls to poll.
+ events: mio::Events,
+
+ /// State shared between the reactor and the handles.
+ inner: Arc<Inner>,
+
+ _wakeup_registration: mio::Registration,
+}
+
+/// A reference to a reactor.
+///
+/// A `Handle` is used for associating I/O objects with an event loop
+/// explicitly. Typically though you won't end up using a `Handle` that often
+/// and will instead use the default reactor for the execution context.
+///
+/// By default, most components bind lazily to reactors.
+/// To get this behavior when manually passing a `Handle`, use `default()`.
+#[derive(Clone)]
+pub struct Handle {
+ inner: Option<HandlePriv>,
+}
+
+/// Like `Handle`, but never `None`.
+#[derive(Clone)]
+pub(crate) struct HandlePriv {
+ inner: Weak<Inner>,
+}
+
+/// Return value from the `turn` method on `Reactor`.
+///
+/// Currently this value doesn't actually provide any functionality, but it may
+/// in the future give insight into what happened during `turn`.
+#[derive(Debug)]
+pub struct Turn {
+ _priv: (),
+}
+
+#[test]
+fn test_handle_size() {
+ use std::mem;
+ assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
+}
+
+pub(super) struct Inner {
+ /// The underlying system event queue.
+ io: mio::Poll,
+
+ /// ABA guard counter
+ next_aba_guard: AtomicUsize,
+
+ /// Dispatch slabs for I/O and futures events
+ pub(super) io_dispatch: RwLock<Slab<ScheduledIo>>,
+
+ /// Used to wake up the reactor from a call to `turn`
+ wakeup: mio::SetReadiness,
+}
+
+pub(super) struct ScheduledIo {
+ aba_guard: usize,
+ pub(super) readiness: AtomicUsize,
+ pub(super) reader: AtomicWaker,
+ pub(super) writer: AtomicWaker,
+}
+
+#[derive(Debug, Eq, PartialEq, Clone, Copy)]
+pub(super) enum Direction {
+ Read,
+ Write,
+}
+
+thread_local! {
+ /// Tracks the reactor for the current execution context.
+ static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)
+}
+
+const TOKEN_SHIFT: usize = 22;
+
+// Kind of arbitrary, but this reserves some token space for later usage.
+const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
+const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES);
+
+fn _assert_kinds() {
+ fn _assert<T: Send + Sync>() {}
+
+ _assert::<Handle>();
+}
+
+// ===== impl Reactor =====
+
+#[derive(Debug)]
+///Guard that resets current reactor on drop.
+pub struct DefaultGuard<'a> {
+ _lifetime: PhantomData<&'a u8>,
+}
+
+impl Drop for DefaultGuard<'_> {
+ fn drop(&mut self) {
+ CURRENT_REACTOR.with(|current| {
+ let mut current = current.borrow_mut();
+ *current = None;
+ });
+ }
+}
+
+///Sets handle for a default reactor, returning guard that unsets it on drop.
+pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
+ CURRENT_REACTOR.with(|current| {
+ let mut current = current.borrow_mut();
+
+ assert!(
+ current.is_none(),
+ "default Tokio reactor already set \
+ for execution context"
+ );
+
+ let handle = match handle.as_priv() {
+ Some(handle) => handle,
+ None => {
+ panic!("`handle` does not reference a reactor");
+ }
+ };
+
+ *current = Some(handle.clone());
+ });
+
+ DefaultGuard {
+ _lifetime: PhantomData,
+ }
+}
+
+impl Reactor {
+ /// Creates a new event loop, returning any error that happened during the
+ /// creation.
+ pub fn new() -> io::Result<Reactor> {
+ let io = mio::Poll::new()?;
+ let wakeup_pair = mio::Registration::new2();
+
+ io.register(
+ &wakeup_pair.0,
+ TOKEN_WAKEUP,
+ mio::Ready::readable(),
+ mio::PollOpt::level(),
+ )?;
+
+ Ok(Reactor {
+ events: mio::Events::with_capacity(1024),
+ _wakeup_registration: wakeup_pair.0,
+ inner: Arc::new(Inner {
+ io,
+ next_aba_guard: AtomicUsize::new(0),
+ io_dispatch: RwLock::new(Slab::with_capacity(1)),
+ wakeup: wakeup_pair.1,
+ }),
+ })
+ }
+
+ /// Returns a handle to this event loop which can be sent across threads
+ /// and can be used as a proxy to the event loop itself.
+ ///
+ /// Handles are cloneable and clones always refer to the same event loop.
+ /// This handle is typically passed into functions that create I/O objects
+ /// to bind them to this event loop.
+ pub fn handle(&self) -> Handle {
+ Handle {
+ inner: Some(HandlePriv {
+ inner: Arc::downgrade(&self.inner),
+ }),
+ }
+ }
+
+ /// Performs one iteration of the event loop, blocking on waiting for events
+ /// for at most `max_wait` (forever if `None`).
+ ///
+ /// This method is the primary method of running this reactor and processing
+ /// I/O events that occur. This method executes one iteration of an event
+ /// loop, blocking at most once waiting for events to happen.
+ ///
+ /// If a `max_wait` is specified then the method should block no longer than
+ /// the duration specified, but this shouldn't be used as a super-precise
+ /// timer but rather a "ballpark approximation"
+ ///
+ /// # Return value
+ ///
+ /// This function returns an instance of `Turn`
+ ///
+ /// `Turn` as of today has no extra information with it and can be safely
+ /// discarded. In the future `Turn` may contain information about what
+ /// happened while this reactor blocked.
+ ///
+ /// # Errors
+ ///
+ /// This function may also return any I/O error which occurs when polling
+ /// for readiness of I/O objects with the OS. This is quite unlikely to
+ /// arise and typically mean that things have gone horribly wrong at that
+ /// point. Currently this is primarily only known to happen for internal
+ /// bugs to `tokio` itself.
+ pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
+ self.poll(max_wait)?;
+ Ok(Turn { _priv: () })
+ }
+
+ /// Returns true if the reactor is currently idle.
+ ///
+ /// Idle is defined as all tasks that have been spawned have completed,
+ /// either successfully or with an error.
+ pub fn is_idle(&self) -> bool {
+ self.inner.io_dispatch.read().is_empty()
+ }
+
+ fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
+ // Block waiting for an event to happen, peeling out how many events
+ // happened.
+ match self.inner.io.poll(&mut self.events, max_wait) {
+ Ok(_) => {}
+ Err(e) => return Err(e),
+ }
+
+ let start = if log_enabled!(Level::Debug) {
+ Some(Instant::now())
+ } else {
+ None
+ };
+
+ // Process all the events that came in, dispatching appropriately
+ let mut events = 0;
+ for event in self.events.iter() {
+ events += 1;
+ let token = event.token();
+ trace!("event {:?} {:?}", event.readiness(), event.token());
+
+ if token == TOKEN_WAKEUP {
+ self.inner
+ .wakeup
+ .set_readiness(mio::Ready::empty())
+ .unwrap();
+ } else {
+ self.dispatch(token, event.readiness());
+ }
+ }
+
+ if let Some(start) = start {
+ let dur = start.elapsed();
+ trace!(
+ "loop process - {} events, {}.{:03}s",
+ events,
+ dur.as_secs(),
+ dur.subsec_millis()
+ );
+ }
+
+ Ok(())
+ }
+
+ fn dispatch(&self, token: mio::Token, ready: mio::Ready) {
+ let aba_guard = token.0 & !MAX_SOURCES;
+ let token = token.0 & MAX_SOURCES;
+
+ let mut rd = None;
+ let mut wr = None;
+
+ // Create a scope to ensure that notifying the tasks stays out of the
+ // lock's critical section.
+ {
+ let io_dispatch = self.inner.io_dispatch.read();
+
+ let io = match io_dispatch.get(token) {
+ Some(io) => io,
+ None => return,
+ };
+
+ if aba_guard != io.aba_guard {
+ return;
+ }
+
+ io.readiness.fetch_or(ready.as_usize(), Relaxed);
+
+ if ready.is_writable() || platform::is_hup(ready) {
+ wr = io.writer.take_waker();
+ }
+
+ if !(ready & (!mio::Ready::writable())).is_empty() {
+ rd = io.reader.take_waker();
+ }
+ }
+
+ if let Some(w) = rd {
+ w.wake();
+ }
+
+ if let Some(w) = wr {
+ w.wake();
+ }
+ }
+}
+
+#[cfg(all(unix, not(target_os = "fuchsia")))]
+impl AsRawFd for Reactor {
+ fn as_raw_fd(&self) -> RawFd {
+ self.inner.io.as_raw_fd()
+ }
+}
+
+impl Park for Reactor {
+ type Unpark = Handle;
+ type Error = io::Error;
+
+ fn unpark(&self) -> Self::Unpark {
+ self.handle()
+ }
+
+ fn park(&mut self) -> io::Result<()> {
+ self.turn(None)?;
+ Ok(())
+ }
+
+ fn park_timeout(&mut self, duration: Duration) -> io::Result<()> {
+ self.turn(Some(duration))?;
+ Ok(())
+ }
+}
+
+impl fmt::Debug for Reactor {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Reactor")
+ }
+}
+
+// ===== impl Handle =====
+
+impl Handle {
+ #[doc(hidden)]
+ #[deprecated(note = "semantics were sometimes surprising, use Handle::default()")]
+ pub fn current() -> Handle {
+ // TODO: Should this panic on error?
+ HandlePriv::try_current()
+ .map(|handle| Handle {
+ inner: Some(handle),
+ })
+ .unwrap_or(Handle {
+ inner: Some(HandlePriv { inner: Weak::new() }),
+ })
+ }
+
+ pub(crate) fn as_priv(&self) -> Option<&HandlePriv> {
+ self.inner.as_ref()
+ }
+}
+
+impl Unpark for Handle {
+ fn unpark(&self) {
+ if let Some(ref h) = self.inner {
+ h.wakeup();
+ }
+ }
+}
+
+impl Default for Handle {
+ /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor.
+ fn default() -> Handle {
+ Handle { inner: None }
+ }
+}
+
+impl fmt::Debug for Handle {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "Handle")
+ }
+}
+
+// ===== impl HandlePriv =====
+
+impl HandlePriv {
+ /// Try to get a handle to the current reactor.
+ ///
+ /// Returns `Err` if no handle is found.
+ pub(super) fn try_current() -> io::Result<HandlePriv> {
+ CURRENT_REACTOR.with(|current| match *current.borrow() {
+ Some(ref handle) => Ok(handle.clone()),
+ None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")),
+ })
+ }
+
+ /// Forces a reactor blocked in a call to `turn` to wakeup, or otherwise
+ /// makes the next call to `turn` return immediately.
+ ///
+ /// This method is intended to be used in situations where a notification
+ /// needs to otherwise be sent to the main reactor. If the reactor is
+ /// currently blocked inside of `turn` then it will wake up and soon return
+ /// after this method has been called. If the reactor is not currently
+ /// blocked in `turn`, then the next call to `turn` will not block and
+ /// return immediately.
+ fn wakeup(&self) {
+ if let Some(inner) = self.inner() {
+ inner.wakeup.set_readiness(mio::Ready::readable()).unwrap();
+ }
+ }
+
+ pub(super) fn inner(&self) -> Option<Arc<Inner>> {
+ self.inner.upgrade()
+ }
+}
+
+impl fmt::Debug for HandlePriv {
+ fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
+ write!(f, "HandlePriv")
+ }
+}
+
+// ===== impl Inner =====
+
+impl Inner {
+ /// Register an I/O resource with the reactor.
+ ///
+ /// The registration token is returned.
+ pub(super) fn add_source(&self, source: &dyn Evented) -> io::Result<usize> {
+ // Get an ABA guard value
+ let aba_guard = self.next_aba_guard.fetch_add(1 << TOKEN_SHIFT, Relaxed);
+
+ let key = {
+ // Block to contain the write lock
+ let mut io_dispatch = self.io_dispatch.write();
+
+ if io_dispatch.len() == MAX_SOURCES {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "reactor at max \
+ registered I/O resources",
+ ));
+ }
+
+ io_dispatch.insert(ScheduledIo {
+ aba_guard,
+ readiness: AtomicUsize::new(0),
+ reader: AtomicWaker::new(),
+ writer: AtomicWaker::new(),
+ })
+ };
+
+ let token = aba_guard | key;
+ debug!("adding I/O source: {}", token);
+
+ self.io.register(
+ source,
+ mio::Token(token),
+ mio::Ready::all(),
+ mio::PollOpt::edge(),
+ )?;
+
+ Ok(key)
+ }
+
+ /// Deregisters an I/O resource from the reactor.
+ pub(super) fn deregister_source(&self, source: &dyn Evented) -> io::Result<()> {
+ self.io.deregister(source)
+ }
+
+ pub(super) fn drop_source(&self, token: usize) {
+ debug!("dropping I/O source: {}", token);
+ self.io_dispatch.write().remove(token);
+ }
+
+ /// Registers interest in the I/O resource associated with `token`.
+ pub(super) fn register(&self, token: usize, dir: Direction, w: Waker) {
+ debug!("scheduling {:?} for: {}", dir, token);
+ let io_dispatch = self.io_dispatch.read();
+ let sched = io_dispatch.get(token).unwrap();
+
+ let (waker, ready) = match dir {
+ Direction::Read => (&sched.reader, !mio::Ready::writable()),
+ Direction::Write => (&sched.writer, mio::Ready::writable()),
+ };
+
+ waker.register(w);
+
+ if sched.readiness.load(SeqCst) & ready.as_usize() != 0 {
+ waker.wake();
+ }
+ }
+}
+
+impl Drop for Inner {
+ fn drop(&mut self) {
+ // When a reactor is dropped it needs to wake up all blocked tasks as
+ // they'll never receive a notification, and all connected I/O objects
+ // will start returning errors pretty quickly.
+ let io = self.io_dispatch.read();
+ for (_, io) in io.iter() {
+ io.writer.wake();
+ io.reader.wake();
+ }
+ }
+}
+
+impl Direction {
+ pub(super) fn mask(self) -> mio::Ready {
+ match self {
+ Direction::Read => {
+ // Everything except writable is signaled through read.
+ mio::Ready::all() - mio::Ready::writable()
+ }
+ Direction::Write => mio::Ready::writable() | platform::hup(),
+ }
+ }
+}
diff --git a/tokio-net/src/registration.rs b/tokio-net/src/driver/registration.rs
index d094e218..eb24d8f7 100644
--- a/tokio-net/src/registration.rs
+++ b/tokio-net/src/driver/registration.rs
@@ -1,4 +1,6 @@
-use crate::{Direction, Handle, HandlePriv};
+use super::platform;
+use super::reactor::{Direction, Handle, HandlePriv};
+
use log::debug;
use mio::{self, Evented};
use std::cell::UnsafeCell;
@@ -504,7 +506,7 @@ impl Inner {
};
let mask = direction.mask();
- let mask_no_hup = (mask - crate::platform::hup()).as_usize();
+ let mask_no_hup = (mask - platform::hup()).as_usize();
let io_dispatch = inner.io_dispatch.read();
let sched = &io_dispatch[self.token];
diff --git a/tokio-net/src/sharded_rwlock.rs b/tokio-net/src/driver/sharded_rwlock.rs
index 67892481..67892481 100644
--- a/tokio-net/src/sharded_rwlock.rs
+++ b/tokio-net/src/driver/sharded_rwlock.rs
diff --git a/tokio-net/src/lib.rs b/tokio-net/src/lib.rs
index 09a9d5e7..c1393300 100644
--- a/tokio-net/src/lib.rs
+++ b/tokio-net/src/lib.rs
@@ -36,570 +36,5 @@
//! [`PollEvented`]: struct.PollEvented.html
//! [reactor module]: https://docs.rs/tokio/0.1/tokio/reactor/index.html
-mod poll_evented;
-mod registration;
-mod sharded_rwlock;
-
-// ===== Public re-exports =====
-
-pub use self::poll_evented::PollEvented;
-pub use self::registration::Registration;
-
-// ===== Private imports =====
-
-use crate::sharded_rwlock::RwLock;
-use log::{debug, log_enabled, trace, Level};
-use mio::event::Evented;
-use slab::Slab;
-use std::cell::RefCell;
-use std::io;
-use std::marker::PhantomData;
-#[cfg(all(unix, not(target_os = "fuchsia")))]
-use std::os::unix::io::{AsRawFd, RawFd};
-use std::sync::atomic::AtomicUsize;
-use std::sync::atomic::Ordering::{Relaxed, SeqCst};
-use std::sync::{Arc, Weak};
-use std::task::Waker;
-use std::time::{Duration, Instant};
-use std::{fmt, usize};
-use tokio_executor::park::{Park, Unpark};
-use tokio_sync::AtomicWaker;
-
-/// The core reactor, or event loop.
-///
-/// The event loop is the main source of blocking in an application which drives
-/// all other I/O events and notifications happening. Each event loop can have
-/// multiple handles pointing to it, each of which can then be used to create
-/// various I/O objects to interact with the event loop in interesting ways.
-pub struct Reactor {
- /// Reuse the `mio::Events` value across calls to poll.
- events: mio::Events,
-
- /// State shared between the reactor and the handles.
- inner: Arc<Inner>,
-
- _wakeup_registration: mio::Registration,
-}
-
-/// A reference to a reactor.
-///
-/// A `Handle` is used for associating I/O objects with an event loop
-/// explicitly. Typically though you won't end up using a `Handle` that often
-/// and will instead use the default reactor for the execution context.
-///
-/// By default, most components bind lazily to reactors.
-/// To get this behavior when manually passing a `Handle`, use `default()`.
-#[derive(Clone)]
-pub struct Handle {
- inner: Option<HandlePriv>,
-}
-
-/// Like `Handle`, but never `None`.
-#[derive(Clone)]
-struct HandlePriv {
- inner: Weak<Inner>,
-}
-
-/// Return value from the `turn` method on `Reactor`.
-///
-/// Currently this value doesn't actually provide any functionality, but it may
-/// in the future give insight into what happened during `turn`.
-#[derive(Debug)]
-pub struct Turn {
- _priv: (),
-}
-
-#[test]
-fn test_handle_size() {
- use std::mem;
- assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
-}
-
-struct Inner {
- /// The underlying system event queue.
- io: mio::Poll,
-
- /// ABA guard counter
- next_aba_guard: AtomicUsize,
-
- /// Dispatch slabs for I/O and futures events
- io_dispatch: RwLock<Slab<ScheduledIo>>,
-
- /// Used to wake up the reactor from a call to `turn`
- wakeup: mio::SetReadiness,
-}
-
-struct ScheduledIo {
- aba_guard: usize,
- readiness: AtomicUsize,
- reader: AtomicWaker,
- writer: AtomicWaker,
-}
-
-#[derive(Debug, Eq, PartialEq, Clone, Copy)]
-pub(crate) enum Direction {
- Read,
- Write,
-}
-
-thread_local! {
- /// Tracks the reactor for the current execution context.
- static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)
-}
-
-const TOKEN_SHIFT: usize = 22;
-
-// Kind of arbitrary, but this reserves some token space for later usage.
-const MAX_SOURCES: usize = (1 << TOKEN_SHIFT) - 1;
-const TOKEN_WAKEUP: mio::Token = mio::Token(MAX_SOURCES);
-
-fn _assert_kinds() {
- fn _assert<T: Send + Sync>() {}
-
- _assert::<Handle>();
-}
-
-// ===== impl Reactor =====
-
-#[derive(Debug)]
-///Guard that resets current reactor on drop.
-pub struct DefaultGuard<'a> {
- _lifetime: PhantomData<&'a u8>,
-}
-
-impl Drop for DefaultGuard<'_> {
- fn drop(&mut self) {
- CURRENT_REACTOR.with(|current| {
- let mut current = current.borrow_mut();
- *current = None;
- });
- }
-}
-
-///Sets handle for a default reactor, returning guard that unsets it on drop.
-pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
- CURRENT_REACTOR.with(|current| {
- let mut current = current.borrow_mut();
-
- assert!(
- current.is_none(),
- "default Tokio reactor already set \
- for execution context"
- );
-
- let handle = match handle.as_priv() {
- Some(handle) => handle,
- None => {
- panic!("`handle` does not reference a reactor");
- }
- };
-
- *current = Some(handle.clone());
- });
-
- DefaultGuard {
- _lifetime: PhantomData,
- }
-}
-
-impl Reactor {
- /// Creates a new event loop, returning any error that happened during the
- /// creation.
- pub fn new() -> io::Result<Reactor> {
- let io = mio::Poll::new()?;
- let wakeup_pair = mio::Registration::new2();
-
- io.register(
- &wakeup_pair.0,
- TOKEN_WAKEUP,
- mio::Ready::readable(),
- mio::PollOpt::level(),
- )?;
-
- Ok(Reactor {
- events: mio::Events::with_capacity(1024),
- _wakeup_registration: wakeup_pair.0,
- inner: Arc::new(Inner {
- io,
- next_aba_guard: AtomicUsize::new(0),
- io_dispatch: RwLock::new(Slab::with_capacity(1)),
- wakeup: wakeup_pair.1,
- }),
- })
- }
-
- /// Returns a handle to this event loop which can be sent across threads
- /// and can be used as a proxy to the event loop itself.
- ///
- /// Handles are cloneable and clones always refer to the same event loop.
- /// This handle is typically passed into functions that create I/O objects
- /// to bind them to this event loop.
- pub fn handle(&self) -> Handle {
- Handle {
- inner: Some(HandlePriv {
- inner: Arc::downgrade(&self.inner),
- }),
- }
- }
-
- /// Performs one iteration of the event loop, blocking on waiting for events
- /// for at most `max_wait` (forever if `None`).
- ///
- /// This method is the primary method of running this reactor and processing
- /// I/O events that occur. This method executes one iteration of an event
- /// loop, blocking at most once waiting for events to happen.
- ///
- /// If a `max_wait` is specified then the method should block no longer than
- /// the duration specified, but this shouldn't be used as a super-precise
- /// timer but rather a "ballpark approximation"
- ///
- /// # Return value
- ///
- /// This function returns an instance of `Turn`
- ///
- /// `Turn` as of today has no extra information with it and can be safely
- /// discarded. In the future `Turn` may contain information about what
- /// happened while this reactor blocked.
- ///
- /// # Errors
- ///
- /// This function may also return any I/O error which occurs when polling
- /// for readiness of I/O objects with the OS. This is quite unlikely to
- /// arise and typically mean that things have gone horribly wrong at that
- /// point. Currently this is primarily only known to happen for internal
- /// bugs to `tokio` itself.
- pub fn turn(&mut self, max_wait: Option<Duration>) -> io::Result<Turn> {
- self.poll(max_wait)?;
- Ok(Turn { _priv: () })
- }
-
- /// Returns true if the reactor is currently idle.
- ///
- /// Idle is defined as all tasks that have been spawned have completed,
- /// either successfully or with an error.
- pub fn is_idle(&self) -> bool {
- self.inner.io_dispatch.read().is_empty()
- }
-
- fn poll(&mut self, max_wait: Option<Duration>) -> io::Result<()> {
- // Block waiting for an event to happen, peeling out how many events
- // happened.
- match self.inner.io.poll(&mut self.events, max_wait) {
- Ok(_) => {}
- Err(e) => return Err(e),
- }
-
- let start = if log_enabled!(Level::Debug) {
- Some(Instant::now())
- } else {