summaryrefslogtreecommitdiffstats
path: root/crates/core/tedge_api/examples
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-05-20 13:27:13 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-05-20 13:27:13 +0200
commit4b4054563981c2a741cd29df6a937a5551178866 (patch)
treec505c2d7c263a831eea6a4e90124c9b1e3d8df6f /crates/core/tedge_api/examples
parente7ddb5a8323617fa92ac027fd29d6f3d920d0415 (diff)
parent1885e9f8883b9b4c86a896d33a0f73679fe36699 (diff)
Merge remote-tracking branch 'gitlab-marcel/feature/add_tedge_api_only' into feature/add_tedge_api/integrate-api
This merges the latest changes from the API branch into an integration branch for adapting the the core implementation to the changes. The changes, most notably commit 43ed68145e7044c0d8f170af794d71c2fa93599a ("Replace tokio MPSC with a direct Future") were the MPSC interfaces were removed from the API, will result in quite a few adaptions in the tedge_core crate implementation. Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
Diffstat (limited to 'crates/core/tedge_api/examples')
-rw-r--r--crates/core/tedge_api/examples/heartbeat.rs121
-rw-r--r--crates/core/tedge_api/examples/universal_log.rs123
2 files changed, 132 insertions, 112 deletions
diff --git a/crates/core/tedge_api/examples/heartbeat.rs b/crates/core/tedge_api/examples/heartbeat.rs
index a9305fde..454be2ea 100644
--- a/crates/core/tedge_api/examples/heartbeat.rs
+++ b/crates/core/tedge_api/examples/heartbeat.rs
@@ -1,4 +1,4 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
use futures::FutureExt;
@@ -9,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -294,11 +295,17 @@ impl Plugin for CriticalService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -309,13 +316,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -349,7 +355,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -418,59 +426,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
-
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -478,11 +482,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await
diff --git a/crates/core/tedge_api/examples/universal_log.rs b/crates/core/tedge_api/examples/universal_log.rs
index f2b3dcea..82ff3130 100644
--- a/crates/core/tedge_api/examples/universal_log.rs
+++ b/crates/core/tedge_api/examples/universal_log.rs
@@ -1,6 +1,7 @@
-use std::{collections::HashMap, time::Duration};
+use std::{collections::HashMap, sync::Arc, time::Duration};
use async_trait::async_trait;
+use futures::FutureExt;
use tedge_api::{
address::ReplySenderFor,
message::{AnyMessage, MessageType},
@@ -8,6 +9,7 @@ use tedge_api::{
Address, CancellationToken, Plugin, PluginBuilder, PluginConfiguration, PluginDirectory,
PluginError,
};
+use tokio::sync::RwLock;
/// A message that represents a heartbeat that gets sent to plugins
#[derive(Debug)]
@@ -229,6 +231,7 @@ impl Plugin for LogService {
}
}
+//
// The following pieces of code would be implemented by a "core" component, that is responsible for
// setting up plugins and their communication.
//
@@ -238,11 +241,17 @@ impl Plugin for LogService {
//
/// Helper type for keeping information about plugins during runtime
-#[derive(Debug)]
struct PluginInfo {
types: Vec<MessageType>,
- receiver: Option<tedge_api::address::MessageReceiver>,
- sender: tedge_api::address::MessageSender,
+ sender: Arc<RwLock<Option<Box<tedge_api::address::MessageFutureProducer>>>>,
+}
+
+impl std::fmt::Debug for PluginInfo {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("PluginInfo")
+ .field("types", &self.types)
+ .finish_non_exhaustive()
+ }
}
/// The type that provides the communication infrastructure to the plugins.
@@ -253,13 +262,12 @@ struct Communication {
impl Communication {
pub fn declare_plugin<PB: PluginBuilder<Self>>(&mut self, name: &str) {
- let (sender, receiver) = tokio::sync::mpsc::channel(10);
+ let sender = Arc::new(RwLock::new(None));
self.plugins.insert(
name.to_owned(),
PluginInfo {
types: PB::kind_message_types().into_types(),
sender,
- receiver: Some(receiver),
},
);
}
@@ -293,7 +301,9 @@ impl PluginDirectory for Communication {
asked_types, name, plug.types,
);
} else {
- Ok(Address::new(plug.sender.clone()))
+ Ok(Address::new(tedge_api::address::InnerMessageSender::new(
+ plug.sender.clone(),
+ )))
}
}
@@ -356,59 +366,55 @@ async fn main() {
let cancel_token = CancellationToken::new();
- let mut heartbeat = build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await;
- let mut critical_service = build_critical_plugin(&mut comms, cancel_token.child_token()).await;
-
- heartbeat.plugin_mut().start().await.unwrap();
- critical_service.plugin_mut().start().await.unwrap();
+ let mut heartbeat = Arc::new(RwLock::new(
+ build_heartbeat_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
+ let mut critical_service = Arc::new(RwLock::new(
+ build_critical_plugin(&mut comms, cancel_token.child_token()).await,
+ ));
- let mut recv = comms
- .plugins
- .get_mut("heartbeat")
- .unwrap()
- .receiver
- .take()
+ heartbeat.write().await.plugin_mut().start().await.unwrap();
+ critical_service
+ .write()
+ .await
+ .plugin_mut()
+ .start()
+ .await
.unwrap();
- let hb_cancel_token = cancel_token.child_token();
- let hb_handle = tokio::task::spawn(async move {
- let hb = heartbeat;
+ let recv = comms.plugins.get("heartbeat").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- hb.handle_message(msg).await.unwrap();
- }
- _ = hb_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let heartbeat = heartbeat.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let heartbeat = heartbeat.clone();
+ async move {
+ let heartbeat = heartbeat.read().await;
+ heartbeat.handle_message(msg).await;
+ Ok(())
}
- }
-
- hb
- });
-
- let mut recv = comms
- .plugins
- .get_mut("critical-service")
- .unwrap()
- .receiver
- .take()
- .unwrap();
+ .boxed()
+ }));
+ }
- let cs_cancel_token = cancel_token.child_token();
- let cs_handle = tokio::task::spawn(async move {
- let cs = critical_service;
+ let recv = comms.plugins.get("critical-service").unwrap();
- loop {
- tokio::select! {
- Some(msg) = recv.recv() => {
- cs.handle_message(msg).await.unwrap();
- }
- _ = cs_cancel_token.cancelled() => break,
+ {
+ let mut lock = recv.sender.write().await;
+ let critical_service = critical_service.clone();
+
+ *lock = Some(Box::new(move |msg, _wait_kind| {
+ let critical_service = critical_service.clone();
+ async move {
+ let critical_service = critical_service.read().await;
+ critical_service.handle_message(msg).await;
+ Ok(())
}
- }
-
- cs
- });
+ .boxed()
+ }));
+ }
println!("Core: Stopping everything in 10 seconds!");
tokio::time::sleep(Duration::from_secs(12)).await;
@@ -416,11 +422,16 @@ async fn main() {
println!("Core: SHUTTING DOWN");
cancel_token.cancel();
- let (heartbeat, critical_service) = tokio::join!(hb_handle, cs_handle);
-
- heartbeat.unwrap().plugin_mut().shutdown().await.unwrap();
+ heartbeat
+ .write()
+ .await
+ .plugin_mut()
+ .shutdown()
+ .await
+ .unwrap();
critical_service
- .unwrap()
+ .write()
+ .await
.plugin_mut()
.shutdown()
.await