diff options
author | Leon Yang <lnyng@fb.com> | 2022-02-08 10:59:02 -0800 |
---|---|---|
committer | Facebook GitHub Bot <facebook-github-bot@users.noreply.github.com> | 2022-02-08 11:04:20 -0800 |
commit | f039574e70138e58f607b8fac092d5c912a29437 (patch) | |
tree | b1d156dd5109f00d9367c4238b3687cc5b996762 | |
parent | d3a9a98741377e58411bc7c63765ac0ecb48d01e (diff) |
Improve thread usage
Summary: Add name to spawned threads to help understand thread usage.
Reviewed By: brianc118
Differential Revision: D34064653
fbshipit-source-id: 54fe1f00a2af4afcb2a8fe628f7cf82184407fa5
-rw-r--r-- | below/src/main.rs | 219 |
1 files changed, 118 insertions, 101 deletions
diff --git a/below/src/main.rs b/below/src/main.rs index f3801ec9..3641cdf8 100644 --- a/below/src/main.rs +++ b/below/src/main.rs @@ -345,12 +345,15 @@ fn start_exitstat( let mut exit_driver = exitstat::ExitstatDriver::new(logger, debug); let exit_buffer = exit_driver.get_buffer(); let (bpf_err_send, bpf_err_recv) = channel(); - thread::spawn(move || { - match exit_driver.drive() { - Ok(_) => {} - Err(e) => bpf_err_send.send(e).unwrap(), - }; - }); + thread::Builder::new() + .name("exit_driver".to_owned()) + .spawn(move || { + match exit_driver.drive() { + Ok(_) => {} + Err(e) => bpf_err_send.send(e).unwrap(), + }; + }) + .expect("Failed to spawn thread"); (exit_buffer, Some(bpf_err_recv)) } @@ -443,18 +446,21 @@ where Ok(mut signals) => { let logger = logger.clone(); let err_sender = err_sender.clone(); - thread::spawn(move || { - let mut term_now = false; - for signal in signals.forever() { - if term_now { - error!(logger, "Below didn't stop in time. Terminate now!"); - std::process::exit(1); + thread::Builder::new() + .name("sighandler".to_owned()) + .spawn(move || { + let mut term_now = false; + for signal in signals.forever() { + if term_now { + error!(logger, "Below didn't stop in time. Terminate now!"); + std::process::exit(1); + } + term_now = true; + error!(logger, "Stop signal received: {}, exiting.", signal); + err_sender.send(anyhow!(StopSignal { signal })).unwrap(); } - term_now = true; - error!(logger, "Stop signal received: {}, exiting.", signal); - err_sender.send(anyhow!(StopSignal { signal })).unwrap(); - } - }); + }) + .expect("Failed to spawn thread"); } Err(e) => { error!(logger, "{:#}", e); @@ -736,18 +742,21 @@ fn replay( logutil::set_current_log_target(logutil::TargetLog::File); let sink = view.cb_sink().clone(); - thread::spawn(move || { - match errs.recv() { - Ok(e) => { - error!(logger, "{:#}", e); - } - Err(_) => { - error!(logger, "error channel disconnected"); - } - }; - sink.send(Box::new(|c| c.quit())) - .expect("Failed to stop view"); - }); + thread::Builder::new() + .name("replay_err_chan".to_owned()) + .spawn(move || { + match errs.recv() { + Ok(e) => { + error!(logger, "{:#}", e); + } + Err(_) => { + error!(logger, "error channel disconnected"); + } + }; + sink.send(Box::new(|c| c.quit())) + .expect("Failed to stop view"); + }) + .expect("Failed to spawn thread"); view.run() } @@ -920,56 +929,60 @@ fn live_local( let sink = view.cb_sink().clone(); - thread::spawn(move || { - loop { - if !bpf_err_warned { - bpf_err_warned = check_for_exitstat_errors( - &logger, - bpf_errs - .as_ref() - .expect("Failed to unwrap bpf_errs receiver"), - ); - } - - // Rely on timeout to guarantee interval between samples - match errs.recv_timeout(interval) { - Ok(e) => { - error!(logger, "{:#}", e); - sink.send(Box::new(|c| c.quit())) - .expect("Failed to stop view"); - return; - } - Err(RecvTimeoutError::Disconnected) => { - error!(logger, "error channel disconnected"); - sink.send(Box::new(|c| c.quit())) - .expect("Failed to stop view"); - return; + thread::Builder::new() + .name("live_collector".to_owned()) + .spawn(move || { + loop { + if !bpf_err_warned { + bpf_err_warned = check_for_exitstat_errors( + &logger, + bpf_errs + .as_ref() + .expect("Failed to unwrap bpf_errs receiver"), + ); } - Err(RecvTimeoutError::Timeout) => {} - }; - let res = collector.update_model(&logger); - match res { - Ok(model) => { - // Error only happens if the other side disconnected - just terminate the thread - let data_plane = Box::new(move |s: &mut Cursive| { - let view_state = s.user_data::<ViewState>().expect("user data not set"); + // Rely on timeout to guarantee interval between samples + match errs.recv_timeout(interval) { + Ok(e) => { + error!(logger, "{:#}", e); + sink.send(Box::new(|c| c.quit())) + .expect("Failed to stop view"); + return; + } + Err(RecvTimeoutError::Disconnected) => { + error!(logger, "error channel disconnected"); + sink.send(Box::new(|c| c.quit())) + .expect("Failed to stop view"); + return; + } + Err(RecvTimeoutError::Timeout) => {} + }; - // When paused, no need to update model - if !view_state.is_paused() { - view_state.update(model); + let res = collector.update_model(&logger); + match res { + Ok(model) => { + // Error only happens if the other side disconnected - just terminate the thread + let data_plane = Box::new(move |s: &mut Cursive| { + let view_state = s.user_data::<ViewState>().expect("user data not set"); + + // When paused, no need to update model + if !view_state.is_paused() { + view_state.update(model); + } + }); + if sink.send(data_plane).is_err() { + return; } - }); - if sink.send(data_plane).is_err() { - return; } - } - Err(e) => { - error!(logger, "{:#}", e); + Err(e) => { + error!(logger, "{:#}", e); + } } } - } - }); + }) + .expect("Failed to spawn thread"); + view.run() } @@ -997,40 +1010,44 @@ fn live_remote( let sink = view.cb_sink().clone(); - thread::spawn(move || { - loop { - // Rely on timeout to guarantee interval between samples - match errs.recv_timeout(interval) { - Ok(e) => { - error!(logger, "{:#}", e); - sink.send(Box::new(|c| c.quit())) - .expect("Failed to stop view"); - return; - } - Err(RecvTimeoutError::Disconnected) => { - error!(logger, "error channel disconnected"); - sink.send(Box::new(|c| c.quit())) - .expect("Failed to stop view"); - return; - } - Err(RecvTimeoutError::Timeout) => {} - }; + thread::Builder::new() + .name("live_remote_collector".to_owned()) + .spawn(move || { + loop { + // Rely on timeout to guarantee interval between samples + match errs.recv_timeout(interval) { + Ok(e) => { + error!(logger, "{:#}", e); + sink.send(Box::new(|c| c.quit())) + .expect("Failed to stop view"); + return; + } + Err(RecvTimeoutError::Disconnected) => { + error!(logger, "error channel disconnected"); + sink.send(Box::new(|c| c.quit())) + .expect("Failed to stop view"); + return; + } + Err(RecvTimeoutError::Timeout) => {} + }; - let data_plane = Box::new(move |s: &mut Cursive| { - let view_state = s.user_data::<ViewState>().expect("user data not set"); + let data_plane = Box::new(move |s: &mut Cursive| { + let view_state = s.user_data::<ViewState>().expect("user data not set"); - if let view::ViewMode::Live(adv) = view_state.mode.clone() { - match adv.borrow_mut().advance(store::Direction::Forward) { - Some(data) => view_state.update(data), - None => {} + if let view::ViewMode::Live(adv) = view_state.mode.clone() { + match adv.borrow_mut().advance(store::Direction::Forward) { + Some(data) => view_state.update(data), + None => {} + } } + }); + if sink.send(data_plane).is_err() { + return; } - }); - if sink.send(data_plane).is_err() { - return; } - } - }); + }) + .expect("Failed to spawn thread"); + logutil::set_current_log_target(logutil::TargetLog::File); view.run() |