summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorEliza Weisman <eliza@buoyant.io>2019-08-20 12:44:26 -0700
committerGitHub <noreply@github.com>2019-08-20 12:44:26 -0700
commit7e7a5147a3f5de83ee540ebaad0928183e61df2c (patch)
tree838bee058074bc160eb7eb7ddd8281b268fb4952
parent2d56312b89ba407272b161290d563551efc896a6 (diff)
executor: switch from `log` to `tracing` (#1454)
## Motivation The `tracing` crate implements scoped, structured, context-aware diagnostics, which can add significant debugging value over unstructured log messages. `tracing` is part of the Tokio project. As part of the `tokio` 0.2 changes, I thought it would be good to move over from `log` to `tracing` in the tokio runtime. Updating the executor crate is an obvious starting point. ## Solution This branch replaces the use of `log` in `tokio-executor` with `tracing`. I've tried to leave all the instrumentation points more or less the same, but modified to use structured fields instead of string interpolation. I've also added a few `tracing` spans, primarily in places where a variable is added to all the log messages in a scope. ## Notes For users who are using the legacy `log` output, there is a feature flag to enable `log` support in `tracing`. I thought about making this on by default, but that would also enable the `tracing` dependency by default, and it is only pulled in when the `threadpool` feature flag is enabled. The `tokio` crate could enable the log feature in its default features instead, since the threadpool feature is on by default in `tokio`. If this isn't the right approach, I can change how `log` back-compatibility is enabled. We might want to consider adding more `tracing` spans in the threadpool later. This could be useful for profiling, and for helping users debug the way their applications interact with the executor. This branch is just intended as a starting point so that we can begin emitting `tracing` data from the executor; we should revisit what instrumentation should be exposed, as well. Signed-off-by: Eliza Weisman <eliza@buoyant.io>
-rw-r--r--tokio-executor/Cargo.toml4
-rw-r--r--tokio-executor/src/lib.rs3
-rw-r--r--tokio-executor/src/threadpool/builder.rs3
-rw-r--r--tokio-executor/src/threadpool/park/boxed.rs14
-rw-r--r--tokio-executor/src/threadpool/pool/mod.rs26
-rw-r--r--tokio-executor/src/threadpool/sender.rs3
-rw-r--r--tokio-executor/src/threadpool/task/mod.rs16
-rw-r--r--tokio-executor/src/threadpool/worker/mod.rs25
-rw-r--r--tokio-executor/src/tracing.rs82
-rw-r--r--tokio/Cargo.toml4
10 files changed, 138 insertions, 42 deletions
diff --git a/tokio-executor/Cargo.toml b/tokio-executor/Cargo.toml
index ba231013..7b579f32 100644
--- a/tokio-executor/Cargo.toml
+++ b/tokio-executor/Cargo.toml
@@ -30,7 +30,6 @@ threadpool = [
"futures-core-preview",
"futures-util-preview",
"num_cpus",
- "log",
"lazy_static",
"slab",
]
@@ -38,6 +37,8 @@ threadpool = [
[dependencies]
tokio-sync = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-sync" }
+tracing = { version = "0.1.5", optional = true }
+
# current-thread dependencies
crossbeam-channel = { version = "0.3.8", optional = true }
@@ -48,7 +49,6 @@ crossbeam-utils = { version = "0.6.4", optional = true }
futures-core-preview = { version = "=0.3.0-alpha.18", optional = true }
futures-util-preview = { version = "=0.3.0-alpha.18", optional = true }
num_cpus = { version = "1.2", optional = true }
-log = { version = "0.4", optional = true }
lazy_static = { version = "1", optional = true }
slab = { version = "0.4.1", optional = true }
diff --git a/tokio-executor/src/lib.rs b/tokio-executor/src/lib.rs
index 22093732..e867af4d 100644
--- a/tokio-executor/src/lib.rs
+++ b/tokio-executor/src/lib.rs
@@ -56,6 +56,9 @@
//! [`DefaultExecutor`]: struct.DefaultExecutor.html
//! [`Park`]: park/index.html
//! [`Future::poll`]: https://doc.rust-lang.org/std/future/trait.Future.html#tymethod.poll
+#[cfg(any(feature = "current-thread", feature = "threadpool"))]
+#[macro_use]
+mod tracing;
mod enter;
mod error;
diff --git a/tokio-executor/src/threadpool/builder.rs b/tokio-executor/src/threadpool/builder.rs
index c7a82a2d..7a7081e3 100644
--- a/tokio-executor/src/threadpool/builder.rs
+++ b/tokio-executor/src/threadpool/builder.rs
@@ -8,7 +8,6 @@ use super::worker::{self, Worker, WorkerId};
use crate::park::Park;
use crossbeam_deque::Injector;
-use log::trace;
use num_cpus;
use std::any::Any;
use std::cmp::max;
@@ -374,7 +373,7 @@ impl Builder {
/// .build();
/// ```
pub fn build(&self) -> ThreadPool {
- trace!("build; num-workers={}", self.pool_size);
+ trace!(message = "build;", num_workers = self.pool_size);
// Create the worker entry list
let workers: Arc<[worker::Entry]> = {
diff --git a/tokio-executor/src/threadpool/park/boxed.rs b/tokio-executor/src/threadpool/park/boxed.rs
index dae1820d..14dbc363 100644
--- a/tokio-executor/src/threadpool/park/boxed.rs
+++ b/tokio-executor/src/threadpool/park/boxed.rs
@@ -1,6 +1,5 @@
use crate::park::{Park, Unpark};
-use log::warn;
use std::error::Error;
use std::time::Duration;
@@ -27,19 +26,20 @@ where
}
fn park(&mut self) -> Result<(), Self::Error> {
- self.0.park().map_err(|e| {
+ self.0.park().map_err(|_e| {
+ // if tracing is disabled, the compiler will flag this as unused.
warn!(
- "calling `park` on worker thread errored -- shutting down thread: {}",
- e
+ message = "calling `park` on worker thread errored -- shutting down thread",
+ error = %_e
);
})
}
fn park_timeout(&mut self, duration: Duration) -> Result<(), Self::Error> {
- self.0.park_timeout(duration).map_err(|e| {
+ self.0.park_timeout(duration).map_err(|_e| {
warn!(
- "calling `park` on worker thread errored -- shutting down thread: {}",
- e
+ message = "calling `park` on worker thread errored -- shutting down thread",
+ error = %_e,
);
})
}
diff --git a/tokio-executor/src/threadpool/pool/mod.rs b/tokio-executor/src/threadpool/pool/mod.rs
index e3601f1c..f5c7c37c 100644
--- a/tokio-executor/src/threadpool/pool/mod.rs
+++ b/tokio-executor/src/threadpool/pool/mod.rs
@@ -17,7 +17,6 @@ use super::BlockingError;
use crossbeam_deque::Injector;
use crossbeam_utils::CachePadded;
use lazy_static::lazy_static;
-use log::{debug, error, trace};
use std::cell::Cell;
use std::collections::hash_map::RandomState;
use std::hash::{BuildHasher, Hash, Hasher};
@@ -133,10 +132,10 @@ impl Pool {
/// Start shutting down the pool. This means that no new futures will be
/// accepted.
+ #[cfg_attr(feature = "tracing", tracing::instrument(level = "trace"))]
pub(crate) fn shutdown(&self, now: bool, purge_queue: bool) {
let mut state: State = self.state.load(Acquire).into();
-
- trace!("shutdown; state={:?}", state);
+ trace!(?state);
// For now, this must be true
debug_assert!(!purge_queue || now);
@@ -184,7 +183,7 @@ impl Pool {
state = actual;
}
- trace!(" -> transitioned to shutdown");
+ trace!("transitioned to shutdown");
// Only transition to terminate if there are no futures currently on the
// pool
@@ -205,7 +204,7 @@ impl Pool {
pub(crate) fn terminate_sleeping_workers(&self) {
use super::worker::Lifecycle::Signaled;
- trace!(" -> shutting down workers");
+ trace!("shutting down workers");
// Wakeup all sleeping workers. They will wake up, see the state
// transition, and terminate.
while let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, true) {
@@ -249,7 +248,7 @@ impl Pool {
if !worker.is_blocking() && *self == *worker.pool {
let idx = worker.id.0;
- trace!(" -> submit internal; idx={}", idx);
+ trace!(message = "submit internal;", idx);
worker.pool.workers[idx].submit_internal(task);
worker.pool.signal_work(pool);
@@ -268,7 +267,7 @@ impl Pool {
pub(crate) fn submit_external(&self, task: Arc<Task>, pool: &Arc<Pool>) {
debug_assert_eq!(*self, **pool);
- trace!(" -> submit external");
+ trace!("submit external");
self.queue.push(task);
self.signal_work(pool);
@@ -388,9 +387,9 @@ impl Pool {
}
});
- if let Err(e) = res {
- error!("failed to spawn worker thread; err={:?}", e);
- panic!("failed to spawn worker thread: {:?}", e);
+ if let Err(err) = res {
+ error!(message = "failed to spawn worker thread;", ?err);
+ panic!("failed to spawn worker thread: {:?}", err);
}
}
@@ -402,6 +401,9 @@ impl Pool {
use super::worker::Lifecycle::Signaled;
if let Some((idx, worker_state)) = self.sleep_stack.pop(&self.workers, Signaled, false) {
+ let span = trace_span!("signal_work", idx);
+ let _enter = span.enter();
+
let entry = &self.workers[idx];
debug_assert!(
@@ -410,10 +412,10 @@ impl Pool {
worker_state.lifecycle(),
);
- trace!("signal_work -- notify; idx={}", idx);
+ trace!("notify");
if !entry.notify(worker_state) {
- trace!("signal_work -- spawn; idx={}", idx);
+ trace!("spawn;");
self.spawn_thread(WorkerId(idx), pool);
}
}
diff --git a/tokio-executor/src/threadpool/sender.rs b/tokio-executor/src/threadpool/sender.rs
index c090dd7d..fb898f2f 100644
--- a/tokio-executor/src/threadpool/sender.rs
+++ b/tokio-executor/src/threadpool/sender.rs
@@ -3,7 +3,6 @@ use super::task::Task;
use crate::{Executor, SpawnError, TypedExecutor};
-use log::trace;
use std::future::Future;
use std::pin::Pin;
use std::sync::atomic::Ordering::{AcqRel, Acquire};
@@ -110,7 +109,7 @@ impl Sender {
.into();
if actual == state {
- trace!("execute; count={:?}", next.num_futures());
+ trace!(message = "execute;", count = next.num_futures());
break;
}
diff --git a/tokio-executor/src/threadpool/task/mod.rs b/tokio-executor/src/threadpool/task/mod.rs
index b4dc4339..06343417 100644
--- a/tokio-executor/src/threadpool/task/mod.rs
+++ b/tokio-executor/src/threadpool/task/mod.rs
@@ -9,11 +9,10 @@ use super::pool::Pool;
use super::waker::Waker;
use futures_util::task;
-use log::trace;
use std::cell::{Cell, UnsafeCell};
use std::future::Future;
use std::pin::Pin;
-use std::sync::atomic::Ordering::{AcqRel, Acquire, Relaxed, Release};
+use std::sync::atomic::Ordering::{AcqRel, Acquire, Release};
use std::sync::atomic::{AtomicPtr, AtomicUsize};
use std::sync::Arc;
use std::task::{Context, Poll};
@@ -95,8 +94,13 @@ impl Task {
/// Execute the task returning `Run::Schedule` if the task needs to be
/// scheduled again.
+ ///
+ // tracing macro expansion adds enough branches to make clippy angry here.
+ #[allow(clippy::cognitive_complexity)]
pub(crate) fn run(me: &Arc<Task>, pool: &Arc<Pool>) -> Run {
use self::State::*;
+ #[cfg(feature = "tracing")]
+ use std::sync::atomic::Ordering::Relaxed;
// Transition task to running state. At this point, the task must be
// scheduled.
@@ -109,8 +113,10 @@ impl Task {
Scheduled => {}
_ => panic!("unexpected task state; {:?}", actual),
}
+ let span = trace_span!("Task::run");
+ let _enter = span.enter();
- trace!("Task::run; state={:?}", State::from(me.state.load(Relaxed)));
+ trace!(state = ?State::from(me.state.load(Relaxed)));
// The transition to `Running` done above ensures that a lock on the
// future has been obtained.
@@ -151,7 +157,7 @@ impl Task {
match res {
Ok(Poll::Ready(_)) | Err(_) => {
- trace!(" -> task complete");
+ trace!("task complete");
// The future has completed. Drop it immediately to free
// resources and run drop handlers.
@@ -172,7 +178,7 @@ impl Task {
Run::Complete
}
Ok(Poll::Pending) => {
- trace!(" -> not ready");
+ trace!("not ready");
// Attempt to transition from Running -> Idle, if successful,
// then the task does not need to be scheduled again. If the CAS
diff --git a/tokio-executor/src/threadpool/worker/mod.rs b/tokio-executor/src/threadpool/worker/mod.rs
index 0ffb5ef5..63f170c6 100644
--- a/tokio-executor/src/threadpool/worker/mod.rs
+++ b/tokio-executor/src/threadpool/worker/mod.rs
@@ -12,7 +12,6 @@ use super::shutdown::ShutdownTrigger;
use super::task::{self, CanBlock, Task};
use super::BlockingError;
-use log::trace;
use std::cell::Cell;
use std::marker::PhantomData;
use std::ptr;
@@ -410,9 +409,9 @@ impl Worker {
self.run_task(task, pool);
trace!(
- "try_steal_task -- signal_work; self={}; from={}",
- self.id.0,
- idx
+ message = "try_steal_task -- signal_work;",
+ self = self.id.0,
+ from = idx,
);
// Signal other workers that work is available
@@ -487,7 +486,7 @@ impl Worker {
.into();
if actual == state {
- trace!("task complete; state={:?}", next);
+ trace!(message = "task complete;", state = ?next);
if state.num_futures() == 1 {
// If the thread pool has been flagged as shutdown,
@@ -564,14 +563,16 @@ impl Worker {
/// Put the worker to sleep
///
/// Returns `true` if woken up due to new work arriving.
+ // tracing macro expansion adds enough branches to make clippy angry here.
+ #[cfg_attr(feature = "tracing", allow(clippy::cognitive_complexity))]
fn sleep(&self) -> bool {
use self::Lifecycle::*;
// Putting a worker to sleep is a multipart operation. This is, in part,
// due to the fact that a worker can be notified without it being popped
// from the sleep stack. Extra care is needed to deal with this.
-
- trace!("Worker::sleep; worker={:?}", self.id);
+ let span = trace_span!("Worker::sleep", idx = self.id.0, id = ?self.id);
+ let _e = span.enter();
let mut state: State = self.entry().state.load(Acquire).into();
@@ -617,12 +618,12 @@ impl Worker {
if !state.is_pushed() {
debug_assert!(next.is_pushed());
- trace!(" sleeping -- push to stack; idx={}", self.id.0);
+ trace!("push to stack");
// We obtained permission to push the worker into the
// sleeper queue.
if self.pool.push_sleeper(self.id.0).is_err() {
- trace!(" sleeping -- push to stack failed; idx={}", self.id.0);
+ trace!("push to stack failed");
// The push failed due to the pool being terminated.
//
// This is true because the "work" being woken up for is
@@ -637,7 +638,7 @@ impl Worker {
state = actual;
}
- trace!(" -> starting to sleep; idx={}", self.id.0);
+ trace!("starting to sleep");
// Do a quick check to see if there are any notifications in the
// reactor or new tasks in the global queue. Since this call will
@@ -686,7 +687,7 @@ impl Worker {
self.entry().park();
- trace!(" -> wakeup; idx={}", self.id.0);
+ trace!("wakeup");
}
}
@@ -719,7 +720,7 @@ impl Worker {
impl Drop for Worker {
fn drop(&mut self) {
- trace!("shutting down thread; idx={}", self.id.0);
+ trace!(message = "shutting down thread", idx = self.id.0);
if self.should_finalize.get() {
// Drain the work queue
diff --git a/tokio-executor/src/tracing.rs b/tokio-executor/src/tracing.rs
new file mode 100644
index 00000000..4d36049b
--- /dev/null
+++ b/tokio-executor/src/tracing.rs
@@ -0,0 +1,82 @@
+//! This module provides a small facade that wraps the `tracing` APIs we use, so
+//! that when the `tracing` dependency is disabled, `tracing`'s macros expand to
+//! no-ops.
+//!
+//! This means we don't have to put a `#[cfg(feature = "tracing")]` on every
+//! individual use of a `tracing` macro.
+#[cfg(not(feature = "tracing"))]
+#[derive(Clone, Debug)]
+pub(crate) struct Span {}
+
+#[cfg(feature = "tracing")]
+macro_rules! trace {
+ ($($arg:tt)+) => {
+ tracing::trace!($($arg)+)
+ };
+}
+
+#[cfg(not(feature = "tracing"))]
+macro_rules! trace {
+ ($($arg:tt)+) => {};
+}
+
+#[cfg(feature = "tracing")]
+macro_rules! debug {
+ ($($arg:tt)+) => {
+ tracing::debug!($($arg)+)
+ };
+}
+
+#[cfg(not(feature = "tracing"))]
+macro_rules! debug {
+ ($($arg:tt)+) => {};
+}
+
+#[cfg(feature = "tracing")]
+macro_rules! warn {
+ ($($arg:tt)+) => {
+ tracing::warn!($($arg)+)
+ };
+}
+
+#[cfg(not(feature = "tracing"))]
+macro_rules! warn {
+ ($($arg:tt)+) => {};
+}
+
+#[cfg(feature = "tracing")]
+macro_rules! error {
+ ($($arg:tt)+) => {
+ tracing::error!($($arg)+)
+ };
+}
+
+#[cfg(not(feature = "tracing"))]
+macro_rules! error {
+ ($($arg:tt)+) => {};
+}
+
+#[cfg(feature = "tracing")]
+macro_rules! trace_span {
+ ($($arg:tt)+) => {
+ tracing::trace_span!($($arg)+)
+ };
+}
+
+#[cfg(not(feature = "tracing"))]
+macro_rules! trace_span {
+ ($($arg:tt)+) => {
+ crate::tracing::Span::new()
+ };
+}
+
+#[cfg(not(feature = "tracing"))]
+impl Span {
+ pub(crate) fn new() -> Self {
+ Span {}
+ }
+
+ pub(crate) fn enter(&self) -> Span {
+ Span {}
+ }
+}
diff --git a/tokio/Cargo.toml b/tokio/Cargo.toml
index 75e11724..a9251a0c 100644
--- a/tokio/Cargo.toml
+++ b/tokio/Cargo.toml
@@ -51,6 +51,7 @@ rt-full = [
sync = ["tokio-sync"]
tcp = ["io", "tokio-net/tcp"]
timer = ["tokio-timer"]
+tracing = ["tracing-core"]
udp = ["io", "tokio-net/udp"]
uds = ["io", "tokio-net/uds"]
@@ -72,6 +73,9 @@ tokio-sync = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-syn
tokio-timer = { version = "=0.3.0-alpha.2", optional = true, path = "../tokio-timer", features = ["async-traits"] }
tracing-core = { version = "0.1", optional = true }
+[target.'cfg(feature = "tracing")'.dependencies]
+tokio-executor = { version = "=0.2.0-alpha.2", optional = true, path = "../tokio-executor", features = ["tracing"] }
+
[dev-dependencies]
tokio-test = { version = "0.2.0-alpha.1", path = "../tokio-test" }