From 627b1138dcd5daac87345f56894bdd0b03b8b75a Mon Sep 17 00:00:00 2001 From: vkalintiris Date: Sat, 20 Jan 2024 16:55:50 +0200 Subject: It works!!!! --- daemon/main.c | 36 ++---- packaging/dag/child_stream.conf | 10 ++ packaging/dag/main.py | 243 +++++++++++++++++++++++++++++++++++---- packaging/dag/parent_stream.conf | 7 ++ 4 files changed, 247 insertions(+), 49 deletions(-) create mode 100644 packaging/dag/child_stream.conf create mode 100644 packaging/dag/parent_stream.conf diff --git a/daemon/main.c b/daemon/main.c index a88fef5e1f..276da6959d 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -873,37 +873,21 @@ static void log_init(void) { nd_log_set_priority_level(config_get(CONFIG_SECTION_LOGS, "level", NDLP_INFO_STR)); - char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "%s/debug.log", netdata_configured_log_dir); - nd_log_set_user_settings(NDLS_DEBUG, config_get(CONFIG_SECTION_LOGS, "debug", filename)); - - bool with_journal = is_stderr_connected_to_journal() /* || nd_log_journal_socket_available() */; - if(with_journal) - snprintfz(filename, FILENAME_MAX, "journal"); - else - snprintfz(filename, FILENAME_MAX, "%s/daemon.log", netdata_configured_log_dir); - nd_log_set_user_settings(NDLS_DAEMON, config_get(CONFIG_SECTION_LOGS, "daemon", filename)); + // char filename[FILENAME_MAX + 1]; + // snprintfz(filename, FILENAME_MAX, "%s/debug.log", netdata_configured_log_dir); + nd_log_set_user_settings(NDLS_DEBUG, config_get(CONFIG_SECTION_LOGS, "debug", "stderr")); - if(with_journal) - snprintfz(filename, FILENAME_MAX, "journal"); - else - snprintfz(filename, FILENAME_MAX, "%s/collector.log", netdata_configured_log_dir); - nd_log_set_user_settings(NDLS_COLLECTORS, config_get(CONFIG_SECTION_LOGS, "collector", filename)); - - snprintfz(filename, FILENAME_MAX, "%s/access.log", netdata_configured_log_dir); - nd_log_set_user_settings(NDLS_ACCESS, config_get(CONFIG_SECTION_LOGS, "access", filename)); - - if(with_journal) - snprintfz(filename, FILENAME_MAX, "journal"); - else - snprintfz(filename, FILENAME_MAX, "%s/health.log", netdata_configured_log_dir); - nd_log_set_user_settings(NDLS_HEALTH, config_get(CONFIG_SECTION_LOGS, "health", filename)); + // bool with_journal = is_stderr_connected_to_journal() /* || nd_log_journal_socket_available() */; + + nd_log_set_user_settings(NDLS_DAEMON, config_get(CONFIG_SECTION_LOGS, "daemon", "stderr")); + nd_log_set_user_settings(NDLS_COLLECTORS, config_get(CONFIG_SECTION_LOGS, "collector", "stderr")); + nd_log_set_user_settings(NDLS_ACCESS, config_get(CONFIG_SECTION_LOGS, "access", "stderr")); + nd_log_set_user_settings(NDLS_HEALTH, config_get(CONFIG_SECTION_LOGS, "health", "stderr")); #ifdef ENABLE_ACLK aclklog_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "conversation log", CONFIG_BOOLEAN_NO); if (aclklog_enabled) { - snprintfz(filename, FILENAME_MAX, "%s/aclk.log", netdata_configured_log_dir); - nd_log_set_user_settings(NDLS_ACLK, config_get(CONFIG_SECTION_CLOUD, "conversation log file", filename)); + nd_log_set_user_settings(NDLS_ACLK, config_get(CONFIG_SECTION_CLOUD, "conversation log file", "stderr")); } #endif } diff --git a/packaging/dag/child_stream.conf b/packaging/dag/child_stream.conf new file mode 100644 index 0000000000..ed78bd3fbd --- /dev/null +++ b/packaging/dag/child_stream.conf @@ -0,0 +1,10 @@ +[stream] + enabled = {{ enabled }} + destination = {{ destination }} + api key = {{ api_key }} + timeout seconds = {{ timeout_seconds }} + default port = {{ default_port }} + send charts matching = {{ send_charts_matching }} + buffer size bytes = {{ buffer_size_bytes }} + reconnect delay seconds = {{ reconnect_delay_seconds }} + initial clock resync iterations = {{ initial_clock_resync_iterations }} diff --git a/packaging/dag/main.py b/packaging/dag/main.py index 8a5b03da0b..fcbed24ef6 100755 --- a/packaging/dag/main.py +++ b/packaging/dag/main.py @@ -1,21 +1,23 @@ #!/usr/bin/env python3 +from typing import Callable, List, Tuple + import asyncio import enum -import click import os +import pathlib import sys +import tempfile import time +import uuid import anyio - +import click import dagger -from typing import Callable, List, Tuple +import jinja2 import images as oci_images -import pathlib - class Platform: def __init__(self, platform: str): @@ -199,8 +201,8 @@ class NetdataInstaller: externaldeps = self.distro._cache_volume(client, self.platform, "externaldeps") ctr = ( - ctr.with_directory(self.repo_root, client.host().directory(host_repo_root)) - .with_workdir(self.repo_root) + ctr.with_directory(self.repo_root.as_posix(), client.host().directory(host_repo_root)) + .with_workdir(self.repo_root.as_posix()) .with_mounted_cache(os.path.join(self.repo_root, "externaldeps"), externaldeps) ) @@ -227,10 +229,10 @@ class NetdataInstaller: if FeatureFlags.BundledProtobuf not in self.features: args.append("--use-system-protobuf") - args.extend(["--install-prefix", self.prefix]) + args.extend(["--install-prefix", self.prefix.as_posix()]) - ctr = self._mount_repo(client, ctr, self.repo_root) + ctr = self._mount_repo(client, ctr, self.repo_root.as_posix()) ctr = ( ctr.with_env_variable('NETDATA_CMAKE_OPTIONS', '-DCMAKE_BUILD_TYPE=Debug') @@ -239,34 +241,164 @@ class NetdataInstaller: # The installer will place everything under "/netdata" if self.prefix != "/": - self.prefix = os.path.join(self.prefix, "netdata") + self.prefix = self.prefix / "netdata" return ctr +class ChildStreamConf: + def __init__(self, installer: NetdataInstaller, destination: str, api_key: uuid.UUID): + self.installer = installer + self.substitutions = { + "enabled": "yes", + "destination": destination, + "api_key": api_key, + "timeout_seconds": 60, + "default_port": 19999, + "send_charts_matching": "*", + "buffer_size_bytes": 1024 * 1024, + "reconnect_delay_seconds": 5, + "initial_clock_resync_iterations": 60, + } + + def render(self) -> str: + tmpl_path = pathlib.Path(__file__).parent / "child_stream.conf" + with open(tmpl_path) as fp: + tmpl = jinja2.Template(fp.read()) + + return tmpl.render(**self.substitutions) + + +class ParentStreamConf: + def __init__(self, installer: NetdataInstaller, api_key: str): + self.installer = installer + self.substitutions = { + "api_key": api_key, + "enabled": "yes", + "allow_from": "*", + "default_history": 3600, + "health_enabled_by_default": "auto", + "default_postpone_alarms_on_connect_seconds": 60, + "multiple_connections": "allow", + } + + def render(self) -> str: + tmpl_path = pathlib.Path(__file__).parent / "parent_stream.conf" + with open(tmpl_path) as fp: + tmpl = jinja2.Template(fp.read()) + + return tmpl.render(**self.substitutions) + + +class StreamConf: + def __init__(self, child_conf: ChildStreamConf, parent_conf: ParentStreamConf): + self.child_conf = child_conf + self.parent_conf = parent_conf + + def render(self) -> str: + child_section = self.child_conf.render() if self.child_conf else '' + parent_section = self.parent_conf.render() if self.parent_conf else '' + return '\n'.join([child_section, parent_section]) + + class Agent: def __init__(self, installer: NetdataInstaller): + self.identifier = uuid.uuid4() self.installer = installer + def _binary(self) -> pathlib.Path: + return os.path.join(self.installer.prefix, "usr/sbin/netdata") + def buildinfo(self, ctr: dagger.Container, installer: NetdataInstaller, output: pathlib.Path) -> dagger.Container: - binary = os.path.join(installer.prefix, "usr/sbin/netdata") - ctr = ( - ctr.with_exec([binary, "-W", "buildinfo"], redirect_stdout=output) + ctr.with_exec([self._binary(), "-W", "buildinfo"], redirect_stdout=output) ) return ctr def unittest(self, ctr: dagger.Container) -> dagger.Container: - binary = os.path.join(self.installer.prefix, "usr/sbin/netdata") + ctr = ( + ctr.with_exec([self._binary(), "-W", "unittest"]) + ) + + return ctr + def run(self, client: dagger.Client, ctr: dagger.Container, stream_conf: StreamConf, port, parent) -> dagger.Container: + # Write stream.conf + if stream_conf: + host_stream_conf_path = str(self.identifier) + ".stream.conf" + + with open(host_stream_conf_path, 'w') as fp: + fp.write(stream_conf.render()) + + dest = self.installer.prefix / "etc/netdata/stream.conf" + ctr = ( + ctr.with_file(dest.as_posix(), client.host().file(host_stream_conf_path)) + ) + + if parent: + ctr = ctr.with_service_binding("tilestora", parent) + + # Exec the binary ctr = ( - ctr.with_exec([binary, "-W", "unittest"]) + ctr.with_exposed_port(port) + .with_exec([self._binary(), "-D", "-i", "0.0.0.0", "-p", str(port)]) ) return ctr +class Digraph: + def __init__(self): + self.nodes = {} # Stores Agent instances + self.children_of = {} # Stores children: {parent_id: [child_ids]} + self.parents_of = {} # Stores parents: {child_id: [parent_ids]} + + def add_node(self, node): + self.nodes[node.identifier] = node + if node.identifier not in self.children_of: + self.children_of[node.identifier] = [] + if node.identifier not in self.parents_of: + self.parents_of[node.identifier] = [] + + def add_children(self, node, children): + if node.identifier not in self.nodes : + raise ValueError("Node not found") + + for child in children: + if child.identifier not in self.nodes : + raise ValueError("Child node not found") + if node.identifier not in self.children_of[child.identifier]: + self.children_of[node.identifier].append(child.identifier) + if child.identifier not in self.parents_of[node.identifier]: + self.parents_of[child.identifier].append(node.identifier) + + def get_children(self, node): + return [self.nodes [child_id] for child_id in self.children_of.get(node.identifier, [])] + + def get_parents(self, node): + return [self.nodes [parent_id] for parent_id in self.parents_of.get(node.identifier, [])] + + def get_siblings(self, node): + siblings = set() + for parent_id in self.parents_of.get(node.identifier, []): + siblings.update(self.children_of.get(parent_id, [])) + siblings.discard(node.identifier) + return [self.nodes [sibling_id] for sibling_id in siblings] + + def render(self, filename="digraph"): + import graphviz + dot = graphviz.Digraph(comment='Agent Topology') + for identifier, node in self.nodes.items(): + dot.node(str(identifier), label=str(identifier)) + + for parent_id, children_ids in self.children_of.items(): + for child_id in children_ids: + dot.edge(str(parent_id), str(child_id)) + + dot.render(filename, format='svg', cleanup=True) + + class Context: def __init__(self, client: dagger.Client, @@ -303,6 +435,13 @@ class Context: ctr = self.agent.buildinfo(ctr, self.installer, output) return ctr + def exec(self, ctr: dagger.Container) -> dagger.Container: + if self.built_agent == False: + self.build_agent(ctr) + + ctr = self.agent.run(ctr) + return ctr + def run_async(func): def wrapper(*args, **kwargs): @@ -317,10 +456,14 @@ async def main(): async with dagger.Connection(config) as client: platform = dagger.Platform("linux/x86_64") distro = Distribution("debian10") - installer = NetdataInstaller(platform, distro, "/netdata", "/opt", FeatureFlags.DBEngine) - agent = Agent(installer) - ctx = Context(client, platform, distro, installer, agent) + repo_root = pathlib.Path("/netdata") + prefix_path = pathlib.Path("/opt") + installer = NetdataInstaller(platform, distro, repo_root, prefix_path, FeatureFlags.DBEngine) + parent_agent = Agent(installer) + child_agent = Agent(installer) + + ctx = Context(client, platform, distro, installer, parent_agent) # build base image with packages we need ctr = ctx.build_distro() @@ -329,14 +472,68 @@ async def main(): ctr = ctx.build_agent(ctr) # get the buildinfo - output = os.path.join(installer.prefix, "buildinfo.log") - ctr = ctx.buildinfo(ctr, output) + # output = os.path.join(installer.prefix, "buildinfo.log") + # ctr = ctx.buildinfo(ctr, output) - # run unittests - ctr = agent.unittest(ctr) + api_key = uuid.uuid4() - await ctr + def setup_parent(): + child_stream_conf = None + parent_stream_conf = ParentStreamConf(installer, api_key) + stream_conf = StreamConf(child_stream_conf, parent_stream_conf) + return stream_conf + + parent_stream_conf = setup_parent() + parent = parent_agent.run(client, ctr, parent_stream_conf, 19999, None) + parent_service = parent.as_service() + + def setup_child(): + child_stream_conf = ChildStreamConf(installer, "tilestora:19999", api_key) + parent_stream_conf = None + stream_conf = StreamConf(child_stream_conf, parent_stream_conf) + return stream_conf + + child_stream_conf = setup_child() + child = child_agent.run(client, ctr, child_stream_conf, 20000, parent_service) + + tunnel = await client.host().tunnel(parent_service, native=True).start() + endpoint = await tunnel.endpoint() + + await child + + # await child.with_service_binding("tilestora", parent_service) + # await child.with_service_binding("tilestora", parent_service) + + + + # tunnel = await client.host().tunnel(parent_service, native=True).start() + # endpoint = await tunnel.endpoint() + + # tunnel = await client.host().tunnel(child_service, native=True).start() + # endpoint = await tunnel.endpoint() + + time.sleep(600) + + # run unittests + # ctr = agent.unittest(ctr) + # await ctr if __name__ == '__main__': + # agent1 = Agent("Data1") + # agent2 = Agent("Data2") + # agent3 = Agent("Data3") + # agent4 = Agent("Data4") + + # dg = Digraph() + # dg.add_node(agent1) + # dg.add_node(agent2) + # dg.add_node(agent3) + # dg.add_node(agent4) + + # dg.add_children(agent1, [agent2, agent3]) + # dg.add_children(agent4, [agent2, agent3]) + + # dg.render() + main() diff --git a/packaging/dag/parent_stream.conf b/packaging/dag/parent_stream.conf new file mode 100644 index 0000000000..15f303f97b --- /dev/null +++ b/packaging/dag/parent_stream.conf @@ -0,0 +1,7 @@ +[{{ api_key }}] + enabled = {{ enabled }} + allow from = {{ allow_from }} + default history = {{ default_history }} + health enabled by default = {{ health_enabled_by_default }} + default postpone alarms on connect seconds = {{ default_postpone_alarms_on_connect_seconds }} + multiple connections = {{ multiple_connections }} -- cgit v1.2.3