summaryrefslogtreecommitdiffstats
path: root/plugins/plugin_fdman/src/plugin.rs
blob: 0c95916221b5a89f740481c0e1f179e0c514b80d (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
use std::sync::{atomic::AtomicU64, Arc};

use async_trait::async_trait;

use tedge_api::{
    plugin::{Handle, Plugin},
    PluginError,
};

use crate::{
    error::Error,
    guard::Guard,
    message::{OpenOptions, OpenOptionsError, OpenOptionsResult},
};

#[derive(Debug)]
pub struct FdManPlugin {
    max_fds: u64,
    currently_held_count: Arc<AtomicU64>,
}

impl FdManPlugin {
    pub fn new(max_fds: u64) -> Self {
        Self {
            max_fds,
            currently_held_count: Arc::new(AtomicU64::from(0)),
        }
    }

    fn aquire_handles(&self, count: u64) -> Result<FdHandles, Error> {
        let old_value = self
            .currently_held_count
            .fetch_add(count, std::sync::atomic::Ordering::SeqCst);

        if old_value >= self.max_fds {
            self.currently_held_count
                .fetch_sub(count, std::sync::atomic::Ordering::SeqCst);
            Err(Error::InsufficientHandles {
                required: count,
                received: (old_value - self.max_fds),
            })
        } else {
            Ok(FdHandles(count))
        }
    }
}

struct FdHandles(u64);

impl tedge_api::plugin::PluginDeclaration for FdManPlugin {
    type HandledMessages = (OpenOptions,);
}

#[async_trait]
impl Plugin for FdManPlugin {}

#[async_trait::async_trait]
impl Handle<OpenOptions> for FdManPlugin {
    async fn handle_message(
        &self,
        message: OpenOptions,
        sender: tedge_api::address::ReplySenderFor<OpenOptions>,
    ) -> Result<(), PluginError> {
        let handles_to_aquire = 1;
        match self.aquire_handles(handles_to_aquire) {
            Ok(FdHandles(aquired_handles)) => {
                if aquired_handles != handles_to_aquire {
                    let err = Error::InsufficientHandles {
                        required: handles_to_aquire,
                        received: aquired_handles,
                    };
                    return sender
                        .reply(OpenOptionsResult::new(
                            message,
                            Err(OpenOptionsError::from(err)),
                        ))
                        .map_err(|_| Error::SendingReply)
                        .map_err(PluginError::from);
                }

                let file_result = message
                    .as_std()
                    .open(message.path())
                    .map(|file| {
                        crate::file::FileGuard::new(
                            file,
                            Guard::new(aquired_handles, self.currently_held_count.clone()),
                        )
                    })
                    .map_err(Error::from)
                    .map_err(OpenOptionsError::from);
                let res = OpenOptionsResult::new(message, file_result);
                sender
                    .reply(res)
                    .map_err(|_| Error::SendingReply)
                    .map_err(PluginError::from)
            }
            Err(err @ Error::InsufficientHandles { .. }) => sender
                .reply(OpenOptionsResult::new(
                    message,
                    Err(OpenOptionsError::from(err)),
                ))
                .map_err(|_| Error::SendingReply)
                .map_err(PluginError::from),
            Err(other) => Err(PluginError::from(other)),
        }
    }
}