diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-26 15:49:10 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-08-30 13:54:48 +0200 |
commit | 7e91080f39c518db20d1e768b294e6ed568de290 (patch) | |
tree | 656b35726fa6a641c29cf37c944a22724a8cf33f /crates/core/tedge_core/src/reactor.rs | |
parent | fe94223dd2fb8fda045bec08795aa677e5aadd0f (diff) |
Add cancellation token for running plugin main()
This patch adds a cancellation token to the implementation that runs the
plugin main() functions asyncronously.
This is the preparation for implementing shutdown-on-panic, where
thin-edge.io shuts down cleanly if one plugin panics.
Right now the only way to implement this is by cancelling the execution
of the `main()` function of all plugins, as we cannot ensure that the
cancellation token that is handed to the plugins upon instantiation are
actually used.
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core/tedge_core/src/reactor.rs')
-rw-r--r-- | crates/core/tedge_core/src/reactor.rs | 101 |
1 files changed, 57 insertions, 44 deletions
diff --git a/crates/core/tedge_core/src/reactor.rs b/crates/core/tedge_core/src/reactor.rs index c97358e0..f845335c 100644 --- a/crates/core/tedge_core/src/reactor.rs +++ b/crates/core/tedge_core/src/reactor.rs @@ -212,55 +212,68 @@ impl Reactor { .await; debug!("Running 'main' for plugins"); - let _main_results = all_plugins - .iter_mut() - .map(|plugin_task| { - async { - let mut restarted_once = false; - loop { - let span = - tracing::debug_span!("plugin.main", plugin.name = %plugin_task.plugin_name()); - - match plugin_task.run_main().instrument(span).await { - err @ Err(crate::errors::PluginLifecycleError::PluginMainPanicked(_)) => { - let restart_policy = self.0.config().plugins().get(plugin_task.plugin_name()) - .map(|pl| pl.restart_policy()); - - match restart_policy { - Some(crate::configuration::RestartPolicy::Never) => { - return err - }, - Some(crate::configuration::RestartPolicy::Once) => { - if restarted_once { - return err - } - - restarted_once = true; - tracing::error!(error = ?err, "Error during plugin lifecycle"); - }, - Some(crate::configuration::RestartPolicy::Always) => { - restarted_once = true; - tracing::error!(error = ?err, "Error during plugin lifecycle"); - }, - None => { - // cannot happen - unimplemented!() + { + let main_cancellation = Arc::new(CancellationToken::new()); + let _main_results = all_plugins + .iter_mut() + .map(|plugin_task| { + let cancellation_token = main_cancellation.child_token(); + async { + let cancellation_token = cancellation_token; + let mut restarted_once = false; + loop { + let span = + tracing::debug_span!("plugin.main", plugin.name = %plugin_task.plugin_name()); + + tokio::select! { + _cancelled = cancellation_token.cancelled() => { + // Nothing for now + }, + main_result = plugin_task.run_main().instrument(span) => { + match main_result { + err @ Err(crate::errors::PluginLifecycleError::PluginMainPanicked(_)) => { + let restart_policy = self.0.config().plugins().get(plugin_task.plugin_name()) + .map(|pl| pl.restart_policy()); + + match restart_policy { + Some(crate::configuration::RestartPolicy::Never) => { + return err + }, + Some(crate::configuration::RestartPolicy::Once) => { + if restarted_once { + return err + } + + restarted_once = true; + tracing::error!(error = ?err, "Error during plugin lifecycle"); + }, + Some(crate::configuration::RestartPolicy::Always) => { + restarted_once = true; + tracing::error!(error = ?err, "Error during plugin lifecycle"); + }, + None => { + // cannot happen + unimplemented!() + } + } + }, + + Err(other_err) => return Err(other_err), + Ok(()) => break, } } - }, + } - Err(other_err) => return Err(other_err), - Ok(()) => break, } - } - Ok(()) - } - }) - .collect::<futures::stream::FuturesUnordered<_>>() - .collect::<Vec<Result<(), _>>>() - .instrument(tracing::info_span!("core.mainloop.plugins.main")) - .await; + Ok(()) + } + }) + .collect::<futures::stream::FuturesUnordered<_>>() + .collect::<Vec<Result<(), _>>>() + .instrument(tracing::info_span!("core.mainloop.plugins.main")) + .await; + } // And now we wait until all communication is finished. // |