summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs121
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs123
-rw-r--r--crates/core/tedge_api/src/address.rs99
3 files changed, 214 insertions, 129 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 37e9c8ec..654c4311 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::FutureExt;
@@ -9,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -294,11 +295,17 @@ impl Plugin for CriticalService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -309,13 +316,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -349,7 +355,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -412,59 +420,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
-
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -472,11 +476,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs
index f2b3dcea..82ff3130 100644
--- a/crates/core/tedge_api/examples/universal_log.rs
+++ b/crates/core/tedge_api/examples/universal_log.rs
@@ -1,6 +1,7 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
+use futures::FutureExt;
use tedge_api::{
address::ReplySenderFor,
message::{AnyMessage, MessageType},
@@ -8,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -229,6 +231,7 @@ impl Plugin for LogService {
}
}
+//
// The following pieces of code would be implemented by a "core" component, that is responsible for
// setting up plugins and their communication.
//
@@ -238,11 +241,17 @@ impl Plugin for LogService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -253,13 +262,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -293,7 +301,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -356,59 +366,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
-
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -416,11 +422,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index ef50edb4..007be13f 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -1,6 +1,7 @@
-use std::{marker::PhantomData, time::Duration};
+use std::{marker::PhantomData, sync::Arc, time::Duration};
-use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
+use futures::future::BoxFuture;
+use tokio::sync::RwLock;
use crate::{
message::MessageType,
@@ -17,9 +18,77 @@ pub struct InternalMessage {
pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
+#[doc(hidden)]
+#[derive(Debug)]
+pub enum ShouldWait {
+ Wait,
+ DontWait,
+ Timeout(std::time::Duration),
+}
+
+#[doc(hidden)]
+pub type MessageFutureProducer = dyn Fn(InternalMessage, ShouldWait) -> BoxFuture<'static, Result<(), InternalMessage>>
+ + Sync
+ + Send;
+
+#[doc(hidden)]
+#[derive(Clone)]
+pub struct InnerMessageSender {
+ send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for InnerMessageSender {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("InnerMessageSender").finish_non_exhaustive()
+ }
+}
+
+impl InnerMessageSender {
+ pub fn new(send_provider: Arc<RwLock<Option<Box<MessageFutureProducer>>>>) -> Self {
+ Self { send_provider }
+ }
+
+ async fn send(&self, message: InternalMessage) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::Wait);
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+
+ async fn try_send(&self, message: InternalMessage) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::DontWait);
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+
+ async fn send_timeout(
+ &self,
+ message: InternalMessage,
+ timeout: Duration,
+ ) -> Result<(), InternalMessage> {
+ let lock = self.send_provider.read().await;
+ if let Some(sender) = &*lock {
+ let sender = (*sender)(message, ShouldWait::Timeout(timeout));
+
+ sender.await
+ } else {
+ Err(message)
+ }
+ }
+}
+
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
-pub type MessageSender = tokio::sync::mpsc::Sender<InternalMessage>;
+pub type MessageSender = InnerMessageSender;
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
#[doc(hidden)]
@@ -94,7 +163,7 @@ impl<RB: ReceiverBundle> Address<RB> {
reply_sender: sender,
})
.await
- .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?;
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -114,7 +183,7 @@ impl<RB: ReceiverBundle> Address<RB> {
///
/// The error is returned if the receiving side (the plugin that is addressed) cannot currently
/// receive messages (either because it is closed or the queue is full).
- pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
+ pub async fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -122,11 +191,8 @@ impl<RB: ReceiverBundle> Address<RB> {
data: Box::new(msg),
reply_sender: sender,
})
- .map_err(|msg| match msg {
- TrySendError::Full(data) | TrySendError::Closed(data) => {
- *data.data.downcast::<M>().unwrap()
- }
- })?;
+ .await
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -157,11 +223,7 @@ impl<RB: ReceiverBundle> Address<RB> {
timeout,
)
.await
- .map_err(|msg| match msg {
- SendTimeoutError::Timeout(data) | SendTimeoutError::Closed(data) => {
- *data.data.downcast::<M>().unwrap()
- }
- })?;
+ .map_err(|msg| *msg.data.downcast::<M>().unwrap())?;
Ok(ReplyReceiverFor {
_pd: PhantomData,
@@ -319,10 +381,13 @@ macro_rules! make_receiver_bundle {
#[cfg(test)]
mod tests {
+ use std::sync::Arc;
+
use static_assertions::{assert_impl_all, assert_not_impl_any};
+ use tokio::sync::RwLock;
use crate::{
- address::{ReplyReceiverFor, ReplySenderFor},
+ address::{InnerMessageSender, ReplyReceiverFor, ReplySenderFor},
make_receiver_bundle,
plugin::{AcceptsReplies, Message},
Address,
@@ -370,7 +435,7 @@ mod tests {
#[test]
fn check_could_receive() {
- let (sender, _receiver) = tokio::sync::mpsc::channel(1);
+ let sender = InnerMessageSender::new(Arc::new(RwLock::new(None)));
let addr: Address<FooBar> = Address {
_pd: std::marker::PhantomData,
sender,