summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs46
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs430
-rw-r--r--crates/core/tedge_api/src/address.rs80
-rw-r--r--crates/core/tedge_api/src/message.rs131
-rw-r--r--crates/core/tedge_api/src/plugin.rs114
5 files changed, 685 insertions, 116 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 95e6ca15..37e9c8ec 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,15 +1,11 @@
-use std::{
- any::TypeId,
- collections::{HashMap, HashSet},
- time::Duration,
-};
+use std::{collections::HashMap, time::Duration};
use async_trait::async_trait;
use futures::FutureExt;
use tedge_api::{
- address::ReplySender,
- message::NoReply,
- plugin::{BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt},
+ address::ReplySenderFor,
+ message::MessageType,
+ plugin::{AcceptsReplies, BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt},
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
@@ -17,7 +13,8 @@ use tedge_api::{
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
struct Heartbeat;
-impl Message for Heartbeat {
+impl Message for Heartbeat {}
+impl AcceptsReplies for Heartbeat {
type Reply = HeartbeatStatus;
}
@@ -27,9 +24,7 @@ enum HeartbeatStatus {
Alive,
Degraded,
}
-impl Message for HeartbeatStatus {
- type Reply = NoReply;
-}
+impl Message for HeartbeatStatus {}
/// A PluginBuilder that gets used to build a HeartbeatService plugin instance
#[derive(Debug)]
@@ -253,7 +248,7 @@ impl Handle<Heartbeat> for CriticalService {
async fn handle_message(
&self,
_message: Heartbeat,
- sender: ReplySender<HeartbeatStatus>,
+ sender: ReplySenderFor<Heartbeat>,
) -> Result<(), PluginError> {
println!("CriticalService: Received Heartbeat!");
let mut status = self.status.lock().await;
@@ -301,23 +296,13 @@ impl Plugin for CriticalService {
/// Helper type for keeping information about plugins during runtime
#[derive(Debug)]
struct PluginInfo {
- types: HashSet<(&'static str, TypeId)>,
+ types: Vec<MessageType>,
receiver: Option<tedge_api::address::MessageReceiver>,
sender: tedge_api::address::MessageSender,
}
-impl Clone for PluginInfo {
- fn clone(&self) -> Self {
- PluginInfo {
- types: self.types.clone(),
- receiver: None,
- sender: self.sender.clone(),
- }
- }
-}
-
/// The type that provides the communication infrastructure to the plugins.
-#[derive(Clone, Debug)]
+#[derive(Debug)]
struct Communication {
plugins: HashMap<String, PluginInfo>,
}
@@ -328,7 +313,7 @@ impl Communication {
self.plugins.insert(
name.to_owned(),
PluginInfo {
- types: PB::kind_message_types().into(),
+ types: PB::kind_message_types().into_types(),
sender,
receiver: Some(receiver),
},
@@ -341,7 +326,7 @@ impl PluginDirectory for Communication {
&self,
name: &str,
) -> Result<Address<MB>, tedge_api::error::DirectoryError> {
- let types = MB::get_ids().into_iter().collect();
+ let asked_types: Vec<_> = MB::get_ids().into_iter().collect();
let plug = self.plugins.get(name).unwrap_or_else(|| {
// This is an example, so we panic!() here.
@@ -353,12 +338,15 @@ impl PluginDirectory for Communication {
)
});
- if !plug.types.is_superset(&types) {
+ if !asked_types
+ .iter()
+ .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type)))
+ {
// This is an example, so we panic!() here
// In real-world, we would do some reporting and return an error
panic!(
"Asked for {:#?} but plugin {} only has types {:#?}",
- types, name, plug.types,
+ asked_types, name, plug.types,
);
} else {
Ok(Address::new(plug.sender.clone()))
diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs
new file mode 100644
index 00000000..f2b3dcea
--- /dev/null
+++ b/crates/core/tedge_api/examples/universal_log.rs
@@ -0,0 +1,430 @@
+use std::{collections::HashMap, time::Duration};
+
+use async_trait::async_trait;
+use tedge_api::{
+ address::ReplySenderFor,
+ message::{AnyMessage, MessageType},
+ plugin::{AnyMessages, BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt},
+ Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
+ PluginError,
+};
+
+/// A message that represents a heartbeat that gets sent to plugins
+#[derive(Debug)]
+struct Heartbeat;
+impl Message for Heartbeat {}
+
+#[derive(Debug)]
+struct RandomData;
+impl Message for RandomData {}
+
+/// A PluginBuilder that gets used to build a HeartbeatService plugin instance
+#[derive(Debug)]
+struct HeartbeatServiceBuilder;
+
+#[derive(miette::Diagnostic, thiserror::Error, Debug)]
+enum HeartbeatBuildError {
+ #[error(transparent)]
+ TomlParse(#[from] toml::de::Error),
+}
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder {
+ fn kind_name() -> &'static str {
+ todo!()
+ }
+
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
+ HeartbeatService::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ _config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ Ok(())
+ }
+
+ async fn instantiate(
+ &self,
+ config: PluginConfiguration,
+ cancellation_token: CancellationToken,
+ plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ PD: 'async_trait,
+ {
+ let hb_config: HeartbeatConfig =
+ toml::Value::try_into(config).map_err(HeartbeatBuildError::from)?;
+ let monitored_services = hb_config
+ .plugins
+ .iter()
+ .map(|name| {
+ plugin_dir
+ .get_address_for::<HeartbeatMessages>(name)
+ .map(|addr| (name.clone(), addr))
+ })
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(HeartbeatService::new(
+ Duration::from_millis(hb_config.interval),
+ monitored_services,
+ cancellation_token,
+ )
+ .finish())
+ }
+}
+
+/// The configuration a HeartbeatServices can receive is represented by this type
+#[derive(serde::Deserialize, Debug)]
+struct HeartbeatConfig {
+ interval: u64,
+ plugins: Vec<String>,
+}
+
+/// The HeartbeatService type represents the actual plugin
+struct HeartbeatService {
+ interval_duration: Duration,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
+ cancel_token: CancellationToken,
+}
+
+impl PluginDeclaration for HeartbeatService {
+ type HandledMessages = ();
+}
+
+#[async_trait]
+impl Plugin for HeartbeatService {
+ /// The setup function of the HeartbeatService can be used by the plugin author to setup for
+ /// example a connection to an external service. In this example, it is simply used to send the
+ /// heartbeat
+ ///
+ /// Because this example is _simple_, we do not spawn a background task that periodically sends
+ /// the heartbeat. In a real world scenario, that background task would be started here.
+ async fn start(&mut self) -> Result<(), PluginError> {
+ println!(
+ "HeartbeatService: Setting up heartbeat service with interval: {:?}!",
+ self.interval_duration
+ );
+
+ for service in &self.monitored_services {
+ let mut interval = tokio::time::interval(self.interval_duration);
+ let service = service.clone();
+ let cancel_token = self.cancel_token.child_token();
+ tokio::spawn(async move {
+ loop {
+ tokio::select! {
+ _ = interval.tick() => {}
+ _ = cancel_token.cancelled() => {
+ break
+ }
+ }
+ println!(
+ "HeartbeatService: Sending heartbeat to service: {:?}",
+ service
+ );
+ service.1.send_and_wait(Heartbeat).await.unwrap();
+ service.1.send_and_wait(RandomData).await.unwrap();
+ }
+ });
+ }
+ Ok(())
+ }
+
+ /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ println!("HeartbeatService: Shutting down heartbeat service!");
+ Ok(())
+ }
+}
+
+impl HeartbeatService {
+ fn new(
+ interval_duration: Duration,
+ monitored_services: Vec<(String, Address<HeartbeatMessages>)>,
+ cancel_token: CancellationToken,
+ ) -> Self {
+ Self {
+ interval_duration,
+ monitored_services,
+ cancel_token,
+ }
+ }
+}
+
+/// A plugin that receives heartbeats
+struct LogServiceBuilder;
+
+// declare a set of messages that the CriticalService can receive.
+// In this example, it can only receive a Heartbeat.
+tedge_api::make_receiver_bundle!(struct HeartbeatMessages(Heartbeat, RandomData));
+
+#[async_trait]
+impl<PD: PluginDirectory> PluginBuilder<PD> for LogServiceBuilder {
+ fn kind_name() -> &'static str {
+ todo!()
+ }
+
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
+ LogService::get_handled_types()
+ }
+
+ async fn verify_configuration(
+ &self,
+ _config: &PluginConfiguration,
+ ) -> Result<(), tedge_api::error::PluginError> {
+ Ok(())
+ }
+
+ async fn instantiate(
+ &self,
+ _config: PluginConfiguration,
+ _cancellation_token: CancellationToken,
+ _plugin_dir: &PD,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ PD: 'async_trait,
+ {
+ Ok(LogService {}.finish())
+ }
+}
+
+/// The actual "critical" plugin implementation
+struct LogService {}
+
+/// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat>
+/// implementation
+#[async_trait]
+impl Handle<AnyMessage> for LogService {
+ async fn handle_message(
+ &self,
+ message: AnyMessage,
+ _sender: ReplySenderFor<AnyMessage>,
+ ) -> Result<(), PluginError> {
+ println!("LogService: Received Message: {:?}", message);
+ Ok(())
+ }
+}
+
+impl PluginDeclaration for LogService {
+ type HandledMessages = AnyMessages;
+}
+
+/// Because the CriticalService is of course a Plugin, it needs an implementation for that as well.
+#[async_trait]
+impl Plugin for LogService {
+ async fn start(&mut self) -> Result<(), PluginError> {
+ println!("CriticalService: Setting up critical service!");
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ println!("CriticalService: Shutting down critical service service!");
+ Ok(())
+ }
+}
+
+// The following pieces of code would be implemented by a "core" component, that is responsible for
+// setting up plugins and their communication.
+//
+// Plugin authors do not need to write this code, but need a basic understanding what it does and
+// how it works.
+// As this is an example, we implement it here to showcase how it is done.
+//
+
+/// Helper type for keeping information about plugins during runtime
+#[derive(Debug)]
+struct PluginInfo {
+ types: Vec<MessageType>,
+ receiver: Option<tedge_api::address::MessageReceiver>,
+ sender: tedge_api::address::MessageSender,
+}
+
+/// The type that provides the communication infrastructure to the plugins.
+#[derive(Debug)]
+struct Communication {
+ plugins: HashMap<String, PluginInfo>,
+}
+
+impl Communication {
+ pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
+ let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ self.plugins.insert(
+ name.to_owned(),
+ PluginInfo {
+ types: PB::kind_message_types().into_types(),
+ sender,
+ receiver: Some(receiver),
+ },
+ );
+ }
+}
+
+impl PluginDirectory for Communication {
+ fn get_address_for<MB: tedge_api::address::ReceiverBundle>(
+ &self,
+ name: &str,
+ ) -> Result<Address<MB>, tedge_api::error::DirectoryError> {
+ let asked_types: Vec<_> = MB::get_ids().into_iter().collect();
+
+ let plug = self.plugins.get(name).unwrap_or_else(|| {
+ // This is an example, so we panic!() here.
+ // In real-world, we would do some reporting and return an error
+ panic!(
+ "Didn't find plugin with name {}, got: {:?}",
+ name,
+ self.plugins.keys().collect::<Vec<_>>()
+ )
+ });
+
+ if !asked_types
+ .iter()
+ .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type)))
+ {
+ // This is an example, so we panic!() here
+ // In real-world, we would do some reporting and return an error
+ panic!(
+ "Asked for {:#?} but plugin {} only has types {:#?}",
+ asked_types, name, plug.types,
+ );
+ } else {
+ Ok(Address::new(plug.sender.clone()))
+ }
+ }
+
+ fn get_address_for_core(&self) -> Address<tedge_api::CoreMessages> {
+ todo!()
+ }
+}
+
+/// Helper function
+async fn build_critical_plugin(
+ comms: &mut Communication,
+ cancel_token: CancellationToken,
+) -> BuiltPlugin {
+ let csb = LogServiceBuilder;
+
+ let config = toml::from_str("").unwrap();
+
+ csb.instantiate(config, cancel_token, comms).await.unwrap()
+}
+
+/// Helper function
+async fn build_heartbeat_plugin(
+ comms: &mut Communication,
+ cancel_token: CancellationToken,
+) -> BuiltPlugin {
+ let hsb = HeartbeatServiceBuilder;
+
+ let config = toml::from_str(
+ r#"
+ interval = 5000
+ plugins = ["critical-service"]
+ "#,
+ )
+ .unwrap();
+
+ hsb.instantiate(config, cancel_token, comms).await.unwrap()
+}
+
+#[tokio::main]
+async fn main() {
+ // This implementation now ties everything together
+ //
+ // This would be implemented in a CLI binary using the "core" implementation to boot things up.
+ //
+ // Here, we just tie everything together in the minimal possible way, to showcase how such a
+ // setup would basically work.
+
+ let mut comms = Communication {
+ plugins: HashMap::new(),
+ };
+
+ // in a main(), the core would be told what plugins are available.
+ // This would, in a real-world scenario, not happen on the "communication" type directly.
+ // Still, this needs to be done by a main()-author.
+ comms.declare_plugin::<LogServiceBuilder>("critical-service");
+ comms.declare_plugin::<HeartbeatServiceBuilder>("heartbeat");
+
+ // The following would all be handled by the core implementation, a main() author would only
+ // need to call some kind of "run everything" function
+
+ let cancel_token = CancellationToken::new();
+
+ let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
+
+ heartbeat.plugin_mut().start().await.unwrap();
+ critical_service.plugin_mut().start().await.unwrap();
+
+ let mut recv = comms
+ .plugins
+ .get_mut("heartbeat")
+ .unwrap()
+ .receiver
+ .take()
+ .unwrap();
+
+ let hb_cancel_token = cancel_token.child_token();
+ let hb_handle = tokio::task::spawn(async move {
+ let hb = heartbeat;
+
+ loop {
+ tokio::select! {
+ Some(msg) = recv.recv() => {
+ hb.handle_message(msg).await.unwrap();
+ }
+ _ = hb_cancel_token.cancelled() => break,
+ }
+ }
+
+ hb
+ });
+
+ let mut recv = comms
+ .plugins
+ .get_mut("critical-service")
+ .unwrap()
+ .receiver
+ .take()
+ .unwrap();
+
+ let cs_cancel_token = cancel_token.child_token();
+ let cs_handle = tokio::task::spawn(async move {
+ let cs = critical_service;
+
+ loop {
+ tokio::select! {
+ Some(msg) = recv.recv() => {
+ cs.handle_message(msg).await.unwrap();
+ }
+ _ = cs_cancel_token.cancelled() => break,
+ }
+ }
+
+ cs
+ });
+
+ println!("Core: Stopping everything in 10 seconds!");
+ tokio::time::sleep(Duration::from_secs(12)).await;
+
+ println!("Core: SHUTTING DOWN");
+ cancel_token.cancel();
+
+ let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
+
+ heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ critical_service
+ .unwrap()
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
+
+ println!("Core: Shut down");
+}
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index 619941ad..93a51450 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -2,16 +2,19 @@ use std::{marker::PhantomData, time::Duration};
use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError};
-use crate::plugin::Message;
+use crate::{
+ message::MessageType,
+ plugin::{AcceptsReplies, Message},
+};
#[doc(hidden)]
-pub type AnySendBox = Box<dyn std::any::Any + Send>;
+pub type AnyMessageBox = Box<dyn Message>;
#[doc(hidden)]
#[derive(Debug)]
pub struct InternalMessage {
- pub(crate) data: AnySendBox,
- pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnySendBox>,
+ pub(crate) data: AnyMessageBox,
+ pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
@@ -79,7 +82,7 @@ impl<RB: ReceiverBundle> Address<RB> {
/// could become an issue use something akin to timeout (like
/// [`timeout`](tokio::time::timeout)).
/// For details on sending and receiving, see `tokio::sync::mpsc::Sender`.
- pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M>
+ pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M>
where
RB: Contains<M>,
{
@@ -93,7 +96,7 @@ impl<RB: ReceiverBundle> Address<RB> {
.await
.map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?;
- Ok(ReplyReceiver {
+ Ok(ReplyReceiverFor {
_pd: PhantomData,
reply_recv: receiver,
})
@@ -111,7 +114,7 @@ impl<RB: ReceiverBundle> Address<RB> {
///
/// The error is returned if the receiving side (the plugin that is addressed) cannot currently
/// receive messages (either because it is closed or the queue is full).
- pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> {
+ pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -125,7 +128,7 @@ impl<RB: ReceiverBundle> Address<RB> {
}
})?;
- Ok(ReplyReceiver {
+ Ok(ReplyReceiverFor {
_pd: PhantomData,
reply_recv: receiver,
})
@@ -142,7 +145,7 @@ impl<RB: ReceiverBundle> Address<RB> {
&self,
msg: M,
timeout: Duration,
- ) -> Result<ReplyReceiver<M::Reply>, M> {
+ ) -> Result<ReplyReceiverFor<M>, M> {
let (sender, receiver) = tokio::sync::oneshot::channel();
self.sender
@@ -160,7 +163,7 @@ impl<RB: ReceiverBundle> Address<RB> {
}
})?;
- Ok(ReplyReceiver {
+ Ok(ReplyReceiverFor {
_pd: PhantomData,
reply_recv: receiver,
})
@@ -169,12 +172,12 @@ impl<RB: ReceiverBundle> Address<RB> {
#[derive(Debug)]
/// Listener that allows one to wait for a reply as sent through [`Address::send_and_wait`]
-pub struct ReplyReceiver<M> {
+pub struct ReplyReceiverFor<M> {
_pd: PhantomData<fn(M)>,
- reply_recv: tokio::sync::oneshot::Receiver<AnySendBox>,
+ reply_recv: tokio::sync::oneshot::Receiver<AnyMessageBox>,
}
-impl<M: Message> ReplyReceiver<M> {
+impl<M: Message> ReplyReceiverFor<M> {
/// Wait for a reply until for the duration given in `timeout`
///
/// ## Note
@@ -186,7 +189,11 @@ impl<M: Message> ReplyReceiver<M> {
/// It is also important, that just because a given `M: Message` has a `M::Reply` type set,
/// that the plugin that a message was sent to does _not_ have to reply with it. It can choose
/// to not do so.
- pub async fn wait_for_reply(self, timeout: Duration) -> Result<M, ReplyError> {
+ pub async fn wait_for_reply<R>(self, timeout: Duration) -> Result<R, ReplyError>
+ where
+ R: Message,
+ M: AcceptsReplies<Reply = R>,
+ {
let data = tokio::time::timeout(timeout, self.reply_recv)
.await
.map_err(|_| ReplyError::Timeout)?
@@ -199,13 +206,13 @@ impl<M: Message> ReplyReceiver<M> {
#[derive(Debug)]
/// Allows the [`Handle`](crate::plugin::Handle) implementation to reply with a given message as
/// specified by the currently handled message.
-pub struct ReplySender<M> {
+pub struct ReplySenderFor<M> {
_pd: PhantomData<fn(M)>,
- reply_sender: tokio::sync::oneshot::Sender<AnySendBox>,
+ reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>,
}
-impl<M: Message> ReplySender<M> {
- pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnySendBox>) -> Self {
+impl<M: Message> ReplySenderFor<M> {
+ pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>) -> Self {
Self {
_pd: PhantomData,
reply_sender,
@@ -213,7 +220,11 @@ impl<M: Message> ReplySender<M> {
}
/// Reply to the originating plugin with the given message
- pub fn reply(self, msg: M) -> Result<(), M> {
+ pub fn reply<R>(self, msg: R) -> Result<(), M>
+ where
+ R: Message,
+ M: AcceptsReplies<Reply = R>,
+ {
self.reply_sender
.send(Box::new(msg))
.map_err(|msg| *msg.downcast::<M>().unwrap())
@@ -244,7 +255,7 @@ pub enum ReplyError {
#[doc(hidden)]
pub trait ReceiverBundle: Send + 'static {
- fn get_ids() -> Vec<(&'static str, std::any::TypeId)>;
+ fn get_ids() -> Vec<MessageType>;
}
#[doc(hidden)]
@@ -258,21 +269,17 @@ pub trait Contains<M: Message> {}
/// ## Example
///
/// ```rust
-/// # use tedge_api::{Message, make_receiver_bundle, message::NoReply};
+/// # use tedge_api::{Message, make_receiver_bundle};
///
/// #[derive(Debug)]
/// struct IntMessage(u8);
///
-/// impl Message for IntMessage {
-/// type Reply = NoReply;
-/// }
+/// impl Message for IntMessage { }
///
/// #[derive(Debug)]
/// struct StatusMessage(String);
///
-/// impl Message for StatusMessage {
-/// type Reply = NoReply;
-/// }
+/// impl Message for StatusMessage { }
///
/// make_receiver_bundle!(struct MessageReceiver(IntMessage, StatusMessage));
///
@@ -289,9 +296,9 @@ macro_rules! make_receiver_bundle {
impl $crate::address::ReceiverBundle for $name {
#[allow(unused_parens)]
- fn get_ids() -> Vec<(&'static str, std::any::TypeId)> {
+ fn get_ids() -> Vec<$crate::message::MessageType> {
vec![
- $((std::any::type_name::<$msg>(), std::any::TypeId::of::<$msg>())),+
+ $($crate::message::MessageType::for_message::<$msg>()),+
]
}
}
@@ -305,25 +312,24 @@ mod tests {
use static_assertions::{assert_impl_all, assert_not_impl_any};
use crate::{
- address::{ReplyReceiver, ReplySender},
+ address::{ReplyReceiverFor, ReplySenderFor},
make_receiver_bundle,
- plugin::Message,
+ plugin::{AcceptsReplies, Message},
Address,
};
#[derive(Debug)]
struct Foo;
- impl Message for Foo {
+ impl Message for Foo {}
+ impl AcceptsReplies for Foo {
type Reply = Bar;
}
#[derive(Debug)]
struct Bar;
- impl Message for Bar {
- type Reply = Bar;
- }
+ impl Message for Bar {}
make_receiver_bundle!(struct FooBar(Foo, Bar));
@@ -344,6 +350,6 @@ mod tests {
assert_impl_all!(Address<FooBar>: Clone, Send, Sync);
assert_not_impl_any!(NotSync: Send, Sync);
- assert_impl_all!(ReplySender<NotSync>: Send, Sync);
- assert_impl_all!(ReplyReceiver<NotSync>: Send, Sync);
+ assert_impl_all!(ReplySenderFor<NotSync>: Send, Sync);
+ assert_impl_all!(ReplyReceiverFor<NotSync>: Send, Sync);
}
diff --git a/crates/core/tedge_api/src/message.rs b/crates/core/tedge_api/src/message.rs
index decb41e5..2fd5faa4 100644
--- a/crates/core/tedge_api/src/message.rs
+++ b/crates/core/tedge_api/src/message.rs
@@ -1,19 +1,134 @@
-use crate::plugin::Message;
+use crate::{address::AnyMessageBox, plugin::Message};
+/// A message that can contain any other message
+///
+/// This is solely used in conjunction with [`AnyMessages`](crate::plugin::AnyMessages) and should not generally be used
+/// otherwise.
+///
+/// To construct it, you will need to have a message and call [`AnyMessage::from_message`]
#[derive(Debug)]
-/// A message which cannot be constructed and thus can be used when no reply is expected.
-pub enum NoReply {}
+pub struct AnyMessage(pub(crate) AnyMessageBox);
-impl Message for NoReply {
- type Reply = NoReply;
+impl std::ops::Deref for AnyMessage {
+ type Target = dyn Message;
+
+ fn deref(&self) -> &Self::Target {
+ &*self.0
+ }
+}
+
+impl AnyMessage {
+ /// Construct a new [`AnyMessage`] from a message
+ pub fn from_message<M: Message>(m: M) -> Self {
+ AnyMessage(Box::new(m))
+ }
+
+ /// Try to downcast this message to a specific message
+ pub fn downcast<M: Message>(self) -> Result<M, Self> {
+ Ok(*self.0.downcast().map_err(AnyMessage)?)
+ }
+
+ /// Take out the raw boxed message
+ ///
+ /// Note
+ ///
+ /// This is an advanced API and should only be used if needed.
+ /// Prefer using `AnyMessage::downcast` if possible
+ pub fn into_raw(self) -> AnyMessageBox {
+ self.0
+ }
+}
+
+impl Message for AnyMessage {}
+
+/// The type of a message as used by `tedge_api` to represent a type
+#[derive(Debug, Clone)]
+pub struct MessageType {
+ name: &'static str,
+ kind: MessageKind,
+}
+
+#[derive(Debug, Clone)]
+enum MessageKind {
+ Wildcard,
+ Typed(std::any::TypeId),
+}
+
+impl MessageType {
+ /// Does this [`MessageType`] satisfy another [`MessageType`]
+ ///
+ /// ## Note
+ /// A message type from [`AnyMessage`] acts as a 'wildcard', being satisfied by any other type
+ /// (even itself).
+ /// The reverse is not true, a specific type cannot be satisfied by a 'wildcard' (i.e.
+ /// [`AnyMessage`]).
+ ///
+ /// [`MessageType::satisfy`] is thus reflexive but not symmetric nor transitive, meaning that it cannot be
+ /// used for `PartialEq`.
+ #[must_use]
+ pub fn satisfy(&self, other: &Self) -> bool {
+ match (&self.kind, &other.kind) {
+ (MessageKind::Wildcard, _) => true,
+ (_, MessageKind::Wildcard) => false,
+ (MessageKind::Typed(ty_l), MessageKind::Typed(ty_r)) => ty_l.eq(ty_r),
+ }
+ }
+
+ /// Get the [`MessageType`] for a `M`:[`Message`]
+ #[must_use]
+ pub fn for_message<M: Message>() -> Self {
+ let id = std::any::TypeId::of::<M>();
+ MessageType {
+ name: std::any::type_name::<M>(),
+ kind: if id == std::any::TypeId::of::<AnyMessage>() {
+ MessageKind::Wildcard
+ } else {
+ MessageKind::Typed(id)
+ },
+ }
+ }
+
+ /// Get the type's name
+ #[must_use]
+ pub fn name(&self) -> &str {
+ self.name
+ }
}
/// A message to tell the core to stop thin-edge
#[derive(Debug)]
pub struct StopCore;
-impl Message for StopCore {
- type Reply = NoReply;
-}
+impl Message for StopCore {}
crate::make_receiver_bundle!(pub struct CoreMessages(StopCore));
+
+#[cfg(test)]
+mod tests {
+ use crate::Message;
+
+ use super::{AnyMessage, MessageType};
+
+ #[derive(Debug)]
+ struct Bar;
+
+ impl Message for Bar {}
+
+ #[derive(Debug)]
+ struct Foo;
+
+ impl Message for Foo {}
+
+ #[test]
+ fn assert_satisfy_laws_for_types() {
+ let bar_type = MessageType::for_message::<Bar>();
+ let any_message_type = MessageType::for_message::<AnyMessage>();
+ let foo_type = MessageType::for_message::<Foo>();
+
+ assert!(any_message_type.satisfy(&bar_type));
+ assert!(any_message_type.satisfy(&foo_type));
+
+ assert!(!bar_type.satisfy(&any_message_type));
+ assert!(!bar_type.satisfy(&foo_type));
+ }
+}
diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs