summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorKevin Leimkuhler <kleimkuhler@icloud.com>2019-10-21 16:20:06 -0700
committerGitHub <noreply@github.com>2019-10-21 16:20:06 -0700
commitc9bcbe77b9fa36868cc94cc75426823c7ec205ac (patch)
tree796c52bbf53eedf72b9f4dd94370e9387a09f824
parent978013a215ebae63cd087139514de32bbd36ce11 (diff)
net: Eagerly bind resources to reactors (#1666)
## Motivation The `tokio_net` resources can be created outside of a runtime due to how tokio has been used with futures to date. For example, this allows a `TcpStream` to be created, and later passed into a runtime: ``` let stream = TcpStream::connect(...).and_then(|socket| { // do something }); tokio::run(stream); ``` In order to support this functionality, the reactor was lazily bound to the resource on the first call to `poll_read_ready`/`poll_write_ready`. This required a lot of additional complexity in the binding logic to support. With the tokio 0.2 common case, this is no longer necessary and can be removed. All resources are expected to be created from within a runtime, and should panic if not done so. Closes #1168 ## Solution The `tokio_net` crate now assumes there to be a `CURRENT_REACTOR` set on the worker thread creating a resource; this can be assumed if called within a tokio runtime. If there is no current reactor, the application will panic with a "no current reactor" message. With this assumption, all the unsafe and atomics have been removed from `tokio_net::driver::Registration` as it is no longer needed. There is no longer any reason to pass in handles to the family of `from_std` methods on `net` resources. `Handle::current` has therefore a more restricted private use where it is only used in `driver::Registration::new`. Signed-off-by: Kevin Leimkuhler <kleimkuhler@icloud.com>
-rw-r--r--tokio-net/src/driver/mod.rs22
-rw-r--r--tokio-net/src/driver/reactor.rs96
-rw-r--r--tokio-net/src/driver/registration.rs366
-rw-r--r--tokio-net/src/process/unix/mod.rs2
-rw-r--r--tokio-net/src/process/windows.rs2
-rw-r--r--tokio-net/src/signal/unix.rs2
-rw-r--r--tokio-net/src/tcp/listener.rs42
-rw-r--r--tokio-net/src/tcp/stream.rs42
-rw-r--r--tokio-net/src/udp/socket.rs20
-rw-r--r--tokio-net/src/uds/datagram.rs23
-rw-r--r--tokio-net/src/uds/listener.rs13
-rw-r--r--tokio-net/src/uds/stream.rs21
-rw-r--r--tokio-net/src/uds/ucred.rs6
-rw-r--r--tokio-net/src/util/poll_evented.rs34
-rw-r--r--tokio-net/tests/bind_resource.rs22
-rw-r--r--tokio-net/tests/process_issue_42.rs32
-rw-r--r--tokio-net/tests/signal_drop_multi_loop.rs31
-rw-r--r--tokio-net/tests/signal_multi_loop.rs8
-rw-r--r--tokio/tests/drop-core.rs14
19 files changed, 217 insertions, 581 deletions
diff --git a/tokio-net/src/driver/mod.rs b/tokio-net/src/driver/mod.rs
index a05a2f30..116bfc9a 100644
--- a/tokio-net/src/driver/mod.rs
+++ b/tokio-net/src/driver/mod.rs
@@ -24,6 +24,8 @@
//! use tokio::net::TcpStream;
//!
//! # async fn process<T>(_t: T) {}
+//!
+//! # #[tokio::main]
//! # async fn dox() -> Result<(), Box<dyn std::error::Error>> {
//! let stream = TcpStream::connect("93.184.216.34:9243").await?;
//!
@@ -56,19 +58,15 @@
//! the task to run on one of its worker threads. This results in the `and_then`
//! closure to get executed.
//!
-//! ## Lazy registration
-//!
-//! Notice how the snippet above does not explicitly reference a reactor. When
-//! [`TcpStream::connect`] is called, it registers the socket with a reactor,
-//! but no reactor is specified. This works because the registration process
-//! mentioned above is actually lazy. It doesn't *actually* happen in the
-//! [`connect`] function. Instead, the registration is established the first
-//! time that the task is polled (again, see [runtime model]).
+//! ## Eager registration
//!
-//! A reactor instance is automatically made available when using the Tokio
-//! [runtime], which is done using [`tokio::run`]. The Tokio runtime's executor
-//! sets a thread-local variable referencing the associated [`Reactor`] instance
-//! and [`Handle::current`] (used by [`Registration`]) returns the reference.
+//! Notice how the snippet does not explicitly reference a reactor. When
+//! [`TcpStream::connect`] is called, it registers the socket with the current
+//! reactor, but no reactor is specified. This works because a reactor
+//! instance is automatically made available when using the Tokio [runtime],
+//! which is done using [`tokio::main`]. The Tokio runtime's executor sets a
+//! thread-local variable referencing the associated [`Reactor`] instance and
+//! [`Handle::current`] (used by [`Registration`]) returns the reference.
//!
//! ## Implementation
//!
diff --git a/tokio-net/src/driver/reactor.rs b/tokio-net/src/driver/reactor.rs
index 800ab509..c83ce8d5 100644
--- a/tokio-net/src/driver/reactor.rs
+++ b/tokio-net/src/driver/reactor.rs
@@ -39,17 +39,8 @@ pub struct Reactor {
/// A `Handle` is used for associating I/O objects with an event loop
/// explicitly. Typically though you won't end up using a `Handle` that often
/// and will instead use the default reactor for the execution context.
-///
-/// By default, most components bind lazily to reactors.
-/// To get this behavior when manually passing a `Handle`, use `default()`.
#[derive(Clone)]
pub struct Handle {
- inner: Option<HandlePriv>,
-}
-
-/// Like `Handle`, but never `None`.
-#[derive(Clone)]
-pub(crate) struct HandlePriv {
inner: Weak<Inner>,
}
@@ -62,12 +53,6 @@ pub struct Turn {
_priv: (),
}
-#[test]
-fn test_handle_size() {
- use std::mem;
- assert_eq!(mem::size_of::<Handle>(), mem::size_of::<HandlePriv>());
-}
-
pub(super) struct Inner {
/// The underlying system event queue.
io: mio::Poll,
@@ -97,7 +82,7 @@ pub(super) enum Direction {
thread_local! {
/// Tracks the reactor for the current execution context.
- static CURRENT_REACTOR: RefCell<Option<HandlePriv>> = RefCell::new(None)
+ static CURRENT_REACTOR: RefCell<Option<Handle>> = RefCell::new(None)
}
const TOKEN_SHIFT: usize = 22;
@@ -115,7 +100,7 @@ fn _assert_kinds() {
// ===== impl Reactor =====
#[derive(Debug)]
-///Guard that resets current reactor on drop.
+/// Guard that resets current reactor on drop.
pub struct DefaultGuard<'a> {
_lifetime: PhantomData<&'a u8>,
}
@@ -129,7 +114,7 @@ impl Drop for DefaultGuard<'_> {
}
}
-///Sets handle for a default reactor, returning guard that unsets it on drop.
+/// Sets handle for a default reactor, returning guard that unsets it on drop.
pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
CURRENT_REACTOR.with(|current| {
let mut current = current.borrow_mut();
@@ -140,13 +125,6 @@ pub fn set_default(handle: &Handle) -> DefaultGuard<'_> {
for execution context"
);
- let handle = match handle.as_priv() {
- Some(handle) => handle,
- None => {
- panic!("`handle` does not reference a reactor");
- }
- };
-
*current = Some(handle.clone());
});
@@ -189,9 +167,7 @@ impl Reactor {
/// to bind them to this event loop.
pub fn handle(&self) -> Handle {
Handle {
- inner: Some(HandlePriv {
- inner: Arc::downgrade(&self.inner),
- }),
+ inner: Arc::downgrade(&self.inner),
}
}
@@ -349,55 +325,15 @@ impl fmt::Debug for Reactor {
// ===== impl Handle =====
impl Handle {
- #[doc(hidden)]
- #[deprecated(note = "semantics were sometimes surprising, use Handle::default()")]
- pub fn current() -> Handle {
- // TODO: Should this panic on error?
- HandlePriv::try_current()
- .map(|handle| Handle {
- inner: Some(handle),
- })
- .unwrap_or(Handle {
- inner: Some(HandlePriv { inner: Weak::new() }),
- })
- }
-
- pub(crate) fn as_priv(&self) -> Option<&HandlePriv> {
- self.inner.as_ref()
- }
-}
-
-impl Unpark for Handle {
- fn unpark(&self) {
- if let Some(ref h) = self.inner {
- h.wakeup();
- }
- }
-}
-
-impl Default for Handle {
- /// Returns a "default" handle, i.e., a handle that lazily binds to a reactor.
- fn default() -> Handle {
- Handle { inner: None }
- }
-}
-
-impl fmt::Debug for Handle {
- fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "Handle")
- }
-}
-
-// ===== impl HandlePriv =====
-
-impl HandlePriv {
- /// Try to get a handle to the current reactor.
+ /// Returns a handle to the current reactor
+ ///
+ /// # Panics
///
- /// Returns `Err` if no handle is found.
- pub(super) fn try_current() -> io::Result<HandlePriv> {
+ /// This function panics if there is no current reactor set.
+ pub(super) fn current() -> Self {
CURRENT_REACTOR.with(|current| match *current.borrow() {
- Some(ref handle) => Ok(handle.clone()),
- None => Err(io::Error::new(io::ErrorKind::Other, "no current reactor")),
+ Some(ref handle) => handle.clone(),
+ None => panic!("no current reactor"),
})
}
@@ -421,9 +357,15 @@ impl HandlePriv {
}
}
-impl fmt::Debug for HandlePriv {
+impl Unpark for Handle {
+ fn unpark(&self) {
+ self.wakeup();
+ }
+}
+
+impl fmt::Debug for Handle {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(f, "HandlePriv")
+ write!(f, "Handle")
}
}
diff --git a/tokio-net/src/driver/registration.rs b/tokio-net/src/driver/registration.rs
index 4d2cd9df..ed53b84e 100644
--- a/tokio-net/src/driver/registration.rs
+++ b/tokio-net/src/driver/registration.rs
@@ -1,12 +1,10 @@
use super::platform;
-use super::reactor::{Direction, Handle, HandlePriv};
+use super::reactor::{Direction, Handle};
use mio::{self, Evented};
-use std::cell::UnsafeCell;
-use std::sync::atomic::AtomicUsize;
use std::sync::atomic::Ordering::SeqCst;
-use std::task::{Context, Poll, Waker};
-use std::{io, ptr, usize};
+use std::task::{Context, Poll};
+use std::{io, usize};
/// Associates an I/O resource with the reactor instance that drives it.
///
@@ -14,10 +12,9 @@ use std::{io, ptr, usize};
/// that it will receive task notifications on readiness. This is the lowest
/// level API for integrating with a reactor.
///
-/// The association between an I/O resource is made by calling [`register`].
-/// Once the association is established, it remains established until the
-/// registration instance is dropped. Subsequent calls to [`register`] are
-/// no-ops.
+/// The association between an I/O resource is made by calling [`new`]. Once
+/// the association is established, it remains established until the
+/// registration instance is dropped.
///
/// A registration instance represents two separate readiness streams. One for
/// the read readiness and one for write readiness. These streams are
@@ -36,86 +33,38 @@ use std::{io, ptr, usize};
/// These events are included as part of the read readiness event stream. The
/// write readiness event stream is only for `Ready::writable()` events.
///
-/// [`register`]: #method.register
+/// [`new`]: #method.new
/// [`poll_read_ready`]: #method.poll_read_ready`]
/// [`poll_write_ready`]: #method.poll_write_ready`]
#[derive(Debug)]
pub struct Registration {
- /// Stores the handle. Once set, the value is not changed.
- ///
- /// Setting this requires acquiring the lock from state.
- inner: UnsafeCell<Option<Inner>>,
-
- /// Tracks the state of the registration.
- ///
- /// The least significant 2 bits are used to track the lifecycle of the
- /// registration. The rest of the `state` variable is a pointer to tasks
- /// that must be notified once the lock is released.
- state: AtomicUsize,
-}
-
-#[derive(Debug)]
-struct Inner {
- handle: HandlePriv,
+ handle: Handle,
token: usize,
}
-/// Tasks waiting on readiness notifications.
-#[derive(Debug)]
-struct Node {
- direction: Direction,
- waker: Waker,
- next: *mut Node,
-}
-
-/// Initial state. The handle is not set and the registration is idle.
-const INIT: usize = 0;
-
-/// A thread locked the state and will associate a handle.
-const LOCKED: usize = 1;
-
-/// A handle has been associated with the registration.
-const READY: usize = 2;
-
-/// Masks the lifecycle state
-const LIFECYCLE_MASK: usize = 0b11;
-
-/// A fake token used to identify error situations
-const ERROR: usize = usize::MAX;
-
// ===== impl Registration =====
impl Registration {
- /// Create a new `Registration`.
- ///
- /// This registration is not associated with a Reactor instance. Call
- /// `register` to establish the association.
- pub fn new() -> Registration {
- Registration {
- inner: UnsafeCell::new(None),
- state: AtomicUsize::new(INIT),
- }
- }
-
/// Register the I/O resource with the default reactor.
///
- /// This function is safe to call concurrently and repeatedly. However, only
- /// the first call will establish the registration. Subsequent calls will be
- /// no-ops.
- ///
/// # Return
///
- /// If the registration happened successfully, `Ok(true)` is returned.
- ///
- /// If an I/O resource has previously been successfully registered,
- /// `Ok(false)` is returned.
- ///
- /// If an error is encountered during registration, `Err` is returned.
- pub fn register<T>(&self, io: &T) -> io::Result<bool>
+ /// - `Ok` if the registration happened successfully
+ /// - `Err` if an error was encountered during registration
+ pub fn new<T>(io: &T) -> io::Result<Self>
where
T: Evented,
{
- self.register2(io, HandlePriv::try_current)
+ let handle = Handle::current();
+ let token = if let Some(inner) = handle.inner() {
+ inner.add_source(io)?
+ } else {
+ return Err(io::Error::new(
+ io::ErrorKind::Other,
+ "failed to find event loop",
+ ));
+ };
+ Ok(Self { handle, token })
}
/// Deregister the I/O resource from the reactor it is associated with.
@@ -138,114 +87,11 @@ impl Registration {
where
T: Evented,
{
- // The state does not need to be checked and coordination is not
- // necessary as this function takes `&mut self`. This guarantees a
- // single thread is accessing the instance.
- if let Some(inner) = unsafe { (*self.inner.get()).as_ref() } {
- inner.deregister(io)?;
- }
-
- Ok(())
- }
-
- /// Register the I/O resource with the specified reactor.
- ///
- /// This function is safe to call concurrently and repeatedly. However, only
- /// the first call will establish the registration. Subsequent calls will be
- /// no-ops.
- ///
- /// If the registration happened successfully, `Ok(true)` is returned.
- ///
- /// If an I/O resource has previously been successfully registered,
- /// `Ok(false)` is returned.
- ///
- /// If an error is encountered during registration, `Err` is returned.
- pub fn register_with<T>(&self, io: &T, handle: &Handle) -> io::Result<bool>
- where
- T: Evented,
- {
- self.register2(io, || match handle.as_priv() {
- Some(handle) => Ok(handle.clone()),
- None => HandlePriv::try_current(),
- })
- }
-
- pub(crate) fn register_with_priv<T>(&self, io: &T, handle: &HandlePriv) -> io::Result<bool>
- where
- T: Evented,
- {
- self.register2(io, || Ok(handle.clone()))
- }
-
- fn register2<T, F>(&self, io: &T, f: F) -> io::Result<bool>
- where
- T: Evented,
- F: Fn() -> io::Result<HandlePriv>,
- {
- let mut state = self.state.load(SeqCst);
-
- loop {
- match state {
- INIT => {
- // Registration is currently not associated with a handle.
- // Get a handle then attempt to lock the state.
- let handle = f()?;
-
- let actual = self.state.compare_and_swap(INIT, LOCKED, SeqCst);
-
- if actual != state {
- state = actual;
- continue;
- }
-
- // Create the actual registration
- let (inner, res) = Inner::new(io, handle);
-
- unsafe {
- *self.inner.get() = Some(inner);
- }
-
- // Transition out of the locked state. This acquires the
- // current value, potentially having a list of tasks that
- // are pending readiness notifications.
- let actual = self.state.swap(READY, SeqCst);
-
- // Consume the stack of nodes
-
- let mut read = false;
- let mut write = false;
- let mut ptr = (actual & !LIFECYCLE_MASK) as *mut Node;
-
- let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
-
- while !ptr.is_null() {
- let node = unsafe { Box::from_raw(ptr) };
- let node = *node;
- let Node {
- direction,
- waker,
- next,
- } = node;
-
- let flag = match direction {
- Direction::Read => &mut read,
- Direction::Write => &mut write,
- };
-
- if !*flag {
- *flag = true;
-
- inner.register(direction, waker);
- }
-
- ptr = next;
- }
-
- return res.map(|_| true);
- }
- _ => return Ok(false),
- }
- }
+ let inner = match self.handle.inner() {
+ Some(inner) => inner,
+ None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
+ };
+ inner.deregister_source(io)
}
/// Poll for events on the I/O resource's read readiness stream.
@@ -350,160 +196,26 @@ impl Registration {
self.poll_ready(Direction::Write, None)
}
+ /// Poll for events on the I/O resource's `direction` readiness stream.
+ ///
+ /// If called with a task context, notify the task when a new event is
+ /// received.
fn poll_ready(
&self,
direction: Direction,
cx: Option<&mut Context<'_>>,
) -> io::Result<Option<mio::Ready>> {
- let mut state = self.state.load(SeqCst);
-
- // Cache the node pointer
- let mut node = None;
-
- loop {
- match state {
- INIT => {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "must call register before poll_read_ready",
- ));
- }
- READY => {
- let inner = unsafe { (*self.inner.get()).as_ref().unwrap() };
- return inner.poll_ready(direction, cx);
- }
- LOCKED => {
- let cx = if let Some(ref cx) = cx {
- cx
- } else {
- // Skip the notification tracking junk.
- return Ok(None);
- };
-
- let next_ptr = (state & !LIFECYCLE_MASK) as *mut Node;
-
- // Get the node
- let mut n = node.take().unwrap_or_else(|| {
- Box::new(Node {
- direction,
- waker: cx.waker().clone(),
- next: ptr::null_mut(),
- })
- });
-
- n.next = next_ptr;
-
- let node_ptr = Box::into_raw(n);
- let next = node_ptr as usize | (state & LIFECYCLE_MASK);
-
- let actual = self.state.compare_and_swap(state, next, SeqCst);
-
- if actual != state {
- // Back out of the node boxing
- let n = unsafe { Box::from_raw(node_ptr) };
-
- // Save this for next loop
- node = Some(n);
-
- state = actual;
- continue;
- }
-
- return Ok(None);
- }
- _ => unreachable!(),
- }
- }
- }
-}
-
-impl Default for Registration {
- fn default() -> Self {
- Self::new()
- }
-}
-
-unsafe impl Send for Registration {}
-unsafe impl Sync for Registration {}
-
-// ===== impl Inner =====
-
-impl Inner {
- fn new<T>(io: &T, handle: HandlePriv) -> (Self, io::Result<()>)
- where
- T: Evented,
- {
- let mut res = Ok(());
-
- let token = match handle.inner() {
- Some(inner) => match inner.add_source(io) {
- Ok(token) => token,
- Err(e) => {
- res = Err(e);
- ERROR
- }
- },
- None => {
- res = Err(io::Error::new(io::ErrorKind::Other, "event loop gone"));
- ERROR
- }
- };
-
- let inner = Inner { handle, token };
-
- (inner, res)
- }
-
- fn register(&self, direction: Direction, waker: Waker) {
- if self.token == ERROR {
- waker.wake();
- return;
- }
-
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => {
- waker.wake();
- return;
- }
- };
-
- inner.register(self.token, direction, waker);
- }
-
- fn deregister<E: Evented>(&self, io: &E) -> io::Result<()> {
- if self.token == ERROR {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to associate with reactor",
- ));
- }
-
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
};
- inner.deregister_source(io)
- }
-
- fn poll_ready(
- &self,
- direction: Direction,
- cx: Option<&mut Context<'_>>,
- ) -> io::Result<Option<mio::Ready>> {
- if self.token == ERROR {
- return Err(io::Error::new(
- io::ErrorKind::Other,
- "failed to associate with reactor",
- ));
+ // If the task should be notified about new events, ensure that it has
+ // been registered
+ if let Some(ref cx) = cx {
+ inner.register(self.token, direction, cx.waker().clone())
}
- let inner = match self.handle.inner() {
- Some(inner) => inner,
- None => return Err(io::Error::new(io::ErrorKind::Other, "reactor gone")),
- };
-
let mask = direction.mask();
let mask_no_hup = (mask - platform::hup()).as_usize();
@@ -544,17 +256,15 @@ impl Inner {
}
}
-impl Drop for Inner {
- fn drop(&mut self) {
- if self.token == ERROR {
- return;
- }
+unsafe impl Send for Registration {}
+unsafe impl Sync for Registration {}
+impl Drop for Registration {
+ fn drop(&mut self) {
let inner = match self.handle.inner() {
Some(inner) => inner,
None => return,
};
-
inner.drop_source(self.token);
}
}
diff --git a/tokio-net/src/process/unix/mod.rs b/tokio-net/src/process/unix/mod.rs
index 822dfc2f..5328c377 100644
--- a/tokio-net/src/process/unix/mod.rs
+++ b/tokio-net/src/process/unix/mod.rs
@@ -221,5 +221,5 @@ where
return Err(io::Error::last_os_error());
}
}
- Ok(Some(PollEvented::new(Fd { inner: io })))
+ Ok(Some(PollEvented::new(Fd { inner: io })?))
}
diff --git a/tokio-net/src/process/windows.rs b/tokio-net/src/process/windows.rs
index f245340a..a0115356 100644
--- a/tokio-net/src/process/windows.rs
+++ b/tokio-net/src/process/windows.rs
@@ -188,5 +188,5 @@ where
None => return None,
};
let pipe = unsafe { NamedPipe::from_raw_handle(io.into_raw_handle()) };
- Some(PollEvented::new(pipe))
+ PollEvented::new(pipe).ok()
}
diff --git a/tokio-net/src/signal/unix.rs b/tokio-net/src/signal/unix.rs
index f485b4e9..db032b10 100644
--- a/tokio-net/src/signal/unix.rs
+++ b/tokio-net/src/signal/unix.rs
@@ -294,7 +294,7 @@ impl Driver {
// either, since we can't compare Handles or assume they will always
// point to the exact same reactor.
let stream = globals().receiver.try_clone()?;
- let wakeup = PollEvented::new(stream);
+ let wakeup = PollEvented::new(stream)?;
Ok(Driver { wakeup })
}
diff --git a/tokio-net/src/tcp/listener.rs b/tokio-net/src/tcp/listener.rs
index b25f2560..0b940f00 100644
--- a/tokio-net/src/tcp/listener.rs
+++ b/tokio-net/src/tcp/listener.rs
@@ -1,7 +1,6 @@
#[cfg(feature = "async-traits")]
use super::incoming::Incoming;
use super::TcpStream;
-use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
@@ -96,7 +95,7 @@ impl TcpListener {
fn bind_addr(addr: SocketAddr) -> io::Result<TcpListener> {
let listener = mio::net::TcpListener::bind(&addr)?;
- Ok(TcpListener::new(listener))
+ TcpListener::new(listener)
}
/// Accept a new incoming connection from this listener.
@@ -137,7 +136,7 @@ impl TcpListener {
let (io, addr) = ready!(self.poll_accept_std(cx))?;
let io = mio::net::TcpStream::from_stream(io)?;
- let io = TcpStream::new(io);
+ let io = TcpStream::new(io)?;
Poll::Ready(Ok((io, addr)))
}
@@ -173,10 +172,6 @@ impl TcpListener {
/// bound to and the listener will only be guaranteed to accept connections
/// of the same address type currently.
///
- /// Finally, the `handle` argument is the event loop that this listener will
- /// be bound to.
- /// Use [`Handle::default()`] to lazily bind to an event loop, just like `bind` does.
- ///
/// The platform specific behavior of this function looks like:
///
/// * On Unix, the socket is placed into nonblocking mode and connections
@@ -187,29 +182,28 @@ impl TcpListener {
/// `addr` is an IPv4 address then all sockets accepted will be IPv4 as
/// well (same for IPv6).
///
- /// [`Handle::default()`]: ../reactor/struct.Handle.html
/// # Examples
///
- /// ```no_run
+ /// ```rust,no_run
+ /// use std::error::Error;
/// use tokio::net::TcpListener;
- /// use tokio_net::driver::Handle;
///
- /// use std::net::TcpListener as StdTcpListener;
- ///
- /// let std_listener = StdTcpListener::bind("127.0.0.1:0")?;
- /// let listener = TcpListener::from_std(std_listener, &Handle::default())?;
- /// # let _ = listener;
- /// # Ok::<_, Box<dyn std::error::Error>>(())
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let std_listener = std::net::TcpListener::bind("127.0.0.1:0")?;
+ /// let listener = TcpListener::from_std(std_listener)?;
+ /// Ok(())
+ /// }
/// ```
- pub fn from_std(listener: net::TcpListener, handle: &Handle) -> io::Result<TcpListener> {
+ pub fn from_std(listener: net::TcpListener) -> io::Result<TcpListener> {
let io = mio::net::TcpListener::from_std(listener)?;
- let io = PollEvented::new_with_handle(io, handle)?;
+ let io = PollEvented::new(io)?;
Ok(TcpListener { io })
}
- fn new(listener: mio::net::TcpListener) -> TcpListener {
- let io = PollEvented::new(listener);
- TcpListener { io }
+ fn new(listener: mio::net::TcpListener) -> io::Result<TcpListener> {
+ let io = PollEvented::new(listener)?;
+ Ok(TcpListener { io })
}
/// Returns the local address that this listener is bound to.
@@ -219,7 +213,7 @@ impl TcpListener {
///
/// # Examples
///
- /// ```
+ /// ```rust,no_run
/// use tokio::net::TcpListener;
///
/// use std::io;
@@ -329,9 +323,9 @@ impl TryFrom<net::TcpListener> for TcpListener {
/// Consumes stream, returning the tokio I/O object.
///
/// This is equivalent to
- /// [`TcpListener::from_std(stream, &Handle::default())`](TcpListener::from_std).
+ /// [`TcpListener::from_std(stream)`](TcpListener::from_std).
fn try_from(stream: net::TcpListener) -> Result<Self, Self::Error> {
- Self::from_std(stream, &Handle::default())
+ Self::from_std(stream)
}
}
diff --git a/tokio-net/src/tcp/stream.rs b/tokio-net/src/tcp/stream.rs
index ea841c26..2634e690 100644
--- a/tokio-net/src/tcp/stream.rs
+++ b/tokio-net/src/tcp/stream.rs
@@ -1,5 +1,4 @@
use super::split::{split, ReadHalf, WriteHalf};
-use crate::driver::Handle;
use crate::util::PollEvented;
use crate::ToSocketAddrs;
@@ -101,7 +100,7 @@ impl TcpStream {
/// Establish a connection to the specified `addr`.
async fn connect_addr(addr: SocketAddr) -> io::Result<TcpStream> {
let sys = mio::net::TcpStream::connect(&addr)?;
- let stream = TcpStream::new(sys);
+ let stream = TcpStream::new(sys)?;
// Once we've connected, wait for the stream to be writable as
// that's when the actual connection has been initiated. Once we're
@@ -118,33 +117,32 @@ impl TcpStream {
Ok(stream)
}
- pub(crate) fn new(connected: mio::net::TcpStream) -> TcpStream {
- let io = PollEvented::new(connected);
- TcpStream { io }
+ pub(crate) fn new(connected: mio::net::TcpStream) -> io::Result<TcpStream> {
+ let io = PollEvented::new(connected)?;
+ Ok(TcpStream { io })
}
/// Create a new `TcpStream` from a `std::net::TcpStream`.
///
/// This function will convert a TCP stream created by the standard library
/// to a TCP stream ready to be used with the provided event loop handle.
- /// Use `Handle::default()` to lazily bind to an event loop, just like `connect` does.
///
/// # Examples
///
- /// ```no_run
+ /// ```rust,no_run
+ /// use std::error::Error;
/// use tokio::net::TcpStream;
- /// use tokio_net::driver::Handle;
///
- /// # fn dox() -> std::io::Result<()> {
- /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
- /// let stream = TcpStream::from_std(std_stream, &Handle::default())?;
- /// # Ok(())
- /// # }
+ /// #[tokio::main]
+ /// async fn main() -> Result<(), Box<dyn Error>> {
+ /// let std_stream = std::net::TcpStream::connect("127.0.0.1:34254")?;
+ /// let stream = TcpStream::from_std(std_stream)?;
+ /// Ok(())
+ /// }
/// ```
- pub fn from_std(stream: net::TcpStream, handle: &Handle) -> io::Result<TcpStream> {
+ pub fn from_std(stream: net::TcpStream) -> io::Result<TcpStream> {
let io = mio::net::TcpStream::from_stream(stream)?;
- let io = PollEvented::new_with_handle(io, handle)?;
-
+ let io = PollEvented::new(io)?;
Ok(TcpStream { io })
}
@@ -152,13 +150,9 @@ impl TcpStream {
//
// This should be removed in favor of some in-crate TcpSocket builder API.
#[doc(hidden)]
- pub async fn connect_std(
- stream: net::TcpStream,
- addr: &SocketAddr,
- handle: &Handle,
- ) -> io::Result<TcpStream> {
+ pub async fn connect_std(stream: net::TcpStream, addr: &SocketAddr) -> io::Result<TcpStream> {
let io = mio::net::TcpSt