summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorLeon Yang <lnyng@fb.com>2022-02-08 10:59:02 -0800
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2022-02-08 11:04:20 -0800
commitf039574e70138e58f607b8fac092d5c912a29437 (patch)
treeb1d156dd5109f00d9367c4238b3687cc5b996762
parentd3a9a98741377e58411bc7c63765ac0ecb48d01e (diff)
Improve thread usage
Summary: Add name to spawned threads to help understand thread usage. Reviewed By: brianc118 Differential Revision: D34064653 fbshipit-source-id: 54fe1f00a2af4afcb2a8fe628f7cf82184407fa5
-rw-r--r--below/src/main.rs219
1 files changed, 118 insertions, 101 deletions
diff --git a/below/src/main.rs b/below/src/main.rs
index f3801ec9..3641cdf8 100644
--- a/below/src/main.rs
+++ b/below/src/main.rs
@@ -345,12 +345,15 @@ fn start_exitstat(
let mut exit_driver = exitstat::ExitstatDriver::new(logger, debug);
let exit_buffer = exit_driver.get_buffer();
let (bpf_err_send, bpf_err_recv) = channel();
- thread::spawn(move || {
- match exit_driver.drive() {
- Ok(_) => {}
- Err(e) => bpf_err_send.send(e).unwrap(),
- };
- });
+ thread::Builder::new()
+ .name("exit_driver".to_owned())
+ .spawn(move || {
+ match exit_driver.drive() {
+ Ok(_) => {}
+ Err(e) => bpf_err_send.send(e).unwrap(),
+ };
+ })
+ .expect("Failed to spawn thread");
(exit_buffer, Some(bpf_err_recv))
}
@@ -443,18 +446,21 @@ where
Ok(mut signals) => {
let logger = logger.clone();
let err_sender = err_sender.clone();
- thread::spawn(move || {
- let mut term_now = false;
- for signal in signals.forever() {
- if term_now {
- error!(logger, "Below didn't stop in time. Terminate now!");
- std::process::exit(1);
+ thread::Builder::new()
+ .name("sighandler".to_owned())
+ .spawn(move || {
+ let mut term_now = false;
+ for signal in signals.forever() {
+ if term_now {
+ error!(logger, "Below didn't stop in time. Terminate now!");
+ std::process::exit(1);
+ }
+ term_now = true;
+ error!(logger, "Stop signal received: {}, exiting.", signal);
+ err_sender.send(anyhow!(StopSignal { signal })).unwrap();
}
- term_now = true;
- error!(logger, "Stop signal received: {}, exiting.", signal);
- err_sender.send(anyhow!(StopSignal { signal })).unwrap();
- }
- });
+ })
+ .expect("Failed to spawn thread");
}
Err(e) => {
error!(logger, "{:#}", e);
@@ -736,18 +742,21 @@ fn replay(
logutil::set_current_log_target(logutil::TargetLog::File);
let sink = view.cb_sink().clone();
- thread::spawn(move || {
- match errs.recv() {
- Ok(e) => {
- error!(logger, "{:#}", e);
- }
- Err(_) => {
- error!(logger, "error channel disconnected");
- }
- };
- sink.send(Box::new(|c| c.quit()))
- .expect("Failed to stop view");
- });
+ thread::Builder::new()
+ .name("replay_err_chan".to_owned())
+ .spawn(move || {
+ match errs.recv() {
+ Ok(e) => {
+ error!(logger, "{:#}", e);
+ }
+ Err(_) => {
+ error!(logger, "error channel disconnected");
+ }
+ };
+ sink.send(Box::new(|c| c.quit()))
+ .expect("Failed to stop view");
+ })
+ .expect("Failed to spawn thread");
view.run()
}
@@ -920,56 +929,60 @@ fn live_local(
let sink = view.cb_sink().clone();
- thread::spawn(move || {
- loop {
- if !bpf_err_warned {
- bpf_err_warned = check_for_exitstat_errors(
- &logger,
- bpf_errs
- .as_ref()
- .expect("Failed to unwrap bpf_errs receiver"),
- );
- }
-
- // Rely on timeout to guarantee interval between samples
- match errs.recv_timeout(interval) {
- Ok(e) => {
- error!(logger, "{:#}", e);
- sink.send(Box::new(|c| c.quit()))
- .expect("Failed to stop view");
- return;
- }
- Err(RecvTimeoutError::Disconnected) => {
- error!(logger, "error channel disconnected");
- sink.send(Box::new(|c| c.quit()))
- .expect("Failed to stop view");
- return;
+ thread::Builder::new()
+ .name("live_collector".to_owned())
+ .spawn(move || {
+ loop {
+ if !bpf_err_warned {
+ bpf_err_warned = check_for_exitstat_errors(
+ &logger,
+ bpf_errs
+ .as_ref()
+ .expect("Failed to unwrap bpf_errs receiver"),
+ );
}
- Err(RecvTimeoutError::Timeout) => {}
- };
- let res = collector.update_model(&logger);
- match res {
- Ok(model) => {
- // Error only happens if the other side disconnected - just terminate the thread
- let data_plane = Box::new(move |s: &mut Cursive| {
- let view_state = s.user_data::<ViewState>().expect("user data not set");
+ // Rely on timeout to guarantee interval between samples
+ match errs.recv_timeout(interval) {
+ Ok(e) => {
+ error!(logger, "{:#}", e);
+ sink.send(Box::new(|c| c.quit()))
+ .expect("Failed to stop view");
+ return;
+ }
+ Err(RecvTimeoutError::Disconnected) => {
+ error!(logger, "error channel disconnected");
+ sink.send(Box::new(|c| c.quit()))
+ .expect("Failed to stop view");
+ return;
+ }
+ Err(RecvTimeoutError::Timeout) => {}
+ };
- // When paused, no need to update model
- if !view_state.is_paused() {
- view_state.update(model);
+ let res = collector.update_model(&logger);
+ match res {
+ Ok(model) => {
+ // Error only happens if the other side disconnected - just terminate the thread
+ let data_plane = Box::new(move |s: &mut Cursive| {
+ let view_state = s.user_data::<ViewState>().expect("user data not set");
+
+ // When paused, no need to update model
+ if !view_state.is_paused() {
+ view_state.update(model);
+ }
+ });
+ if sink.send(data_plane).is_err() {
+ return;
}
- });
- if sink.send(data_plane).is_err() {
- return;
}
- }
- Err(e) => {
- error!(logger, "{:#}", e);
+ Err(e) => {
+ error!(logger, "{:#}", e);
+ }
}
}
- }
- });
+ })
+ .expect("Failed to spawn thread");
+
view.run()
}
@@ -997,40 +1010,44 @@ fn live_remote(
let sink = view.cb_sink().clone();
- thread::spawn(move || {
- loop {
- // Rely on timeout to guarantee interval between samples
- match errs.recv_timeout(interval) {
- Ok(e) => {
- error!(logger, "{:#}", e);
- sink.send(Box::new(|c| c.quit()))
- .expect("Failed to stop view");
- return;
- }
- Err(RecvTimeoutError::Disconnected) => {
- error!(logger, "error channel disconnected");
- sink.send(Box::new(|c| c.quit()))
- .expect("Failed to stop view");
- return;
- }
- Err(RecvTimeoutError::Timeout) => {}
- };
+ thread::Builder::new()
+ .name("live_remote_collector".to_owned())
+ .spawn(move || {
+ loop {
+ // Rely on timeout to guarantee interval between samples
+ match errs.recv_timeout(interval) {
+ Ok(e) => {
+ error!(logger, "{:#}", e);
+ sink.send(Box::new(|c| c.quit()))
+ .expect("Failed to stop view");
+ return;
+ }
+ Err(RecvTimeoutError::Disconnected) => {
+ error!(logger, "error channel disconnected");
+ sink.send(Box::new(|c| c.quit()))
+ .expect("Failed to stop view");
+ return;
+ }
+ Err(RecvTimeoutError::Timeout) => {}
+ };
- let data_plane = Box::new(move |s: &mut Cursive| {
- let view_state = s.user_data::<ViewState>().expect("user data not set");
+ let data_plane = Box::new(move |s: &mut Cursive| {
+ let view_state = s.user_data::<ViewState>().expect("user data not set");
- if let view::ViewMode::Live(adv) = view_state.mode.clone() {
- match adv.borrow_mut().advance(store::Direction::Forward) {
- Some(data) => view_state.update(data),
- None => {}
+ if let view::ViewMode::Live(adv) = view_state.mode.clone() {
+ match adv.borrow_mut().advance(store::Direction::Forward) {
+ Some(data) => view_state.update(data),
+ None => {}
+ }
}
+ });
+ if sink.send(data_plane).is_err() {
+ return;
}
- });
- if sink.send(data_plane).is_err() {
- return;
}
- }
- });
+ })
+ .expect("Failed to spawn thread");
+
logutil::set_current_log_target(logutil::TargetLog::File);
view.run()