summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMatthias Beyer <matthias.beyer@ifm.com>2022-09-09 17:48:16 +0200
committerMatthias Beyer <matthias.beyer@ifm.com>2022-09-10 11:07:04 +0200
commit74c412fd5891eeae44d5fb87d293b1a0b4055aef (patch)
tree03dd2112a89c930eed53ee6280182ea6f84ffd16
parentf9310fd3bfbbda599e8d5af024de0b6825a6caa1 (diff)
plugin_fdman: Add implementation with OpenOptions
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r--Cargo.lock1
-rw-r--r--plugins/plugin_fdman/Cargo.toml1
-rw-r--r--plugins/plugin_fdman/src/error.rs11
-rw-r--r--plugins/plugin_fdman/src/lib.rs3
-rw-r--r--plugins/plugin_fdman/src/message/mod.rs2
-rw-r--r--plugins/plugin_fdman/src/message/open_options.rs157
-rw-r--r--plugins/plugin_fdman/src/plugin.rs77
7 files changed, 247 insertions, 5 deletions
diff --git a/Cargo.lock b/Cargo.lock
index ce135180..2f2f93be 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -2450,6 +2450,7 @@ name = "plugin_fdman"
version = "0.1.0"
dependencies = [
"async-trait",
+ "bevy_reflect",
"miette",
"serde",
"tedge_api",
diff --git a/plugins/plugin_fdman/Cargo.toml b/plugins/plugin_fdman/Cargo.toml
index 78c7239c..22c10067 100644
--- a/plugins/plugin_fdman/Cargo.toml
+++ b/plugins/plugin_fdman/Cargo.toml
@@ -13,6 +13,7 @@ thiserror = "1"
toml = "0.5"
tracing = "0.1"
tokio-util = "0.7.0"
+bevy_reflect = "0.7.0"
tedge_api = { path = "../../crates/core/tedge_api" }
tedge_lib = { path = "../../crates/core/tedge_lib" }
diff --git a/plugins/plugin_fdman/src/error.rs b/plugins/plugin_fdman/src/error.rs
index 5b7f977a..77e63b37 100644
--- a/plugins/plugin_fdman/src/error.rs
+++ b/plugins/plugin_fdman/src/error.rs
@@ -1,5 +1,14 @@
#[derive(Debug, miette::Diagnostic, thiserror::Error)]
-pub(crate) enum Error {
+pub enum Error {
#[error("Failed to parse configuration")]
ConfigParseFailed(#[from] toml::de::Error),
+
+ #[error("IO")]
+ Io(#[from] std::io::Error),
+
+ #[error("Sending reply message failed")]
+ SendingReply,
+
+ #[error("Out of file handles")]
+ NoHandleLeft,
}
diff --git a/plugins/plugin_fdman/src/lib.rs b/plugins/plugin_fdman/src/lib.rs
index 259f7a13..71e12f2a 100644
--- a/plugins/plugin_fdman/src/lib.rs
+++ b/plugins/plugin_fdman/src/lib.rs
@@ -1,8 +1,9 @@
mod builder;
mod config;
mod error;
+pub mod message;
mod plugin;
pub use crate::builder::FdManPluginBuilder;
+pub use crate::error::Error;
pub use crate::plugin::FdManPlugin;
-
diff --git a/plugins/plugin_fdman/src/message/mod.rs b/plugins/plugin_fdman/src/message/mod.rs
new file mode 100644
index 00000000..72ef4570
--- /dev/null
+++ b/plugins/plugin_fdman/src/message/mod.rs
@@ -0,0 +1,2 @@
+mod open_options;
+pub use self::open_options::*;
diff --git a/plugins/plugin_fdman/src/message/open_options.rs b/plugins/plugin_fdman/src/message/open_options.rs
new file mode 100644
index 00000000..c8913052
--- /dev/null
+++ b/plugins/plugin_fdman/src/message/open_options.rs
@@ -0,0 +1,157 @@
+use std::{
+ path::PathBuf,
+ sync::{atomic::AtomicU64, Arc},
+};
+
+use tedge_api::Message;
+
+#[derive(Clone, Debug, bevy_reflect::TypeUuid)]
+#[uuid = "cf82a6ed-3fc1-4011-bbf0-c79fc6fdf781"]
+pub struct OpenOptions {
+ append: bool,
+ create: bool,
+ create_new: bool,
+ read: bool,
+ truncate: bool,
+ write: bool,
+ path: PathBuf,
+}
+
+impl OpenOptions {
+ pub fn new(path: PathBuf) -> Self {
+ Self {
+ append: false,
+ create: false,
+ create_new: false,
+ read: false,
+ truncate: false,
+ write: false,
+ path,
+ }
+ }
+
+ pub(crate) fn path(&self) -> &PathBuf {
+ &self.path
+ }
+
+ pub(crate) fn as_std(&self) -> std::fs::OpenOptions {
+ let mut o = std::fs::OpenOptions::new();
+ o.append(self.append);
+ o.create(self.create);
+ o.create_new(self.create_new);
+ o.read(self.read);
+ o.truncate(self.truncate);
+ o.write(self.write);
+ o
+ }
+
+ pub fn append(mut self, b: bool) -> Self {
+ self.append = b;
+ self
+ }
+
+ pub fn create(mut self, b: bool) -> Self {
+ self.create = b;
+ self
+ }
+
+ pub fn create_new(mut self, b: bool) -> Self {
+ self.create_new = b;
+ self
+ }
+
+ pub fn read(mut self, b: bool) -> Self {
+ self.read = b;
+ self
+ }
+
+ pub fn truncate(mut self, b: bool) -> Self {
+ self.truncate = b;
+ self
+ }
+
+ pub fn write(mut self, b: bool) -> Self {
+ self.write = b;
+ self
+ }
+}
+
+impl tedge_api::Message for OpenOptions {}
+impl tedge_api::message::AcceptsReplies for OpenOptions {
+ type Reply = OpenOptionsResult;
+}
+
+#[derive(Debug)]
+pub struct OpenOptionsFile {
+ file: std::fs::File,
+ currently_held_count: Arc<AtomicU64>,
+}
+
+impl OpenOptionsFile {
+ pub(crate) fn new(file: std::fs::File, currently_held_count: Arc<AtomicU64>) -> Self {
+ Self {
+ file,
+ currently_held_count,
+ }
+ }
+}
+
+impl Drop for OpenOptionsFile {
+ fn drop(&mut self) {
+ self.currently_held_count
+ .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
+ }
+}
+
+impl std::ops::Deref for OpenOptionsFile {
+ type Target = std::fs::File;
+
+ fn deref(&self) -> &Self::Target {
+ &self.file
+ }
+}
+
+impl std::ops::DerefMut for OpenOptionsFile {
+ fn deref_mut(&mut self) -> &mut Self::Target {
+ &mut self.file
+ }
+}
+
+#[derive(Debug, thiserror::Error)]
+pub enum OpenOptionsError {
+ #[error("FdMan Plugin error")]
+ FdMan(#[from] crate::error::Error),
+}
+
+#[derive(Debug, bevy_reflect::TypeUuid)]
+#[uuid = "e259bed9-14b9-4edf-ab7b-e87d454b823f"]
+pub struct OpenOptionsResult {
+ open_options: OpenOptions,
+ result: Result<OpenOptionsFile, OpenOptionsError>,
+}
+
+impl OpenOptionsResult {
+ pub(crate) fn new(
+ open_options: OpenOptions,
+ result: Result<OpenOptionsFile, OpenOptionsError>,
+ ) -> Self {
+ Self {
+ open_options,
+ result,
+ }
+ }
+
+ pub fn open_options(&self) -> &OpenOptions {
+ &self.open_options
+ }
+
+ pub fn result(&self) -> &Result<OpenOptionsFile, OpenOptionsError> {
+ &self.result
+ }
+
+ pub fn into_result(self) -> Result<OpenOptionsFile, OpenOptionsError> {
+ self.result
+ }
+}
+
+impl Message for OpenOptionsResult {}
diff --git a/plugins/plugin_fdman/src/plugin.rs b/plugins/plugin_fdman/src/plugin.rs
index eaaecebf..5c02c37f 100644
--- a/plugins/plugin_fdman/src/plugin.rs
+++ b/plugins/plugin_fdman/src/plugin.rs
@@ -1,21 +1,92 @@
+use std::sync::{atomic::AtomicU64, Arc};
+
use async_trait::async_trait;
-use tedge_api::plugin::Plugin;
+use tedge_api::{
+ plugin::{Handle, Plugin},
+ PluginError,
+};
+
+use crate::{
+ error::Error,
+ message::{OpenOptions, OpenOptionsError, OpenOptionsResult},
+};
#[derive(Debug)]
pub struct FdManPlugin {
max_fds: u64,
+ currently_held_count: Arc<AtomicU64>,
}
impl FdManPlugin {
pub fn new(max_fds: u64) -> Self {
- Self { max_fds }
+ Self {
+ max_fds,
+ currently_held_count: Arc::new(AtomicU64::from(0)),
+ }
+ }
+
+ fn aquire_handle(&self) -> Result<FdHandle, Error> {
+ let old_value = self
+ .currently_held_count
+ .fetch_add(1, std::sync::atomic::Ordering::SeqCst);
+
+ if old_value >= self.max_fds {
+ self.currently_held_count
+ .fetch_sub(1, std::sync::atomic::Ordering::SeqCst);
+ Err(Error::NoHandleLeft)
+ } else {
+ Ok(FdHandle)
+ }
}
}
+struct FdHandle;
+
impl tedge_api::plugin::PluginDeclaration for FdManPlugin {
- type HandledMessages = ();
+ type HandledMessages = (OpenOptions,);
}
#[async_trait]
impl Plugin for FdManPlugin {}
+
+#[async_trait::async_trait]
+impl Handle<OpenOptions> for FdManPlugin {
+ async fn handle_message(
+ &self,
+ message: OpenOptions,
+ sender: tedge_api::address::ReplySenderFor<OpenOptions>,
+ ) -> Result<(), PluginError> {
+ match self.aquire_handle() {
+ Ok(FdHandle) => {
+ let file_result = message
+ .as_std()
+ .open(message.path())
+ .map(|file| {
+ crate::message::OpenOptionsFile::new(
+ file,
+ self.currently_held_count.clone(),
+ )
+ })
+ .map_err(Error::from)
+ .map_err(OpenOptionsError::from);
+ let res = OpenOptionsResult::new(message, file_result);
+ sender
+ .reply(res)
+ .map_err(|_| Error::SendingReply)
+ .map_err(PluginError::from)
+ }
+ Err(Error::NoHandleLeft) => {
+ let res = OpenOptionsResult::new(
+ message,
+ Err(OpenOptionsError::from(Error::NoHandleLeft)),
+ );
+ sender
+ .reply(res)
+ .map_err(|_| Error::SendingReply)
+ .map_err(PluginError::from)
+ }
+ Err(other) => Err(PluginError::from(other)),
+ }
+ }
+}