summaryrefslogtreecommitdiffstats
path: root/tokio/tests/io_driver.rs
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-20 00:05:14 -0800
committerGitHub <noreply@github.com>2019-11-20 00:05:14 -0800
commit69975fb9601bbb21659db283d888470733bae660 (patch)
tree8db7c9a31e4125646634af09e197ae6479e10cc4 /tokio/tests/io_driver.rs
parent7c8b8877d440629ab9a27a2c9dcef859835d3536 (diff)
Refactor the I/O driver, extracting slab to `tokio::util`. (#1792)
The I/O driver is made private and moved to `tokio::io::driver`. `Registration` is moved to `tokio::io::Registration` and `PollEvented` is moved to `tokio::io::PollEvented`. Additionally, the concurrent slab used by the I/O driver is cleaned up and extracted to `tokio::util::slab`, allowing it to eventually be used by other types.
Diffstat (limited to 'tokio/tests/io_driver.rs')
-rw-r--r--tokio/tests/io_driver.rs83
1 files changed, 83 insertions, 0 deletions
diff --git a/tokio/tests/io_driver.rs b/tokio/tests/io_driver.rs
new file mode 100644
index 00000000..a97de8d2
--- /dev/null
+++ b/tokio/tests/io_driver.rs
@@ -0,0 +1,83 @@
+#![warn(rust_2018_idioms)]
+
+use tokio::net::TcpListener;
+use tokio::runtime;
+use tokio_test::{assert_ok, assert_pending};
+
+use futures::task::{waker_ref, ArcWake};
+use std::future::Future;
+use std::net::TcpStream;
+use std::pin::Pin;
+use std::sync::{mpsc, Arc, Mutex};
+use std::task::Context;
+
+struct Task<T> {
+ future: Mutex<Pin<Box<T>>>,
+}
+
+impl<T: Send> ArcWake for Task<T> {
+ fn wake_by_ref(_: &Arc<Self>) {
+ // Do nothing...
+ }
+}
+
+impl<T> Task<T> {
+ fn new(future: T) -> Task<T> {
+ Task {
+ future: Mutex::new(Box::pin(future)),
+ }
+ }
+}
+
+#[test]
+fn test_drop_on_notify() {
+ // When the reactor receives a kernel notification, it notifies the
+ // task that holds the associated socket. If this notification results in
+ // the task being dropped, the socket will also be dropped.
+ //
+ // Previously, there was a deadlock scenario where the reactor, while
+ // notifying, held a lock and the task being dropped attempted to acquire
+ // that same lock in order to clean up state.
+ //
+ // To simulate this case, we create a fake executor that does nothing when
+ // the task is notified. This simulates an executor in the process of
+ // shutting down. Then, when the task handle is dropped, the task itself is
+ // dropped.
+
+ let mut rt = runtime::Builder::new().basic_scheduler().build().unwrap();
+
+ let (addr_tx, addr_rx) = mpsc::channel();
+
+ // Define a task that just drains the listener
+ let task = Arc::new(Task::new(async move {
+ // Create a listener
+ let mut listener = assert_ok!(TcpListener::bind("127.0.0.1:0").await);
+
+ // Send the address
+ let addr = listener.local_addr().unwrap();
+ addr_tx.send(addr).unwrap();
+
+ loop {
+ let _ = listener.accept().await;
+ }
+ }));
+
+ {
+ rt.enter(|| {
+ let waker = waker_ref(&task);
+ let mut cx = Context::from_waker(&waker);
+ assert_pending!(task.future.lock().unwrap().as_mut().poll(&mut cx));
+ });
+ }
+
+ // Get the address
+ let addr = addr_rx.recv().unwrap();
+
+ drop(task);
+
+ // Establish a connection to the acceptor
+ let _s = TcpStream::connect(&addr).unwrap();
+
+ // Force the reactor to turn
+ rt.block_on(async {});
+}