diff options
author | Marcel Müller <m.mueller@ifm.com> | 2022-03-17 10:15:31 +0100 |
---|---|---|
committer | Marcel Müller <m.mueller@ifm.com> | 2022-03-21 08:53:49 +0100 |
commit | 7e710a7ea547833c21040d4dacac813fd2201f8f (patch) | |
tree | 9df6340d7d77afd09ae760d65af14c87afd1a0b1 | |
parent | 9c165ac3fea7aab3be94798ccd7896b272bdcef1 (diff) |
Add typed Address struct
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
-rw-r--r-- | Cargo.lock | 7 | ||||
-rw-r--r-- | crates/core/tedge_api/Cargo.toml | 1 | ||||
-rw-r--r-- | crates/core/tedge_api/examples/heartbeat.rs | 258 | ||||
-rw-r--r-- | crates/core/tedge_api/src/address.rs | 70 | ||||
-rw-r--r-- | crates/core/tedge_api/src/plugin.rs | 240 |
5 files changed, 414 insertions, 162 deletions
@@ -706,6 +706,12 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4bb454f0228b18c7f4c3b0ebbee346ed9c52e7443b0999cd543ff3571205701d" [[package]] +name = "downcast-rs" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ea835d29036a4087793836fa931b08837ad5e957da9e23886b29586fb9b6650" + +[[package]] name = "download" version = "0.5.2" dependencies = [ @@ -2692,6 +2698,7 @@ version = "0.1.0" dependencies = [ "anyhow", "async-trait", + "downcast-rs", "futures", "serde", "static_assertions", diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml index bcf32c51..9309f2bc 100644 --- a/crates/core/tedge_api/Cargo.toml +++ b/crates/core/tedge_api/Cargo.toml @@ -8,6 +8,7 @@ edition = "2021" [dependencies] anyhow = "1.0.53" async-trait = "0.1.52" +downcast-rs = "1.2.0" futures = "0.3.21" thiserror = "1.0.30" tokio = { version = "1.16.1", features = ["sync"] } diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs index 0b5cc823..0fe0d308 100644 --- a/crates/core/tedge_api/examples/heartbeat.rs +++ b/crates/core/tedge_api/examples/heartbeat.rs @@ -1,29 +1,40 @@ +use std::{ + any::TypeId, + collections::{HashMap, HashSet}, +}; + use async_trait::async_trait; use tedge_api::{ - address::EndpointKind, - plugin::{Handle, HandleTypes, Message}, - Address, CoreCommunication, MessageKind, Plugin, PluginBuilder, PluginConfiguration, - PluginError, + plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt}, + Address, CoreCommunication, Plugin, PluginBuilder, PluginConfiguration, PluginError, }; +#[derive(Debug)] struct Heartbeat; impl Message for Heartbeat {} +#[derive(Debug)] enum HeartbeatStatusReply { Alive, Degraded, } impl Message for HeartbeatStatusReply {} +#[derive(Debug)] struct HeartbeatServiceBuilder; +type HeartbeatMessages = (HeartbeatStatusReply,); + #[async_trait] -impl PluginBuilder for HeartbeatServiceBuilder { +impl<CC: CoreCommunication> PluginBuilder<CC> for HeartbeatServiceBuilder { fn kind_name(&self) -> &'static str { todo!() } - fn kind_message_types(&self) -> tedge_api::plugin::HandleTypes { + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { HandleTypes::get_handlers_for::<(HeartbeatStatusReply,), HeartbeatService>() } @@ -37,46 +48,90 @@ impl PluginBuilder for HeartbeatServiceBuilder { async fn instantiate( &self, config: PluginConfiguration, - tedge_comms: tedge_api::plugin::CoreCommunication, - ) -> Result<Box<dyn Plugin>, PluginError> { + tedge_comms: &CC, + ) -> Result<BuiltPlugin, PluginError> + where + CC: 'async_trait, + { let hb_config: HeartbeatConfig = toml::Value::try_into(config.into_inner())?; - Ok(Box::new(HeartbeatService::new(tedge_comms, hb_config))) + let monitored_services = hb_config + .plugins + .iter() + .map(|name| tedge_comms.get_address_for::<CriticalServiceMessage>(name)) + .collect::<Result<Vec<_>, _>>()?; + Ok( + HeartbeatService::new(hb_config, monitored_services) + .into_untyped::<HeartbeatMessages>(), + ) } } #[derive(serde::Deserialize, Debug)] struct HeartbeatConfig { interval: u64, + plugins: Vec<String>, } struct HeartbeatService { - comms: tedge_api::plugin::CoreCommunication, config: HeartbeatConfig, + monitored_services: Vec<Address<CriticalServiceMessage>>, +} + +#[async_trait] +impl Plugin for HeartbeatService { + async fn setup(&mut self) -> Result<(), PluginError> { + println!( + "Setting up heartbeat service with interval: {}!", + self.config.interval + ); + + for service in &self.monitored_services { + println!("Sending heartbeat to service"); + service.send(Heartbeat).await.unwrap(); + } + Ok(()) + } + + async fn shutdown(&mut self) -> Result<(), PluginError> { + println!("Shutting down heartbeat service!"); + Ok(()) + } } impl HeartbeatService { - fn new(comms: tedge_api::plugin::CoreCommunication, config: HeartbeatConfig) -> Self { - Self { comms, config } + fn new( + config: HeartbeatConfig, + monitored_services: Vec<Address<CriticalServiceMessage>>, + ) -> Self { + Self { + config, + monitored_services, + } } } #[async_trait] impl Handle<HeartbeatStatusReply> for HeartbeatService { - async fn handle_message(&self, message: HeartbeatStatusReply) -> Result<(), PluginError> { - println!("Received Heartbeat!"); + async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> { + println!("Received HeartbeatReply!"); Ok(()) } } struct CriticalServiceBuilder; +tedge_api::make_message_bundle!(struct CriticalServiceMessage(Heartbeat)); + #[async_trait] -impl PluginBuilder for CriticalServiceBuilder { +impl<CC: CoreCommunication> PluginBuilder<CC> for CriticalServiceBuilder { fn kind_name(&self) -> &'static str { todo!() } - fn kind_message_types(&self) -> tedge_api::plugin::HandleTypes { + fn kind_message_types() -> tedge_api::plugin::HandleTypes + where + Self: Sized, + { HandleTypes::get_handlers_for::<(Heartbeat,), CriticalService>() } @@ -89,11 +144,13 @@ impl PluginBuilder for CriticalServiceBuilder { async fn instantiate( &self, - config: PluginConfiguration, - tedge_comms: tedge_api::plugin::CoreCommunication, - ) -> Result<Box<dyn Plugin>, PluginError> { - let hb_config: HeartbeatConfig = toml::Value::try_into(config.into_inner())?; - Ok(Box::new(HeartbeatService::new(tedge_comms, hb_config))) + _config: PluginConfiguration, + _tedge_comms: &CC, + ) -> Result<BuiltPlugin, PluginError> + where + CC: 'async_trait, + { + Ok(CriticalService {}.into_untyped::<(Heartbeat,)>()) } } @@ -101,67 +158,168 @@ struct CriticalService; #[async_trait] impl Handle<Heartbeat> for CriticalService { - async fn handle_message(&self, message: Heartbeat) -> Result<(), PluginError> { + async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> { println!("Received Heartbeat!"); Ok(()) } } #[async_trait] -impl Plugin for HeartbeatService { +impl Plugin for CriticalService { async fn setup(&mut self) -> Result<(), PluginError> { - println!( - "Setting up heartbeat service with interval: {}!", - self.config.interval - ); + println!("Setting up critical service!"); Ok(()) } async fn shutdown(&mut self) -> Result<(), PluginError> { - println!("Shutting down heartbeat service!"); + println!("Shutting down critical service service!"); Ok(()) } } -#[tokio::main] -async fn main() { - let hsb = HeartbeatServiceBuilder; - let (sender, mut receiver) = tokio::sync::mpsc::channel(10); +#[derive(Debug)] +struct PluginInfo { + types: HashSet<(&'static str, TypeId)>, + receiver: Option<tedge_api::address::MessageReceiver>, + sender: tedge_api::address::MessageSender, +} - let plugin_name = "heartbeat-service".to_string(); - let comms = CoreCommunication::new(plugin_name.clone(), sender); +impl Clone for PluginInfo { + fn clone(&self) -> Self { + PluginInfo { + types: self.types.clone(), + receiver: None, + sender: self.sender.clone(), + } + } +} + +#[derive(Clone, Debug)] +struct Communication { + plugins: HashMap<String, PluginInfo>, +} + +impl Communication { + pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) { + let (sender, receiver) = tokio::sync::mpsc::channel(10); + self.plugins.insert( + name.to_owned(), + PluginInfo { + types: PB::kind_message_types().into(), + sender, + receiver: Some(receiver), + }, + ); + } +} + +impl CoreCommunication for Communication { + fn get_address_for<MB: tedge_api::plugin::MessageBundle>( + &self, + name: &str, + ) -> Result<Address<MB>, PluginError> { + let types = MB::get_ids().into_iter().collect(); + + let plug = self.plugins.get(name).unwrap_or_else(|| { + panic!( + "Didn't find plugin with name {}, got: {:?}", + name, + self.plugins.keys().collect::<Vec<_>>() + ) + }); + + if !plug.types.is_superset(&types) { + panic!( + "Asked for {:#?} but plugin {} only has types {:#?}", + types, name, plug.types, + ); + } else { + Ok(Address::new(plug.sender.clone())) + } + } +} + +async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin { + let csb = CriticalServiceBuilder; + + let config = toml::from_str("").unwrap(); + + csb.instantiate(config, comms).await.unwrap() +} + +async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin { + let hsb = HeartbeatServiceBuilder; let config = toml::from_str( r#" interval = 200 + plugins = ["critical-service"] "#, ) .unwrap(); - let mut heartbeat = hsb.instantiate(config, comms.clone()).await.unwrap(); + hsb.instantiate(config, comms).await.unwrap() +} - heartbeat.setup().await.unwrap(); +#[tokio::main] +async fn main() { + let mut comms = Communication { + plugins: HashMap::new(), + }; - let handle = tokio::task::spawn(async move { - let hb = heartbeat; + comms.declare_plugin::<CriticalServiceBuilder>("critical-service"); + comms.declare_plugin::<HeartbeatServiceBuilder>("heartbeat"); - hb.handle_message(Message::new( - Address::new(EndpointKind::Plugin { id: plugin_name }), - Address::new(EndpointKind::Core), - MessageKind::CheckReadyness, - )) - .await + let mut heartbeat = build_heartbeat_plugin(&mut comms).await; + let mut critical_service = build_critical_plugin(&mut comms).await; + + heartbeat.plugin_mut().setup().await.unwrap(); + critical_service.plugin_mut().setup().await.unwrap(); + + let mut recv = comms + .plugins + .get_mut("heartbeat") + .unwrap() + .receiver + .take() .unwrap(); + let hb_handle = tokio::task::spawn(async move { + let hb = heartbeat; + + for msg in recv.recv().await { + hb.handle_message(msg).await.unwrap(); + } + hb }); - println!( - "Receiving message from service: {:#?}", - receiver.recv().await - ); + let mut recv = comms + .plugins + .get_mut("critical-service") + .unwrap() + .receiver + .take() + .unwrap(); + + let cs_handle = tokio::task::spawn(async move { + let cs = critical_service; + + for msg in recv.recv().await { + println!("Critical service received message!"); + cs.handle_message(msg).await.unwrap(); + } - let mut heartbeat = handle.await.unwrap(); + cs + }); + + let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle); - heartbeat.shutdown().await.unwrap(); + heartbeat.unwrap().plugin_mut().shutdown().await.unwrap(); + critical_service + .unwrap() + .plugin_mut() + .shutdown() + .await + .unwrap(); } diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs index fed61ef7..fcda366c 100644 --- a/crates/core/tedge_api/src/address.rs +++ b/crates/core/tedge_api/src/address.rs @@ -1,31 +1,61 @@ -/// An address which specifices either the unique name of a plugin or the core of ThinEdge -/// -/// This is used in the [`Comms::send`](crate::plugin::Comms::send) method to send messages to -/// other plugins attached to the same core. +use std::marker::PhantomData; + +use crate::plugin::{Contains, Message, MessageBundle}; + +/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME +#[doc(hidden)] +pub type MessageSender = tokio::sync::mpsc::Sender<Box<dyn std::any::Any + Send>>; + +/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME +#[doc(hidden)] +pub type MessageReceiver = tokio::sync::mpsc::Receiver<Box<dyn std::any::Any + Send>>; + +/// An address of a plugin that can receive messages of type `M` #[derive(Debug, Clone)] -pub struct Address { - endpoint_kind: EndpointKind, +pub struct Address<MB: MessageBundle> { + _pd: PhantomData<MB>, + sender: MessageSender, } -impl Address { - /// Create a new address with the given destination/origin - pub fn new(endpoint: EndpointKind) -> Address { +impl<MB: MessageBundle> Address<MB> { + /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME + #[doc(hidden)] + pub fn new(sender: MessageSender) -> Self { Self { - endpoint_kind: endpoint, + _pd: PhantomData, + sender, } } - /// Get the endpoint kind associated to this address - pub fn endpoint_kind(&self) -> &EndpointKind { - &self.endpoint_kind + pub async fn send<M: Message>(&self, msg: M) -> Result<(), M> + where + MB: Contains<M>, + { + self.sender + .send(Box::new(msg)) + .await + .map_err(|msg| *msg.0.downcast::<M>().unwrap()) } } -/// What kind of endpoint is it -#[derive(Debug, Clone)] -pub enum EndpointKind { - /// The `tedge` core - Core, - /// A specific plugin - Plugin { id: String }, +#[cfg(test)] +mod tests { + use crate::{make_message_bundle, plugin::Message, Address}; + + struct Foo; + + impl Message for Foo {} + + struct Bar; + + impl Message for Bar {} + + make_message_bundle!(struct FooBar(Foo, Bar)); + + #[allow(unreachable_code, dead_code, unused)] + fn check_compile() { + let addr: Address<FooBar> = todo!(); + addr.send(Foo); + addr.send(Bar); + } } diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs index 33c148c7..b803fa72 100644 --- a/crates/core/tedge_api/src/plugin.rs +++ b/crates/core/tedge_api/src/plugin.rs @@ -5,27 +5,20 @@ //! 2. Create your plugin struct that implements `Plugin` use futures::future::BoxFuture; -use std::any::{Any, TypeId}; +use std::{ + any::{Any, TypeId}, + collections::HashSet, +}; + +use downcast_rs::{impl_downcast, DowncastSync}; use async_trait::async_trait; -use crate::error::PluginError; +use crate::{error::PluginError, Address}; /// The communication struct to interface with the core of ThinEdge -/// -/// It's main purpose is the [`send`](CoreCommunication::send) method, through which one plugin -/// can communicate with another. -#[derive(Clone)] -pub struct CoreCommunication { - plugin_name: String, -} - -impl std::fmt::Debug for CoreCommunication { - fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result { - f.debug_struct("Comms") - .field("plugin_name", &self.plugin_name) - .finish_non_exhaustive() - } +pub trait CoreCommunication: Clone + Send + Sync { + fn get_address_for<MB: MessageBundle>(&self, name: &str) -> Result<Address<MB>, PluginError>; } /// The plugin configuration as a `toml::Spanned` table. @@ -37,15 +30,16 @@ pub type PluginConfiguration = toml::Spanned<toml::value::Value>; /// A plugin builder for a given plugin #[async_trait] -pub trait PluginBuilder: Sync + Send + 'static { +pub trait PluginBuilder<CC: CoreCommunication>: Sync + Send + 'static { /// The name for the kind of plugins this creates, this should be unique and will prevent startup otherwise fn kind_name(&self) -> &'static str; /// A list of message types the plugin this builder creates supports /// - /// To create it, you must use the `HandleTypes::get_handlers_for` method. See there on how to - /// use it. - fn kind_message_types(&self) -> HandleTypes; + /// To create it, you must use the [`HandleTypes::get_handlers_for`] method. + fn kind_message_types() -> HandleTypes + where + Self: Sized; /// This may be called anytime to verify whether a plugin could be instantiated with the /// passed configuration. @@ -57,13 +51,15 @@ pub trait PluginBuilder: Sync + Send + 'static { async fn instantiate( &self, config: PluginConfiguration, - core_comms: CoreCommunication, - ) -> Result<BuiltPlugin, PluginError>; + core_comms: &CC, + ) -> Result<BuiltPlugin, PluginError> + where + CC: 'async_trait; } /// A functionality extension to ThinEdge #[async_trait] -pub trait Plugin: Sync + Send + std::any::Any { +pub trait Plugin: Sync + Send + DowncastSync { /// The plugin can set itself up here async fn setup(&mut self) -> Result<(), PluginError>; @@ -71,6 +67,8 @@ pub trait Plugin: Sync + Send + std::any::Any { async fn shutdown(&mut self) -> Result<(), PluginError>; } +impl_downcast!(sync Plugin); + #[async_trait] pub trait Handle<Msg> { /// Handle a message specific to this plugin @@ -81,6 +79,10 @@ pub trait Handle<Msg> { pub struct HandleTypes(Vec<(&'static str, TypeId)>); impl HandleTypes { + pub fn get_types(&self) -> &[(&'static str, TypeId)] { + &self.0 + } + /// Get a list of message types this plugin is proven to handle /// /// ## Example @@ -147,30 +149,34 @@ impl HandleTypes { /// XX | println!("{:#?}", HandleTypes::get_handlers_for::<(Heartbeat,), HeartbeatPlugin>()); /// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Handle<Heartbeat>` is not implemented for `HeartbeatPlugin` /// ``` - pub fn get_handlers_for<M: MsgBundle, Plugin: DoesHandle<M>>() -> HandleTypes { + pub fn get_handlers_for<M: MessageBundle, Plugin: DoesHandle<M>>() -> HandleTypes { HandleTypes(M::get_ids()) } } -pub trait MsgBundle { - fn get_ids() -> Vec<(&'static str, TypeId)>; +impl From<HandleTypes> for HashSet<(&'static str, TypeId)> { + fn from(ht: HandleTypes) -> Self { + ht.0.into_iter().collect() + } } -impl<A: Message> MsgBundle for (A,) { - fn get_ids() -> Vec<(&'static str, TypeId)> { - vec![(std::any::type_name::<A>(), TypeId::of::<A>())] - } +pub trait Message: 'static + Send {} + +pub trait MessageBundle { + fn get_ids() -> Vec<(&'static str, TypeId)>; } -impl<A: Message, B: Message> MsgBundle for (A, B) { - fn get_ids() -> Vec<(&'static str, TypeId)> { - vec![ - (std::any::type_name::<A>(), TypeId::of::<A>()), - (std::any::type_name::<B>(), TypeId::of::<B>()), - ] +pub trait PluginExt: Plugin { + fn into_untyped<M: MessageBundle>(self) -> BuiltPlugin + where + Self: DoesHandle<M> + Sized, + { + self.into_built_plugin() } } +impl<P: Plugin> PluginExt for P {} + type PluginHandlerFn = for<'r> fn(&'r dyn Any, Box<dyn Any + Send>) -> BoxFuture<'r, Result<(), PluginError>>; @@ -180,82 +186,132 @@ pub struct BuiltPlugin { } impl BuiltPlugin { + /// Call the plugin with the given types. + /// + /// ## Panics + /// + /// This method will panic when given a message it does not understand. + #[must_use] pub fn handle_message( &self, message: Box<dyn Any + Send>, ) -> BoxFuture<'_, Result<(), PluginError>> { - (self.handler)(&self.plugin, message) + (self.handler)((&*self.plugin).as_any(), message) + } + + /// Get a mutable reference to the built plugin's plugin. + pub fn plugin_mut(&mut self) -> &mut Box<dyn Plugin> { + &mut self.plugin + } + + /// Get a reference to the built plugin's plugin. + pub fn plugin(&self) -> &dyn Plugin { + self.plugin.as_ref() } } -pub trait DoesHandle<M: MsgBundle> { - fn into_untyped(self) -> BuiltPlugin; +pub trait DoesHandle<M: MessageBundle> { + fn into_built_plugin(self) -> BuiltPlugin; } -// TODO: Implement these with a macro to cut down on repetition - -impl<A: Message, PLUG: Plugin + Handle<A>> DoesHandle<(A,)> for PLUG { - fn into_untyped(self) -> BuiltPlugin { - fn handle_message<'a, M: Message, PLUG: Plugin + Handle<M>>( - plugin: &'a dyn Any, - message: Box<dyn Any + Send>, - ) -> BoxFuture<'a, Result<(), PluginError>> { - let plug = plugin.downcast_ref::<PLUG>().unwrap(); - let message = { - if let Ok(message) = message.downcast::<M>() { - message - } else { - unreachable!() - } - }; +pub trait Contains<M: Message> {} - futures::FutureExt::boxed(async move { plug.handle_message(*message).await }) - } - BuiltPlugin { - plugin: Box::new(self), - handler: handle_message::<A, PLUG>, +macro_rules! impl_does_handle_tuple { + () => {}; + ($cur:ident $($rest:tt)*) => { + impl<$cur: Message, $($rest: Message,)* PLUG: Plugin + Handle<$cur> $(+ Handle<$rest>)*> DoesHandle<($cur, $($rest),*)> for PLUG { + fn into_built_plugin(self) -> BuiltPlugin { + fn handle_message<'a, $cur: Message, $($rest: Message,)* PLUG: Plugin + Handle<$cur> $(+ Handle<$rest>)*>( + plugin: &'a dyn Any, + message: Box<dyn Any + Send>, + ) -> BoxFuture<'a, Result<(), PluginError>> { + let plug = match plugin.downcast_ref::<PLUG>() { + Some(p) => p, + None => { + panic!("Could not downcast to {}", std::any::type_name::<PLUG>()); + } + }; + futures::FutureExt::boxed(async move { + #![allow(unused)] + + let message = match message.downcast::<$cur>() { + Ok(message) => return plug.handle_message(*message).await, + Err(m) => m, + }; + + $( + let message = match message.downcast::<$rest>() { + Ok(message) => return plug.handle_message(*message).await, + Err(m) => m, + }; + )* + + unreachable!(); + }) + } + BuiltPlugin { + plugin: Box::new(self), + handler: handle_message::<$cur, $($rest,)* PLUG>, + } + } } + + impl_does_handle_tuple!($($rest)*); + }; +} + +impl<M: Message> MessageBundle for M { + fn get_ids() -> Vec<(&'static str, TypeId)> { + vec![(std::any::type_name::<M>(), TypeId::of::<M>())] } } -impl<A: Message, B: Message, PLUG: Plugin + Handle<A> + Handle<B>> DoesHandle<(A, B)> for PLUG { - fn into_untyped(self) -> BuiltPlugin { - fn handle_message<'a, A: Message, B: Message, PLUG: Plugin + Handle<A> + Handle<B>>( - plugin: &'a dyn Any, - message: Box<dyn Any + Send>, - ) -> BoxFuture<'a, Result<(), PluginError>> { - let plug = plugin.downcast_ref::<PLUG>().unwrap(); - futures::FutureExt::boxed(async move { - let message = match message.downcast::<A>() { - Ok(message) => return plug.handle_message(*message).await, - Err(m) => m, - }; - - match message.downcast::<B>() { - Ok(message) => return plug.handle_message(*message).await, - Err(m) => m, - }; - - unreachable!(); - }) - } - BuiltPlugin { - plugin: Box::new(self), - handler: handle_message::<A, B, PLUG>, + +macro_rules! impl_msg_bundle_tuple { + () => {}; + (@rec_tuple $cur:ident) => { + ($cur, ()) + }; + (@rec_tuple $cur:ident $($rest:tt)*) => { + ($cur, impl_msg_bundle_tuple!(@rec_tuple $($rest)*)) + }; + ($cur:ident $($rest:tt)*) => { + impl<$cur: Message, $($rest: Message),*> MessageBundle for ($cur,$($rest),*) { + fn get_ids() -> Vec<(&'static str, TypeId)> { + vec![ + (std::any::type_name::<$cur>(), TypeId::of::<$cur>()), + $((std::any::type_name::<$rest>(), TypeId::of::<$rest>())),* + ] + } } - } + + impl_msg_bundle_tuple!($($rest)*); + }; } -pub trait Message: 'static + Send {} +impl_msg_bundle_tuple!(M10 M9 M8 M7 M6 M5 M4 M3 M2 M1); +impl_does_handle_tuple!(M10 M9 M8 M7 M6 M5 M4 M3 M2 M1); + +#[macro_export] +macro_rules! make_message_bundle { + (struct $name:ident($($msg:ty),+)) => { + struct $name; + + impl $crate::plugin::MessageBundle for $name { + fn get_ids() -> Vec<(&'static str, std::any::TypeId)> { + <($($msg),+) as $crate::plugin::MessageBundle>::get_ids() + } + } + + $(impl $crate::plugin::Contains<$msg> for $name {})+ + }; +} #[cfg(test)] mod tests { - use super::{CoreCommunication, Plugin, PluginBuilder}; - use static_assertions::{assert_impl_all, assert_obj_safe}; + use super::{Plugin, PluginBuilder}; + use static_assertions::assert_obj_safe; // Object Safety - assert_obj_safe!(PluginBuilder); + assert_obj_safe!(PluginBuilder<()>); assert_obj_safe!(Plugin); - - // Sync + Send - assert_impl_all!(CoreCommunication: Send, Clone); } |