summaryrefslogtreecommitdiffstats
path: root/.gitignore
blob: b93fb7eff94286c7a15d475fa581dac32a89b0db (plain)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
#
# NOTE! Don't add files that are generated in specific
# subdirectories here. Add them in the ".gitignore" file
# in that subdirectory instead.
#
# NOTE! Please use 'git ls-files -i --exclude-standard'
# command after changing this file, to see if there are
# any tracked files which get ignored after the change.
#
# Normal rules
#
.*
*.o
*.o.*
*.a
*.s
*.ko
*.so
*.so.dbg
*.mod.c
*.i
*.lst
*.symtypes
*.order
*.elf
*.bin
*.gz
*.lzma
*.patch
*.gcno

#
# Top-level generic files
#
tags
TAGS
vmlinux
System.map
Module.markers
Module.symvers
!.gitignore
!.mailmap

#
# Generated include files
#
include/asm
include/asm-*/asm-offsets.h
include/config
include/linux/autoconf.h
include/linux/compile.h
include/linux/version.h
include/linux/utsrelease.h
include/linux/bounds.h
include/generated

# stgit generated dirs
patches-*

# quilt's files
patches
series

# cscope files
cscope.*
ncscope.*

# gnu global files
GPATH
GRTAGS
GSYMS
GTAGS

*.orig
*~
\#*#
08' href='#n108'>108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196
use std::any::TypeId;
use std::collections::HashSet;

use futures::StreamExt;

use futures::future::FutureExt;
use tedge_api::plugin::BuiltPlugin;
use tokio_util::sync::CancellationToken;
use tracing::debug;
use tracing::error;
use tracing::trace;

use crate::configuration::PluginInstanceConfiguration;
use crate::configuration::PluginKind;
use crate::errors::Result;
use crate::errors::TedgeApplicationError;
use crate::plugin_task::PluginTask;
use crate::task::Task;
use crate::TedgeApplication;
use crate::communication::PluginDirectory;
use crate::communication::PluginInfo;

/// Helper type for running a TedgeApplication
///
/// This type is only introduced for more seperation-of-concerns in the codebase
/// `Reactor::run()` is simply `TedgeApplication::run()`.
pub struct Reactor(pub TedgeApplication);

impl std::fmt::Debug for Reactor {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        self.0.fmt(f)
    }
}

/// Helper type for preparing a PluginTask
struct PluginTaskPrep {
    name: String,
    plugin: BuiltPlugin,
    plugin_msg_receiver: tedge_api::address::MessageReceiver,
    cancellation_token: CancellationToken,
}

impl Reactor {
    pub async fn run(self) -> Result<()> {
        let channel_size = self.0.config().communication_buffer_size().get();

        let directory_iter = self.0
            .config()
            .plugins()
            .iter()
            .map(|(pname, pconfig)| {
                let handle_types = self.0
                    .plugin_builders()
                    .get(pconfig.kind().as_ref())
                    .map(|(handle_types, _)| {
                        handle_types.get_types()
                            .into_iter()
                            .cloned()
                            .collect::<HashSet<(&'static str, TypeId)>>()
                    })
                    .ok_or_else(|| {
                        TedgeApplicationError::UnknownPluginKind(pconfig.kind().as_ref().to_string())
                    })?;

                    Ok((pname.to_string(), PluginInfo::new(handle_types, channel_size)))
            });
        let (core_sender, core_receiver) = tokio::sync::mpsc::channel(channel_size);
        let mut directory = PluginDirectory::collect_from(directory_iter, core_sender)?;

        let instantiated_plugins = self
            .0
            .config()
            .plugins()
            .iter()
            .map(|(pname, pconfig)| {
                let receiver = match directory.get_mut(pname).and_then(|pinfo| pinfo.receiver.take()) {
                    Some(receiver) => receiver,
                    None => unreachable!("Tried to take receiver twice. This is a FATAL bug, please report it"),
                };

                (pname, pconfig, receiver)
            })
            .collect::<Vec<_>>()
            .into_iter()
            .map(|(pname, pconfig, receiver)| {
                self.instantiate_plugin(pname, pconfig, &directory, receiver, self.0.cancellation_token().child_token())
            })
            .collect::<futures::stream::FuturesUnordered<_>>()
            .collect::<Vec<Result<_>>>()
            .await
            .into_iter()
            .collect::<Result<Vec<_>>>()?;
        debug!("Plugins instantiated");

        let running_core = {
            // we clone the cancellation_token here, because the core must be able to use the
            // "root" token to stop all plugins
            let core_cancel_token = self.0.cancellation_token().clone();
            crate::core_task::CoreTask::new(core_cancel_token, core_receiver).run()
        };
        debug!("Core task instantiated");

        let running_plugins = instantiated_plugins
            .into_iter()
            .map(|prep| {
                PluginTask::new(
                    prep.name,
                    prep.plugin,
                    prep.plugin_msg_receiver,
                    prep.cancellation_token,
                    self.0.config().plugin_shutdown_timeout(),
                )
            })
            .map(Task::run)
            .map(Box::pin)
            .collect::<futures::stream::FuturesUnordered<_>>() // main loop
            .collect::<Vec<Result<()>>>()
            .inspect(|res| debug!("All Plugin Tasks finished running: {:?}", res));
        debug!("Plugin tasks instantiated");

        debug!("Entering main loop");
        let (plugin_res, core_res) = tokio::join!(running_plugins, running_core);

        plugin_res
            .into_iter() // result type conversion
            .collect::<Result<Vec<()>>>()
            .and_then(|_| core_res)
    }

    fn get_config_for_plugin<'a>(
        &'a self,
        plugin_name: &str,
    ) -> Option<&'a tedge_api::PluginConfiguration> {
        trace!("Searching config for plugin: {}", plugin_name);
        self.0
            .config()
            .plugins()
            .get(plugin_name)
            .map(|cfg| cfg.configuration())
    }

    fn find_plugin_builder<'a>(
        &'a self,
        plugin_kind: &PluginKind,
    ) -> Option<&'a Box<dyn tedge_api::PluginBuilder<PluginDirectory>>> {
        trace!("Searching builder for plugin: {}", plugin_kind.as_ref());
        self.0
            .plugin_builders()
            .get(plugin_kind.as_ref())
            .map(|(_, pb)| pb)
    }

    async fn instantiate_plugin(
        &self,
        plugin_name: &str,
        plugin_config: &PluginInstanceConfiguration,
        directory: &PluginDirectory,
        plugin_msg_receiver: tedge_api::address::MessageReceiver,
        cancellation_token: CancellationToken,
    ) -> Result<PluginTaskPrep> {
        let builder = self
            .find_plugin_builder(plugin_config.kind())
            .ok_or_else(|| {
                let kind_name = plugin_config.kind().as_ref().to_string();
                TedgeApplicationError::UnknownPluginKind(kind_name)
            })?;

        let config = self.get_config_for_plugin(plugin_name).ok_or_else(|| {
            let pname = plugin_name.to_string();
            TedgeApplicationError::PluginConfigMissing(pname)
        })?;

        if let Err(e) = builder.verify_configuration(&config).await {
            error!("Verification of configuration failed for plugin '{}'", plugin_name);
            return Err(TedgeApplicationError::PluginConfigVerificationFailed(e))
        }

        let cancel_token = self.0.cancellation_token.child_token();

        trace!(
            "Instantiating plugin: {} of kind {}",
            plugin_name,
            plugin_config.kind().as_ref()
        );
        builder
            .instantiate(config.clone(), cancel_token, directory)
            .await
            .map_err(TedgeApplicationError::from)
            .map(|plugin| PluginTaskPrep {
                name: plugin_name.to_string(),
                plugin,
                plugin_msg_receiver,
                cancellation_token,
            })
    }
}