diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-04-29 09:30:10 +0200 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-05-05 15:06:20 +0200 |
commit | abab64c856c0c299b13fcc1857a5b78f449e4a9e (patch) | |
tree | 1dfacf02e269b25328275367f20c70369a2797db /crates | |
parent | a398c9783b54892aa1f625685beb7f153d7f7e6d (diff) |
Add AnyMessage & AnyMessages
This patch adds the ability to send any kind of messages to plugins that
declare handling `AnyMessage` + `AnyMessages`.
It does it with these changes:
- Remove the `Message::Reply` associated type
- This allows `Message` to be used as a trait object
- Add the `MessageType` which allows to identify messages
- This permits `AnyMessage` to be received by plugins
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates')
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 46 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/universal_log.rs | 430 | ||||
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 80 | ||||
-rw-r--r-- | crates/core/tedge_api/src/message.rs | 131 | ||||
-rw-r--r-- | crates/core/tedge_api/src/plugin.rs | 114 |
5 files changed, 685 insertions, 116 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 95e6ca15..37e9c8ec 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,15 +1,11 @@ -use std::{ - any::TypeId, - collections::{HashMap, HashSet}, - time::Duration, -}; +use std::{collections::HashMap, time::Duration}; use async_trait::async_trait; use futures::FutureExt; use tedge_api::{ - address::ReplySender, - message::NoReply, - plugin::{BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt}, + address::ReplySenderFor, + message::MessageType, + plugin::{AcceptsReplies, BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt}, Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, PluginError, }; @@ -17,7 +13,8 @@ use tedge_api::{ /// A message that represents a heartbeat that gets sent to plugins #[derive(Debug)] struct Heartbeat; -impl Message for Heartbeat { +impl Message for Heartbeat {} +impl AcceptsReplies for Heartbeat { type Reply = HeartbeatStatus; } @@ -27,9 +24,7 @@ enum HeartbeatStatus { Alive, Degraded, } -impl Message for HeartbeatStatus { - type Reply = NoReply; -} +impl Message for HeartbeatStatus {} /// A PluginBuilder that gets used to build a HeartbeatService plugin instance #[derive(Debug)] @@ -253,7 +248,7 @@ impl Handle<Heartbeat> for CriticalService { async fn handle_message( &self, _message: Heartbeat, - sender: ReplySender<HeartbeatStatus>, + sender: ReplySenderFor<Heartbeat>, ) -> Result<(), PluginError> { println!("CriticalService: Received Heartbeat!"); let mut status = self.status.lock().await; @@ -301,23 +296,13 @@ impl Plugin for CriticalService { /// Helper type for keeping information about plugins during runtime #[derive(Debug)] struct PluginInfo { - types: HashSet<(&'static str, TypeId)>, + types: Vec<MessageType>, receiver: Option<tedge_api::address::MessageReceiver>, sender: tedge_api::address::MessageSender, } -impl Clone for PluginInfo { - fn clone(&self) -> Self { - PluginInfo { - types: self.types.clone(), - receiver: None, - sender: self.sender.clone(), - } - } -} - /// The type that provides the communication infrastructure to the plugins. -#[derive(Clone, Debug)] +#[derive(Debug)] struct Communication { plugins: HashMap<String, PluginInfo>, } @@ -328,7 +313,7 @@ impl Communication { self.plugins.insert( name.to_owned(), PluginInfo { - types: PB::kind_message_types().into(), + types: PB::kind_message_types().into_types(), sender, receiver: Some(receiver), }, @@ -341,7 +326,7 @@ impl PluginDirectory for Communication { &self, name: &str, ) -> Result<Address<MB>, tedge_api::error::DirectoryError> { - let types = MB::get_ids().into_iter().collect(); + let asked_types: Vec<_> = MB::get_ids().into_iter().collect(); let plug = self.plugins.get(name).unwrap_or_else(|| { // This is an example, so we panic!() here. @@ -353,12 +338,15 @@ impl PluginDirectory for Communication { ) }); - if !plug.types.is_superset(&types) { + if !asked_types + .iter() + .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type))) + { // This is an example, so we panic!() here // In real-world, we would do some reporting and return an error panic!( "Asked for {:#?} but plugin {} only has types {:#?}", - types, name, plug.types, + asked_types, name, plug.types, ); } else { Ok(Address::new(plug.sender.clone())) diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs new file mode 100644 index 00000000..f2b3dcea --- /dev/null +++ b/crates/core/tedge_api/examples/universal_log.rs @@ -0,0 +1,430 @@ +use std::{collections::HashMap, time::Duration}; + +use async_trait::async_trait; +use tedge_api::{ + address::ReplySenderFor, + message::{AnyMessage, MessageType}, + plugin::{AnyMessages, BuiltPlugin, Handle, Message, PluginDeclaration, PluginExt}, + Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory, + PluginError, +}; + +/// A message that represents a heartbeat that gets sent to plugins +#[derive(Debug)] +struct Heartbeat; +impl Message for Heartbeat {} + +#[derive(Debug)] +struct RandomData; +impl Message for RandomData {} + +/// A PluginBuilder that gets used to build a HeartbeatService plugin instance +#[derive(Debug)] +struct HeartbeatServiceBuilder; + +#[derive(miette::Diagnostic, thiserror::Error, Debug)] +enum HeartbeatBuildError { + #[error(transparent)] + TomlParse(#[from] toml::de::Error), +} + +#[async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for HeartbeatServiceBuilder { + fn kind_name() -> &'static str { + todo!() + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + HeartbeatService::get_handled_types() + } + + async fn verify_configuration( + &self, + _config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + Ok(()) + } + + async fn instantiate( + &self, + config: PluginConfiguration, + cancellation_token: CancellationToken, + plugin_dir: &PD, + ) -> Result<BuiltPlugin, PluginError> + where + PD: 'async_trait, + { + let hb_config: HeartbeatConfig = + toml::Value::try_into(config).map_err(HeartbeatBuildError::from)?; + let monitored_services = hb_config + .plugins + .iter() + .map(|name| { + plugin_dir + .get_address_for::<HeartbeatMessages>(name) + .map(|addr| (name.clone(), addr)) + }) + .collect::<Result<Vec<_>, _>>()?; + Ok(HeartbeatService::new( + Duration::from_millis(hb_config.interval), + monitored_services, + cancellation_token, + ) + .finish()) + } +} + +/// The configuration a HeartbeatServices can receive is represented by this type +#[derive(serde::Deserialize, Debug)] +struct HeartbeatConfig { + interval: u64, + plugins: Vec<String>, +} + +/// The HeartbeatService type represents the actual plugin +struct HeartbeatService { + interval_duration: Duration, + monitored_services: Vec<(String, Address<HeartbeatMessages>)>, + cancel_token: CancellationToken, +} + +impl PluginDeclaration for HeartbeatService { + type HandledMessages = (); +} + +#[async_trait] +impl Plugin for HeartbeatService { + /// The setup function of the HeartbeatService can be used by the plugin author to setup for + /// example a connection to an external service. In this example, it is simply used to send the + /// heartbeat + /// + /// Because this example is _simple_, we do not spawn a background task that periodically sends + /// the heartbeat. In a real world scenario, that background task would be started here. + async fn start(&mut self) -> Result<(), PluginError> { + println!( + "HeartbeatService: Setting up heartbeat service with interval: {:?}!", + self.interval_duration + ); + + for service in &self.monitored_services { + let mut interval = tokio::time::interval(self.interval_duration); + let service = service.clone(); + let cancel_token = self.cancel_token.child_token(); + tokio::spawn(async move { + loop { + tokio::select! { + _ = interval.tick() => {} + _ = cancel_token.cancelled() => { + break + } + } + println!( + "HeartbeatService: Sending heartbeat to service: {:?}", + service + ); + service.1.send_and_wait(Heartbeat).await.unwrap(); + service.1.send_and_wait(RandomData).await.unwrap(); + } + }); + } + Ok(()) + } + + /// A plugin author can use this shutdown function to clean resources when thin-edge shuts down + async fn shutdown(&mut self) -> Result<(), PluginError> { + println!("HeartbeatService: Shutting down heartbeat service!"); + Ok(()) + } +} + +impl HeartbeatService { + fn new( + interval_duration: Duration, + monitored_services: Vec<(String, Address<HeartbeatMessages>)>, + cancel_token: CancellationToken, + ) -> Self { + Self { + interval_duration, + monitored_services, + cancel_token, + } + } +} + +/// A plugin that receives heartbeats +struct LogServiceBuilder; + +// declare a set of messages that the CriticalService can receive. +// In this example, it can only receive a Heartbeat. +tedge_api::make_receiver_bundle!(struct HeartbeatMessages(Heartbeat, RandomData)); + +#[async_trait] +impl<PD: PluginDirectory> PluginBuilder<PD> for LogServiceBuilder { + fn kind_name() -> &'static str { + todo!() + } + + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { + LogService::get_handled_types() + } + + async fn verify_configuration( + &self, + _config: &PluginConfiguration, + ) -> Result<(), tedge_api::error::PluginError> { + Ok(()) + } + + async fn instantiate( + &self, + _config: PluginConfiguration, + _cancellation_token: CancellationToken, + _plugin_dir: &PD, + ) -> Result<BuiltPlugin, PluginError> + where + PD: 'async_trait, + { + Ok(LogService {}.finish()) + } +} + +/// The actual "critical" plugin implementation +struct LogService {} + +/// The CriticalService can receive Heartbeat objects, thus it needs a Handle<Heartbeat> +/// implementation +#[async_trait] +impl Handle<AnyMessage> for LogService { + async fn handle_message( + &self, + message: AnyMessage, + _sender: ReplySenderFor<AnyMessage>, + ) -> Result<(), PluginError> { + println!("LogService: Received Message: {:?}", message); + Ok(()) + } +} + +impl PluginDeclaration for LogService { + type HandledMessages = AnyMessages; +} + +/// Because the CriticalService is of course a Plugin, it needs an implementation for that as well. +#[async_trait] +impl Plugin for LogService { + async fn start(&mut self) -> Result<(), PluginError> { + println!("CriticalService: Setting up critical service!"); + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + println!("CriticalService: Shutting down critical service service!"); + Ok(()) + } +} + +// The following pieces of code would be implemented by a "core" component, that is responsible for +// setting up plugins and their communication. +// +// Plugin authors do not need to write this code, but need a basic understanding what it does and +// how it works. +// As this is an example, we implement it here to showcase how it is done. +// + +/// Helper type for keeping information about plugins during runtime +#[derive(Debug)] +struct PluginInfo { + types: Vec<MessageType>, + receiver: Option<tedge_api::address::MessageReceiver>, + sender: tedge_api::address::MessageSender, +} + +/// The type that provides the communication infrastructure to the plugins. +#[derive(Debug)] +struct Communication { + plugins: HashMap<String, PluginInfo>, +} + +impl Communication { + pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { + let (sender, receiver) = tokio::sync::mpsc::channel(10); + self.plugins.insert( + name.to_owned(), + PluginInfo { + types: PB::kind_message_types().into_types(), + sender, + receiver: Some(receiver), + }, + ); + } +} + +impl PluginDirectory for Communication { + fn get_address_for<MB: tedge_api::address::ReceiverBundle>( + &self, + name: &str, + ) -> Result<Address<MB>, tedge_api::error::DirectoryError> { + let asked_types: Vec<_> = MB::get_ids().into_iter().collect(); + + let plug = self.plugins.get(name).unwrap_or_else(|| { + // This is an example, so we panic!() here. + // In real-world, we would do some reporting and return an error + panic!( + "Didn't find plugin with name {}, got: {:?}", + name, + self.plugins.keys().collect::<Vec<_>>() + ) + }); + + if !asked_types + .iter() + .all(|req_type| plug.types.iter().any(|ty| ty.satisfy(req_type))) + { + // This is an example, so we panic!() here + // In real-world, we would do some reporting and return an error + panic!( + "Asked for {:#?} but plugin {} only has types {:#?}", + asked_types, name, plug.types, + ); + } else { + Ok(Address::new(plug.sender.clone())) + } + } + + fn get_address_for_core(&self) -> Address<tedge_api::CoreMessages> { + todo!() + } +} + +/// Helper function +async fn build_critical_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { + let csb = LogServiceBuilder; + + let config = toml::from_str("").unwrap(); + + csb.instantiate(config, cancel_token, comms).await.unwrap() +} + +/// Helper function +async fn build_heartbeat_plugin( + comms: &mut Communication, + cancel_token: CancellationToken, +) -> BuiltPlugin { + let hsb = HeartbeatServiceBuilder; + + let config = toml::from_str( + r#" + interval = 5000 + plugins = ["critical-service"] + "#, + ) + .unwrap(); + + hsb.instantiate(config, cancel_token, comms).await.unwrap() +} + +#[tokio::main] +async fn main() { + // This implementation now ties everything together + // + // This would be implemented in a CLI binary using the "core" implementation to boot things up. + // + // Here, we just tie everything together in the minimal possible way, to showcase how such a + // setup would basically work. + + let mut comms = Communication { + plugins: HashMap::new(), + }; + + // in a main(), the core would be told what plugins are available. + // This would, in a real-world scenario, not happen on the "communication" type directly. + // Still, this needs to be done by a main()-author. + comms.declare_plugin::<LogServiceBuilder>("critical-service"); + comms.declare_plugin::<HeartbeatServiceBuilder>("heartbeat"); + + // The following would all be handled by the core implementation, a main() author would only + // need to call some kind of "run everything" function + + let cancel_token = CancellationToken::new(); + + let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await; + let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await; + + heartbeat.plugin_mut().start().await.unwrap(); + critical_service.plugin_mut().start().await.unwrap(); + + let mut recv = comms + .plugins + .get_mut("heartbeat") + .unwrap() + .receiver + .take() + .unwrap(); + + let hb_cancel_token = cancel_token.child_token(); + let hb_handle = tokio::task::spawn(async move { + let hb = heartbeat; + + loop { + tokio::select! { + Some(msg) = recv.recv() => { + hb.handle_message(msg).await.unwrap(); + } + _ = hb_cancel_token.cancelled() => break, + } + } + + hb + }); + + let mut recv = comms + .plugins + .get_mut("critical-service") + .unwrap() + .receiver + .take() + .unwrap(); + + let cs_cancel_token = cancel_token.child_token(); + let cs_handle = tokio::task::spawn(async move { + let cs = critical_service; + + loop { + tokio::select! { + Some(msg) = recv.recv() => { + cs.handle_message(msg).await.unwrap(); + } + _ = cs_cancel_token.cancelled() => break, + } + } + + cs + }); + + println!("Core: Stopping everything in 10 seconds!"); + tokio::time::sleep(Duration::from_secs(12)).await; + + println!("Core: SHUTTING DOWN"); + cancel_token.cancel(); + + let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); + + heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + critical_service + .unwrap() + .plugin_mut() + .shutdown() + .await + .unwrap(); + + println!("Core: Shut down"); +} diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index 619941ad..93a51450 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -2,16 +2,19 @@ use std::{marker::PhantomData, time::Duration}; use tokio::sync::mpsc::error::{SendTimeoutError, TrySendError}; -use crate::plugin::Message; +use crate::{ + message::MessageType, + plugin::{AcceptsReplies, Message}, +}; #[doc(hidden)] -pub type AnySendBox = Box<dyn std::any::Any + Send>; +pub type AnyMessageBox = Box<dyn Message>; #[doc(hidden)] #[derive(Debug)] pub struct InternalMessage { - pub(crate) data: AnySendBox, - pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnySendBox>, + pub(crate) data: AnyMessageBox, + pub(crate) reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>, } /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME @@ -79,7 +82,7 @@ impl<RB: ReceiverBundle> Address<RB> { /// could become an issue use something akin to timeout (like /// [`timeout`](tokio::time::timeout)). /// For details on sending and receiving, see `tokio::sync::mpsc::Sender`. - pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> + pub async fn send_and_wait<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> where RB: Contains<M>, { @@ -93,7 +96,7 @@ impl<RB: ReceiverBundle> Address<RB> { .await .map_err(|msg| *msg.0.data.downcast::<M>().unwrap())?; - Ok(ReplyReceiver { + Ok(ReplyReceiverFor { _pd: PhantomData, reply_recv: receiver, }) @@ -111,7 +114,7 @@ impl<RB: ReceiverBundle> Address<RB> { /// /// The error is returned if the receiving side (the plugin that is addressed) cannot currently /// receive messages (either because it is closed or the queue is full). - pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiver<M::Reply>, M> { + pub fn try_send<M: Message>(&self, msg: M) -> Result<ReplyReceiverFor<M>, M> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender @@ -125,7 +128,7 @@ impl<RB: ReceiverBundle> Address<RB> { } })?; - Ok(ReplyReceiver { + Ok(ReplyReceiverFor { _pd: PhantomData, reply_recv: receiver, }) @@ -142,7 +145,7 @@ impl<RB: ReceiverBundle> Address<RB> { &self, msg: M, timeout: Duration, - ) -> Result<ReplyReceiver<M::Reply>, M> { + ) -> Result<ReplyReceiverFor<M>, M> { let (sender, receiver) = tokio::sync::oneshot::channel(); self.sender @@ -160,7 +163,7 @@ impl<RB: ReceiverBundle> Address<RB> { } })?; - Ok(ReplyReceiver { + Ok(ReplyReceiverFor { _pd: PhantomData, reply_recv: receiver, }) @@ -169,12 +172,12 @@ impl<RB: ReceiverBundle> Address<RB> { #[derive(Debug)] /// Listener that allows one to wait for a reply as sent through [`Address::send_and_wait`] -pub struct ReplyReceiver<M> { +pub struct ReplyReceiverFor<M> { _pd: PhantomData<fn(M)>, - reply_recv: tokio::sync::oneshot::Receiver<AnySendBox>, + reply_recv: tokio::sync::oneshot::Receiver<AnyMessageBox>, } -impl<M: Message> ReplyReceiver<M> { +impl<M: Message> ReplyReceiverFor<M> { /// Wait for a reply until for the duration given in `timeout` /// /// ## Note @@ -186,7 +189,11 @@ impl<M: Message> ReplyReceiver<M> { /// It is also important, that just because a given `M: Message` has a `M::Reply` type set, /// that the plugin that a message was sent to does _not_ have to reply with it. It can choose /// to not do so. - pub async fn wait_for_reply(self, timeout: Duration) -> Result<M, ReplyError> { + pub async fn wait_for_reply<R>(self, timeout: Duration) -> Result<R, ReplyError> + where + R: Message, + M: AcceptsReplies<Reply = R>, + { let data = tokio::time::timeout(timeout, self.reply_recv) .await .map_err(|_| ReplyError::Timeout)? @@ -199,13 +206,13 @@ impl<M: Message> ReplyReceiver<M> { #[derive(Debug)] /// Allows the [`Handle`](crate::plugin::Handle) implementation to reply with a given message as /// specified by the currently handled message. -pub struct ReplySender<M> { +pub struct ReplySenderFor<M> { _pd: PhantomData<fn(M)>, - reply_sender: tokio::sync::oneshot::Sender<AnySendBox>, + reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>, } -impl<M: Message> ReplySender<M> { - pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnySendBox>) -> Self { +impl<M: Message> ReplySenderFor<M> { + pub(crate) fn new(reply_sender: tokio::sync::oneshot::Sender<AnyMessageBox>) -> Self { Self { _pd: PhantomData, reply_sender, @@ -213,7 +220,11 @@ impl<M: Message> ReplySender<M> { } /// Reply to the originating plugin with the given message - pub fn reply(self, msg: M) -> Result<(), M> { + pub fn reply<R>(self, msg: R) -> Result<(), M> + where + R: Message, + M: AcceptsReplies<Reply = R>, + { self.reply_sender .send(Box::new(msg)) .map_err(|msg| *msg.downcast::<M>().unwrap()) @@ -244,7 +255,7 @@ pub enum ReplyError { #[doc(hidden)] pub trait ReceiverBundle: Send + 'static { - fn get_ids() -> Vec<(&'static str, std::any::TypeId)>; + fn get_ids() -> Vec<MessageType>; } #[doc(hidden)] @@ -258,21 +269,17 @@ pub trait Contains<M: Message> {} /// ## Example /// /// ```rust -/// # use tedge_api::{Message, make_receiver_bundle, message::NoReply}; +/// # use tedge_api::{Message, make_receiver_bundle}; /// /// #[derive(Debug)] /// struct IntMessage(u8); /// -/// impl Message for IntMessage { -/// type Reply = NoReply; -/// } +/// impl Message for IntMessage { } /// /// #[derive(Debug)] /// struct StatusMessage(String); /// -/// impl Message for StatusMessage { -/// type Reply = NoReply; -/// } +/// impl Message for StatusMessage { } /// /// make_receiver_bundle!(struct MessageReceiver(IntMessage, StatusMessage)); /// @@ -289,9 +296,9 @@ macro_rules! make_receiver_bundle { impl $crate::address::ReceiverBundle for $name { #[allow(unused_parens)] - fn get_ids() -> Vec<(&'static str, std::any::TypeId)> { + fn get_ids() -> Vec<$crate::message::MessageType> { vec![ - $((std::any::type_name::<$msg>(), std::any::TypeId::of::<$msg>())),+ + $($crate::message::MessageType::for_message::<$msg>()),+ ] } } @@ -305,25 +312,24 @@ mod tests { use static_assertions::{assert_impl_all, assert_not_impl_any}; use crate::{ - address::{ReplyReceiver, ReplySender}, + address::{ReplyReceiverFor, ReplySenderFor}, make_receiver_bundle, - plugin::Message, + plugin::{AcceptsReplies, Message}, Address, }; #[derive(Debug)] struct Foo; - impl Message for Foo { + impl Message for Foo {} + impl AcceptsReplies for Foo { type Reply = Bar; } #[derive(Debug)] struct Bar; - impl Message for Bar { - type Reply = Bar; - } + impl Message for Bar {} make_receiver_bundle!(struct FooBar(Foo, Bar)); @@ -344,6 +350,6 @@ mod tests { assert_impl_all!(Address<FooBar>: Clone, Send, Sync); assert_not_impl_any!(NotSync: Send, Sync); - assert_impl_all!(ReplySender<NotSync>: Send, Sync); - assert_impl_all!(ReplyReceiver<NotSync>: Send, Sync); + assert_impl_all!(ReplySenderFor<NotSync>: Send, Sync); + assert_impl_all!(ReplyReceiverFor<NotSync>: Send, Sync); } diff --git a/crates/core/tedge_api/src/message.rs b/crates/core/tedge_api/src/message.rs index decb41e5..2fd5faa4 100644 --- a/crates/core/tedge_api/src/message.rs +++ b/crates/core/tedge_api/src/message.rs @@ -1,19 +1,134 @@ -use crate::plugin::Message; +use crate::{address::AnyMessageBox, plugin::Message}; +/// A message that can contain any other message +/// +/// This is solely used in conjunction with [`AnyMessages`](crate::plugin::AnyMessages) and should not generally be used +/// otherwise. +/// +/// To construct it, you will need to have a message and call [`AnyMessage::from_message`] #[derive(Debug)] -/// A message which cannot be constructed and thus can be used when no reply is expected. -pub enum NoReply {} +pub struct AnyMessage(pub(crate) AnyMessageBox); -impl Message for NoReply { - type Reply = NoReply; +impl std::ops::Deref for AnyMessage { + type Target = dyn Message; + + fn deref(&self) -> &Self::Target { + &*self.0 + } +} + +impl AnyMessage { + /// Construct a new [`AnyMessage`] from a message + pub fn from_message<M: Message>(m: M) -> Self { + AnyMessage(Box::new(m)) + } + + /// Try to downcast this message to a specific message + pub fn downcast<M: Message>(self) -> Result<M, Self> { + Ok(*self.0.downcast().map_err(AnyMessage)?) + } + + /// Take out the raw boxed message + /// + /// Note + /// + /// This is an advanced API and should only be used if needed. + /// Prefer using `AnyMessage::downcast` if possible + pub fn into_raw(self) -> AnyMessageBox { + self.0 + } +} + +impl Message for AnyMessage {} + +/// The type of a message as used by `tedge_api` to represent a type +#[derive(Debug, Clone)] +pub struct MessageType { + name: &'static str, + kind: MessageKind, +} + +#[derive(Debug, Clone)] +enum MessageKind { + Wildcard, + Typed(std::any::TypeId), +} + +impl MessageType { + /// Does this [`MessageType`] satisfy another [`MessageType`] + /// + /// ## Note + /// A message type from [`AnyMessage`] acts as a 'wildcard', being satisfied by any other type + /// (even itself). + /// The reverse is not true, a specific type cannot be satisfied by a 'wildcard' (i.e. + /// [`AnyMessage`]). + /// + /// [`MessageType::satisfy`] is thus reflexive but not symmetric nor transitive, meaning that it cannot be + /// used for `PartialEq`. + #[must_use] + pub fn satisfy(&self, other: &Self) -> bool { + match (&self.kind, &other.kind) { + (MessageKind::Wildcard, _) => true, + (_, MessageKind::Wildcard) => false, + (MessageKind::Typed(ty_l), MessageKind::Typed(ty_r)) => ty_l.eq(ty_r), + } + } + + /// Get the [`MessageType`] for a `M`:[`Message`] + #[must_use] + pub fn for_message<M: Message>() -> Self { + let id = std::any::TypeId::of::<M>(); + MessageType { + name: std::any::type_name::<M>(), + kind: if id == std::any::TypeId::of::<AnyMessage>() { + MessageKind::Wildcard + } else { + MessageKind::Typed(id) + }, + } + } + + /// Get the type's name + #[must_use] + pub fn name(&self) -> &str { + self.name + } } /// A message to tell the core to stop thin-edge #[derive(Debug)] pub struct StopCore; -impl Message for StopCore { - type Reply = NoReply; -} +impl Message for StopCore {} crate::make_receiver_bundle!(pub struct CoreMessages(StopCore)); + +#[cfg(test)] +mod tests { + use crate::Message; + + use super::{AnyMessage, MessageType}; + + #[derive(Debug)] + struct Bar; + + impl Message for Bar {} + + #[derive(Debug)] + struct Foo; + + impl Message for Foo {} + + #[test] + fn assert_satisfy_laws_for_types() { + let bar_type = MessageType::for_message::<Bar>(); + let any_message_type = MessageType::for_message::<AnyMessage>(); + let foo_type = MessageType::for_message::<Foo>(); + + assert!(any_message |