summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorMarkos Fountoulakis <44345837+mfundul@users.noreply.github.com>2020-05-14 11:57:20 +0300
committerGitHub <noreply@github.com>2020-05-14 11:57:20 +0300
commit6393b2f535c993de9f341d2245ad8ba327694281 (patch)
treed53a5b854c4075908c4c3921c372b9d75ba9617d
parenta606a27f164b1c704d850c838a7b89d6c6e0c17c (diff)
Improve the impact of health code on netdata scalability (#8407)
* Add support for spawning processes without pipes. * Port health_alarm_execute() from mypopen() to netdata_spawn() * Make alarm notifications asynchronous within a single health thread iteration * Initial version of spawn server. * preliminary integration of spawn client with health
-rw-r--r--CMakeLists.txt8
-rw-r--r--Makefile.am9
-rw-r--r--configure.ac1
-rw-r--r--daemon/common.h3
-rw-r--r--daemon/main.c8
-rw-r--r--database/rrd.h3
-rw-r--r--health/health.c86
-rw-r--r--health/health.h1
-rw-r--r--libnetdata/popen/popen.c118
-rw-r--r--libnetdata/popen/popen.h2
-rw-r--r--spawn/Makefile.am9
-rw-r--r--spawn/README.md0
-rw-r--r--spawn/spawn.c289
-rw-r--r--spawn/spawn.h109
-rw-r--r--spawn/spawn_client.c241
-rw-r--r--spawn/spawn_server.c377
16 files changed, 1216 insertions, 48 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 56f3a2416e..866e602b27 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -641,6 +641,13 @@ set(ACLK_PLUGIN_FILES
aclk/mqtt.h
)
+set(SPAWN_PLUGIN_FILES
+ spawn/spawn.c
+ spawn/spawn_server.c
+ spawn/spawn_client.c
+ spawn/spawn.h
+ )
+
set(ACLK_STATIC_LIBS
${CMAKE_SOURCE_DIR}/externaldeps/mosquitto/libmosquitto.a
${CMAKE_SOURCE_DIR}/externaldeps/libwebsockets/libwebsockets.a
@@ -741,6 +748,7 @@ set(NETDATA_FILES
${STREAMING_PLUGIN_FILES}
${WEB_PLUGIN_FILES}
${CLAIM_PLUGIN_FILES}
+ ${SPAWN_PLUGIN_FILES}
)
set(NETDATACLI_FILES
diff --git a/Makefile.am b/Makefile.am
index 8a6de4e0e8..da722b1670 100644
--- a/Makefile.am
+++ b/Makefile.am
@@ -112,6 +112,7 @@ SUBDIRS += \
web \
claim \
aclk \
+ spawn \
$(NULL)
@@ -497,6 +498,13 @@ endif
+SPAWN_PLUGIN_FILES = \
+ spawn/spawn.c \
+ spawn/spawn_server.c \
+ spawn/spawn_client.c \
+ spawn/spawn.h \
+ $(NULL)
+
EXPORTING_ENGINE_FILES = \
exporting/exporting_engine.c \
exporting/exporting_engine.h \
@@ -595,6 +603,7 @@ NETDATA_FILES = \
$(WEB_PLUGIN_FILES) \
$(CLAIM_FILES) \
$(ACLK_FILES) \
+ $(SPAWN_PLUGIN_FILES) \
$(NULL)
if FREEBSD
diff --git a/configure.ac b/configure.ac
index 18db26b731..be6fcbc373 100644
--- a/configure.ac
+++ b/configure.ac
@@ -1490,6 +1490,7 @@ AC_CONFIG_FILES([
web/server/static/Makefile
claim/Makefile
aclk/Makefile
+ spawn/Makefile
])
AC_OUTPUT
diff --git a/daemon/common.h b/daemon/common.h
index f86e61543f..742ca4a766 100644
--- a/daemon/common.h
+++ b/daemon/common.h
@@ -68,6 +68,9 @@
// netdata agent cloud link
#include "aclk/agent_cloud_link.h"
+// netdata agent spawn server
+#include "spawn/spawn.h"
+
// the netdata deamon
#include "daemon.h"
#include "main.h"
diff --git a/daemon/main.c b/daemon/main.c
index 098ddbac5b..d921f83a86 100644
--- a/daemon/main.c
+++ b/daemon/main.c
@@ -906,6 +906,11 @@ int main(int argc, char **argv) {
else i++;
}
}
+ if (argc > 1 && strcmp(argv[1], SPAWN_SERVER_COMMAND_LINE_ARGUMENT) == 0) {
+ // don't run netdata, this is the spawn server
+ spawn_server();
+ exit(0);
+ }
// parse options
{
@@ -1377,6 +1382,9 @@ int main(int argc, char **argv) {
netdata_threads_init_after_fork((size_t)config_get_number(CONFIG_SECTION_GLOBAL, "pthread stack size", (long)default_stacksize));
+ // fork the spawn server
+ spawn_init();
+
// ------------------------------------------------------------------------
// initialize rrd, registry, health, rrdpush, etc.
diff --git a/database/rrd.h b/database/rrd.h
index 62dbac39e2..8fb6ef14a6 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -576,6 +576,7 @@ struct alarm_entry {
char *recipient;
time_t exec_run_timestamp;
int exec_code;
+ uint64_t exec_spawn_serial;
char *source;
char *units;
@@ -601,6 +602,8 @@ struct alarm_entry {
time_t last_repeat;
struct alarm_entry *next;
+ struct alarm_entry *next_in_progress;
+ struct alarm_entry *prev_in_progress;
};
diff --git a/health/health.c b/health/health.c
index cbc7554cb4..0bf79e27d1 100644
--- a/health/health.c
+++ b/health/health.c
@@ -11,6 +11,46 @@ struct health_cmdapi_thread_status {
unsigned int default_health_enabled = 1;
char *silencers_filename;
+// the queue of executed alarm notifications that haven't been waited for yet
+static struct {
+ ALARM_ENTRY *head; // oldest
+ ALARM_ENTRY *tail; // latest
+} alarm_notifications_in_progress = {NULL, NULL};
+
+static inline void enqueue_alarm_notify_in_progress(ALARM_ENTRY *ae)
+{
+ ae->prev_in_progress = NULL;
+ ae->next_in_progress = NULL;
+
+ if (NULL != alarm_notifications_in_progress.tail) {
+ ae->prev_in_progress = alarm_notifications_in_progress.tail;
+ alarm_notifications_in_progress.tail->next_in_progress = ae;
+ }
+ if (NULL == alarm_notifications_in_progress.head) {
+ alarm_notifications_in_progress.head = ae;
+ }
+ alarm_notifications_in_progress.tail = ae;
+
+}
+
+static inline void unlink_alarm_notify_in_progress(ALARM_ENTRY *ae)
+{
+ struct alarm_entry *prev = ae->prev_in_progress;
+ struct alarm_entry *next = ae->next_in_progress;
+
+ if (NULL != prev) {
+ prev->next_in_progress = next;
+ }
+ if (NULL != next) {
+ next->prev_in_progress = prev;
+ }
+ if (ae == alarm_notifications_in_progress.head) {
+ alarm_notifications_in_progress.head = next;
+ }
+ if (ae == alarm_notifications_in_progress.tail) {
+ alarm_notifications_in_progress.tail = prev;
+ }
+}
// ----------------------------------------------------------------------------
// health initialization
@@ -265,7 +305,6 @@ static inline void health_alarm_execute(RRDHOST *host, ALARM_ENTRY *ae) {
}
static char command_to_run[ALARM_EXEC_COMMAND_LENGTH + 1];
- pid_t command_pid;
const char *exec = (ae->exec) ? ae->exec : host->health_default_exec;
const char *recipient = (ae->recipient) ? ae->recipient : host->health_default_recipient;
@@ -321,25 +360,30 @@ static inline void health_alarm_execute(RRDHOST *host, ALARM_ENTRY *ae) {
);
ae->flags |= HEALTH_ENTRY_FLAG_EXEC_RUN;
- ae->exec_run_timestamp = now_realtime_sec();
+ ae->exec_run_timestamp = now_realtime_sec(); /* will be updated by real time after spawning */
debug(D_HEALTH, "executing command '%s'", command_to_run);
- FILE *fp = mypopen(command_to_run, &command_pid);
- if(!fp) {
- error("HEALTH: Cannot popen(\"%s\", \"r\").", command_to_run);
- goto done;
- }
- debug(D_HEALTH, "HEALTH reading from command (discarding command's output)");
- char buffer[100 + 1];
- while(fgets(buffer, 100, fp) != NULL) ;
- ae->exec_code = mypclose(fp, command_pid);
+ ae->flags |= HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS;
+ ae->exec_spawn_serial = spawn_enq_cmd(command_to_run);
+ enqueue_alarm_notify_in_progress(ae);
+
+ return; //health_alarm_wait_for_execution
+done:
+ health_alarm_log_save(host, ae);
+}
+
+static inline void health_alarm_wait_for_execution(ALARM_ENTRY *ae) {
+ if (!(ae->flags & HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS))
+ return;
+
+ spawn_wait_cmd(ae->exec_spawn_serial, &ae->exec_code, &ae->exec_run_timestamp);
debug(D_HEALTH, "done executing command - returned with code %d", ae->exec_code);
+ ae->flags &= ~HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS;
if(ae->exec_code != 0)
ae->flags |= HEALTH_ENTRY_FLAG_EXEC_FAILED;
-done:
- health_alarm_log_save(host, ae);
+ unlink_alarm_notify_in_progress(ae);
}
static inline void health_process_notifications(RRDHOST *host, ALARM_ENTRY *ae) {
@@ -401,6 +445,7 @@ static inline void health_alarm_log_process(RRDHOST *host) {
ALARM_ENTRY *t = ae->next;
if(likely(!alarm_entry_isrepeating(host, ae))) {
+ health_alarm_wait_for_execution(ae);
health_alarm_log_free_one_nochecks_nounlink(ae);
host->health_log.count--;
}
@@ -945,6 +990,7 @@ void *health_main(void *ptr) {
rc->rrdcalc_flags |= RRDCALC_FLAG_RUN_ONCE;
health_process_notifications(host, ae);
debug(D_HEALTH, "Notification sent for the repeating alarm %u.", ae->alarm_id);
+ health_alarm_wait_for_execution(ae);
health_alarm_log_free_one_nochecks_nounlink(ae);
}
}
@@ -959,11 +1005,23 @@ void *health_main(void *ptr) {
// and cleanup
health_alarm_log_process(host);
- if (unlikely(netdata_exit))
+ if (unlikely(netdata_exit)) {
+ // wait for all notifications to finish before allowing health to be cleaned up
+ ALARM_ENTRY *ae;
+ while (NULL != (ae = alarm_notifications_in_progress.head)) {
+ health_alarm_wait_for_execution(ae);
+ }
break;
+ }
} /* rrdhost_foreach */
+ // wait for all notifications to finish before allowing health to be cleaned up
+ ALARM_ENTRY *ae;
+ while (NULL != (ae = alarm_notifications_in_progress.head)) {
+ health_alarm_wait_for_execution(ae);
+ }
+
rrd_unlock();
diff --git a/health/health.h b/health/health.h
index 78a6ab9eab..0423ab9aa8 100644
--- a/health/health.h
+++ b/health/health.h
@@ -24,6 +24,7 @@ extern unsigned int default_health_enabled;
#define HEALTH_ENTRY_FLAG_EXEC_FAILED 0x00000008
#define HEALTH_ENTRY_FLAG_SILENCED 0x00000010
#define HEALTH_ENTRY_RUN_ONCE 0x00000020
+#define HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS 0x00000040
#define HEALTH_ENTRY_FLAG_SAVED 0x10000000
#define HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION 0x80000000
diff --git a/libnetdata/popen/popen.c b/libnetdata/popen/popen.c
index 1c4ae64d6c..c0135cf406 100644
--- a/libnetdata/popen/popen.c
+++ b/libnetdata/popen/popen.c
@@ -78,8 +78,16 @@ static void myp_del(pid_t pid) {
#define PIPE_READ 0
#define PIPE_WRITE 1
-static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, char **env) {
- FILE *fp;
+/* custom_popene flag definitions */
+#define FLAG_CREATE_PIPE 1 // Create a pipe like popen() when set, otherwise set stdout to /dev/null
+#define FLAG_CLOSE_FD 2 // Close all file descriptors other than STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO
+
+/*
+ * Returns -1 on failure, 0 on success. When FLAG_CREATE_PIPE is set, on success set the FILE *fp pointer.
+ */
+static inline int custom_popene(const char *command, volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp) {
+ FILE *fp = NULL;
+ int ret = 0; // success by default
int pipefd[2], error;
pid_t pid;
char *const spawn_argv[] = {
@@ -91,23 +99,36 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c
posix_spawnattr_t attr;
posix_spawn_file_actions_t fa;
- if (pipe(pipefd) == -1)
- return NULL;
- if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) {
- goto error_after_pipe;
+ if (flags & FLAG_CREATE_PIPE) {
+ if (pipe(pipefd) == -1)
+ return -1;
+ if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) {
+ goto error_after_pipe;
+ }
}
- // Mark all files to be closed by the exec() stage of posix_spawn()
- int i;
- for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--)
- if(i != STDIN_FILENO && i != STDERR_FILENO)
- (void)fcntl(i, F_SETFD, FD_CLOEXEC);
+ if (flags & FLAG_CLOSE_FD) {
+ // Mark all files to be closed by the exec() stage of posix_spawn()
+ int i;
+ for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--) {
+ if (i != STDIN_FILENO && i != STDERR_FILENO)
+ (void) fcntl(i, F_SETFD, FD_CLOEXEC);
+ }
+ }
if (!posix_spawn_file_actions_init(&fa)) {
- // move the pipe to stdout in the child
- if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) {
- error("posix_spawn_file_actions_adddup2() failed");
- goto error_after_posix_spawn_file_actions_init;
+ if (flags & FLAG_CREATE_PIPE) {
+ // move the pipe to stdout in the child
+ if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) {
+ error("posix_spawn_file_actions_adddup2() failed");
+ goto error_after_posix_spawn_file_actions_init;
+ }
+ } else {
+ // set stdout to /dev/null
+ if (posix_spawn_file_actions_addopen(&fa, STDOUT_FILENO, "/dev/null", O_WRONLY, 0)) {
+ error("posix_spawn_file_actions_addopen() failed");
+ // this is not a fatal error
+ }
}
} else {
error("posix_spawn_file_actions_init() failed.");
@@ -136,10 +157,16 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c
} else {
myp_add_unlock();
error("Failed to spawn command: '%s' from parent pid %d.", command, getpid());
- fclose(fp);
- fp = NULL;
+ if (flags & FLAG_CREATE_PIPE) {
+ fclose(fp);
+ }
+ ret = -1;
+ }
+ if (flags & FLAG_CREATE_PIPE) {
+ close(pipefd[PIPE_WRITE]);
+ if (0 == ret) // on success set FILE * pointer
+ *fpp = fp;
}
- close(pipefd[PIPE_WRITE]);
if (!error) {
// posix_spawnattr_init() succeeded
@@ -149,19 +176,21 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c
if (posix_spawn_file_actions_destroy(&fa))
error("posix_spawn_file_actions_destroy");
- return fp;
+ return ret;
error_after_posix_spawn_file_actions_init:
if (posix_spawn_file_actions_destroy(&fa))
error("posix_spawn_file_actions_destroy");
error_after_pipe:
- if (fp)
- fclose(fp);
- else
- close(pipefd[PIPE_READ]);
+ if (flags & FLAG_CREATE_PIPE) {
+ if (fp)
+ fclose(fp);
+ else
+ close(pipefd[PIPE_READ]);
- close(pipefd[PIPE_WRITE]);
- return NULL;
+ close(pipefd[PIPE_WRITE]);
+ }
+ return -1;
}
// See man environ
@@ -222,26 +251,37 @@ int myp_reap(pid_t pid) {
}
FILE *mypopen(const char *command, volatile pid_t *pidptr) {
- return custom_popene(command, pidptr, environ);
+ FILE *fp = NULL;
+ (void)custom_popene(command, pidptr, environ, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp);
+ return fp;
}
FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env) {
- return custom_popene(command, pidptr, env);
+ FILE *fp = NULL;
+ (void)custom_popene(command, pidptr, env, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp);
+ return fp;
+}
+
+// returns 0 on success, -1 on failure
+int netdata_spawn(const char *command, volatile pid_t *pidptr) {
+ return custom_popene(command, pidptr, environ, 0, NULL);
}
-int mypclose(FILE *fp, pid_t pid) {
+int custom_pclose(FILE *fp, pid_t pid) {
int ret;
siginfo_t info;
debug(D_EXIT, "Request to mypclose() on pid %d", pid);
- // close the pipe fd
- // this is required in musl
- // without it the childs do not exit
- close(fileno(fp));
+ if (fp) {
+ // close the pipe fd
+ // this is required in musl
+ // without it the childs do not exit
+ close(fileno(fp));
- // close the pipe file pointer
- fclose(fp);
+ // close the pipe file pointer
+ fclose(fp);
+ }
errno = 0;
@@ -285,3 +325,13 @@ int mypclose(FILE *fp, pid_t pid) {
return 0;
}
+
+int mypclose(FILE *fp, pid_t pid)
+{
+ return custom_pclose(fp, pid);
+}
+
+int netdata_spawn_waitpid(pid_t pid)
+{
+ return custom_pclose(NULL, pid);
+} \ No newline at end of file
diff --git a/libnetdata/popen/popen.h b/libnetdata/popen/popen.h
index 32f64e460b..f387cff0a1 100644
--- a/libnetdata/popen/popen.h
+++ b/libnetdata/popen/popen.h
@@ -11,6 +11,8 @@
extern FILE *mypopen(const char *command, volatile pid_t *pidptr);
extern FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env);
extern int mypclose(FILE *fp, pid_t pid);
+extern int netdata_spawn(const char *command, volatile pid_t *pidptr);
+extern int netdata_spawn_waitpid(pid_t pid);
extern void myp_init(void);
extern void myp_free(void);
extern int myp_reap(pid_t pid);
diff --git a/spawn/Makefile.am b/spawn/Makefile.am
new file mode 100644
index 0000000000..02fe3a314f
--- /dev/null
+++ b/spawn/Makefile.am
@@ -0,0 +1,9 @@
+# SPDX-License-Identifier: GPL-3.0-or-later
+
+AUTOMAKE_OPTIONS = subdir-objects
+MAINTAINERCLEANFILES = $(srcdir)/Makefile.in
+
+dist_noinst_DATA = \
+ README.md \
+ $(NULL)
+
diff --git a/spawn/README.md b/spawn/README.md
new file mode 100644
index 0000000000..e69de29bb2
--- /dev/null
+++ b/spawn/README.md
diff --git a/spawn/spawn.c b/spawn/spawn.c
new file mode 100644
index 0000000000..8d57adc344
--- /dev/null
+++ b/spawn/spawn.c
@@ -0,0 +1,289 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#include "spawn.h"
+#include "../database/engine/rrdenginelib.h"
+
+static uv_thread_t thread;
+int spawn_thread_error;
+int spawn_thread_shutdown;
+
+struct spawn_queue spawn_cmd_queue;
+
+static struct spawn_cmd_info *create_spawn_cmd(char *command_to_run)
+{
+ struct spawn_cmd_info *cmdinfo;
+
+ cmdinfo = mallocz(sizeof(*cmdinfo));
+ assert(0 == uv_cond_init(&cmdinfo->cond));
+ assert(0 == uv_mutex_init(&cmdinfo->mutex));
+ cmdinfo->serial = 0; /* invalid */
+ cmdinfo->command_to_run = strdupz(command_to_run);
+ cmdinfo->exit_status = -1; /* invalid */
+ cmdinfo->pid = -1; /* invalid */
+ cmdinfo->flags = 0;
+
+ return cmdinfo;
+}
+
+void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo)
+{
+ uv_cond_destroy(&cmdinfo->cond);
+ uv_mutex_destroy(&cmdinfo->mutex);
+
+ freez(cmdinfo->command_to_run);
+ freez(cmdinfo);
+}
+
+int spawn_cmd_compare(void *a, void *b)
+{
+ struct spawn_cmd_info *cmda = a, *cmdb = b;
+
+ /* No need for mutex, serial will never change and the entries cannot be deallocated yet */
+ if (cmda->serial < cmdb->serial) return -1;
+ if (cmda->serial > cmdb->serial) return 1;
+
+ return 0;
+}
+
+static void init_spawn_cmd_queue(void)
+{
+ spawn_cmd_queue.cmd_tree.root = NULL;
+ spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare;
+ spawn_cmd_queue.size = 0;
+ spawn_cmd_queue.latest_serial = 0;
+ assert(0 == uv_cond_init(&spawn_cmd_queue.cond));
+ assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex));
+}
+
+/*
+ * Returns serial number of the enqueued command
+ */
+uint64_t spawn_enq_cmd(char *command_to_run)
+{
+ unsigned queue_size;
+ uint64_t serial;
+ avl *avl_ret;
+ struct spawn_cmd_info *cmdinfo;
+
+ cmdinfo = create_spawn_cmd(command_to_run);
+
+ /* wait for free space in queue */
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) {
+ uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex);
+ }
+ assert(queue_size < SPAWN_MAX_OUTSTANDING);
+ spawn_cmd_queue.size = queue_size + 1;
+
+ serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */
+ cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */
+
+ /* enqueue command */
+ avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo);
+ assert(avl_ret == (avl *)cmdinfo);
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+
+ /* wake up event loop */
+ assert(0 == uv_async_send(&spawn_async));
+ return serial;
+}
+
+/*
+ * Blocks until command with serial finishes running. Only one thread is allowed to wait per command.
+ */
+void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp)
+{
+ avl *avl_ret;
+ struct spawn_cmd_info tmp, *cmdinfo;
+
+ tmp.serial = serial;
+
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl *)&tmp);
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+
+ assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */
+ cmdinfo = (struct spawn_cmd_info *)avl_ret;
+
+ uv_mutex_lock(&cmdinfo->mutex);
+ while (!(cmdinfo->flags & SPAWN_CMD_DONE)) {
+ /* Only 1 thread is allowed to wait for this command to finish */
+ uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex);
+ }
+ uv_mutex_unlock(&cmdinfo->mutex);
+
+ spawn_deq_cmd(cmdinfo);
+ *exit_status = cmdinfo->exit_status;
+ *exec_run_timestamp = cmdinfo->exec_run_timestamp;
+
+ destroy_spawn_cmd(cmdinfo);
+}
+
+void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo)
+{
+ unsigned queue_size;
+ avl *avl_ret;
+
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ queue_size = spawn_cmd_queue.size;
+ assert(queue_size);
+ /* dequeue command */
+ avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo);
+ assert(avl_ret);
+
+ spawn_cmd_queue.size = queue_size - 1;
+
+ /* wake up callers */
+ uv_cond_signal(&spawn_cmd_queue.cond);
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+}
+
+/*
+ * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the
+ * only writer as far as struct spawn_cmd_info entries are concerned.
+ */
+static int find_unprocessed_spawn_cmd_cb(void *entry, void *data)
+{
+ struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry;
+
+ if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) {
+ *cmdinfop = cmdinfo;
+ return -1; /* break tree traversal */
+ }
+ return 0; /* continue traversing */
+}
+
+struct spawn_cmd_info *spawn_get_unprocessed_cmd(void)
+{
+ struct spawn_cmd_info *cmdinfo;
+ unsigned queue_size;
+ int ret;
+
+ uv_mutex_lock(&spawn_cmd_queue.mutex);
+ queue_size = spawn_cmd_queue.size;
+ if (queue_size == 0) {
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+ return NULL;
+ }
+ /* find command */
+ cmdinfo = NULL;
+ ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo);
+ if (-1 != ret) { /* no commands available for processing */
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+ return NULL;
+ }
+ uv_mutex_unlock(&spawn_cmd_queue.mutex);
+
+ return cmdinfo;
+}
+
+/**
+ * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties.
+ * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD.
+ * The caller has to be the netdata user as configured.
+ *
+ * @param loop the libuv loop of the caller context
+ * @param spawn_channel the birectional libuv IPC pipe that the server and the caller will share
+ * @param process the spawn server libuv process context
+ * @return 0 on success or the libuv error code
+ */
+int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process)
+{
+ uv_process_options_t options = {0};
+ size_t exepath_size;
+ char exepath[FILENAME_MAX];
+ char *args[3];
+ int ret;
+#define SPAWN_SERVER_DESCRIPTORS (3)
+ uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS];
+
+ exepath_size = sizeof(exepath);
+ ret = uv_exepath(exepath, &exepath_size);
+ assert(ret == 0);
+
+ exepath[exepath_size] = '\0';
+ args[0] = exepath;
+ args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT;
+ args[2] = NULL;
+
+ memset(&options, 0, sizeof(options));
+ options.file = exepath;
+ options.args = args;
+ options.exit_cb = NULL; //exit_cb;
+ options.stdio = stdio;
+ options.stdio_count = SPAWN_SERVER_DESCRIPTORS;
+
+ stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE;
+ stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */
+ stdio[1].flags = UV_INHERIT_FD;
+ stdio[1].data.fd = 1 /* UV_STDOUT_FD */;
+ stdio[2].flags = UV_INHERIT_FD;
+ stdio[2].data.fd = 2 /* UV_STDERR_FD */;
+
+ ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */
+ assert(ret == 0);
+
+ return ret;
+}
+
+#define CONCURRENT_SPAWNS 16
+#define SPAWN_ITERATIONS 10000
+#undef CONCURRENT_STRESS_TEST
+
+void spawn_init(void)
+{
+ struct completion completion;
+ int error;
+
+ info("Initializing spawn client.");
+
+ init_spawn_cmd_queue();
+
+ init_completion(&completion);
+ error = uv_thread_create(&thread, spawn_client, &completion);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ goto after_error;
+ }
+ /* wait for spawn client thread to initialize */
+ wait_for_completion(&completion);
+ destroy_completion(&completion);
+ uv_thread_set_name_np(thread, "DAEMON_SPAWN");
+
+ if (spawn_thread_error) {
+ error = uv_thread_join(&thread);
+ if (error) {
+ error("uv_thread_create(): %s", uv_strerror(error));
+ }
+ goto after_error;
+ }
+#ifdef CONCURRENT_STRESS_TEST
+ signals_reset();
+ signals_unblock();
+
+ sleep(60);
+ uint64_t serial[CONCURRENT_SPAWNS];
+ for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) {
+ for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
+ char cmd[64];
+ sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1);
+ serial[i] = spawn_enq_cmd(cmd);
+ info("Queued command %s for spawning.", cmd);
+ }
+ int exit_status;
+ time_t exec_run_timestamp;
+ for (int i = 0; i < CONCURRENT_SPAWNS; ++i) {
+ info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
+ exec_run_timestamp);
+ spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp);
+ info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status,
+ exec_run_timestamp);
+ }
+ }
+ exit(0);
+#endif
+ return;
+
+ after_error:
+ error("Failed to initialize spawn service. The alarms notifications will not be spawned.");
+}
diff --git a/spawn/spawn.h b/spawn/spawn.h
new file mode 100644
index 0000000000..aaeadb7a69
--- /dev/null
+++ b/spawn/spawn.h
@@ -0,0 +1,109 @@
+// SPDX-License-Identifier: GPL-3.0-or-later
+
+#ifndef NETDATA_SPAWN_H
+#define NETDATA_SPAWN_H 1
+
+#include "../daemon/common.h"
+
+#define SPAWN_SERVER_COMMAND_LINE_ARGUMENT "--special-spawn-server"
+
+typedef enum spawn_protocol {
+ SPAWN_PROT_EXEC_CMD = 0,
+ SPAWN_PROT_SPAWN_RESULT,
+ SPAWN_PROT_CMD_EXIT_STATUS
+} spawn_prot_t;
+
+struct spawn_prot_exec_cmd {
+ uint16_t command_length;
+ char command_to_run[];
+};
+
+struct spawn_prot_spawn_result {
+ pid_t exec_pid; /* 0 if failed to spawn */
+ time_t exec_run_timestamp; /* time of successfully spawning the command */
+};
+
+struct spawn_prot_cmd_exit_status {
+ int exec_exit_status;
+};
+
+struct spawn_prot_header {
+ spawn_prot_t opcode;
+ void *handle;
+};
+
+#undef SPAWN_DEBUG /* define to enable debug prints */
+
+#define SPAWN_MAX_OUTSTANDING (32768)
+
+#define SPAWN_CMD_PROCESSED 0x00000001
+#define SPAWN_CMD_IN_PROGRESS 0x00000002
+#define SPAWN_CMD_FAILED_TO_SPAWN 0x00000004
+#define SPAWN_CMD_DONE 0x00000008
+
+struct spawn_cmd_info {
+ avl avl;
+
+ /* concurrency control per command */
+ uv_mutex_t mutex;
+ uv_cond_t cond; /* users block here until command has finished */
+
+ uint64_t serial;
+ char *command_to_run;
+ int exit_status;
+ pid_t pid;
+ unsigned long flags;
+ time_t exec_run_timestamp; /* time of successfully spawning the command */
+};
+
+/* spawn command queue */
+struct spawn_queue {
+ avl_tree cmd_tree;
+
+ /* concurrency control of command queue */
+ uv_mutex_t mutex;
+ uv_cond_t cond;
+
+ volatile unsigned size;
+ uint64_t latest_serial;
+};
+
+struct write_context {
+ uv_write_t write_req;
+ struct spawn_prot_header header;
+ struct spawn_prot_cmd_exit_status exit_status;
+ struct spawn_prot_spawn_result spawn_result;
+ struct spawn_prot_exec_cmd payload;
+};
+
+extern int spawn_thread_error;
+extern int spawn_thread_shutdown;
+extern uv_async_t spawn_async;
+
+void spawn_init(void);
+void spawn_server(void);
+void spawn_client(void *arg);
+void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo);
+uint64_t spawn_enq_cmd(char *command_to_run);
+void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp);
+void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo);
+struct spawn_cmd_info *spawn_get_unprocessed_cmd(void);
+int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process);
+
+/*
+ * Copies from the source buffer to the protocol buffer. It advances the source buffer by the amount copied. It
+ * subtracts the amount copied from the source length.
+ */
+static inline void copy_to_prot_b