summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--Cargo.lock5
-rw-r--r--crates/core/tedge_api/Cargo.toml1
-rw-r--r--crates/core/tedge_api/README.md127
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs121
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs123
-rw-r--r--crates/core/tedge_api/src/address.rs122
-rw-r--r--crates/core/tedge_api/src/config.rs8
-rw-r--r--crates/core/tedge_api/src/plugin.rs20
8 files changed, 386 insertions, 141 deletions
diff --git a/Cargo.lock b/Cargo.lock
index f161c6b4..9d568031 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -3408,6 +3408,7 @@ dependencies = [
"tokio",
"tokio-util 0.7.0",
"toml",
+ "tracing",
]
[[package]]
@@ -3920,9 +3921,9 @@ dependencies = [
[[package]]
name = "tracing-attributes"
-version = "0.1.18"
+version = "0.1.21"
source = "registry+https://github.com/rust-lang/crates.io-index"
-checksum = "f4f480b8f81512e825f337ad51e94c1eb5d3bbdf2b363dcd01e2b19a9ffe3f8e"
+checksum = "cc6b8ad3567499f98a1db7a752b07a7c8c7c7c34c332ec00effb2b0027974b7c"
dependencies = [
"proc-macro2 1.0.38",
"quote 1.0.18",
diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml
index 2f316c2f..c9f2bad4 100644
--- a/crates/core/tedge_api/Cargo.toml
+++ b/crates/core/tedge_api/Cargo.toml
@@ -16,6 +16,7 @@ tokio-util = "0.7.0"
toml = "0.5.8"
serde = { version = "1.0.136", features = ["derive"] }
tedge_config_derive = { version = "0.1.0", path = "tedge_config_derive" }
+tracing = "0.1"
[dev-dependencies]
pretty_assertions = "1.2.1"
diff --git a/crates/core/tedge_api/README.md b/crates/core/tedge_api/README.md
index d085a7e1..340faa23 100644
--- a/crates/core/tedge_api/README.md
+++ b/crates/core/tedge_api/README.md
@@ -1,9 +1,134 @@
# Thin Edge API
thin-edge is made up out of "Plugins"[^1] which pass messages to eachother.
-These plugins run on a "Core", which handles the message passing.
+These plugins run on a "Core", which handles the message passing and the plugin
+lifecycle.
This crate defines the interfaces a plugin author needs to implement so that a
plugin can be built into thin-edge.
+
+## What a Plugin is
+
+A Plugin is a piece of code that covers some usecase. That usecase has to be
+exposed to the thin-edge.io ecosystem with the following functionalities (from a
+high level):
+
+* There's a function to "instantiate" a `Plugin`
+* There's a function to "start" a `Plugin`
+* A plugin can expose a number of API endpoints that it can receive messages on
+* There's a function for cleanly shutting a `Plugin` down
+
+The implementation of what we call the "core" of thin-edge.io is then
+responsible of instantiating a plugin upon user request, start it, and send it
+messages or forward messages the plugin emits to other plugins.
+
+Messages are just objects that are defined via Rust structures. See below for
+more information.
+
+If shutdown of thin-edge.io is requested, the core is also responsible of
+shutting a plugin down.
+
+The core is responsible to protect each plugin from crashes of other plugins and
+guarantee safe operation of all plugins in the ecosystem.
+
+
+## API to implement
+
+To implement a Plugin and bring functionality to the thin-edge.io ecosystem,
+the following API endpoints need to be implemented:
+
+* The `PluginBuilder` trait defines the interface that will be used to
+ instantiate a `Plugin`
+* The `Plugin` trait defines the interface that is used to start the
+ instantiated `Plugin` and to shut it down
+* The `Handle` trait defines the interfaces a `Plugin` instance can receive
+ messages on. This trait can be implemented multiple times for each `Plugin`,
+ one time for each message type this plugin is able to receive.
+
+The following (simplified) diagram should describe this in a visual way:
+
+<!--
+the "aquamarine" crate does not yet support rendering this in rustdoc.
+See: https://github.com/mersinvald/aquamarine/issues/19
+-->
+```mermaid
+classDiagram
+ class PluginBuilder
+ <<interface>> PluginBuilder
+ PluginBuilder : +instantiate() ~Plugin~
+
+ class Plugin
+ <<interface>> Plugin
+ Plugin : +start()
+ Plugin : +shutdown()
+
+ class Handle
+ <<interface>> Handle~Message~
+ Handle : +handle_message(~Message~ message)
+
+ class MyPluginBuilder
+ MyPluginBuilder <|-- PluginBuilder : implements
+
+ class MyPlugin
+ MyPlugin <|-- Plugin : implements
+ MyPlugin <|-- Handle~MyMessage~ : implements
+```
+
+## What a Message is
+
+A message can be anything that is able to implement the `Message` trait.
+
+This trait does not require the message to implement functionality, it just
+requires an implementing structure to implement `Debug` and `Send` and some
+others.
+
+For example:
+
+```rust
+#[derive(Debug)]
+struct Value(f64);
+
+impl Message for Value {}
+```
+
+## How messages are send
+
+Messages can be send between plugins, but can also be send to the core of
+thin-edge.io and to the plugin itself (a Plugin can send messages to itself).
+
+To be able to send a Message, an `Address` of that Plugin needs to be known.
+That `Address` can be fetched during the instantiation of the Plugin. The
+`PluginBuilder::instantiate()` function gets a reference to a `PluginDirectory`.
+That `PluginDirectory` is basically an "address book", that can be asked for
+addresses to other plugins, by specifying their name.
+That name usually has to be configured.
+
+
+## Plugin lifecycle
+
+The above illustrated how a plugin lifecycle looks like. The following diagram
+is here to visualize the words from above (minus some details that are not
+required for understanding the grand scheme of things):
+
+```mermaid
+sequenceDiagram
+ thinedge->>+PluginBuilder: instantiate()
+ PluginBuilder->>Configuration: get_target_address()
+ Configuration->>PluginBuilder: target_address
+ PluginBuilder->>PluginDirectory: get_address_for<MeasurementReceiver>(target_address)
+ PluginDirectory->>PluginBuilder: Address<MeasurementReceiver>
+ PluginBuilder->>+Plugin: new(Address<MeasurementReceiver>)
+ Plugin-->PluginBuilder: Plugin
+ PluginBuilder->thinedge : Plugin
+ thinedge->>+Plugin : start()
+ Plugin-->-thinedge : start
+ loop Lifecycle
+ thinedge-->Plugin : handle_message()
+ end
+ thinedge->>+Plugin : shutdown()
+ Plugin-->-thinedge : shutdown
+```
+
+
[^1]: Name is subject to change.
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index a9305fde..454be2ea 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::FutureExt;
@@ -9,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -294,11 +295,17 @@ impl Plugin for CriticalService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -309,13 +316,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -349,7 +355,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -418,59 +426,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
-
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -478,11 +482,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs
index f2b3dcea..82ff3130 100644
--- a/crates/core/tedge_api/examples/universal_log.rs
+++ b/crates/core/tedge_api/examples/universal_log.rs
@@ -1,6 +1,7 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
+use futures::FutureExt;
use tedge_api::{
address::ReplySenderFor,
message::{AnyMessage, MessageType},
@@ -8,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -229,6 +231,7 @@ impl Plugin for LogService {
}
}
+//
// The following pieces of code would be implemented by a "core" component, that is responsible for
// setting up plugins and their communication.
//
@@ -238,11 +241,17 @@ impl Plugin for LogService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -253,13 +262,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -293,7 +301,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -356,59 +366,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
-
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -416,11 +422,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index ef50edb4..4f786ea5 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -1,6 +1,8 @@
-use std::{marker::PhantomData, time::Duration};
+use std::{marker::PhantomData, sync::Arc, time::Duration};
-use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
+use futures::future::BoxFuture;
+use tokio::sync::RwLock;
+use tracing::{instrument, trace};
use crate::{
message::MessageType,
@@ -11,15 +13,103 @@ use crate::{
pub type AnyMessageBox = Box<dyn Message>;
#[doc(hidden)]
-#[derive(Debug)]
pub struct InternalMessage {
pub(crate) data: AnyMessageBox,
pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
+impl std::fmt::Debug for InternalMessage {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InternalMessage")
+ .field("data", &self.data)
+ .finish_non_exhaustive()
+ }
+}
+
+#[doc(hidden)]
+#[derive(Debug)]
+pub enum ShouldWait {
+ Wait,
+ DontWait,
+ Timeout(std::time::Duration),
+}
+
+#[doc(hidden)]
+pub type MessageFutureProducer = dyn Fn(InternalMessage, ShouldWait) -> BoxFuture<'static, Result<(), InternalMessage>>
+ + Sync
+ + Send;
+
+#[doc(hidden)]
+#[derive(Clone)]
+pub struct InnerMessageSender {
+ #[doc(hidden)]
+ pub send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for InnerMessageSender {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InnerMessageSender").finish_non_exhaustive()
+ }
+}
+
+impl InnerMessageSender {
+ pub fn new(send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>) -> Self {
+ Self { send_provider }
+ }
+
+ pub async fn init_with(&self, producer: Box<MessageFutureProducer>) {
+ let mut lock = self.send_provider.write().await;
+ *lock = Some(producer);
+ }
+
+ pub async fn reset(&self) {
+ let mut lock = self.send_provider.write().await;
+ *lock = None;
+ }
+
+ #[instrument(skip_all, level = "trace")]
+ async fn send(&self, message: InternalMessage) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ trace!(sender_exists = ?lock.is_some(), "Checking for internal sender");
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::Wait);
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+
+ async fn try_send(&self, message: InternalMessage) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::DontWait);
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+
+ async fn send_timeout(
+ &self,
+ message: InternalMessage,
+ timeout: Duration,
+ ) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::Timeout(timeout));
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+}
+
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
-pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>;
+pub type MessageSender = InnerMessageSender;
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
@@ -94,7 +184,7 @@ impl<RB: ReceiverBundle> Address<RB> {
reply_sender: sender,
})
.await
- .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?;
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -114,7 +204,7 @@ impl<RB: ReceiverBundle> Address<RB> {
///
/// The error is returned if the receiving side (the plugin that is addressed) cannot currently
/// receive messages (either because it is closed or the queue is full).
- pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
+ pub async fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -122,11 +212,8 @@ impl<RB: ReceiverBundle> Address<RB> {
data: Box::new(msg),
reply_sender: sender,
})
- .map_err(|msg| match msg {
- TrySendError::Full(data) | TrySendError::Closed(data) => {
- *data.data.downcast::<M>().unwrap()
- }
- })?;
+ .await
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -157,11 +244,7 @@ impl<RB: ReceiverBundle> Address<RB> {
timeout,
)
.await
- .map_err(|msg| match msg {
- SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => {
- *data.data.downcast::<M>().unwrap()
- }
- })?;
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -319,10 +402,13 @@ macro_rules! make_receiver_bundle {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use static_assertions::{assert_impl_all, assert_not_impl_any};
+ use tokio::sync::RwLock;
use crate::{
- address::{ReplyReceiverFor, ReplySenderFor},
+ address::{InnerMessageSender, ReplyReceiverFor, ReplySenderFor},
make_receiver_bundle,
plugin::{AcceptsReplies, Message},
Address,
@@ -370,7 +456,7 @@ mod tests {
#[test]
fn check_could_receive() {
- let (sender, _receiver) = tokio::sync::mpsc::channel(1);
+ let sender = InnerMessageSender::new(Arc::new(RwLock::new(None)));
let addr: Address<FooBar> = Address {
_pd: std::marker::PhantomData,
sender,
diff --git a/crates/core/tedge_api/src/config.rs b/crates/core/tedge_api/src/config.rs
index ff25e1b4..a2a42bb7 100644
--- a/crates/core/tedge_api/src/config.rs
+++ b/crates/core/tedge_api/src/config.rs
@@ -29,7 +29,7 @@ impl ConfigDescription {
&self.kind
}
- /// Set or replace the documentation of this [`Config`]
+ /// Set or replace the documentation of this [`ConfigDescription`]
#[must_use]
pub fn with_doc(mut self, doc: Option<&'static str>) -> Self {
self.doc = doc;
@@ -63,7 +63,7 @@ pub enum ConfigEnumKind {
Untagged,
}
-/// The specific kind a [`Config`] represents
+/// The specific kind a [`ConfigDescription`] represents
#[derive(Debug, Serialize, PartialEq)]
pub enum ConfigKind {
/// Config represents a boolean `true`/`false`
@@ -120,11 +120,11 @@ pub enum ConfigKind {
),
}
-/// Turn a plugin configuration into a [`Config`] object
+/// Turn a plugin configuration into a [`ConfigDescription`] object
///
/// Plugin authors are expected to implement this for their configurations to give users
pub trait AsConfig {
- /// Get a [`Config`] object from the type
+ /// Get a [`ConfigDescription`] object from the type
fn as_config() -> ConfigDescription;
}
diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs
index 92f14c2b..a342734a 100644
--- a/crates/core/tedge_api/src/plugin.rs
+++ b/crates/core/tedge_api/src/plugin.rs
@@ -173,8 +173,8 @@ pub trait PluginBuilder<PD: PluginDirectory>: Sync + Send + 'static {
/// Get a generic configuration description of what kind of input the
/// plugin expects.
///
- /// See [`Config`] as well as [`AsConfig`] for how to implement and use these types and
- /// interfaces.
+ /// See [`ConfigDescription`] as well as [`AsConfig`](crate::config::AsConfig) for how to
+ /// implement and use these types and interfaces.
fn kind_configuration() -> Option<ConfigDescription>
where
Self: Sized,
@@ -317,14 +317,26 @@ pub trait Plugin: Sync + Send + DowncastSync {
///
/// This function will be called by the core of thin-edge before any message-passing starts.
/// The plugin is free to for example spawn up background tasks here.
- async fn start(&mut self) -> Result<(), PluginError>;
+ async fn start(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+
+ /// The main function of the plugin
+ ///
+ /// This method is called once all plugins have [`start`](Plugin::start)ed. The plugin is free
+ /// to spawn new tasks or loop indefinitely (while still observing the cancel token!)
+ async fn main(&self) -> Result<(), PluginError> {
+ Ok(())
+ }
/// Gracefully handle shutdown
///
/// This function is called by the core of thin-edge before the software shuts down as a whole,
/// giving the plugin the opportunity to clear up resources (e.g. deallocate file handles
/// cleanly, shut down network connections properly, etc...).
- async fn shutdown(&mut self) -> Result<(), PluginError>;
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
}
impl_downcast!(sync Plugin);