summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvkalintiris <vasilis@netdata.cloud>2024-01-20 16:55:50 +0200
committervkalintiris <vasilis@netdata.cloud>2024-01-20 16:55:50 +0200
commit627b1138dcd5daac87345f56894bdd0b03b8b75a (patch)
tree278c996ecbcf6b93526f33ffa072e591aa699d4c
parentf02001c5c3a28c03527eb2f598f232437286c20d (diff)
It works!!!!dagger
-rw-r--r--daemon/main.c36
-rw-r--r--packaging/dag/child_stream.conf10
-rwxr-xr-xpackaging/dag/main.py243
-rw-r--r--packaging/dag/parent_stream.conf7
4 files changed, 247 insertions, 49 deletions
diff --git a/daemon/main.c b/daemon/main.c
index a88fef5e1f..276da6959d 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -873,37 +873,21 @@ static void log_init(void) {
nd_log_set_priority_level(config_get(CONFIG_SECTION_LOGS, "level", NDLP_INFO_STR));
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "%s/debug.log", netdata_configured_log_dir);
- nd_log_set_user_settings(NDLS_DEBUG, config_get(CONFIG_SECTION_LOGS, "debug", filename));
-
- bool with_journal = is_stderr_connected_to_journal() /* || nd_log_journal_socket_available() */;
- if(with_journal)
- snprintfz(filename, FILENAME_MAX, "journal");
- else
- snprintfz(filename, FILENAME_MAX, "%s/daemon.log", netdata_configured_log_dir);
- nd_log_set_user_settings(NDLS_DAEMON, config_get(CONFIG_SECTION_LOGS, "daemon", filename));
+ // char filename[FILENAME_MAX + 1];
+ // snprintfz(filename, FILENAME_MAX, "%s/debug.log", netdata_configured_log_dir);
+ nd_log_set_user_settings(NDLS_DEBUG, config_get(CONFIG_SECTION_LOGS, "debug", "stderr"));
- if(with_journal)
- snprintfz(filename, FILENAME_MAX, "journal");
- else
- snprintfz(filename, FILENAME_MAX, "%s/collector.log", netdata_configured_log_dir);
- nd_log_set_user_settings(NDLS_COLLECTORS, config_get(CONFIG_SECTION_LOGS, "collector", filename));
-
- snprintfz(filename, FILENAME_MAX, "%s/access.log", netdata_configured_log_dir);
- nd_log_set_user_settings(NDLS_ACCESS, config_get(CONFIG_SECTION_LOGS, "access", filename));
-
- if(with_journal)
- snprintfz(filename, FILENAME_MAX, "journal");
- else
- snprintfz(filename, FILENAME_MAX, "%s/health.log", netdata_configured_log_dir);
- nd_log_set_user_settings(NDLS_HEALTH, config_get(CONFIG_SECTION_LOGS, "health", filename));
+ // bool with_journal = is_stderr_connected_to_journal() /* || nd_log_journal_socket_available() */;
+
+ nd_log_set_user_settings(NDLS_DAEMON, config_get(CONFIG_SECTION_LOGS, "daemon", "stderr"));
+ nd_log_set_user_settings(NDLS_COLLECTORS, config_get(CONFIG_SECTION_LOGS, "collector", "stderr"));
+ nd_log_set_user_settings(NDLS_ACCESS, config_get(CONFIG_SECTION_LOGS, "access", "stderr"));
+ nd_log_set_user_settings(NDLS_HEALTH, config_get(CONFIG_SECTION_LOGS, "health", "stderr"));
#ifdef ENABLE_ACLK
aclklog_enabled = config_get_boolean(CONFIG_SECTION_CLOUD, "conversation log", CONFIG_BOOLEAN_NO);
if (aclklog_enabled) {
- snprintfz(filename, FILENAME_MAX, "%s/aclk.log", netdata_configured_log_dir);
- nd_log_set_user_settings(NDLS_ACLK, config_get(CONFIG_SECTION_CLOUD, "conversation log file", filename));
+ nd_log_set_user_settings(NDLS_ACLK, config_get(CONFIG_SECTION_CLOUD, "conversation log file", "stderr"));
}
#endif
}
diff --git a/packaging/dag/child_stream.conf b/packaging/dag/child_stream.conf
new file mode 100644
index 0000000000..ed78bd3fbd
--- /dev/null
+++ b/packaging/dag/child_stream.conf
@@ -0,0 +1,10 @@
+[stream]
+ enabled = {{ enabled }}
+ destination = {{ destination }}
+ api key = {{ api_key }}
+ timeout seconds = {{ timeout_seconds }}
+ default port = {{ default_port }}
+ send charts matching = {{ send_charts_matching }}
+ buffer size bytes = {{ buffer_size_bytes }}
+ reconnect delay seconds = {{ reconnect_delay_seconds }}
+ initial clock resync iterations = {{ initial_clock_resync_iterations }}
diff --git a/packaging/dag/main.py b/packaging/dag/main.py
index 8a5b03da0b..fcbed24ef6 100755
--- a/packaging/dag/main.py
+++ b/packaging/dag/main.py
@@ -1,21 +1,23 @@
#!/usr/bin/env python3
+from typing import Callable, List, Tuple
+
import asyncio
import enum
-import click
import os
+import pathlib
import sys
+import tempfile
import time
+import uuid
import anyio
-
+import click
import dagger
-from typing import Callable, List, Tuple
+import jinja2
import images as oci_images
-import pathlib
-
class Platform:
def __init__(self, platform: str):
@@ -199,8 +201,8 @@ class NetdataInstaller:
externaldeps = self.distro._cache_volume(client, self.platform, "externaldeps")
ctr = (
- ctr.with_directory(self.repo_root, client.host().directory(host_repo_root))
- .with_workdir(self.repo_root)
+ ctr.with_directory(self.repo_root.as_posix(), client.host().directory(host_repo_root))
+ .with_workdir(self.repo_root.as_posix())
.with_mounted_cache(os.path.join(self.repo_root, "externaldeps"), externaldeps)
)
@@ -227,10 +229,10 @@ class NetdataInstaller:
if FeatureFlags.BundledProtobuf not in self.features:
args.append("--use-system-protobuf")
- args.extend(["--install-prefix", self.prefix])
+ args.extend(["--install-prefix", self.prefix.as_posix()])
- ctr = self._mount_repo(client, ctr, self.repo_root)
+ ctr = self._mount_repo(client, ctr, self.repo_root.as_posix())
ctr = (
ctr.with_env_variable('NETDATA_CMAKE_OPTIONS', '-DCMAKE_BUILD_TYPE=Debug')
@@ -239,34 +241,164 @@ class NetdataInstaller:
# The installer will place everything under "<install-prefix>/netdata"
if self.prefix != "/":
- self.prefix = os.path.join(self.prefix, "netdata")
+ self.prefix = self.prefix / "netdata"
return ctr
+class ChildStreamConf:
+ def __init__(self, installer: NetdataInstaller, destination: str, api_key: uuid.UUID):
+ self.installer = installer
+ self.substitutions = {
+ "enabled": "yes",
+ "destination": destination,
+ "api_key": api_key,
+ "timeout_seconds": 60,
+ "default_port": 19999,
+ "send_charts_matching": "*",
+ "buffer_size_bytes": 1024 * 1024,
+ "reconnect_delay_seconds": 5,
+ "initial_clock_resync_iterations": 60,
+ }
+
+ def render(self) -> str:
+ tmpl_path = pathlib.Path(__file__).parent / "child_stream.conf"
+ with open(tmpl_path) as fp:
+ tmpl = jinja2.Template(fp.read())
+
+ return tmpl.render(**self.substitutions)
+
+
+class ParentStreamConf:
+ def __init__(self, installer: NetdataInstaller, api_key: str):
+ self.installer = installer
+ self.substitutions = {
+ "api_key": api_key,
+ "enabled": "yes",
+ "allow_from": "*",
+ "default_history": 3600,
+ "health_enabled_by_default": "auto",
+ "default_postpone_alarms_on_connect_seconds": 60,
+ "multiple_connections": "allow",
+ }
+
+ def render(self) -> str:
+ tmpl_path = pathlib.Path(__file__).parent / "parent_stream.conf"
+ with open(tmpl_path) as fp:
+ tmpl = jinja2.Template(fp.read())
+
+ return tmpl.render(**self.substitutions)
+
+
+class StreamConf:
+ def __init__(self, child_conf: ChildStreamConf, parent_conf: ParentStreamConf):
+ self.child_conf = child_conf
+ self.parent_conf = parent_conf
+
+ def render(self) -> str:
+ child_section = self.child_conf.render() if self.child_conf else ''
+ parent_section = self.parent_conf.render() if self.parent_conf else ''
+ return '\n'.join([child_section, parent_section])
+
+
class Agent:
def __init__(self, installer: NetdataInstaller):
+ self.identifier = uuid.uuid4()
self.installer = installer
+ def _binary(self) -> pathlib.Path:
+ return os.path.join(self.installer.prefix, "usr/sbin/netdata")
+
def buildinfo(self, ctr: dagger.Container, installer: NetdataInstaller, output: pathlib.Path) -> dagger.Container:
- binary = os.path.join(installer.prefix, "usr/sbin/netdata")
-
ctr = (
- ctr.with_exec([binary, "-W", "buildinfo"], redirect_stdout=output)
+ ctr.with_exec([self._binary(), "-W", "buildinfo"], redirect_stdout=output)
)
return ctr
def unittest(self, ctr: dagger.Container) -> dagger.Container:
- binary = os.path.join(self.installer.prefix, "usr/sbin/netdata")
+ ctr = (
+ ctr.with_exec([self._binary(), "-W", "unittest"])
+ )
+
+ return ctr
+ def run(self, client: dagger.Client, ctr: dagger.Container, stream_conf: StreamConf, port, parent) -> dagger.Container:
+ # Write stream.conf
+ if stream_conf:
+ host_stream_conf_path = str(self.identifier) + ".stream.conf"
+
+ with open(host_stream_conf_path, 'w') as fp:
+ fp.write(stream_conf.render())
+
+ dest = self.installer.prefix / "etc/netdata/stream.conf"
+ ctr = (
+ ctr.with_file(dest.as_posix(), client.host().file(host_stream_conf_path))
+ )
+
+ if parent:
+ ctr = ctr.with_service_binding("tilestora", parent)
+
+ # Exec the binary
ctr = (
- ctr.with_exec([binary, "-W", "unittest"])
+ ctr.with_exposed_port(port)
+ .with_exec([self._binary(), "-D", "-i", "0.0.0.0", "-p", str(port)])
)
return ctr
+class Digraph:
+ def __init__(self):
+ self.nodes = {} # Stores Agent instances
+ self.children_of = {} # Stores children: {parent_id: [child_ids]}
+ self.parents_of = {} # Stores parents: {child_id: [parent_ids]}
+
+ def add_node(self, node):
+ self.nodes[node.identifier] = node
+ if node.identifier not in self.children_of:
+ self.children_of[node.identifier] = []
+ if node.identifier not in self.parents_of:
+ self.parents_of[node.identifier] = []
+
+ def add_children(self, node, children):
+ if node.identifier not in self.nodes :
+ raise ValueError("Node not found")
+
+ for child in children:
+ if child.identifier not in self.nodes :
+ raise ValueError("Child node not found")
+ if node.identifier not in self.children_of[child.identifier]:
+ self.children_of[node.identifier].append(child.identifier)
+ if child.identifier not in self.parents_of[node.identifier]:
+ self.parents_of[child.identifier].append(node.identifier)
+
+ def get_children(self, node):
+ return [self.nodes [child_id] for child_id in self.children_of.get(node.identifier, [])]
+
+ def get_parents(self, node):
+ return [self.nodes [parent_id] for parent_id in self.parents_of.get(node.identifier, [])]
+
+ def get_siblings(self, node):
+ siblings = set()
+ for parent_id in self.parents_of.get(node.identifier, []):
+ siblings.update(self.children_of.get(parent_id, []))
+ siblings.discard(node.identifier)
+ return [self.nodes [sibling_id] for sibling_id in siblings]
+
+ def render(self, filename="digraph"):
+ import graphviz
+ dot = graphviz.Digraph(comment='Agent Topology')
+ for identifier, node in self.nodes.items():
+ dot.node(str(identifier), label=str(identifier))
+
+ for parent_id, children_ids in self.children_of.items():
+ for child_id in children_ids:
+ dot.edge(str(parent_id), str(child_id))
+
+ dot.render(filename, format='svg', cleanup=True)
+
+
class Context:
def __init__(self,
client: dagger.Client,
@@ -303,6 +435,13 @@ class Context:
ctr = self.agent.buildinfo(ctr, self.installer, output)
return ctr
+ def exec(self, ctr: dagger.Container) -> dagger.Container:
+ if self.built_agent == False:
+ self.build_agent(ctr)
+
+ ctr = self.agent.run(ctr)
+ return ctr
+
def run_async(func):
def wrapper(*args, **kwargs):
@@ -317,10 +456,14 @@ async def main():
async with dagger.Connection(config) as client:
platform = dagger.Platform("linux/x86_64")
distro = Distribution("debian10")
- installer = NetdataInstaller(platform, distro, "/netdata", "/opt", FeatureFlags.DBEngine)
- agent = Agent(installer)
- ctx = Context(client, platform, distro, installer, agent)
+ repo_root = pathlib.Path("/netdata")
+ prefix_path = pathlib.Path("/opt")
+ installer = NetdataInstaller(platform, distro, repo_root, prefix_path, FeatureFlags.DBEngine)
+ parent_agent = Agent(installer)
+ child_agent = Agent(installer)
+
+ ctx = Context(client, platform, distro, installer, parent_agent)
# build base image with packages we need
ctr = ctx.build_distro()
@@ -329,14 +472,68 @@ async def main():
ctr = ctx.build_agent(ctr)
# get the buildinfo
- output = os.path.join(installer.prefix, "buildinfo.log")
- ctr = ctx.buildinfo(ctr, output)
+ # output = os.path.join(installer.prefix, "buildinfo.log")
+ # ctr = ctx.buildinfo(ctr, output)
- # run unittests
- ctr = agent.unittest(ctr)
+ api_key = uuid.uuid4()
- await ctr
+ def setup_parent():
+ child_stream_conf = None
+ parent_stream_conf = ParentStreamConf(installer, api_key)
+ stream_conf = StreamConf(child_stream_conf, parent_stream_conf)
+ return stream_conf
+
+ parent_stream_conf = setup_parent()
+ parent = parent_agent.run(client, ctr, parent_stream_conf, 19999, None)
+ parent_service = parent.as_service()
+
+ def setup_child():
+ child_stream_conf = ChildStreamConf(installer, "tilestora:19999", api_key)
+ parent_stream_conf = None
+ stream_conf = StreamConf(child_stream_conf, parent_stream_conf)
+ return stream_conf
+
+ child_stream_conf = setup_child()
+ child = child_agent.run(client, ctr, child_stream_conf, 20000, parent_service)
+
+ tunnel = await client.host().tunnel(parent_service, native=True).start()
+ endpoint = await tunnel.endpoint()
+
+ await child
+
+ # await child.with_service_binding("tilestora", parent_service)
+ # await child.with_service_binding("tilestora", parent_service)
+
+
+
+ # tunnel = await client.host().tunnel(parent_service, native=True).start()
+ # endpoint = await tunnel.endpoint()
+
+ # tunnel = await client.host().tunnel(child_service, native=True).start()
+ # endpoint = await tunnel.endpoint()
+
+ time.sleep(600)
+
+ # run unittests
+ # ctr = agent.unittest(ctr)
+ # await ctr
if __name__ == '__main__':
+ # agent1 = Agent("Data1")
+ # agent2 = Agent("Data2")
+ # agent3 = Agent("Data3")
+ # agent4 = Agent("Data4")
+
+ # dg = Digraph()
+ # dg.add_node(agent1)
+ # dg.add_node(agent2)
+ # dg.add_node(agent3)
+ # dg.add_node(agent4)
+
+ # dg.add_children(agent1, [agent2, agent3])
+ # dg.add_children(agent4, [agent2, agent3])
+
+ # dg.render()
+
main()
diff --git a/packaging/dag/parent_stream.conf b/packaging/dag/parent_stream.conf
new file mode 100644
index 0000000000..15f303f97b
--- /dev/null
+++ b/packaging/dag/parent_stream.conf
@@ -0,0 +1,7 @@
+[{{ api_key }}]
+ enabled = {{ enabled }}
+ allow from = {{ allow_from }}
+ default history = {{ default_history }}
+ health enabled by default = {{ health_enabled_by_default }}
+ default postpone alarms on connect seconds = {{ default_postpone_alarms_on_connect_seconds }}
+ multiple connections = {{ multiple_connections }}