summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_core/src/reactor.rs
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core/tedge_core/src/reactor.rs')
-rw-r--r--crates/core/tedge_core/src/reactor.rs101
1 files changed, 57 insertions, 44 deletions
diff --git a/crates/core/tedge_core/src/reactor.rs b/crates/core/tedge_core/src/reactor.rs
index c97358e0..f845335c 100644
--- a/crates/core/tedge_core/src/reactor.rs
+++ b/crates/core/tedge_core/src/reactor.rs
@@ -212,55 +212,68 @@ impl Reactor {
.await;
debug!("Running 'main' for plugins");
- let _main_results = all_plugins
- .iter_mut()
- .map(|plugin_task| {
- async {
- let mut restarted_once = false;
- loop {
- let span =
- tracing::debug_span!("plugin.main", plugin.name = %plugin_task.plugin_name());
-
- match plugin_task.run_main().instrument(span).await {
- err @ Err(crate::errors::PluginLifecycleError::PluginMainPanicked(_)) => {
- let restart_policy = self.0.config().plugins().get(plugin_task.plugin_name())
- .map(|pl| pl.restart_policy());
-
- match restart_policy {
- Some(crate::configuration::RestartPolicy::Never) => {
- return err
- },
- Some(crate::configuration::RestartPolicy::Once) => {
- if restarted_once {
- return err
- }
-
- restarted_once = true;
- tracing::error!(error = ?err, "Error during plugin lifecycle");
- },
- Some(crate::configuration::RestartPolicy::Always) => {
- restarted_once = true;
- tracing::error!(error = ?err, "Error during plugin lifecycle");
- },
- None => {
- // cannot happen
- unimplemented!()
+ {
+ let main_cancellation = Arc::new(CancellationToken::new());
+ let _main_results = all_plugins
+ .iter_mut()
+ .map(|plugin_task| {
+ let cancellation_token = main_cancellation.child_token();
+ async {
+ let cancellation_token = cancellation_token;
+ let mut restarted_once = false;
+ loop {
+ let span =
+ tracing::debug_span!("plugin.main", plugin.name = %plugin_task.plugin_name());
+
+ tokio::select! {
+ _cancelled = cancellation_token.cancelled() => {
+ // Nothing for now
+ },
+ main_result = plugin_task.run_main().instrument(span) => {
+ match main_result {
+ err @ Err(crate::errors::PluginLifecycleError::PluginMainPanicked(_)) => {
+ let restart_policy = self.0.config().plugins().get(plugin_task.plugin_name())
+ .map(|pl| pl.restart_policy());
+
+ match restart_policy {
+ Some(crate::configuration::RestartPolicy::Never) => {
+ return err
+ },
+ Some(crate::configuration::RestartPolicy::Once) => {
+ if restarted_once {
+ return err
+ }
+
+ restarted_once = true;
+ tracing::error!(error = ?err, "Error during plugin lifecycle");
+ },
+ Some(crate::configuration::RestartPolicy::Always) => {
+ restarted_once = true;
+ tracing::error!(error = ?err, "Error during plugin lifecycle");
+ },
+ None => {
+ // cannot happen
+ unimplemented!()
+ }
+ }
+ },
+
+ Err(other_err) => return Err(other_err),
+ Ok(()) => break,
}
}
- },
+ }
- Err(other_err) => return Err(other_err),
- Ok(()) => break,
}
- }
- Ok(())
- }
- })
- .collect::<futures::stream::FuturesUnordered<_>>()
- .collect::<Vec<Result<(), _>>>()
- .instrument(tracing::info_span!("core.mainloop.plugins.main"))
- .await;
+ Ok(())
+ }
+ })
+ .collect::<futures::stream::FuturesUnordered<_>>()
+ .collect::<Vec<Result<(), _>>>()
+ .instrument(tracing::info_span!("core.mainloop.plugins.main"))
+ .await;
+ }
// And now we wait until all communication is finished.
//