summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorCarl Lerche <me@carllerche.com>2019-11-01 13:18:52 -0700
committerGitHub <noreply@github.com>2019-11-01 13:18:52 -0700
commitd70c928d88dff9e3e8d673b8ee02bce131598550 (patch)
tree6b079db2f80bd61764203a32ffe48769d18c1386
parent742d89b0f333150f6a550ae7840235851f4eb069 (diff)
runtime: merge multi & single threaded runtimes (#1716)
Simplify Tokio's runtime construct by combining both Runtime variants into a single type. The execution style can be controlled by a configuration setting on `Builder`. The implication of this change is that there is no longer any way to spawn `!Send` futures. This, however, is a temporary limitation. A different strategy will be employed for supporting `!Send` futures. Included in this patch is a rework of `task::JoinHandle` to support using this type from both the thread-pool and current-thread executors.
-rw-r--r--tokio-macros/src/lib.rs21
-rw-r--r--tokio-test/src/lib.rs14
-rw-r--r--tokio/Cargo.toml2
-rw-r--r--tokio/src/executor/blocking/mod.rs1
-rw-r--r--tokio/src/executor/current_thread/mod.rs1003
-rw-r--r--tokio/src/executor/current_thread/scheduler.rs808
-rw-r--r--tokio/src/executor/global.rs43
-rw-r--r--tokio/src/executor/loom/mod.rs6
-rw-r--r--tokio/src/executor/loom/std/causal_cell.rs1
-rw-r--r--tokio/src/executor/loom/std/mod.rs20
-rw-r--r--tokio/src/executor/mod.rs8
-rw-r--r--tokio/src/executor/task/core.rs29
-rw-r--r--tokio/src/executor/task/error.rs22
-rw-r--r--tokio/src/executor/task/harness.rs61
-rw-r--r--tokio/src/executor/task/join.rs25
-rw-r--r--tokio/src/executor/task/list.rs9
-rw-r--r--tokio/src/executor/task/mod.rs52
-rw-r--r--tokio/src/executor/task/raw.rs40
-rw-r--r--tokio/src/executor/task/stack.rs9
-rw-r--r--tokio/src/executor/task/tests/task.rs6
-rw-r--r--tokio/src/executor/task/waker.rs6
-rw-r--r--tokio/src/executor/tests/mock_schedule.rs2
-rw-r--r--tokio/src/executor/thread_pool/join.rs42
-rw-r--r--tokio/src/executor/thread_pool/mod.rs6
-rw-r--r--tokio/src/executor/thread_pool/pool.rs3
-rw-r--r--tokio/src/executor/thread_pool/queue/global.rs16
-rw-r--r--tokio/src/executor/thread_pool/set.rs6
-rw-r--r--tokio/src/executor/thread_pool/spawner.rs3
-rw-r--r--tokio/src/executor/thread_pool/tests/queue.rs2
-rw-r--r--tokio/src/runtime/builder.rs382
-rw-r--r--tokio/src/runtime/current_thread/builder.rs91
-rw-r--r--tokio/src/runtime/current_thread/mod.rs67
-rw-r--r--tokio/src/runtime/current_thread/runtime.rs206
-rw-r--r--tokio/src/runtime/mod.rs204
-rw-r--r--tokio/src/runtime/spawner.rs (renamed from tokio/src/runtime/threadpool/spawner.rs)26
-rw-r--r--tokio/src/runtime/threadpool/builder.rs288
-rw-r--r--tokio/src/runtime/threadpool/mod.rs202
-rw-r--r--tokio/src/signal/registry.rs8
-rw-r--r--tokio/src/signal/windows.rs13
-rw-r--r--tokio/tests/clock.rs20
-rw-r--r--tokio/tests/current_thread.rs781
-rw-r--r--tokio/tests/process_issue_42.rs5
-rw-r--r--tokio/tests/rt_current_thread.rs339
-rw-r--r--tokio/tests/rt_thread_pool.rs (renamed from tokio/tests/runtime_threaded.rs)29
-rw-r--r--tokio/tests/runtime_current_thread.rs137
-rw-r--r--tokio/tests/signal_drop_rt.rs13
-rw-r--r--tokio/tests/signal_multi_rt.rs11
-rw-r--r--tokio/tests/timer_hammer.rs13
-rw-r--r--tokio/tests/timer_rt.rs7
49 files changed, 1523 insertions, 3585 deletions
diff --git a/tokio-macros/src/lib.rs b/tokio-macros/src/lib.rs
index 00c0cc34..1cfc14c3 100644
--- a/tokio-macros/src/lib.rs
+++ b/tokio-macros/src/lib.rs
@@ -98,7 +98,7 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
}
let result = match runtime {
- RuntimeType::Multi => quote! {
+ RuntimeType::Multi | RuntimeType::Auto => quote! {
#(#attrs)*
fn #name(#inputs) #ret {
tokio::runtime::Runtime::new().unwrap().block_on(async { #body })
@@ -107,14 +107,11 @@ pub fn main(args: TokenStream, item: TokenStream) -> TokenStream {
RuntimeType::Single => quote! {
#(#attrs)*
fn #name(#inputs) #ret {
- tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body })
- }
- },
- RuntimeType::Auto => quote! {
- #(#attrs)*
- fn #name() #ret {
- let mut rt = tokio::runtime::__main::Runtime::new().unwrap();
- rt.block_on(async { #body })
+ tokio::runtime::Builder::new()
+ .current_thread()
+ .build()
+ .unwrap()
+ .block_on(async { #body })
}
},
};
@@ -211,7 +208,11 @@ pub fn test(args: TokenStream, item: TokenStream) -> TokenStream {
#[test]
#(#attrs)*
fn #name() #ret {
- tokio::runtime::current_thread::Runtime::new().unwrap().block_on(async { #body })
+ tokio::runtime::Builder::new()
+ .current_thread()
+ .build()
+ .unwrap()
+ .block_on(async { #body })
}
},
};
diff --git a/tokio-test/src/lib.rs b/tokio-test/src/lib.rs
index 58499bbb..749112d8 100644
--- a/tokio-test/src/lib.rs
+++ b/tokio-test/src/lib.rs
@@ -26,15 +26,9 @@ pub mod task;
///
/// [runtime-block-on]: https://docs.rs/tokio/0.2.0-alpha.2/tokio/runtime/current_thread/struct.Runtime.html#method.block_on
pub fn block_on<F: std::future::Future>(future: F) -> F::Output {
- let mut rt = tokio::runtime::current_thread::Runtime::new().unwrap();
- rt.block_on(future)
-}
+ use tokio::runtime;
-/*
-#[doc(hidden)]
-pub mod codegen {
- pub mod futures {
- pub use futures::*;
- }
+ let mut rt = runtime::Builder::new().current_thread().build().unwrap();
+
+ rt.block_on(future)
}
-*/
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 48b27a39..b01d16af 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -47,7 +47,6 @@ net-full = ["tcp", "udp", "uds"]
net-driver = ["io-traits", "mio", "blocking", "lazy_static"]
rt-current-thread = [
"executor-core",
- "crossbeam-channel",
"timer",
"sync",
"net-driver",
@@ -95,7 +94,6 @@ futures-util-preview = { version = "=0.3.0-alpha.19", features = ["sink", "chann
# Everything else is optional...
bytes = { version = "0.4", optional = true }
-crossbeam-channel = { version = "0.3.8", optional = true }
fnv = { version = "1.0.6", optional = true }
iovec = { version = "0.1", optional = true }
lazy_static = { version = "1.0.2", optional = true }
diff --git a/tokio/src/executor/blocking/mod.rs b/tokio/src/executor/blocking/mod.rs
index 2ad573d8..92dc1c36 100644
--- a/tokio/src/executor/blocking/mod.rs
+++ b/tokio/src/executor/blocking/mod.rs
@@ -221,6 +221,7 @@ impl Pool {
}
}
+#[derive(Debug)]
pub(crate) struct PoolWaiter(Arc<Pool>);
impl From<Pool> for PoolWaiter {
diff --git a/tokio/src/executor/current_thread/mod.rs b/tokio/src/executor/current_thread/mod.rs
index 62619f2a..8ee4da82 100644
--- a/tokio/src/executor/current_thread/mod.rs
+++ b/tokio/src/executor/current_thread/mod.rs
@@ -1,873 +1,380 @@
-//! A single-threaded executor which executes tasks on the same thread from which
-//! they are spawned.
-//!
-//! [`CurrentThread`] is the main type of this crate. It executes tasks on the
-//! current thread. The easiest way to start a new [`CurrentThread`] executor
-//! is to call [`block_on_all`] with an initial task to seed the executor. All
-//! tasks that are being managed by a [`CurrentThread`] executor are able to
-//! spawn additional tasks by calling [`spawn`].
-//!
-//! Application authors will not use this crate directly. Instead, they will use
-//! the `tokio` crate. Library authors should only depend on
-//! `tokio-current-thread` if they are building a custom task executor.
-//!
-//! [`CurrentThread`]: struct.CurrentThread.html
-//! [`spawn`]: fn.spawn.html
-//! [`block_on_all`]: fn.block_on_all.html
-
-mod scheduler;
-use self::scheduler::{Scheduler, TickArgs};
-
-#[cfg(feature = "blocking")]
-use crate::executor::blocking::{Pool, PoolWaiter};
-use crate::executor::park::{Park, ParkThread, Unpark};
-use crate::executor::{EnterError, Executor, SpawnError, TypedExecutor};
-
-use std::cell::Cell;
-use std::error::Error;
+use crate::executor::park::{Park, Unpark};
+use crate::executor::task::{self, JoinHandle, Schedule, Task};
+use crate::executor::Executor;
+
+use std::cell::UnsafeCell;
+use std::collections::VecDeque;
use std::fmt;
use std::future::Future;
-use std::pin::Pin;
-use std::rc::Rc;
-use std::sync::{atomic, Arc};
-use std::task::{Context, Poll, Waker};
-use std::thread;
-use std::time::{Duration, Instant};
+use std::mem::ManuallyDrop;
+use std::sync::{Arc, Mutex};
+use std::task::{RawWaker, RawWakerVTable, Waker};
+use std::time::Duration;
/// Executes tasks on the current thread
-pub struct CurrentThread<P: Park = ParkThread> {
- /// Execute futures and receive unpark notifications.
- scheduler: Scheduler<P::Unpark>,
-
- /// Current number of futures being executed.
- ///
- /// The LSB is used to indicate that the runtime is preparing to shut down.
- /// Thus, to get the actual number of pending futures, `>>1`.
- num_futures: Arc<atomic::AtomicUsize>,
-
- /// Thread park handle
- park: P,
-
- /// Handle for spawning new futures from other threads
- spawn_handle: Handle,
-
- /// Receiver for futures spawned from other threads
- spawn_receiver: crossbeam_channel::Receiver<Pin<Box<dyn Future<Output = ()> + Send + 'static>>>,
-
- /// Handle to pool for handling blocking tasks
- #[cfg(feature = "blocking")]
- blocking: PoolWaiter,
-
- /// The thread-local ID assigned to this executor.
- id: u64,
-}
-
-/// Executes futures on the current thread.
-///
-/// All futures executed using this executor will be executed on the current
-/// thread. As such, `run` will wait for these futures to complete before
-/// returning.
-///
-/// For more details, see the [module level](index.html) documentation.
-#[derive(Debug, Clone)]
-pub struct TaskExecutor {
- // Prevent the handle from moving across threads.
- _p: ::std::marker::PhantomData<Rc<()>>,
-}
-
-/// Returned by the `turn` function.
#[derive(Debug)]
-pub struct Turn {
- polled: bool,
-}
+pub(crate) struct CurrentThread<P>
+where
+ P: Park,
+{
+ /// Scheduler component
+ scheduler: Arc<Scheduler>,
-impl Turn {
- /// `true` if any futures were polled at all and `false` otherwise.
- pub fn has_polled(&self) -> bool {
- self.polled
- }
+ /// Local state
+ local: Local<P>,
}
-/// A `CurrentThread` instance bound to a supplied execution context.
-pub struct Entered<'a, P: Park> {
- executor: &'a mut CurrentThread<P>,
+#[derive(Debug, Clone)]
+pub(crate) struct Spawner {
+ scheduler: Arc<Scheduler>,
}
-/// Error returned by the `run` function.
-#[derive(Debug)]
-pub struct RunError {
- _p: (),
-}
+/// The scheduler component.
+pub(super) struct Scheduler {
+ /// List of all active tasks spawned onto this executor.
+ ///
+ /// # Safety
+ ///
+ /// Must only be accessed from the primary thread
+ owned_tasks: UnsafeCell<task::OwnedList<Self>>,
-impl fmt::Display for RunError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "Run error")
- }
-}
+ /// Local run queue.
+ ///
+ /// Tasks notified from the current thread are pushed into this queue.
+ ///
+ /// # Safety
+ ///
+ /// References should not be handed out. Only call `push` / `pop` functions.
+ /// Only call from the owning thread.
+ local_queue: UnsafeCell<VecDeque<Task<Scheduler>>>,
-impl Error for RunError {}
+ /// Remote run queue.
+ ///
+ /// Tasks notified from another thread are pushed into this queue.
+ remote_queue: Mutex<RemoteQueue>,
-/// Error returned by the `run_timeout` function.
-#[derive(Debug)]
-pub struct RunTimeoutError {
- timeout: bool,
-}
+ /// Tasks pending drop
+ pending_drop: task::TransferStack<Self>,
-impl fmt::Display for RunTimeoutError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- let descr = if self.timeout {
- "Run timeout error (timeout)"
- } else {
- "Run timeout error (not timeout)"
- };
- write!(fmt, "{}", descr)
- }
+ /// Unpark the blocked thread
+ unpark: Box<dyn Unpark>,
}
-impl Error for RunTimeoutError {}
+unsafe impl Send for Scheduler {}
+unsafe impl Sync for Scheduler {}
-/// Error returned by the `turn` function.
+/// Local state
#[derive(Debug)]
-pub struct TurnError {
- _p: (),
-}
+struct Local<P> {
+ /// Current tick
+ tick: u8,
-impl fmt::Display for TurnError {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "Turn error")
- }
+ /// Thread park handle
+ park: P,
}
-impl Error for TurnError {}
-
-/// Error returned by the `block_on` function.
#[derive(Debug)]
-pub struct BlockError<T> {
- inner: Option<T>,
-}
+struct RemoteQueue {
+ /// FIFO list of tasks
+ queue: VecDeque<Task<Scheduler>>,
-impl<T> fmt::Display for BlockError<T> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- write!(fmt, "Block error")
- }
+ /// `true` when a task can be pushed into the queue, false otherwise.
+ open: bool,
}
-impl<T: fmt::Debug> Error for BlockError<T> {}
+/// Max number of tasks to poll per tick.
+const MAX_TASKS_PER_TICK: usize = 61;
-/// This is mostly split out to make the borrow checker happy.
-struct Borrow<'a, U> {
- spawner: BorrowSpawner<'a, U>,
- #[cfg(feature = "blocking")]
- blocking: &'a PoolWaiter,
-}
-
-/// As is this.
-struct BorrowSpawner<'a, U> {
- id: u64,
- num_futures: &'a atomic::AtomicUsize,
- scheduler: &'a mut Scheduler<U>,
-}
-
-trait SpawnLocal {
- fn spawn_local(&mut self, future: Pin<Box<dyn Future<Output = ()>>>, already_counted: bool);
-}
-
-struct CurrentRunner {
- spawn: Cell<Option<*mut dyn SpawnLocal>>,
- id: Cell<Option<u64>>,
-}
+/// How often to check the remote queue first
+const CHECK_REMOTE_INTERVAL: u8 = 13;
-thread_local! {
- /// Current thread's task runner. This is set in `TaskRunner::with`
- static CURRENT: CurrentRunner = CurrentRunner {
- spawn: Cell::new(None),
- id: Cell::new(None),
- }
-}
-
-thread_local! {
- /// Unique ID to assign to each new executor launched on this thread.
- ///
- /// The unique ID is used to determine if the currently running executor matches the one
- /// referred to by a `Handle` so that direct task dispatch can be used.
- static EXECUTOR_ID: Cell<u64> = Cell::new(0)
-}
-
-/// Run the executor bootstrapping the execution with the provided future.
-///
-/// This creates a new [`CurrentThread`] executor, spawns the provided future,
-/// and blocks the current thread until the provided future and **all**
-/// subsequently spawned futures complete. In other words:
-///
-/// * If the provided bootstrap future does **not** spawn any additional tasks,
-/// `block_on_all` returns once `future` completes.
-/// * If the provided bootstrap future **does** spawn additional tasks, then
-/// `block_on_all` returns once **all** spawned futures complete.
-///
-/// See [module level][mod] documentation for more details.
-///
-/// [`CurrentThread`]: struct.CurrentThread.html
-/// [mod]: index.html
-pub fn block_on_all<F>(future: F) -> F::Output
+impl<P> CurrentThread<P>
where
- F: Future,
+ P: Park,
{
- let mut current_thread = CurrentThread::new();
-
- let ret = current_thread.block_on(future);
- current_thread.run().unwrap();
- ret
-}
-
-/// Executes a future on the current thread.
-///
-/// The provided future must complete or be canceled before `run` will return.
-///
-/// Unlike [`tokio::spawn`], this function will always spawn on a
-/// `CurrentThread` executor and is able to spawn futures that are not `Send`.
-///
-/// # Panics
-///
-/// This function can only be invoked from the context of a `run` call; any
-/// other use will result in a panic.
-///
-/// [`tokio::spawn`]: ../fn.spawn.html
-pub fn spawn<F>(future: F)
-where
- F: Future<Output = ()> + 'static,
-{
- TaskExecutor::current()
- .spawn_local(Box::pin(future))
- .unwrap();
-}
-
-// ===== impl CurrentThread =====
-
-impl CurrentThread<ParkThread> {
- /// Create a new instance of `CurrentThread`.
- pub fn new() -> Self {
- CurrentThread::new_with_park(ParkThread::new())
- }
-}
-
-impl<P: Park> CurrentThread<P> {
- /// Create a new instance of `CurrentThread` backed by the given park
- /// handle.
- pub fn new_with_park(park: P) -> Self {
+ pub(crate) fn new(park: P) -> CurrentThread<P> {
let unpark = park.unpark();
- let (spawn_sender, spawn_receiver) = crossbeam_channel::unbounded();
- let thread = thread::current().id();
- let id = EXECUTOR_ID.with(|idc| {
- let id = idc.get();
- idc.set(id + 1);
- id
- });
-
- let scheduler = Scheduler::new(unpark);
- let waker = scheduler.waker();
-
- let num_futures = Arc::new(atomic::AtomicUsize::new(0));
-
CurrentThread {
- scheduler,
- num_futures: num_futures.clone(),
- park,
- id,
- spawn_handle: Handle {
- sender: spawn_sender,
- num_futures,
- waker,
- thread,
- id,
- },
- spawn_receiver,
-
- #[cfg(feature = "blocking")]
- blocking: PoolWaiter::from(Pool::default()),
+ scheduler: Arc::new(Scheduler {
+ owned_tasks: UnsafeCell::new(task::OwnedList::new()),
+ local_queue: UnsafeCell::new(VecDeque::with_capacity(64)),
+ remote_queue: Mutex::new(RemoteQueue {
+ queue: VecDeque::with_capacity(64),
+ open: true,
+ }),
+ pending_drop: task::TransferStack::new(),
+ unpark: Box::new(unpark),
+ }),
+ local: Local { tick: 0, park },
}
}
- /// Returns `true` if the executor is currently idle.
- ///
- /// An idle executor is defined by not currently having any spawned tasks.
- ///
- /// Note that this method is inherently racy -- if a future is spawned from a remote `Handle`,
- /// this method may return `true` even though there are more futures to be executed.
- pub fn is_idle(&self) -> bool {
- self.num_futures.load(atomic::Ordering::SeqCst) <= 1
+ pub(crate) fn spawner(&self) -> Spawner {
+ Spawner {
+ scheduler: self.scheduler.clone(),
+ }
}
- /// Spawn the future on the executor.
- ///
- /// This internally queues the future to be executed once `run` is called.
- pub fn spawn<F>(&mut self, future: F) -> &mut Self
+ /// Spawn a future onto the thread pool
+ pub(crate) fn spawn<F>(&self, future: F) -> JoinHandle<F::Output>
where
- F: Future<Output = ()> + 'static,
+ F: Future + Send + 'static,
+ F::Output: Send + 'static,
{
- self.borrow().spawner.spawn_local(Box::pin(future), false);
- self
+ let (task, handle) = task::joinable(future);
+ self.scheduler.schedule(task);
+ handle
}
- /// Synchronously waits for the provided `future` to complete.
- ///
- /// This function can be used to synchronously block the current thread
- /// until the provided `future` has resolved either successfully or with an
- /// error. The result of the future is then returned from this function
- /// call.
- ///
- /// Note that this function will **also** execute any spawned futures on the
- /// current thread, but will **not** block until these other spawned futures
- /// have completed.
- ///
- /// The caller is responsible for ensuring that other spawned futures
- /// complete execution.
- pub fn block_on<F>(&mut self, future: F) -> F::Output
+ pub(crate) fn block_on<F>(&mut self, mut future: F) -> F::Output
where
F: Future,
{
- let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
- self.enter().block_on(future)
- }
-
- /// Run the executor to completion, blocking the thread until **all**
- /// spawned futures have completed.
- pub fn run(&mut self) -> Result<(), RunError> {
- let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
- self.enter().run()
- }
-
- /// Run the executor to completion, blocking the thread until all
- /// spawned futures have completed **or** `duration` time has elapsed.
- pub fn run_timeout(&mut self, duration: Duration) -> Result<(), RunTimeoutError> {
- let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
- self.enter().run_timeout(duration)
- }
-
- /// Perform a single iteration of the event loop.
- ///
- /// This function blocks the current thread even if the executor is idle.
- pub fn turn(&mut self, duration: Option<Duration>) -> Result<Turn, TurnError> {
- let _enter = crate::executor::enter().expect("failed to start `current_thread::Runtime`");
- self.enter().turn(duration)
- }
-
- /// Bind `CurrentThread` instance with an execution context.
- fn enter(&mut self) -> Entered<'_, P> {
- Entered { executor: self }
- }
+ use std::pin::Pin;
+ use std::task::Context;
+ use std::task::Poll::Ready;
- /// Returns a reference to the underlying `Park` instance.
- pub fn get_park(&self) -> &P {
- &self.park
- }
+ let local = &mut self.local;
+ let scheduler = &*self.scheduler;
- /// Returns a mutable reference to the underlying `Park` instance.
- pub fn get_park_mut(&mut self) -> &mut P {
- &mut self.park
- }
+ crate::executor::global::with_current_thread(scheduler, || {
+ let mut _enter =
+ crate::executor::enter().expect("attempting to block while on a Tokio executor");
- fn borrow(&mut self) -> Borrow<'_, P::Unpark> {
- Borrow {
- spawner: BorrowSpawner {
- id: self.id,
- scheduler: &mut self.scheduler,
- num_futures: &*self.num_futures,
- },
- #[cfg(feature = "blocking")]
- blocking: &self.blocking,
- }
- }
+ let raw_waker = RawWaker::new(
+ scheduler as *const Scheduler as *const (),
+ &RawWakerVTable::new(sched_clone_waker, sched_noop, sched_wake_by_ref, sched_noop),
+ );
- /// Get a new handle to spawn futures on the executor
- ///
- /// Different to the executor itself, the handle can be sent to different
- /// threads and can be used to spawn futures on the executor.
- pub fn handle(&self) -> Handle {
- self.spawn_handle.clone()
- }
-}
+ let waker = ManuallyDrop::new(unsafe { Waker::from_raw(raw_waker) });
+ let mut cx = Context::from_waker(&waker);
-impl<P: Park> Drop for CurrentThread<P> {
- fn drop(&mut self) {
- // Signal to Handles that no more futures can be spawned by setting LSB.
- //
- // NOTE: this isn't technically necessary since the send on the mpsc will fail once the
- // receiver is dropped, but it's useful to illustrate how clean shutdown will be
- // implemented (e.g., by setting the LSB).
- let pending = self.num_futures.fetch_add(1, atomic::Ordering::SeqCst);
-
- // TODO: We currently ignore any pending futures at the time we shut down.
- //
- // The "proper" fix for this is to have an explicit shutdown phase (`shutdown_on_idle`)
- // which sets LSB (as above) do make Handle::spawn stop working, and then runs until
- // num_futures.load() == 1.
- let _ = pending;
-
- // We will wait for any blocking ops by virtue of dropping `blocking`.
- }
-}
+ // `block_on` takes ownership of `f`. Once it is pinned here, the
+ // original `f` binding can no longer be accessed, making the
+ // pinning safe.
+ let mut future = unsafe { Pin::new_unchecked(&mut future) };
-impl Executor for CurrentThread {
- fn spawn(
- &mut self,
- future: Pin<Box<dyn Future<Output = ()> + Send>>,
- ) -> Result<(), SpawnError> {
- self.borrow().spawner.spawn_local(future, false);
- Ok(())
- }
-}
+ loop {
+ if let Ready(v) = future.as_mut().poll(&mut cx) {
+ return v;
+ }
-impl<T> TypedExecutor<T> for CurrentThread
-where
- T: Future<Output = ()> + 'static,
-{
- fn spawn(&mut self, future: T) -> Result<(), SpawnError> {
- self.borrow().spawner.spawn_local(Box::pin(future), false);
- Ok(())
- }
-}
+ scheduler.tick(local);
-impl<P: Park> fmt::Debug for CurrentThread<P> {
- fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
- fmt.debug_struct("CurrentThread")
- .field("scheduler", &self.scheduler)
- .field(
- "num_futures",
- &self.num_futures.load(atomic::Ordering::SeqCst),
- )
- .finish()
- }
-}
-
-impl<P: Park + Default> Default for CurrentThread<P> {
- fn default() -> Self {
- CurrentThread::new_with_park(P::default())
+ // Maintenance work
+ scheduler.drain_pending_drop();
+ }