summaryrefslogtreecommitdiffstats
path: root/crates/core
diff options
context:
space:
mode:
authorMarcel Müller <m.mueller@ifm.com>2022-03-17 10:15:31 +0100
committerMarcel Müller <m.mueller@ifm.com>2022-03-21 08:53:49 +0100
commit7e710a7ea547833c21040d4dacac813fd2201f8f (patch)
tree9df6340d7d77afd09ae760d65af14c87afd1a0b1 /crates/core
parent9c165ac3fea7aab3be94798ccd7896b272bdcef1 (diff)
Add typed Address struct
Signed-off-by: Marcel Müller <m.mueller@ifm.com>
Diffstat (limited to 'crates/core')
-rw-r--r--crates/core/tedge_api/Cargo.toml1
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs258
-rw-r--r--crates/core/tedge_api/src/address.rs70
-rw-r--r--crates/core/tedge_api/src/plugin.rs240
4 files changed, 407 insertions, 162 deletions
diff --git a/crates/core/tedge_api/Cargo.toml b/crates/core/tedge_api/Cargo.toml
index bcf32c51..9309f2bc 100644
--- a/crates/core/tedge_api/Cargo.toml
+++ b/crates/core/tedge_api/Cargo.toml
@@ -8,6 +8,7 @@ edition = "2021"
[dependencies]
anyhow = "1.0.53"
async-trait = "0.1.52"
+downcast-rs = "1.2.0"
futures = "0.3.21"
thiserror = "1.0.30"
tokio = { version = "1.16.1", features = ["sync"] }
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index 0b5cc823..0fe0d308 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,29 +1,40 @@
+use std::{
+ any::TypeId,
+ collections::{HashMap, HashSet},
+};
+
use async_trait::async_trait;
use tedge_api::{
- address::EndpointKind,
- plugin::{Handle, HandleTypes, Message},
- Address, CoreCommunication, MessageKind, Plugin, PluginBuilder, PluginConfiguration,
- PluginError,
+ plugin::{BuiltPlugin, Handle, HandleTypes, Message, PluginExt},
+ Address, CoreCommunication, Plugin, PluginBuilder, PluginConfiguration, PluginError,
};
+#[derive(Debug)]
struct Heartbeat;
impl Message for Heartbeat {}
+#[derive(Debug)]
enum HeartbeatStatusReply {
Alive,
Degraded,
}
impl Message for HeartbeatStatusReply {}
+#[derive(Debug)]
struct HeartbeatServiceBuilder;
+type HeartbeatMessages = (HeartbeatStatusReply,);
+
#[async_trait]
-impl PluginBuilder for HeartbeatServiceBuilder {
+impl<CC: CoreCommunication> PluginBuilder<CC> for HeartbeatServiceBuilder {
fn kind_name(&self) -> &'static str {
todo!()
}
- fn kind_message_types(&self) -> tedge_api::plugin::HandleTypes {
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
HandleTypes::get_handlers_for::<(HeartbeatStatusReply,), HeartbeatService>()
}
@@ -37,46 +48,90 @@ impl PluginBuilder for HeartbeatServiceBuilder {
async fn instantiate(
&self,
config: PluginConfiguration,
- tedge_comms: tedge_api::plugin::CoreCommunication,
- ) -> Result<Box<dyn Plugin>, PluginError> {
+ tedge_comms: &CC,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ CC: 'async_trait,
+ {
let hb_config: HeartbeatConfig = toml::Value::try_into(config.into_inner())?;
- Ok(Box::new(HeartbeatService::new(tedge_comms, hb_config)))
+ let monitored_services = hb_config
+ .plugins
+ .iter()
+ .map(|name| tedge_comms.get_address_for::<CriticalServiceMessage>(name))
+ .collect::<Result<Vec<_>, _>>()?;
+ Ok(
+ HeartbeatService::new(hb_config, monitored_services)
+ .into_untyped::<HeartbeatMessages>(),
+ )
}
}
#[derive(serde::Deserialize, Debug)]
struct HeartbeatConfig {
interval: u64,
+ plugins: Vec<String>,
}
struct HeartbeatService {
- comms: tedge_api::plugin::CoreCommunication,
config: HeartbeatConfig,
+ monitored_services: Vec<Address<CriticalServiceMessage>>,
+}
+
+#[async_trait]
+impl Plugin for HeartbeatService {
+ async fn setup(&mut self) -> Result<(), PluginError> {
+ println!(
+ "Setting up heartbeat service with interval: {}!",
+ self.config.interval
+ );
+
+ for service in &self.monitored_services {
+ println!("Sending heartbeat to service");
+ service.send(Heartbeat).await.unwrap();
+ }
+ Ok(())
+ }
+
+ async fn shutdown(&mut self) -> Result<(), PluginError> {
+ println!("Shutting down heartbeat service!");
+ Ok(())
+ }
}
impl HeartbeatService {
- fn new(comms: tedge_api::plugin::CoreCommunication, config: HeartbeatConfig) -> Self {
- Self { comms, config }
+ fn new(
+ config: HeartbeatConfig,
+ monitored_services: Vec<Address<CriticalServiceMessage>>,
+ ) -> Self {
+ Self {
+ config,
+ monitored_services,
+ }
}
}
#[async_trait]
impl Handle<HeartbeatStatusReply> for HeartbeatService {
- async fn handle_message(&self, message: HeartbeatStatusReply) -> Result<(), PluginError> {
- println!("Received Heartbeat!");
+ async fn handle_message(&self, _message: HeartbeatStatusReply) -> Result<(), PluginError> {
+ println!("Received HeartbeatReply!");
Ok(())
}
}
struct CriticalServiceBuilder;
+tedge_api::make_message_bundle!(struct CriticalServiceMessage(Heartbeat));
+
#[async_trait]
-impl PluginBuilder for CriticalServiceBuilder {
+impl<CC: CoreCommunication> PluginBuilder<CC> for CriticalServiceBuilder {
fn kind_name(&self) -> &'static str {
todo!()
}
- fn kind_message_types(&self) -> tedge_api::plugin::HandleTypes {
+ fn kind_message_types() -> tedge_api::plugin::HandleTypes
+ where
+ Self: Sized,
+ {
HandleTypes::get_handlers_for::<(Heartbeat,), CriticalService>()
}
@@ -89,11 +144,13 @@ impl PluginBuilder for CriticalServiceBuilder {
async fn instantiate(
&self,
- config: PluginConfiguration,
- tedge_comms: tedge_api::plugin::CoreCommunication,
- ) -> Result<Box<dyn Plugin>, PluginError> {
- let hb_config: HeartbeatConfig = toml::Value::try_into(config.into_inner())?;
- Ok(Box::new(HeartbeatService::new(tedge_comms, hb_config)))
+ _config: PluginConfiguration,
+ _tedge_comms: &CC,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ CC: 'async_trait,
+ {
+ Ok(CriticalService {}.into_untyped::<(Heartbeat,)>())
}
}
@@ -101,67 +158,168 @@ struct CriticalService;
#[async_trait]
impl Handle<Heartbeat> for CriticalService {
- async fn handle_message(&self, message: Heartbeat) -> Result<(), PluginError> {
+ async fn handle_message(&self, _message: Heartbeat) -> Result<(), PluginError> {
println!("Received Heartbeat!");
Ok(())
}
}
#[async_trait]
-impl Plugin for HeartbeatService {
+impl Plugin for CriticalService {
async fn setup(&mut self) -> Result<(), PluginError> {
- println!(
- "Setting up heartbeat service with interval: {}!",
- self.config.interval
- );
+ println!("Setting up critical service!");
Ok(())
}
async fn shutdown(&mut self) -> Result<(), PluginError> {
- println!("Shutting down heartbeat service!");
+ println!("Shutting down critical service service!");
Ok(())
}
}
-#[tokio::main]
-async fn main() {
- let hsb = HeartbeatServiceBuilder;
- let (sender, mut receiver) = tokio::sync::mpsc::channel(10);
+#[derive(Debug)]
+struct PluginInfo {
+ types: HashSet<(&'static str, TypeId)>,
+ receiver: Option<tedge_api::address::MessageReceiver>,
+ sender: tedge_api::address::MessageSender,
+}
- let plugin_name = "heartbeat-service".to_string();
- let comms = CoreCommunication::new(plugin_name.clone(), sender);
+impl Clone for PluginInfo {
+ fn clone(&self) -> Self {
+ PluginInfo {
+ types: self.types.clone(),
+ receiver: None,
+ sender: self.sender.clone(),
+ }
+ }
+}
+
+#[derive(Clone, Debug)]
+struct Communication {
+ plugins: HashMap<String, PluginInfo>,
+}
+
+impl Communication {
+ pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
+ let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ self.plugins.insert(
+ name.to_owned(),
+ PluginInfo {
+ types: PB::kind_message_types().into(),
+ sender,
+ receiver: Some(receiver),
+ },
+ );
+ }
+}
+
+impl CoreCommunication for Communication {
+ fn get_address_for<MB: tedge_api::plugin::MessageBundle>(
+ &self,
+ name: &str,
+ ) -> Result<Address<MB>, PluginError> {
+ let types = MB::get_ids().into_iter().collect();
+
+ let plug = self.plugins.get(name).unwrap_or_else(|| {
+ panic!(
+ "Didn't find plugin with name {}, got: {:?}",
+ name,
+ self.plugins.keys().collect::<Vec<_>>()
+ )
+ });
+
+ if !plug.types.is_superset(&types) {
+ panic!(
+ "Asked for {:#?} but plugin {} only has types {:#?}",
+ types, name, plug.types,
+ );
+ } else {
+ Ok(Address::new(plug.sender.clone()))
+ }
+ }
+}
+
+async fn build_critical_plugin(comms: &mut Communication) -> BuiltPlugin {
+ let csb = CriticalServiceBuilder;
+
+ let config = toml::from_str("").unwrap();
+
+ csb.instantiate(config, comms).await.unwrap()
+}
+
+async fn build_heartbeat_plugin(comms: &mut Communication) -> BuiltPlugin {
+ let hsb = HeartbeatServiceBuilder;
let config = toml::from_str(
r#"
interval = 200
+ plugins = ["critical-service"]
"#,
)
.unwrap();
- let mut heartbeat = hsb.instantiate(config, comms.clone()).await.unwrap();
+ hsb.instantiate(config, comms).await.unwrap()
+}
- heartbeat.setup().await.unwrap();
+#[tokio::main]
+async fn main() {
+ let mut comms = Communication {
+ plugins: HashMap::new(),
+ };
- let handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ comms.declare_plugin::<CriticalServiceBuilder>("critical-service");
+ comms.declare_plugin::<HeartbeatServiceBuilder>("heartbeat");
- hb.handle_message(Message::new(
- Address::new(EndpointKind::Plugin { id: plugin_name }),
- Address::new(EndpointKind::Core),
- MessageKind::CheckReadyness,
- ))
- .await
+ let mut heartbeat = build_heartbeat_plugin(&mut comms).await;
+ let mut critical_service = build_critical_plugin(&mut comms).await;
+
+ heartbeat.plugin_mut().setup().await.unwrap();
+ critical_service.plugin_mut().setup().await.unwrap();
+
+ let mut recv = comms
+ .plugins
+ .get_mut("heartbeat")
+ .unwrap()
+ .receiver
+ .take()
.unwrap();
+ let hb_handle = tokio::task::spawn(async move {
+ let hb = heartbeat;
+
+ for msg in recv.recv().await {
+ hb.handle_message(msg).await.unwrap();
+ }
+
hb
});
- println!(
- "Receiving message from service: {:#?}",
- receiver.recv().await
- );
+ let mut recv = comms
+ .plugins
+ .get_mut("critical-service")
+ .unwrap()
+ .receiver
+ .take()
+ .unwrap();
+
+ let cs_handle = tokio::task::spawn(async move {
+ let cs = critical_service;
+
+ for msg in recv.recv().await {
+ println!("Critical service received message!");
+ cs.handle_message(msg).await.unwrap();
+ }
- let mut heartbeat = handle.await.unwrap();
+ cs
+ });
+
+ let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
- heartbeat.shutdown().await.unwrap();
+ heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ critical_service
+ .unwrap()
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
}
diff --git a/crates/core/tedge_api/src/address.rs b/crates/core/tedge_api/src/address.rs
index fed61ef7..fcda366c 100644
--- a/crates/core/tedge_api/src/address.rs
+++ b/crates/core/tedge_api/src/address.rs
@@ -1,31 +1,61 @@
-/// An address which specifices either the unique name of a plugin or the core of ThinEdge
-///
-/// This is used in the [`Comms::send`](crate::plugin::Comms::send) method to send messages to
-/// other plugins attached to the same core.
+use std::marker::PhantomData;
+
+use crate::plugin::{Contains, Message, MessageBundle};
+
+/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
+#[doc(hidden)]
+pub type MessageSender = tokio::sync::mpsc::Sender<Box<dyn std::any::Any + Send>>;
+
+/// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
+#[doc(hidden)]
+pub type MessageReceiver = tokio::sync::mpsc::Receiver<Box<dyn std::any::Any + Send>>;
+
+/// An address of a plugin that can receive messages of type `M`
#[derive(Debug, Clone)]
-pub struct Address {
- endpoint_kind: EndpointKind,
+pub struct Address<MB: MessageBundle> {
+ _pd: PhantomData<MB>,
+ sender: MessageSender,
}
-impl Address {
- /// Create a new address with the given destination/origin
- pub fn new(endpoint: EndpointKind) -> Address {
+impl<MB: MessageBundle> Address<MB> {
+ /// THIS IS NOT PART OF THE PUBLIC API, AND MAY CHANGE AT ANY TIME
+ #[doc(hidden)]
+ pub fn new(sender: MessageSender) -> Self {
Self {
- endpoint_kind: endpoint,
+ _pd: PhantomData,
+ sender,
}
}
- /// Get the endpoint kind associated to this address
- pub fn endpoint_kind(&self) -> &EndpointKind {
- &self.endpoint_kind
+ pub async fn send<M: Message>(&self, msg: M) -> Result<(), M>
+ where
+ MB: Contains<M>,
+ {
+ self.sender
+ .send(Box::new(msg))
+ .await
+ .map_err(|msg| *msg.0.downcast::<M>().unwrap())
}
}
-/// What kind of endpoint is it
-#[derive(Debug, Clone)]
-pub enum EndpointKind {
- /// The `tedge` core
- Core,
- /// A specific plugin
- Plugin { id: String },
+#[cfg(test)]
+mod tests {
+ use crate::{make_message_bundle, plugin::Message, Address};
+
+ struct Foo;
+
+ impl Message for Foo {}
+
+ struct Bar;
+
+ impl Message for Bar {}
+
+ make_message_bundle!(struct FooBar(Foo, Bar));
+
+ #[allow(unreachable_code, dead_code, unused)]
+ fn check_compile() {
+ let addr: Address<FooBar> = todo!();
+ addr.send(Foo);
+ addr.send(Bar);
+ }
}
diff --git a/crates/core/tedge_api/src/plugin.rs b/crates/core/tedge_api/src/plugin.rs
index 33c148c7..b803fa72 100644
--- a/crates/core/tedge_api/src/plugin.rs
+++ b/crates/core/tedge_api/src/plugin.rs
@@ -5,27 +5,20 @@
//! 2. Create your plugin struct that implements `Plugin`
use futures::future::BoxFuture;
-use std::any::{Any, TypeId};
+use std::{
+ any::{Any, TypeId},
+ collections::HashSet,
+};
+
+use downcast_rs::{impl_downcast, DowncastSync};
use async_trait::async_trait;
-use crate::error::PluginError;
+use crate::{error::PluginError, Address};
/// The communication struct to interface with the core of ThinEdge
-///
-/// It's main purpose is the [`send`](CoreCommunication::send) method, through which one plugin
-/// can communicate with another.
-#[derive(Clone)]
-pub struct CoreCommunication {
- plugin_name: String,
-}
-
-impl std::fmt::Debug for CoreCommunication {
- fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
- f.debug_struct("Comms")
- .field("plugin_name", &self.plugin_name)
- .finish_non_exhaustive()
- }
+pub trait CoreCommunication: Clone + Send + Sync {
+ fn get_address_for<MB: MessageBundle>(&self, name: &str) -> Result<Address<MB>, PluginError>;
}
/// The plugin configuration as a `toml::Spanned` table.
@@ -37,15 +30,16 @@ pub type PluginConfiguration = toml::Spanned<toml::value::Value>;
/// A plugin builder for a given plugin
#[async_trait]
-pub trait PluginBuilder: Sync + Send + 'static {
+pub trait PluginBuilder<CC: CoreCommunication>: Sync + Send + 'static {
/// The name for the kind of plugins this creates, this should be unique and will prevent startup otherwise
fn kind_name(&self) -> &'static str;
/// A list of message types the plugin this builder creates supports
///
- /// To create it, you must use the `HandleTypes::get_handlers_for` method. See there on how to
- /// use it.
- fn kind_message_types(&self) -> HandleTypes;
+ /// To create it, you must use the [`HandleTypes::get_handlers_for`] method.
+ fn kind_message_types() -> HandleTypes
+ where
+ Self: Sized;
/// This may be called anytime to verify whether a plugin could be instantiated with the
/// passed configuration.
@@ -57,13 +51,15 @@ pub trait PluginBuilder: Sync + Send + 'static {
async fn instantiate(
&self,
config: PluginConfiguration,
- core_comms: CoreCommunication,
- ) -> Result<BuiltPlugin, PluginError>;
+ core_comms: &CC,
+ ) -> Result<BuiltPlugin, PluginError>
+ where
+ CC: 'async_trait;
}
/// A functionality extension to ThinEdge
#[async_trait]
-pub trait Plugin: Sync + Send + std::any::Any {
+pub trait Plugin: Sync + Send + DowncastSync {
/// The plugin can set itself up here
async fn setup(&mut self) -> Result<(), PluginError>;
@@ -71,6 +67,8 @@ pub trait Plugin: Sync + Send + std::any::Any {
async fn shutdown(&mut self) -> Result<(), PluginError>;
}
+impl_downcast!(sync Plugin);
+
#[async_trait]
pub trait Handle<Msg> {
/// Handle a message specific to this plugin
@@ -81,6 +79,10 @@ pub trait Handle<Msg> {
pub struct HandleTypes(Vec<(&'static str, TypeId)>);
impl HandleTypes {
+ pub fn get_types(&self) -> &[(&'static str, TypeId)] {
+ &self.0
+ }
+
/// Get a list of message types this plugin is proven to handle
///
/// ## Example
@@ -147,30 +149,34 @@ impl HandleTypes {
/// XX | println!("{:#?}", HandleTypes::get_handlers_for::<(Heartbeat,), HeartbeatPlugin>());
/// | ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Handle<Heartbeat>` is not implemented for `HeartbeatPlugin`
/// ```
- pub fn get_handlers_for<M: MsgBundle, Plugin: DoesHandle<M>>() -> HandleTypes {
+ pub fn get_handlers_for<M: MessageBundle, Plugin: DoesHandle<M>>() -> HandleTypes {
HandleTypes(M::get_ids())
}
}
-pub trait MsgBundle {
- fn get_ids() -> Vec<(&'static str, TypeId)>;
+impl From<HandleTypes> for HashSet<(&'static str, TypeId)> {
+ fn from(ht: HandleTypes) -> Self {
+ ht.0.into_iter().collect()
+ }
}
-impl<A: Message> MsgBundle for (A,) {
- fn get_ids() -> Vec<(&'static str, TypeId)> {
- vec![(std::any::type_name::<A>(), TypeId::of::<A>())]
- }
+pub trait Message: 'static + Send {}
+
+pub trait MessageBundle {
+ fn get_ids() -> Vec<(&'static str, TypeId)>;
}
-impl<A: Message, B: Message> MsgBundle for (A, B) {
- fn get_ids() -> Vec<(&'static str, TypeId)> {
- vec![
- (std::any::type_name::<A>(), TypeId::of::<A>()),
- (std::any::type_name::<B>(), TypeId::of::<B>()),
- ]
+pub trait PluginExt: Plugin {
+ fn into_untyped<M: MessageBundle>(self) -> BuiltPlugin
+ where
+ Self: DoesHandle<M> + Sized,
+ {
+ self.into_built_plugin()
}
}
+impl<P: Plugin> PluginExt for P {}
+
type PluginHandlerFn =
for<'r> fn(&'r dyn Any, Box<dyn Any + Send>) -> BoxFuture<'r, Result<(), PluginError>>;
@@ -180,82 +186,132 @@ pub struct BuiltPlugin {
}
impl BuiltPlugin {
+ /// Call the plugin with the given types.
+ ///
+ /// ## Panics
+ ///
+ /// This method will panic when given a message it does not understand.
+ #[must_use]
pub fn handle_message(
&self,
message: Box<dyn Any + Send>,
) -> BoxFuture<'_, Result<(), PluginError>> {
- (self.handler)(&self.plugin, message)
+ (self.handler)((&*self.plugin).as_any(), message)
+ }
+
+ /// Get a mutable reference to the built plugin's plugin.
+ pub fn plugin_mut(&mut self) -> &mut Box<dyn Plugin> {
+ &mut self.plugin
+ }
+
+ /// Get a reference to the built plugin's plugin.
+ pub fn plugin(&self) -> &dyn Plugin {
+ self.plugin.as_ref()
}
}
-pub trait DoesHandle<M: MsgBundle> {
- fn into_untyped(self) -> BuiltPlugin;
+pub trait DoesHandle<M: MessageBundle> {
+ fn into_built_plugin(self) -> BuiltPlugin;
}
-// TODO: Implement these with a macro to cut down on repetition
-
-impl<A: Message, PLUG: Plugin + Handle<A>> DoesHandle<(A,)> for PLUG {
- fn into_untyped(self) -> BuiltPlugin {
- fn handle_message<'a, M: Message, PLUG: Plugin + Handle<M>>(
- plugin: &'a dyn Any,
- message: Box<dyn Any + Send>,
- ) -> BoxFuture<'a, Result<(), PluginError>> {
- let plug = plugin.downcast_ref::<PLUG>().unwrap();
- let message = {
- if let Ok(message) = message.downcast::<M>() {
- message
- } else {
- unreachable!()
- }
- };
+pub trait Contains<M: Message> {}
- futures::FutureExt::boxed(async move { plug.handle_message(*message).await })
- }
- BuiltPlugin {
- plugin: Box::new(self),
- handler: handle_message::<A, PLUG>,
+macro_rules! impl_does_handle_tuple {
+ () => {};
+ ($cur:ident $($rest:tt)*) => {
+ impl<$cur: Message, $($rest: Message,)* PLUG: Plugin + Handle<$cur> $(+ Handle<$rest>)*> DoesHandle<($cur, $($rest),*)> for PLUG {
+ fn into_built_plugin(self) -> BuiltPlugin {
+ fn handle_message<'a, $cur: Message, $($rest: Message,)* PLUG: Plugin + Handle<$cur> $(+ Handle<$rest>)*>(
+ plugin: &'a dyn Any,
+ message: Box<dyn Any + Send>,
+ ) -> BoxFuture<'a, Result<(), PluginError>> {
+ let plug = match plugin.downcast_ref::<PLUG>() {
+ Some(p) => p,
+ None => {
+ panic!("Could not downcast to {}", std::any::type_name::<PLUG>());
+ }
+ };
+ futures::FutureExt::boxed(async move {
+ #![allow(unused)]
+
+ let message = match message.downcast::<$cur>() {
+ Ok(message) => return plug.handle_message(*message).await,
+ Err(m) => m,
+ };
+
+ $(
+ let message = match message.downcast::<$rest>() {
+ Ok(message) => return plug.handle_message(*message).await,
+ Err(m) => m,
+ };
+ )*
+
+ unreachable!();
+ })
+ }
+ BuiltPlugin {
+ plugin: Box::new(self),
+ handler: handle_message::<$cur, $($rest,)* PLUG>,
+ }
+ }
}
+
+ impl_does_handle_tuple!($($rest)*);
+ };
+}
+
+impl<M: Message> MessageBundle for M {
+ fn get_ids() -> Vec<(&'static str, TypeId)> {
+ vec![(std::any::type_name::<M>(), TypeId::of::<M>())]
}
}
-impl<A: Message, B: Message, PLUG: Plugin + Handle<A> + Handle<B>> DoesHandle<(A, B)> for PLUG {
- fn into_untyped(self) -> BuiltPlugin {
- fn handle_message<'a, A: Message, B: Message, PLUG: Plugin + Handle<A> + Handle<B>>(
- plugin: &'a dyn Any,
- message: Box<dyn Any + Send>,
- ) -> BoxFuture<'a, Result<(), PluginError>> {
- let plug = plugin.downcast_ref::<PLUG>().unwrap();
- futures::FutureExt::boxed(async move {
- let message = match message.downcast::<A>() {
- Ok(message) => return plug.handle_message(*message).await,
- Err(m) => m,
- };
-
- match message.downcast::<B>() {
- Ok(message) => return plug.handle_message(*message).await,
- Err(m) => m,
- };
-
- unreachable!();
- })
- }
- BuiltPlugin {
- plugin: Box::new(self),
- handler: handle_message::<A, B, PLUG>,
+
+macro_rules! impl_msg_bundle_tuple {
+ () => {};
+ (@rec_tuple $cur:ident) => {
+ ($cur, ())
+ };
+ (@rec_tuple $cur:ident $($rest:tt)*) => {
+ ($cur, impl_msg_bundle_tuple!(@rec_tuple $($rest)*))
+ };
+ ($cur:ident $($rest:tt)*) => {
+ impl<$cur: Message, $($rest: Message),*> MessageBundle for ($cur,$($rest),*) {
+ fn get_ids() -> Vec<(&'static str, TypeId)> {
+ vec![
+ (std::any::type_name::<$cur>(), TypeId::of::<$cur>()),
+ $((std::any::type_name::<$rest>(), TypeId::of::<$rest>())),*
+ ]
+ }
}
- }
+
+ impl_msg_bundle_tuple!($($rest)*);
+ };
}
-pub trait Message: 'static + Send {}
+impl_msg_bundle_tuple!(M10 M9 M8 M7 M6 M5 M4 M3 M2 M1);
+impl_does_handle_tuple!(M10 M9 M8 M7 M6 M5 M4 M3 M2 M1);
+
+#[macro_export]
+macro_rules! make_message_bundle {
+ (struct $name:ident($($msg:ty),+)) => {
+ struct $name;
+
+ impl $crate::plugin::MessageBundle for $name {
+ fn get_ids() -> Vec<(&'static str, std::any::TypeId)> {
+ <($($msg),+) as $crate::plugin::MessageBundle>::get_ids()
+ }
+ }
+
+ $(impl $crate::plugin::Contains<$msg> for $name {})+
+ };
+}
#[cfg(test)]
mod tests {
- use super::{CoreCommunication, Plugin, PluginBuilder};
- use static_assertions::{assert_impl_all, assert_obj_safe};
+ use super::{Plugin, PluginBuilder};
+ use static_assertions::assert_obj_safe;
// Object Safety
- assert_obj_safe!(PluginBuilder);
+ assert_obj_safe!(PluginBuilder<()>);
assert_obj_safe!(Plugin);
-
- // Sync + Send
- assert_impl_all!(CoreCommunication: Send, Clone);
}