summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-05-20 13:27:13 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-20 13:27:13 +0200
commit4b4054563981c2a741cd29df6a937a5551178866 (patch)
treec505c2d7c263a831eea6a4e90124c9b1e3d8df6f /crates/core
parente7ddb5a8323617fa92ac027fd29d6f3d920d0415 (diff)
parent1885e9f8883b9b4c86a896d33a0f73679fe36699 (diff)
Merge remote-tracking branch 'gitlab-marcel/feature/add_tedge_api_only' into feature/add_tedge_api/integrate-api
This merges the latest changes from the API branch into an integration branch for adapting the the core implementation to the changes. The changes, most notably commit 43ed68145e7044c0d8f170af794d71c2fa93599a ("Replace tokio MPSC with a direct Future") were the MPSC interfaces were removed from the API, will result in quite a few adaptions in the tedge_core crate implementation. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/tedge_api/Cargo.toml1
-rw-r--r--crates/core/tedge_api/README.md127
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs121
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs123
-rw-r--r--crates/core/tedge_api/src/address.rs122
-rw-r--r--crates/core/tedge_api/src/config.rs8
-rw-r--r--crates/core/tedge_api/src/plugin.rs20
7 files changed, 383 insertions, 139 deletions
diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml
index 2f316c2f..c9f2bad4 100644
--- a/crates/core/tedge_api/Cargo.toml
+++ b/crates/core/tedge_api/Cargo.toml
@@ -16,6 +16,7 @@ tokio-util = "0.7.0"
toml = "0.5.8"
serde = { version = "1.0.136", features = ["derive"] }
tedge_config_derive = { version = "0.1.0", path = "tedge_config_derive" }
+tracing = "0.1"
[dev-dependencies]
pretty_assertions = "1.2.1"
diff --git a/crates/core/tedge_api/README.md b/crates/core/tedge_api/README.md
index d085a7e1..340faa23 100644
--- a/crates/core/tedge_api/README.md
+++ b/crates/core/tedge_api/README.md
@@ -1,9 +1,134 @@
# Thin Edge API
thin-edge is made up out of "Plugins"[^1] which pass messages to eachother.
-These plugins run on a "Core", which handles the message passing.
+These plugins run on a "Core", which handles the message passing and the plugin
+lifecycle.
This crate defines the interfaces a plugin author needs to implement so that a
plugin can be built into thin-edge.
+
+## What a Plugin is
+
+A Plugin is a piece of code that covers some usecase. That usecase has to be
+exposed to the thin-edge.io ecosystem with the following functionalities (from a
+high level):
+
+* There's a function to "instantiate" a `Plugin`
+* There's a function to "start" a `Plugin`
+* A plugin can expose a number of API endpoints that it can receive messages on
+* There's a function for cleanly shutting a `Plugin` down
+
+The implementation of what we call the "core" of thin-edge.io is then
+responsible of instantiating a plugin upon user request, start it, and send it
+messages or forward messages the plugin emits to other plugins.
+
+Messages are just objects that are defined via Rust structures. See below for
+more information.
+
+If shutdown of thin-edge.io is requested, the core is also responsible of
+shutting a plugin down.
+
+The core is responsible to protect each plugin from crashes of other plugins and
+guarantee safe operation of all plugins in the ecosystem.
+
+
+## API to implement
+
+To implement a Plugin and bring functionality to the thin-edge.io ecosystem,
+the following API endpoints need to be implemented:
+
+* The `PluginBuilder` trait defines the interface that will be used to
+ instantiate a `Plugin`
+* The `Plugin` trait defines the interface that is used to start the
+ instantiated `Plugin` and to shut it down
+* The `Handle` trait defines the interfaces a `Plugin` instance can receive
+ messages on. This trait can be implemented multiple times for each `Plugin`,
+ one time for each message type this plugin is able to receive.
+
+The following (simplified) diagram should describe this in a visual way:
+
+<!--
+the "aquamarine" crate does not yet support rendering this in rustdoc.
+See: https://github.com/mersinvald/aquamarine/issues/19
+-->
+```mermaid
+classDiagram
+ class PluginBuilder
+ <<interface>> PluginBuilder
+ PluginBuilder : +instantiate() ~Plugin~
+
+ class Plugin
+ <<interface>> Plugin
+ Plugin : +start()
+ Plugin : +shutdown()
+
+ class Handle
+ <<interface>> Handle~Message~
+ Handle : +handle_message(~Message~ message)
+
+ class MyPluginBuilder
+ MyPluginBuilder <|-- PluginBuilder : implements
+
+ class MyPlugin
+ MyPlugin <|-- Plugin : implements
+ MyPlugin <|-- Handle~MyMessage~ : implements
+```
+
+## What a Message is
+
+A message can be anything that is able to implement the `Message` trait.
+
+This trait does not require the message to implement functionality, it just
+requires an implementing structure to implement `Debug` and `Send` and some
+others.
+
+For example:
+
+```rust
+#[derive(Debug)]
+struct Value(f64);
+
+impl Message for Value {}
+```
+
+## How messages are send
+
+Messages can be send between plugins, but can also be send to the core of
+thin-edge.io and to the plugin itself (a Plugin can send messages to itself).
+
+To be able to send a Message, an `Address` of that Plugin needs to be known.
+That `Address` can be fetched during the instantiation of the Plugin. The
+`PluginBuilder::instantiate()` function gets a reference to a `PluginDirectory`.
+That `PluginDirectory` is basically an "address book", that can be asked for
+addresses to other plugins, by specifying their name.
+That name usually has to be configured.
+
+
+## Plugin lifecycle
+
+The above illustrated how a plugin lifecycle looks like. The following diagram
+is here to visualize the words from above (minus some details that are not
+required for understanding the grand scheme of things):
+
+```mermaid
+sequenceDiagram
+ thinedge->>+PluginBuilder: instantiate()
+ PluginBuilder->>Configuration: get_target_address()
+ Configuration->>PluginBuilder: target_address
+ PluginBuilder->>PluginDirectory: get_address_for<MeasurementReceiver>(target_address)
+ PluginDirectory->>PluginBuilder: Address<MeasurementReceiver>
+ PluginBuilder->>+Plugin: new(Address<MeasurementReceiver>)
+ Plugin-->PluginBuilder: Plugin
+ PluginBuilder->thinedge : Plugin
+ thinedge->>+Plugin : start()
+ Plugin-->-thinedge : start
+ loop Lifecycle
+ thinedge-->Plugin : handle_message()
+ end
+ thinedge->>+Plugin : shutdown()
+ Plugin-->-thinedge : shutdown
+```
+
+
[^1]: Name is subject to change.
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index a9305fde..454be2ea 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::FutureExt;
@@ -9,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -294,11 +295,17 @@ impl Plugin for CriticalService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -309,13 +316,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -349,7 +355,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -418,59 +426,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
-
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -478,11 +482,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs
index f2b3dcea..82ff3130 100644
--- a/crates/core/tedge_api/examples/universal_log.rs
+++ b/crates/core/tedge_api/examples/universal_log.rs
@@ -1,6 +1,7 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
+use futures::FutureExt;
use tedge_api::{
address::ReplySenderFor,
message::{AnyMessage, MessageType},
@@ -8,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -229,6 +231,7 @@ impl Plugin for LogService {
}
}
+//
// The following pieces of code would be implemented by a "core" component, that is responsible for
// setting up plugins and their communication.
//
@@ -238,11 +241,17 @@ impl Plugin for LogService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -253,13 +262,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -293,7 +301,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -356,59 +366,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
-
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -416,11 +422,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index ef50edb4..4f786ea5 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -1,6 +1,8 @@
-use std::{marker::PhantomData, time::Duration};
+use std::{marker::PhantomData, sync::Arc, time::Duration};
-use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
+use futures::future::BoxFuture;
+use tokio::sync::RwLock;
+use tracing::{instrument, trace};
use crate::{
message::MessageType,
@@ -11,15 +13,103 @@ use crate::{
pub type AnyMessageBox = Box<dyn Message>;
#[doc(hidden)]
-#[derive(Debug)]
pub struct InternalMessage {
pub(crate) data: AnyMessageBox,
pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
+impl std::fmt::Debug for InternalMessage {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InternalMessage")
+ .field("data", &self.data)
+ .finish_non_exhaustive()
+ }
+}
+
+#[doc(hidden)]
+#[derive(Debug)]
+pub enum ShouldWait {
+ Wait,
+ DontWait,
+ Timeout(std::time::Duration),
+}
+
+#[doc(hidden)]
+pub type MessageFutureProducer = dyn Fn(InternalMessage, ShouldWait) -> BoxFuture<'static, Result<(), InternalMessage>>
+ + Sync
+ + Send;
+
+#[doc(hidden)]
+#[derive(Clone)]
+pub struct InnerMessageSender {
+ #[doc(hidden)]
+ pub send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for InnerMessageSender {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InnerMessageSender").finish_non_exhaustive()
+ }
+}
+
+impl InnerMessageSender {
+ pub fn new(send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>) -> Self {
+ Self { send_provider }
+ }
+
+ pub async fn init_with(&self, producer: Box<MessageFutureProducer>) {
+ let mut lock = self.send_provider.write().await;
+ *lock = Some(producer);
+ }
+
+ pub async fn reset(&self) {
+ let mut lock = self.send_provider.write().await;
+ *lock = None;
+ }
+
+ #[instrument(skip_all, level = "trace")]
+ async fn send(&self, message: InternalMessage) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ trace!(sender_exists = ?lock.is_some(), "Checking for internal sender");
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::Wait);
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+
+ async fn try_send(&self, message: InternalMessage) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::DontWait);
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+
+ async fn send_timeout(
+ &self,
+ message: InternalMessage,
+ timeout: Duration,
+ ) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::Timeout(timeout));
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+}
+
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
-pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>;
+pub type MessageSender = InnerMessageSender;
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
@@ -94,7 +184,7 @@ impl<RB: ReceiverBundle> Address<RB> {
reply_sender: sender,
})
.await
- .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?;
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -114,7 +204,7 @@ impl<RB: ReceiverBundle> Address<RB> {
///
/// The error is returned if the receiving side (the plugin that is addressed) cannot currently
/// receive messages (either because it is closed or the queue is full).
- pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
+ pub async fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -122,11 +212,8 @@ impl<RB: ReceiverBundle> Address<RB> {
data: Box::new(msg),
reply_sender: sender,
})
- .map_err(|msg| match msg {
- TrySendError::Full(data) | TrySendError::Closed(data) => {
- *data.data.downcast::<M>().unwrap()
- }
- })?;
+ .await
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -157,11 +244,7 @@ impl<RB: ReceiverBundle> Address<RB> {
timeout,
)
.await
- .map_err(|msg| match msg {
- SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => {
- *data.data.downcast::<M>().unwrap()
- }
- })?;
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -319,10 +402,13 @@ macro_rules! make_receiver_bundle {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use static_assertions::{assert_impl_all, assert_not_impl_any};
+ use tokio::sync::RwLock;
use crate::{
- address::{ReplyReceiverFor, ReplySenderFor},
+ address::{InnerMessageSender, ReplyReceiverFor, ReplySenderFor},
make_receiver_bundle,
plugin::{AcceptsReplies, Message},
Address,
@@ -370,7 +456,7 @@ mod tests {
#[test]
fn check_could_receive() {
- let (sender, _receiver) = tokio::sync::mpsc::channel(1);
+ let sender = InnerMessageSender::new(Arc::new(RwLock::new(None)));
let addr: Address<FooBar> = Address {
_pd: std::marker::PhantomData,
sender,
diff --git a/crates/core/tedge_api/src/config.rs b/crates/core/tedge_api/src/config.rs
index ff25e1b4..a2a42bb7 100644
--- a/crates/core/tedge_api/src/config.rs
+++ b/crates/core/tedge_api/src/config.rs
@@ -29,7 +29,7 @@ impl ConfigDescription {
&self.kind
}
- /// Set or replace the documentation of this [`Config`]
+ /// Set or replace the documentation of this [`ConfigDescription`]
#[must_use]
pub fn with_doc(mut self, doc: Option<&'static str>) -> Self {
self.doc = doc;
@@ -63,7 +63,7 @@ pub enum ConfigEnumKind {
Untagged,
}
-/// The specific kind a [`Config`] represents
+/// The specific kind a [`ConfigDescription`] represents
#[derive(Debug, Serialize, PartialEq)]
pub enum ConfigKind {
/// Config represents a boolean `true`/`false`
@@ -120,11 +120,11 @@ pub enum ConfigKind {
),
}
-/// Turn a plugin configuration into a [`Config`] object
+/// Turn a plugin configuration into a [`ConfigDescription`] object
///
/// Plugin authors are expected to implement this for their configurations to give users
pub trait AsConfig {
- /// Get a [`Config`] object from the type
+ /// Get a [`ConfigDescription`] object from the type
fn as_config() -> ConfigDescription;
}
diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs
index 92f14c2b..a342734a 100644
--- a/crates/core/tedge_api/src/plugin.rs
+++ b/crates/core/tedge_api/src/plugin.rs
@@ -173,8 +173,8 @@ pub trait PluginBuilder<PD: PluginDirectory>: Sync + Send + 'static {
/// Get a generic configuration description of what kind of input the
/// plugin expects.
///
- /// See [`Config`] as well as [`AsConfig`] for how to implement and use these types and
- /// interfaces.
+ /// See [`ConfigDescription`] as well as [`AsConfig`](crate::config::AsConfig) for how to
+ /// implement and use these types and interfaces.
fn kind_configuration() -> Option<ConfigDescription>
where
Self: Sized,
@@ -317,14 +317,26 @@ pub trait Plugin: Sync + Send + DowncastSync {
///
/// This function will be called by the core of thin-edge before any message-passing starts.
/// The plugin is free to for example spawn up background tasks here.
- async fn start(&mut self) -> Result<(), PluginError>;
+ async fn start(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
+
+ /// The main function of the plugin
+ ///
+ /// This method is called once all plugins have [`start`](Plugin::start)ed. The plugin is free
+ /// to spawn new tasks or loop indefinitely (while still observing the cancel token!)
+ async fn main(&self) -> Result<(), PluginError> {
+ Ok(())
+ }
/// Gracefully handle shutdown
///
/// This function is called by the core of thin-edge before the software shuts down as a whole,
/// giving the plugin the opportunity to clear up resources (e.g. deallocate file handles
/// cleanly, shut down network connections properly, etc...).
- async fn shutdown(&mut self) -> Result<(), PluginError>;
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ Ok(())
+ }
}
impl_downcast!(sync Plugin);