diff options
-rw-r--r-- | Cargo.lock | 5 | ||||
-rw-r--r-- | crates/core/tedge_api/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_api/README.md | 127 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 121 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/universal_log.rs | 123 | ||||
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 122 | ||||
-rw-r--r-- | crates/core/tedge_api/src/config.rs | 8 | ||||
-rw-r--r-- | crates/core/tedge_api/src/plugin.rs | 20 |
8 files changed, 386 insertions, 141 deletions
@@ -3408,6 +3408,7 @@ dependencies = [ "tokio", "tokio-util 0.7.0", "toml", + "tracing", ] [[package]] @@ -3920,9 +3921,9 @@ dependencies = [ [[package]] name = "tracing-attributes" -version = "0.1.18" +version = "0.1.21" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e" +checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c" dependencies = [ "proc-macro2 1.0.38", "quote 1.0.18", diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml index 2f316c2f..c9f2bad4 100644 --- a/crates/core/tedge_api/Cargo.toml +++ b/crates/core/tedge_api/Cargo.toml @@ -16,6 +16,7 @@ tokio-util = "0.7.0" toml = "0.5.8" serde = { version = "1.0.136", features = ["derive"] } tedge_config_derive = { version = "0.1.0", path = "tedge_config_derive" } +tracing = "0.1" [dev-dependencies] pretty_assertions = "1.2.1" diff --git a/crates/core/tedge_api/README.md b/crates/core/tedge_api/README.md index d085a7e1..340faa23 100644 --- a/crates/core/tedge_api/README.md +++ b/crates/core/tedge_api/README.md @@ -1,9 +1,134 @@ # Thin Edge API thin-edge is made up out of "Plugins"[^1] which pass messages to eachother. -These plugins run on a "Core", which handles the message passing. +These plugins run on a "Core", which handles the message passing and the plugin +lifecycle. This crate defines the interfaces a plugin author needs to implement so that a plugin can be built into thin-edge. + +## What a Plugin is + +A Plugin is a piece of code that covers some usecase. That usecase has to be +exposed to the thin-edge.io ecosystem with the following functionalities (from a +high level): + +* There's a function to "instantiate" a `Plugin` +* There's a function to "start" a `Plugin` +* A plugin can expose a number of API endpoints that it can receive messages on +* There's a function for cleanly shutting a `Plugin` down + +The implementation of what we call the "core" of thin-edge.io is then +responsible of instantiating a plugin upon user request, start it, and send it +messages or forward messages the plugin emits to other plugins. + +Messages are just objects that are defined via Rust structures. See below for +more information. + +If shutdown of thin-edge.io is requested, the core is also responsible of +shutting a plugin down. + +The core is responsible to protect each plugin from crashes of other plugins and +guarantee safe operation of all plugins in the ecosystem. + + +## API to implement + +To implement a Plugin and bring functionality to the thin-edge.io ecosystem, +the following API endpoints need to be implemented: + +* The `PluginBuilder` trait defines the interface that will be used to + instantiate a `Plugin` +* The `Plugin` trait defines the interface that is used to start the + instantiated `Plugin` and to shut it down +* The `Handle` trait defines the interfaces a `Plugin` instance can receive + messages on. This trait can be implemented multiple times for each `Plugin`, + one time for each message type this plugin is able to receive. + +The following (simplified) diagram should describe this in a visual way: + +<!-- +the "aquamarine" crate does not yet support rendering this in rustdoc. +See: https://github.com/mersinvald/aquamarine/issues/19 +--> +```mermaid +classDiagram + class PluginBuilder + <<interface>> PluginBuilder + PluginBuilder : +instantiate() ~Plugin~ + + class Plugin + <<interface>> Plugin + Plugin : +start() + Plugin : +shutdown() + + class Handle + <<interface>> Handle~Message~ + Handle : +handle_message(~Message~ message) + + class MyPluginBuilder + MyPluginBuilder <|-- PluginBuilder : implements + + class MyPlugin + MyPlugin <|-- Plugin : implements + MyPlugin <|-- Handle~MyMessage~ : implements +``` + +## What a Message is + +A message can be anything that is able to implement the `Message` trait. + +This trait does not require the message to implement functionality, it just +requires an implementing structure to implement `Debug` and `Send` and some +others. + +For example: + +```rust +#[derive(Debug)] +struct Value(f64); + +impl Message for Value {} +``` + +## How messages are send + +Messages can be send between plugins, but can also be send to the core of +thin-edge.io and to the plugin itself (a Plugin can send messages to itself). + +To be able to send a Message, an `Address` of that Plugin needs to be known. +That `Address` can be fetched during the instantiation of the Plugin. The +`PluginBuilder::instantiate()` function gets a reference to a `PluginDirectory`. +That `PluginDirectory` is basically an "address book", that can be asked for +addresses to other plugins, by specifying their name. +That name usually has to be configured. + + +## Plugin lifecycle + +The above illustrated how a plugin lifecycle looks like. The following diagram +is here to visualize the words from above (minus some details that are not +required for understanding the grand scheme of things): + +```mermaid +sequenceDiagram + thinedge->>+PluginBuilder: instantiate() + PluginBuilder->>Configuration: get_target_address() + Configuration->>PluginBuilder: target_address + PluginBuilder->>PluginDirectory: get_address_for<MeasurementReceiver>(target_address) + PluginDirectory->>PluginBuilder: Address<MeasurementReceiver> + PluginBuilder->>+Plugin: new(Address<MeasurementReceiver>) + Plugin-->PluginBuilder: Plugin + PluginBuilder->thinedge : Plugin + thinedge->>+Plugin : start() + Plugin-->-thinedge : start + loop Lifecycle + thinedge-->Plugin : handle_message() + end + thinedge->>+Plugin : shutdown() + Plugin-->-thinedge : shutdown +``` + + [^1]: Name is subject to change. diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index a9305fde..454be2ea 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; use futures::FutureExt; @@ -9,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -294,11 +295,17 @@ impl Plugin for CriticalService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec<MessageType>, - receiver: Option<tedge_api::address::MessageReceiver>, - sender: tedge_api::address::MessageSender, + sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -309,13 +316,12 @@ struct Communication { impl Communication { pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -349,7 +355,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -418,59 +426,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); - - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -478,11 +482,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs index f2b3dcea..82ff3130 100644 --- a/crates/core/tedge_api/examples/universal_log.rs +++ b/crates/core/tedge_api/examples/universal_log.rs @@ -1,6 +1,7 @@ -use std::{collections::HashMap, time::Duration}; +use std::{collections::HashMap, sync::Arc, time::Duration}; use async_trait::async_trait; +use futures::FutureExt; use tedge_api::{ address::ReplySenderFor, message::{AnyMessage, MessageType}, @@ -8,6 +9,7 @@ use tedge_api::{ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; +use tokio::sync::RwLock; /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] @@ -229,6 +231,7 @@ impl Plugin for LogService { } } +// // The following pieces of code would be implemented by a "core" component, that is responsible for // setting up plugins and their communication. // @@ -238,11 +241,17 @@ impl Plugin for LogService { // /// Helper type for keeping information about plugins during runtime -#[derive(Debug)] struct PluginInfo { types: Vec<MessageType>, - receiver: Option<tedge_api::address::MessageReceiver>, - sender: tedge_api::address::MessageSender, + sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for PluginInfo { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("PluginInfo") + .field("types", &self.types) + .finish_non_exhaustive() + } } /// The type that provides the communication infrastructure to the plugins. @@ -253,13 +262,12 @@ struct Communication { impl Communication { pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { - let (sender, receiver) = tokio::sync::mpsc::channel(10); + let sender = Arc::new(RwLock::new(None)); self.plugins.insert( name.to_owned(), PluginInfo { types: PB::kind_message_types().into_types(), sender, - receiver: Some(receiver), }, ); } @@ -293,7 +301,9 @@ impl PluginDirectory for Communication { asked_types, name, plug.types, ); } else { - Ok(Address::new(plug.sender.clone())) + Ok(Address::new(tedge_api::address::InnerMessageSender::new( + plug.sender.clone(), + ))) } } @@ -356,59 +366,55 @@ async fn main() { let cancel_token = CancellationToken::new(); - let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; - let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; - - heartbeat.plugin_mut().start().await.unwrap(); - critical_service.plugin_mut().start().await.unwrap(); + let mut heartbeat = Arc::new(RwLock::new( + build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await, + )); + let mut critical_service = Arc::new(RwLock::new( + build_critical_plugin(&mut comms, cancel_token.child_token()).await, + )); - let mut recv = comms - .plugins - .get_mut("heartbeat") - .unwrap() - .receiver - .take() + heartbeat.write().await.plugin_mut().start().await.unwrap(); + critical_service + .write() + .await + .plugin_mut() + .start() + .await .unwrap(); - let hb_cancel_token = cancel_token.child_token(); - let hb_handle = tokio::task::spawn(async move { - let hb = heartbeat; + let recv = comms.plugins.get("heartbeat").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - hb.handle_message(msg).await.unwrap(); - } - _ = hb_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let heartbeat = heartbeat.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let heartbeat = heartbeat.clone(); + async move { + let heartbeat = heartbeat.read().await; + heartbeat.handle_message(msg).await; + Ok(()) } - } - - hb - }); - - let mut recv = comms - .plugins - .get_mut("critical-service") - .unwrap() - .receiver - .take() - .unwrap(); + .boxed() + })); + } - let cs_cancel_token = cancel_token.child_token(); - let cs_handle = tokio::task::spawn(async move { - let cs = critical_service; + let recv = comms.plugins.get("critical-service").unwrap(); - loop { - tokio::select! { - Some(msg) = recv.recv() => { - cs.handle_message(msg).await.unwrap(); - } - _ = cs_cancel_token.cancelled() => break, + { + let mut lock = recv.sender.write().await; + let critical_service = critical_service.clone(); + + *lock = Some(Box::new(move |msg, _wait_kind| { + let critical_service = critical_service.clone(); + async move { + let critical_service = critical_service.read().await; + critical_service.handle_message(msg).await; + Ok(()) } - } - - cs - }); + .boxed() + })); + } println!("Core: Stopping everything in 10 seconds!"); tokio::time::sleep(Duration::from_secs(12)).await; @@ -416,11 +422,16 @@ async fn main() { println!("Core: SHUTTING DOWN"); cancel_token.cancel(); - let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - - heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + heartbeat + .write() + .await + .plugin_mut() + .shutdown() + .await + .unwrap(); critical_service - .unwrap() + .write() + .await .plugin_mut() .shutdown() .await diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index ef50edb4..4f786ea5 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -1,6 +1,8 @@ -use std::{marker::PhantomData, time::Duration}; +use std::{marker::PhantomData, sync::Arc, time::Duration}; -use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; +use futures::future::BoxFuture; +use tokio::sync::RwLock; +use tracing::{instrument, trace}; use crate::{ message::MessageType, @@ -11,15 +13,103 @@ use crate::{ pub type AnyMessageBox = Box<dyn Message>; #[doc(hidden)] -#[derive(Debug)] pub struct InternalMessage { pub(crate) data: AnyMessageBox, pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>, } +impl std::fmt::Debug for InternalMessage { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InternalMessage") + .field("data", &self.data) + .finish_non_exhaustive() + } +} + +#[doc(hidden)] +#[derive(Debug)] +pub enum ShouldWait { + Wait, + DontWait, + Timeout(std::time::Duration), +} + +#[doc(hidden)] +pub type MessageFutureProducer = dyn Fn(InternalMessage, ShouldWait) -> BoxFuture<'static, Result<(), InternalMessage>> + + Sync + + Send; + +#[doc(hidden)] +#[derive(Clone)] +pub struct InnerMessageSender { + #[doc(hidden)] + pub send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>, +} + +impl std::fmt::Debug for InnerMessageSender { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("InnerMessageSender").finish_non_exhaustive() + } +} + +impl InnerMessageSender { + pub fn new(send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>) -> Self { + Self { send_provider } + } + + pub async fn init_with(&self, producer: Box<MessageFutureProducer>) { + let mut lock = self.send_provider.write().await; + *lock = Some(producer); + } + + pub async fn reset(&self) { + let mut lock = self.send_provider.write().await; + *lock = None; + } + + #[instrument(skip_all, level = "trace")] + async fn send(&self, message: InternalMessage) -> Result<(), InternalMessage> { + let lock = self.send_provider.read().await; + trace!(sender_exists = ?lock.is_some(), "Checking for internal sender"); + if let Some(sender) = &*lock { + let sender = (*sender)(message, ShouldWait::Wait); + + sender.await + } else { + Err(message) + } + } + + async fn try_send(&self, message: InternalMessage) -> Result<(), InternalMessage> { + let lock = self.send_provider.read().await; + if let Some(sender) = &*lock { + let sender = (*sender)(message, ShouldWait::DontWait); + + sender.await + } else { + Err(message) + } + } + + async fn send_timeout( + &self, + message: InternalMessage, + timeout: Duration, + ) -> Result<(), InternalMessage> { + let lock = self.send_provider.read().await; + if let Some(sender) = &*lock { + let sender = (*sender)(message, ShouldWait::Timeout(timeout)); + + sender.await + } else { + Err(message) + } + } +} + /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME #[doc(hidden)] -pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>; +pub type MessageSender = InnerMessageSender; /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME #[doc(hidden)] @@ -94,7 +184,7 @@ impl<RB: ReceiverBundle> Address<RB> { reply_sender: sender, }) .await - .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?; + .map_err(|msg| *msg.data.downcast::<M>().unwrap())?; Ok(ReplyReceiverFor { _pd: PhantomData, @@ -114,7 +204,7 @@ impl<RB: ReceiverBundle> Address<RB> { /// /// The error is returned if the receiving side (the plugin that is addressed) cannot currently /// receive messages (either because it is closed or the queue is full). - pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> { + pub async fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender @@ -122,11 +212,8 @@ impl<RB: ReceiverBundle> Address<RB> { data: Box::new(msg), reply_sender: sender, }) - .map_err(|msg| match msg { - TrySendError::Full(data) | TrySendError::Closed(data) => { - *data.data.downcast::<M>().unwrap() - } - })?; + .await + .map_err(|msg| *msg.data.downcast::<M>().unwrap())?; Ok(ReplyReceiverFor { _pd: PhantomData, @@ -157,11 +244,7 @@ impl<RB: ReceiverBundle> Address<RB> { timeout, ) .await - .map_err(|msg| match msg { - SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => { - *data.data.downcast::<M>().unwrap() - } - })?; + .map_err(|msg| *msg.data.downcast::<M>().unwrap())?; Ok(ReplyReceiverFor { _pd: PhantomData, @@ -319,10 +402,13 @@ macro_rules! make_receiver_bundle { #[cfg(test)] mod tests { + use std::sync::Arc; + use static_assertions::{assert_impl_all, assert_not_impl_any}; + use tokio::sync::RwLock; use crate::{ - address::{ReplyReceiverFor, ReplySenderFor}, + address::{InnerMessageSender, ReplyReceiverFor, ReplySenderFor}, make_receiver_bundle, plugin::{AcceptsReplies, Message}, Address, @@ -370,7 +456,7 @@ mod tests { #[test] fn check_could_receive() { - let (sender, _receiver) = tokio::sync::mpsc::channel(1); + let sender = InnerMessageSender::new(Arc::new(RwLock::new(None))); let addr: Address<FooBar> = Address { _pd: std::marker::PhantomData, sender, diff --git a/crates/core/tedge_api/src/config.rs b/crates/core/tedge_api/src/config.rs index ff25e1b4..a2a42bb7 100644 --- a/crates/core/tedge_api/src/config.rs +++ b/crates/core/tedge_api/src/config.rs @@ -29,7 +29,7 @@ impl ConfigDescription { &self.kind } - /// Set or replace the documentation of this [`Config`] + /// Set or replace the documentation of this [`ConfigDescription`] #[must_use] pub fn with_doc(mut self, doc: Option<&'static str>) -> Self { self.doc = doc; @@ -63,7 +63,7 @@ pub enum ConfigEnumKind { Untagged, } -/// The specific kind a [`Config`] represents +/// The specific kind a [`ConfigDescription`] represents #[derive(Debug, Serialize, PartialEq)] pub enum ConfigKind { /// Config represents a boolean `true`/`false` @@ -120,11 +120,11 @@ pub enum ConfigKind { ), } -/// Turn a plugin configuration into a [`Config`] object +/// Turn a plugin configuration into a [`ConfigDescription`] object /// /// Plugin authors are expected to implement this for their configurations to give users pub trait AsConfig { - /// Get a [`Config`] object from the type + /// Get a [`ConfigDescription`] object from the type fn as_config() -> ConfigDescription; } diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs index 92f14c2b..a342734a 100644 --- a/crates/core/tedge_api/src/plugin.rs +++ b/crates/core/tedge_api/src/plugin.rs @@ -173,8 +173,8 @@ pub trait PluginBuilder<PD: PluginDirectory>: Sync + Send + 'static { /// Get a generic configuration description of what kind of input the /// plugin expects. /// - /// See [`Config`] as well as [`AsConfig`] for how to implement and use these types and - /// interfaces. + /// See [`ConfigDescription`] as well as [`AsConfig`](crate::config::AsConfig) for how to + /// implement and use these types and interfaces. fn kind_configuration() -> Option<ConfigDescription> where Self: Sized, @@ -317,14 +317,26 @@ pub trait Plugin: Sync + Send + DowncastSync { /// /// This function will be called by the core of thin-edge before any message-passing starts. /// The plugin is free to for example spawn up background tasks here. - async fn start(&mut self) -> Result<(), PluginError>; + async fn start(&mut self) -> Result<(), PluginError> { + Ok(()) + } + + /// The main function of the plugin + /// + /// This method is called once all plugins have [`start`](Plugin::start)ed. The plugin is free + /// to spawn new tasks or loop indefinitely (while still observing the cancel token!) + async fn main(&self) -> Result<(), PluginError> { + Ok(()) + } /// Gracefully handle shutdown /// /// This function is called by the core of thin-edge before the software shuts down as a whole, /// giving the plugin the opportunity to clear up resources (e.g. deallocate file handles /// cleanly, shut down network connections properly, etc...). - async fn shutdown(&mut self) -> Result<(), PluginError>; + async fn shutdown(&mut self) -> Result<(), PluginError> { + Ok(()) + } } impl_downcast!(sync Plugin); |