diff options
author | Matthias Beyer <matthias.beyer@ifm.com> | 2022-09-09 17:48:16 +0200 |
---|---|---|
committer | Matthias Beyer <matthias.beyer@ifm.com> | 2022-09-10 11:07:04 +0200 |
commit | 74c412fd5891eeae44d5fb87d293b1a0b4055aef (patch) | |
tree | 03dd2112a89c930eed53ee6280182ea6f84ffd16 | |
parent | f9310fd3bfbbda599e8d5af024de0b6825a6caa1 (diff) |
plugin_fdman: Add implementation with OpenOptions
Signed-off-by: Matthias Beyer <matthias.beyer@ifm.com>
-rw-r--r-- | Cargo.lock | 1 | ||||
-rw-r--r-- | plugins/plugin_fdman/Cargo.toml | 1 | ||||
-rw-r--r-- | plugins/plugin_fdman/src/error.rs | 11 | ||||
-rw-r--r-- | plugins/plugin_fdman/src/lib.rs | 3 | ||||
-rw-r--r-- | plugins/plugin_fdman/src/message/mod.rs | 2 | ||||
-rw-r--r-- | plugins/plugin_fdman/src/message/open_options.rs | 157 | ||||
-rw-r--r-- | plugins/plugin_fdman/src/plugin.rs | 77 |
7 files changed, 247 insertions, 5 deletions
@@ -2450,6 +2450,7 @@ name = "plugin_fdman" version = "0.1.0" dependencies = [ "async-trait", + "bevy_reflect", "miette", "serde", "tedge_api", diff --git a/plugins/plugin_fdman/Cargo.toml b/plugins/plugin_fdman/Cargo.toml index 78c7239c..22c10067 100644 --- a/plugins/plugin_fdman/Cargo.toml +++ b/plugins/plugin_fdman/Cargo.toml @@ -13,6 +13,7 @@ thiserror = "1" toml = "0.5" tracing = "0.1" tokio-util = "0.7.0" +bevy_reflect = "0.7.0" tedge_api = { path = "../../crates/core/tedge_api" } tedge_lib = { path = "../../crates/core/tedge_lib" } diff --git a/plugins/plugin_fdman/src/error.rs b/plugins/plugin_fdman/src/error.rs index 5b7f977a..77e63b37 100644 --- a/plugins/plugin_fdman/src/error.rs +++ b/plugins/plugin_fdman/src/error.rs @@ -1,5 +1,14 @@ #[derive(Debug, miette::Diagnostic, thiserror::Error)] -pub(crate) enum Error { +pub enum Error { #[error("Failed to parse configuration")] ConfigParseFailed(#[from] toml::de::Error), + + #[error("IO")] + Io(#[from] std::io::Error), + + #[error("Sending reply message failed")] + SendingReply, + + #[error("Out of file handles")] + NoHandleLeft, } diff --git a/plugins/plugin_fdman/src/lib.rs b/plugins/plugin_fdman/src/lib.rs index 259f7a13..71e12f2a 100644 --- a/plugins/plugin_fdman/src/lib.rs +++ b/plugins/plugin_fdman/src/lib.rs @@ -1,8 +1,9 @@ mod builder; mod config; mod error; +pub mod message; mod plugin; pub use crate::builder::FdManPluginBuilder; +pub use crate::error::Error; pub use crate::plugin::FdManPlugin; - diff --git a/plugins/plugin_fdman/src/message/mod.rs b/plugins/plugin_fdman/src/message/mod.rs new file mode 100644 index 00000000..72ef4570 --- /dev/null +++ b/plugins/plugin_fdman/src/message/mod.rs @@ -0,0 +1,2 @@ +mod open_options; +pub use self::open_options::*; diff --git a/plugins/plugin_fdman/src/message/open_options.rs b/plugins/plugin_fdman/src/message/open_options.rs new file mode 100644 index 00000000..c8913052 --- /dev/null +++ b/plugins/plugin_fdman/src/message/open_options.rs @@ -0,0 +1,157 @@ +use std::{ + path::PathBuf, + sync::{atomic::AtomicU64, Arc}, +}; + +use tedge_api::Message; + +#[derive(Clone, Debug, bevy_reflect::TypeUuid)] +#[uuid = "cf82a6ed-3fc1-4011-bbf0-c79fc6fdf781"] +pub struct OpenOptions { + append: bool, + create: bool, + create_new: bool, + read: bool, + truncate: bool, + write: bool, + path: PathBuf, +} + +impl OpenOptions { + pub fn new(path: PathBuf) -> Self { + Self { + append: false, + create: false, + create_new: false, + read: false, + truncate: false, + write: false, + path, + } + } + + pub(crate) fn path(&self) -> &PathBuf { + &self.path + } + + pub(crate) fn as_std(&self) -> std::fs::OpenOptions { + let mut o = std::fs::OpenOptions::new(); + o.append(self.append); + o.create(self.create); + o.create_new(self.create_new); + o.read(self.read); + o.truncate(self.truncate); + o.write(self.write); + o + } + + pub fn append(mut self, b: bool) -> Self { + self.append = b; + self + } + + pub fn create(mut self, b: bool) -> Self { + self.create = b; + self + } + + pub fn create_new(mut self, b: bool) -> Self { + self.create_new = b; + self + } + + pub fn read(mut self, b: bool) -> Self { + self.read = b; + self + } + + pub fn truncate(mut self, b: bool) -> Self { + self.truncate = b; + self + } + + pub fn write(mut self, b: bool) -> Self { + self.write = b; + self + } +} + +impl tedge_api::Message for OpenOptions {} +impl tedge_api::message::AcceptsReplies for OpenOptions { + type Reply = OpenOptionsResult; +} + +#[derive(Debug)] +pub struct OpenOptionsFile { + file: std::fs::File, + currently_held_count: Arc<AtomicU64>, +} + +impl OpenOptionsFile { + pub(crate) fn new(file: std::fs::File, currently_held_count: Arc<AtomicU64>) -> Self { + Self { + file, + currently_held_count, + } + } +} + +impl Drop for OpenOptionsFile { + fn drop(&mut self) { + self.currently_held_count + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + } +} + +impl std::ops::Deref for OpenOptionsFile { + type Target = std::fs::File; + + fn deref(&self) -> &Self::Target { + &self.file + } +} + +impl std::ops::DerefMut for OpenOptionsFile { + fn deref_mut(&mut self) -> &mut Self::Target { + &mut self.file + } +} + +#[derive(Debug, thiserror::Error)] +pub enum OpenOptionsError { + #[error("FdMan Plugin error")] + FdMan(#[from] crate::error::Error), +} + +#[derive(Debug, bevy_reflect::TypeUuid)] +#[uuid = "e259bed9-14b9-4edf-ab7b-e87d454b823f"] +pub struct OpenOptionsResult { + open_options: OpenOptions, + result: Result<OpenOptionsFile, OpenOptionsError>, +} + +impl OpenOptionsResult { + pub(crate) fn new( + open_options: OpenOptions, + result: Result<OpenOptionsFile, OpenOptionsError>, + ) -> Self { + Self { + open_options, + result, + } + } + + pub fn open_options(&self) -> &OpenOptions { + &self.open_options + } + + pub fn result(&self) -> &Result<OpenOptionsFile, OpenOptionsError> { + &self.result + } + + pub fn into_result(self) -> Result<OpenOptionsFile, OpenOptionsError> { + self.result + } +} + +impl Message for OpenOptionsResult {} diff --git a/plugins/plugin_fdman/src/plugin.rs b/plugins/plugin_fdman/src/plugin.rs index eaaecebf..5c02c37f 100644 --- a/plugins/plugin_fdman/src/plugin.rs +++ b/plugins/plugin_fdman/src/plugin.rs @@ -1,21 +1,92 @@ +use std::sync::{atomic::AtomicU64, Arc}; + use async_trait::async_trait; -use tedge_api::plugin::Plugin; +use tedge_api::{ + plugin::{Handle, Plugin}, + PluginError, +}; + +use crate::{ + error::Error, + message::{OpenOptions, OpenOptionsError, OpenOptionsResult}, +}; #[derive(Debug)] pub struct FdManPlugin { max_fds: u64, + currently_held_count: Arc<AtomicU64>, } impl FdManPlugin { pub fn new(max_fds: u64) -> Self { - Self { max_fds } + Self { + max_fds, + currently_held_count: Arc::new(AtomicU64::from(0)), + } + } + + fn aquire_handle(&self) -> Result<FdHandle, Error> { + let old_value = self + .currently_held_count + .fetch_add(1, std::sync::atomic::Ordering::SeqCst); + + if old_value >= self.max_fds { + self.currently_held_count + .fetch_sub(1, std::sync::atomic::Ordering::SeqCst); + Err(Error::NoHandleLeft) + } else { + Ok(FdHandle) + } } } +struct FdHandle; + impl tedge_api::plugin::PluginDeclaration for FdManPlugin { - type HandledMessages = (); + type HandledMessages = (OpenOptions,); } #[async_trait] impl Plugin for FdManPlugin {} + +#[async_trait::async_trait] +impl Handle<OpenOptions> for FdManPlugin { + async fn handle_message( + &self, + message: OpenOptions, + sender: tedge_api::address::ReplySenderFor<OpenOptions>, + ) -> Result<(), PluginError> { + match self.aquire_handle() { + Ok(FdHandle) => { + let file_result = message + .as_std() + .open(message.path()) + .map(|file| { + crate::message::OpenOptionsFile::new( + file, + self.currently_held_count.clone(), + ) + }) + .map_err(Error::from) + .map_err(OpenOptionsError::from); + let res = OpenOptionsResult::new(message, file_result); + sender + .reply(res) + .map_err(|_| Error::SendingReply) + .map_err(PluginError::from) + } + Err(Error::NoHandleLeft) => { + let res = OpenOptionsResult::new( + message, + Err(OpenOptionsError::from(Error::NoHandleLeft)), + ); + sender + .reply(res) + .map_err(|_| Error::SendingReply) + .map_err(PluginError::from) + } + Err(other) => Err(PluginError::from(other)), + } + } +} |