diff options
-rw-r--r-- | tokio-executor/Cargo.toml | 4 | ||||
-rw-r--r-- | tokio-executor/src/lib.rs | 3 | ||||
-rw-r--r-- | tokio-executor/src/threadpool/builder.rs | 3 | ||||
-rw-r--r-- | tokio-executor/src/threadpool/park/boxed.rs | 14 | ||||
-rw-r--r-- | tokio-executor/src/threadpool/pool/mod.rs | 26 | ||||
-rw-r--r-- | tokio-executor/src/threadpool/sender.rs | 3 | ||||
-rw-r--r-- | tokio-executor/src/threadpool/task/mod.rs | 16 | ||||
-rw-r--r-- | tokio-executor/src/threadpool/worker/mod.rs | 25 | ||||
-rw-r--r-- | tokio-executor/src/tracing.rs | 82 | ||||
-rw-r--r-- | tokio/Cargo.toml | 4 |
10 files changed, 138 insertions, 42 deletions
diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml index ba231013..7b579f32 100644 --- a/tokio-executor/Cargo.toml +++ b/tokio-executor/Cargo.toml @@ -30,7 +30,6 @@ threadpool = [ "futures-core-preview", "futures-util-preview", "num_cpus", - "log", "lazy_static", "slab", ] @@ -38,6 +37,8 @@ threadpool = [ [dependencies] tokio-sync = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-sync" } +tracing = { version = "0.1.5", optional = true } + # current-thread dependencies crossbeam-channel = { version = "0.3.8", optional = true } @@ -48,7 +49,6 @@ crossbeam-utils = { version = "0.6.4", optional = true } futures-core-preview = { version = "=0.3.0-alpha.18", optional = true } futures-util-preview = { version = "=0.3.0-alpha.18", optional = true } num_cpus = { version = "1.2", optional = true } -log = { version = "0.4", optional = true } lazy_static = { version = "1", optional = true } slab = { version = "0.4.1", optional = true } diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs index 22093732..e867af4d 100644 --- a/tokio-executor/src/lib.rs +++ b/tokio-executor/src/lib.rs @@ -56,6 +56,9 @@ //! [`DefaultExecutor`]: struct.DefaultExecutor.html //! [`Park`]: park/index.html //! [`Future::poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll +#[cfg(any(feature = "current-thread", feature = "threadpool"))] +#[macro_use] +mod tracing; mod enter; mod error; diff --git a/tokio-executor/src/threadpool/builder.rs b/tokio-executor/src/threadpool/builder.rs index c7a82a2d..7a7081e3 100644 --- a/tokio-executor/src/threadpool/builder.rs +++ b/tokio-executor/src/threadpool/builder.rs @@ -8,7 +8,6 @@ use super::worker::{self, Worker, WorkerId}; use crate::park::Park; use crossbeam_deque::Injector; -use log::trace; use num_cpus; use std::any::Any; use std::cmp::max; @@ -374,7 +373,7 @@ impl Builder { /// .build(); /// ``` pub fn build(&self) -> ThreadPool { - trace!("build; num-workers={}", self.pool_size); + trace!(message = "build;", num_workers = self.pool_size); // Create the worker entry list let workers: Arc<[worker::Entry]> = { diff --git a/tokio-executor/src/threadpool/park/boxed.rs b/tokio-executor/src/threadpool/park/boxed.rs index dae1820d..14dbc363 100644 --- a/tokio-executor/src/threadpool/park/boxed.rs +++ b/tokio-executor/src/threadpool/park/boxed.rs @@ -1,6 +1,5 @@ use crate::park::{Park, Unpark}; -use log::warn; use std::error::Error; use std::time::Duration; @@ -27,19 +26,20 @@ where } fn park(&mut self) -> Result<(), Self::Error> { - self.0.park().map_err(|e| { + self.0.park().map_err(|_e| { + // if tracing is disabled, the compiler will flag this as unused. warn!( - "calling `park` on worker thread errored -- shutting down thread: {}", - e + message = "calling `park` on worker thread errored -- shutting down thread", + error = %_e ); }) } fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> { - self.0.park_timeout(duration).map_err(|e| { + self.0.park_timeout(duration).map_err(|_e| { warn!( - "calling `park` on worker thread errored -- shutting down thread: {}", - e + message = "calling `park` on worker thread errored -- shutting down thread", + error = %_e, ); }) } diff --git a/tokio-executor/src/threadpool/pool/mod.rs b/tokio-executor/src/threadpool/pool/mod.rs index e3601f1c..f5c7c37c 100644 --- a/tokio-executor/src/threadpool/pool/mod.rs +++ b/tokio-executor/src/threadpool/pool/mod.rs @@ -17,7 +17,6 @@ use super::BlockingError; use crossbeam_deque::Injector; use crossbeam_utils::CachePadded; use lazy_static::lazy_static; -use log::{debug, error, trace}; use std::cell::Cell; use std::collections::hash_map::RandomState; use std::hash::{BuildHasher, Hash, Hasher}; @@ -133,10 +132,10 @@ impl Pool { /// Start shutting down the pool. This means that no new futures will be /// accepted. + #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))] pub(crate) fn shutdown(&self, now: bool, purge_queue: bool) { let mut state: State = self.state.load(Acquire).into(); - - trace!("shutdown; state={:?}", state); + trace!(?state); // For now, this must be true debug_assert!(!purge_queue || now); @@ -184,7 +183,7 @@ impl Pool { state = actual; } - trace!(" -> transitioned to shutdown"); + trace!("transitioned to shutdown"); // Only transition to terminate if there are no futures currently on the // pool @@ -205,7 +204,7 @@ impl Pool { pub(crate) fn terminate_sleeping_workers(&self) { use super::worker::Lifecycle::Signaled; - trace!(" -> shutting down workers"); + trace!("shutting down workers"); // Wakeup all sleeping workers. They will wake up, see the state // transition, and terminate. while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) { @@ -249,7 +248,7 @@ impl Pool { if !worker.is_blocking() && *self == *worker.pool { let idx = worker.id.0; - trace!(" -> submit internal; idx={}", idx); + trace!(message = "submit internal;", idx); worker.pool.workers[idx].submit_internal(task); worker.pool.signal_work(pool); @@ -268,7 +267,7 @@ impl Pool { pub(crate) fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) { debug_assert_eq!(*self, **pool); - trace!(" -> submit external"); + trace!("submit external"); self.queue.push(task); self.signal_work(pool); @@ -388,9 +387,9 @@ impl Pool { } }); - if let Err(e) = res { - error!("failed to spawn worker thread; err={:?}", e); - panic!("failed to spawn worker thread: {:?}", e); + if let Err(err) = res { + error!(message = "failed to spawn worker thread;", ?err); + panic!("failed to spawn worker thread: {:?}", err); } } @@ -402,6 +401,9 @@ impl Pool { use super::worker::Lifecycle::Signaled; if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) { + let span = trace_span!("signal_work", idx); + let _enter = span.enter(); + let entry = &self.workers[idx]; debug_assert!( @@ -410,10 +412,10 @@ impl Pool { worker_state.lifecycle(), ); - trace!("signal_work -- notify; idx={}", idx); + trace!("notify"); if !entry.notify(worker_state) { - trace!("signal_work -- spawn; idx={}", idx); + trace!("spawn;"); self.spawn_thread(WorkerId(idx), pool); } } diff --git a/tokio-executor/src/threadpool/sender.rs b/tokio-executor/src/threadpool/sender.rs index c090dd7d..fb898f2f 100644 --- a/tokio-executor/src/threadpool/sender.rs +++ b/tokio-executor/src/threadpool/sender.rs @@ -3,7 +3,6 @@ use super::task::Task; use crate::{Executor, SpawnError, TypedExecutor}; -use log::trace; use std::future::Future; use std::pin::Pin; use std::sync::atomic::Ordering::{AcqRel, Acquire}; @@ -110,7 +109,7 @@ impl Sender { .into(); if actual == state { - trace!("execute; count={:?}", next.num_futures()); + trace!(message = "execute;", count = next.num_futures()); break; } diff --git a/tokio-executor/src/threadpool/task/mod.rs b/tokio-executor/src/threadpool/task/mod.rs index b4dc4339..06343417 100644 --- a/tokio-executor/src/threadpool/task/mod.rs +++ b/tokio-executor/src/threadpool/task/mod.rs @@ -9,11 +9,10 @@ use super::pool::Pool; use super::waker::Waker; use futures_util::task; -use log::trace; use std::cell::{Cell, UnsafeCell}; use std::future::Future; use std::pin::Pin; -use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release}; +use std::sync::atomic::Ordering::{AcqRel, Acquire, Release}; use std::sync::atomic::{AtomicPtr, AtomicUsize}; use std::sync::Arc; use std::task::{Context, Poll}; @@ -95,8 +94,13 @@ impl Task { /// Execute the task returning `Run::Schedule` if the task needs to be /// scheduled again. + /// + // tracing macro expansion adds enough branches to make clippy angry here. + #[allow(clippy::cognitive_complexity)] pub(crate) fn run(me: &Arc<Task>, pool: &Arc<Pool>) -> Run { use self::State::*; + #[cfg(feature = "tracing")] + use std::sync::atomic::Ordering::Relaxed; // Transition task to running state. At this point, the task must be // scheduled. @@ -109,8 +113,10 @@ impl Task { Scheduled => {} _ => panic!("unexpected task state; {:?}", actual), } + let span = trace_span!("Task::run"); + let _enter = span.enter(); - trace!("Task::run; state={:?}", State::from(me.state.load(Relaxed))); + trace!(state = ?State::from(me.state.load(Relaxed))); // The transition to `Running` done above ensures that a lock on the // future has been obtained. @@ -151,7 +157,7 @@ impl Task { match res { Ok(Poll::Ready(_)) | Err(_) => { - trace!(" -> task complete"); + trace!("task complete"); // The future has completed. Drop it immediately to free // resources and run drop handlers. @@ -172,7 +178,7 @@ impl Task { Run::Complete } Ok(Poll::Pending) => { - trace!(" -> not ready"); + trace!("not ready"); // Attempt to transition from Running -> Idle, if successful, // then the task does not need to be scheduled again. If the CAS diff --git a/tokio-executor/src/threadpool/worker/mod.rs b/tokio-executor/src/threadpool/worker/mod.rs index 0ffb5ef5..63f170c6 100644 --- a/tokio-executor/src/threadpool/worker/mod.rs +++ b/tokio-executor/src/threadpool/worker/mod.rs @@ -12,7 +12,6 @@ use super::shutdown::ShutdownTrigger; use super::task::{self, CanBlock, Task}; use super::BlockingError; -use log::trace; use std::cell::Cell; use std::marker::PhantomData; use std::ptr; @@ -410,9 +409,9 @@ impl Worker { self.run_task(task, pool); trace!( - "try_steal_task -- signal_work; self={}; from={}", - self.id.0, - idx + message = "try_steal_task -- signal_work;", + self = self.id.0, + from = idx, ); // Signal other workers that work is available @@ -487,7 +486,7 @@ impl Worker { .into(); if actual == state { - trace!("task complete; state={:?}", next); + trace!(message = "task complete;", state = ?next); if state.num_futures() == 1 { // If the thread pool has been flagged as shutdown, @@ -564,14 +563,16 @@ impl Worker { /// Put the worker to sleep /// /// Returns `true` if woken up due to new work arriving. + // tracing macro expansion adds enough branches to make clippy angry here. + #[cfg_attr(feature = "tracing", allow(clippy::cognitive_complexity))] fn sleep(&self) -> bool { use self::Lifecycle::*; // Putting a worker to sleep is a multipart operation. This is, in part, // due to the fact that a worker can be notified without it being popped // from the sleep stack. Extra care is needed to deal with this. - - trace!("Worker::sleep; worker={:?}", self.id); + let span = trace_span!("Worker::sleep", idx = self.id.0, id = ?self.id); + let _e = span.enter(); let mut state: State = self.entry().state.load(Acquire).into(); @@ -617,12 +618,12 @@ impl Worker { if !state.is_pushed() { debug_assert!(next.is_pushed()); - trace!(" sleeping -- push to stack; idx={}", self.id.0); + trace!("push to stack"); // We obtained permission to push the worker into the // sleeper queue. if self.pool.push_sleeper(self.id.0).is_err() { - trace!(" sleeping -- push to stack failed; idx={}", self.id.0); + trace!("push to stack failed"); // The push failed due to the pool being terminated. // // This is true because the "work" being woken up for is @@ -637,7 +638,7 @@ impl Worker { state = actual; } - trace!(" -> starting to sleep; idx={}", self.id.0); + trace!("starting to sleep"); // Do a quick check to see if there are any notifications in the // reactor or new tasks in the global queue. Since this call will @@ -686,7 +687,7 @@ impl Worker { self.entry().park(); - trace!(" -> wakeup; idx={}", self.id.0); + trace!("wakeup"); } } @@ -719,7 +720,7 @@ impl Worker { impl Drop for Worker { fn drop(&mut self) { - trace!("shutting down thread; idx={}", self.id.0); + trace!(message = "shutting down thread", idx = self.id.0); if self.should_finalize.get() { // Drain the work queue diff --git a/tokio-executor/src/tracing.rs b/tokio-executor/src/tracing.rs new file mode 100644 index 00000000..4d36049b --- /dev/null +++ b/tokio-executor/src/tracing.rs @@ -0,0 +1,82 @@ +//! This module provides a small facade that wraps the `tracing` APIs we use, so +//! that when the `tracing` dependency is disabled, `tracing`'s macros expand to +//! no-ops. +//! +//! This means we don't have to put a `#[cfg(feature = "tracing")]` on every +//! individual use of a `tracing` macro. +#[cfg(not(feature = "tracing"))] +#[derive(Clone, Debug)] +pub(crate) struct Span {} + +#[cfg(feature = "tracing")] +macro_rules! trace { + ($($arg:tt)+) => { + tracing::trace!($($arg)+) + }; +} + +#[cfg(not(feature = "tracing"))] +macro_rules! trace { + ($($arg:tt)+) => {}; +} + +#[cfg(feature = "tracing")] +macro_rules! debug { + ($($arg:tt)+) => { + tracing::debug!($($arg)+) + }; +} + +#[cfg(not(feature = "tracing"))] +macro_rules! debug { + ($($arg:tt)+) => {}; +} + +#[cfg(feature = "tracing")] +macro_rules! warn { + ($($arg:tt)+) => { + tracing::warn!($($arg)+) + }; +} + +#[cfg(not(feature = "tracing"))] +macro_rules! warn { + ($($arg:tt)+) => {}; +} + +#[cfg(feature = "tracing")] +macro_rules! error { + ($($arg:tt)+) => { + tracing::error!($($arg)+) + }; +} + +#[cfg(not(feature = "tracing"))] +macro_rules! error { + ($($arg:tt)+) => {}; +} + +#[cfg(feature = "tracing")] +macro_rules! trace_span { + ($($arg:tt)+) => { + tracing::trace_span!($($arg)+) + }; +} + +#[cfg(not(feature = "tracing"))] +macro_rules! trace_span { + ($($arg:tt)+) => { + crate::tracing::Span::new() + }; +} + +#[cfg(not(feature = "tracing"))] +impl Span { + pub(crate) fn new() -> Self { + Span {} + } + + pub(crate) fn enter(&self) -> Span { + Span {} + } +} diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml index 75e11724..a9251a0c 100644 --- a/tokio/Cargo.toml +++ b/tokio/Cargo.toml @@ -51,6 +51,7 @@ rt-full = [ sync = ["tokio-sync"] tcp = ["io", "tokio-net/tcp"] timer = ["tokio-timer"] +tracing = ["tracing-core"] udp = ["io", "tokio-net/udp"] uds = ["io", "tokio-net/uds"] @@ -72,6 +73,9 @@ tokio-sync = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-syn tokio-timer = { version = "=0.3.0-alpha.2", optional = true, path = "../tokio-timer", features = ["async-traits"] } tracing-core = { version = "0.1", optional = true } +[target.'cfg(feature = "tracing")'.dependencies] +tokio-executor = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-executor", features = ["tracing"] } + [dev-dependencies] tokio-test = { version = "0.2.0-alpha.1", path = "../tokio-test" } |