diff options
author | Markos Fountoulakis <44345837+mfundul@users.noreply.github.com> | 2020-05-14 11:57:20 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-05-14 11:57:20 +0300 |
commit | 6393b2f535c993de9f341d2245ad8ba327694281 (patch) | |
tree | d53a5b854c4075908c4c3921c372b9d75ba9617d | |
parent | a606a27f164b1c704d850c838a7b89d6c6e0c17c (diff) |
Improve the impact of health code on netdata scalability (#8407)
* Add support for spawning processes without pipes.
* Port health_alarm_execute() from mypopen() to netdata_spawn()
* Make alarm notifications asynchronous within a single health thread iteration
* Initial version of spawn server.
* preliminary integration of spawn client with health
-rw-r--r-- | CMakeLists.txt | 8 | ||||
-rw-r--r-- | Makefile.am | 9 | ||||
-rw-r--r-- | configure.ac | 1 | ||||
-rw-r--r-- | daemon/common.h | 3 | ||||
-rw-r--r-- | daemon/main.c | 8 | ||||
-rw-r--r-- | database/rrd.h | 3 | ||||
-rw-r--r-- | health/health.c | 86 | ||||
-rw-r--r-- | health/health.h | 1 | ||||
-rw-r--r-- | libnetdata/popen/popen.c | 118 | ||||
-rw-r--r-- | libnetdata/popen/popen.h | 2 | ||||
-rw-r--r-- | spawn/Makefile.am | 9 | ||||
-rw-r--r-- | spawn/README.md | 0 | ||||
-rw-r--r-- | spawn/spawn.c | 289 | ||||
-rw-r--r-- | spawn/spawn.h | 109 | ||||
-rw-r--r-- | spawn/spawn_client.c | 241 | ||||
-rw-r--r-- | spawn/spawn_server.c | 377 |
16 files changed, 1216 insertions, 48 deletions
diff --git a/CMakeLists.txt b/CMakeLists.txt index 56f3a2416e..866e602b27 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -641,6 +641,13 @@ set(ACLK_PLUGIN_FILES aclk/mqtt.h ) +set(SPAWN_PLUGIN_FILES + spawn/spawn.c + spawn/spawn_server.c + spawn/spawn_client.c + spawn/spawn.h + ) + set(ACLK_STATIC_LIBS ${CMAKE_SOURCE_DIR}/externaldeps/mosquitto/libmosquitto.a ${CMAKE_SOURCE_DIR}/externaldeps/libwebsockets/libwebsockets.a @@ -741,6 +748,7 @@ set(NETDATA_FILES ${STREAMING_PLUGIN_FILES} ${WEB_PLUGIN_FILES} ${CLAIM_PLUGIN_FILES} + ${SPAWN_PLUGIN_FILES} ) set(NETDATACLI_FILES diff --git a/Makefile.am b/Makefile.am index 8a6de4e0e8..da722b1670 100644 --- a/Makefile.am +++ b/Makefile.am @@ -112,6 +112,7 @@ SUBDIRS += \ web \ claim \ aclk \ + spawn \ $(NULL) @@ -497,6 +498,13 @@ endif +SPAWN_PLUGIN_FILES = \ + spawn/spawn.c \ + spawn/spawn_server.c \ + spawn/spawn_client.c \ + spawn/spawn.h \ + $(NULL) + EXPORTING_ENGINE_FILES = \ exporting/exporting_engine.c \ exporting/exporting_engine.h \ @@ -595,6 +603,7 @@ NETDATA_FILES = \ $(WEB_PLUGIN_FILES) \ $(CLAIM_FILES) \ $(ACLK_FILES) \ + $(SPAWN_PLUGIN_FILES) \ $(NULL) if FREEBSD diff --git a/configure.ac b/configure.ac index 18db26b731..be6fcbc373 100644 --- a/configure.ac +++ b/configure.ac @@ -1490,6 +1490,7 @@ AC_CONFIG_FILES([ web/server/static/Makefile claim/Makefile aclk/Makefile + spawn/Makefile ]) AC_OUTPUT diff --git a/daemon/common.h b/daemon/common.h index f86e61543f..742ca4a766 100644 --- a/daemon/common.h +++ b/daemon/common.h @@ -68,6 +68,9 @@ // netdata agent cloud link #include "aclk/agent_cloud_link.h" +// netdata agent spawn server +#include "spawn/spawn.h" + // the netdata deamon #include "daemon.h" #include "main.h" diff --git a/daemon/main.c b/daemon/main.c index 098ddbac5b..d921f83a86 100644 --- a/daemon/main.c +++ b/daemon/main.c @@ -906,6 +906,11 @@ int main(int argc, char **argv) { else i++; } } + if (argc > 1 && strcmp(argv[1], SPAWN_SERVER_COMMAND_LINE_ARGUMENT) == 0) { + // don't run netdata, this is the spawn server + spawn_server(); + exit(0); + } // parse options { @@ -1377,6 +1382,9 @@ int main(int argc, char **argv) { netdata_threads_init_after_fork((size_t)config_get_number(CONFIG_SECTION_GLOBAL, "pthread stack size", (long)default_stacksize)); + // fork the spawn server + spawn_init(); + // ------------------------------------------------------------------------ // initialize rrd, registry, health, rrdpush, etc. diff --git a/database/rrd.h b/database/rrd.h index 62dbac39e2..8fb6ef14a6 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -576,6 +576,7 @@ struct alarm_entry { char *recipient; time_t exec_run_timestamp; int exec_code; + uint64_t exec_spawn_serial; char *source; char *units; @@ -601,6 +602,8 @@ struct alarm_entry { time_t last_repeat; struct alarm_entry *next; + struct alarm_entry *next_in_progress; + struct alarm_entry *prev_in_progress; }; diff --git a/health/health.c b/health/health.c index cbc7554cb4..0bf79e27d1 100644 --- a/health/health.c +++ b/health/health.c @@ -11,6 +11,46 @@ struct health_cmdapi_thread_status { unsigned int default_health_enabled = 1; char *silencers_filename; +// the queue of executed alarm notifications that haven't been waited for yet +static struct { + ALARM_ENTRY *head; // oldest + ALARM_ENTRY *tail; // latest +} alarm_notifications_in_progress = {NULL, NULL}; + +static inline void enqueue_alarm_notify_in_progress(ALARM_ENTRY *ae) +{ + ae->prev_in_progress = NULL; + ae->next_in_progress = NULL; + + if (NULL != alarm_notifications_in_progress.tail) { + ae->prev_in_progress = alarm_notifications_in_progress.tail; + alarm_notifications_in_progress.tail->next_in_progress = ae; + } + if (NULL == alarm_notifications_in_progress.head) { + alarm_notifications_in_progress.head = ae; + } + alarm_notifications_in_progress.tail = ae; + +} + +static inline void unlink_alarm_notify_in_progress(ALARM_ENTRY *ae) +{ + struct alarm_entry *prev = ae->prev_in_progress; + struct alarm_entry *next = ae->next_in_progress; + + if (NULL != prev) { + prev->next_in_progress = next; + } + if (NULL != next) { + next->prev_in_progress = prev; + } + if (ae == alarm_notifications_in_progress.head) { + alarm_notifications_in_progress.head = next; + } + if (ae == alarm_notifications_in_progress.tail) { + alarm_notifications_in_progress.tail = prev; + } +} // ---------------------------------------------------------------------------- // health initialization @@ -265,7 +305,6 @@ static inline void health_alarm_execute(RRDHOST *host, ALARM_ENTRY *ae) { } static char command_to_run[ALARM_EXEC_COMMAND_LENGTH + 1]; - pid_t command_pid; const char *exec = (ae->exec) ? ae->exec : host->health_default_exec; const char *recipient = (ae->recipient) ? ae->recipient : host->health_default_recipient; @@ -321,25 +360,30 @@ static inline void health_alarm_execute(RRDHOST *host, ALARM_ENTRY *ae) { ); ae->flags |= HEALTH_ENTRY_FLAG_EXEC_RUN; - ae->exec_run_timestamp = now_realtime_sec(); + ae->exec_run_timestamp = now_realtime_sec(); /* will be updated by real time after spawning */ debug(D_HEALTH, "executing command '%s'", command_to_run); - FILE *fp = mypopen(command_to_run, &command_pid); - if(!fp) { - error("HEALTH: Cannot popen(\"%s\", \"r\").", command_to_run); - goto done; - } - debug(D_HEALTH, "HEALTH reading from command (discarding command's output)"); - char buffer[100 + 1]; - while(fgets(buffer, 100, fp) != NULL) ; - ae->exec_code = mypclose(fp, command_pid); + ae->flags |= HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS; + ae->exec_spawn_serial = spawn_enq_cmd(command_to_run); + enqueue_alarm_notify_in_progress(ae); + + return; //health_alarm_wait_for_execution +done: + health_alarm_log_save(host, ae); +} + +static inline void health_alarm_wait_for_execution(ALARM_ENTRY *ae) { + if (!(ae->flags & HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS)) + return; + + spawn_wait_cmd(ae->exec_spawn_serial, &ae->exec_code, &ae->exec_run_timestamp); debug(D_HEALTH, "done executing command - returned with code %d", ae->exec_code); + ae->flags &= ~HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS; if(ae->exec_code != 0) ae->flags |= HEALTH_ENTRY_FLAG_EXEC_FAILED; -done: - health_alarm_log_save(host, ae); + unlink_alarm_notify_in_progress(ae); } static inline void health_process_notifications(RRDHOST *host, ALARM_ENTRY *ae) { @@ -401,6 +445,7 @@ static inline void health_alarm_log_process(RRDHOST *host) { ALARM_ENTRY *t = ae->next; if(likely(!alarm_entry_isrepeating(host, ae))) { + health_alarm_wait_for_execution(ae); health_alarm_log_free_one_nochecks_nounlink(ae); host->health_log.count--; } @@ -945,6 +990,7 @@ void *health_main(void *ptr) { rc->rrdcalc_flags |= RRDCALC_FLAG_RUN_ONCE; health_process_notifications(host, ae); debug(D_HEALTH, "Notification sent for the repeating alarm %u.", ae->alarm_id); + health_alarm_wait_for_execution(ae); health_alarm_log_free_one_nochecks_nounlink(ae); } } @@ -959,11 +1005,23 @@ void *health_main(void *ptr) { // and cleanup health_alarm_log_process(host); - if (unlikely(netdata_exit)) + if (unlikely(netdata_exit)) { + // wait for all notifications to finish before allowing health to be cleaned up + ALARM_ENTRY *ae; + while (NULL != (ae = alarm_notifications_in_progress.head)) { + health_alarm_wait_for_execution(ae); + } break; + } } /* rrdhost_foreach */ + // wait for all notifications to finish before allowing health to be cleaned up + ALARM_ENTRY *ae; + while (NULL != (ae = alarm_notifications_in_progress.head)) { + health_alarm_wait_for_execution(ae); + } + rrd_unlock(); diff --git a/health/health.h b/health/health.h index 78a6ab9eab..0423ab9aa8 100644 --- a/health/health.h +++ b/health/health.h @@ -24,6 +24,7 @@ extern unsigned int default_health_enabled; #define HEALTH_ENTRY_FLAG_EXEC_FAILED 0x00000008 #define HEALTH_ENTRY_FLAG_SILENCED 0x00000010 #define HEALTH_ENTRY_RUN_ONCE 0x00000020 +#define HEALTH_ENTRY_FLAG_EXEC_IN_PROGRESS 0x00000040 #define HEALTH_ENTRY_FLAG_SAVED 0x10000000 #define HEALTH_ENTRY_FLAG_NO_CLEAR_NOTIFICATION 0x80000000 diff --git a/libnetdata/popen/popen.c b/libnetdata/popen/popen.c index 1c4ae64d6c..c0135cf406 100644 --- a/libnetdata/popen/popen.c +++ b/libnetdata/popen/popen.c @@ -78,8 +78,16 @@ static void myp_del(pid_t pid) { #define PIPE_READ 0 #define PIPE_WRITE 1 -static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, char **env) { - FILE *fp; +/* custom_popene flag definitions */ +#define FLAG_CREATE_PIPE 1 // Create a pipe like popen() when set, otherwise set stdout to /dev/null +#define FLAG_CLOSE_FD 2 // Close all file descriptors other than STDIN_FILENO, STDOUT_FILENO, STDERR_FILENO + +/* + * Returns -1 on failure, 0 on success. When FLAG_CREATE_PIPE is set, on success set the FILE *fp pointer. + */ +static inline int custom_popene(const char *command, volatile pid_t *pidptr, char **env, uint8_t flags, FILE **fpp) { + FILE *fp = NULL; + int ret = 0; // success by default int pipefd[2], error; pid_t pid; char *const spawn_argv[] = { @@ -91,23 +99,36 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c posix_spawnattr_t attr; posix_spawn_file_actions_t fa; - if (pipe(pipefd) == -1) - return NULL; - if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) { - goto error_after_pipe; + if (flags & FLAG_CREATE_PIPE) { + if (pipe(pipefd) == -1) + return -1; + if ((fp = fdopen(pipefd[PIPE_READ], "r")) == NULL) { + goto error_after_pipe; + } } - // Mark all files to be closed by the exec() stage of posix_spawn() - int i; - for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--) - if(i != STDIN_FILENO && i != STDERR_FILENO) - (void)fcntl(i, F_SETFD, FD_CLOEXEC); + if (flags & FLAG_CLOSE_FD) { + // Mark all files to be closed by the exec() stage of posix_spawn() + int i; + for (i = (int) (sysconf(_SC_OPEN_MAX) - 1); i >= 0; i--) { + if (i != STDIN_FILENO && i != STDERR_FILENO) + (void) fcntl(i, F_SETFD, FD_CLOEXEC); + } + } if (!posix_spawn_file_actions_init(&fa)) { - // move the pipe to stdout in the child - if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) { - error("posix_spawn_file_actions_adddup2() failed"); - goto error_after_posix_spawn_file_actions_init; + if (flags & FLAG_CREATE_PIPE) { + // move the pipe to stdout in the child + if (posix_spawn_file_actions_adddup2(&fa, pipefd[PIPE_WRITE], STDOUT_FILENO)) { + error("posix_spawn_file_actions_adddup2() failed"); + goto error_after_posix_spawn_file_actions_init; + } + } else { + // set stdout to /dev/null + if (posix_spawn_file_actions_addopen(&fa, STDOUT_FILENO, "/dev/null", O_WRONLY, 0)) { + error("posix_spawn_file_actions_addopen() failed"); + // this is not a fatal error + } } } else { error("posix_spawn_file_actions_init() failed."); @@ -136,10 +157,16 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c } else { myp_add_unlock(); error("Failed to spawn command: '%s' from parent pid %d.", command, getpid()); - fclose(fp); - fp = NULL; + if (flags & FLAG_CREATE_PIPE) { + fclose(fp); + } + ret = -1; + } + if (flags & FLAG_CREATE_PIPE) { + close(pipefd[PIPE_WRITE]); + if (0 == ret) // on success set FILE * pointer + *fpp = fp; } - close(pipefd[PIPE_WRITE]); if (!error) { // posix_spawnattr_init() succeeded @@ -149,19 +176,21 @@ static inline FILE *custom_popene(const char *command, volatile pid_t *pidptr, c if (posix_spawn_file_actions_destroy(&fa)) error("posix_spawn_file_actions_destroy"); - return fp; + return ret; error_after_posix_spawn_file_actions_init: if (posix_spawn_file_actions_destroy(&fa)) error("posix_spawn_file_actions_destroy"); error_after_pipe: - if (fp) - fclose(fp); - else - close(pipefd[PIPE_READ]); + if (flags & FLAG_CREATE_PIPE) { + if (fp) + fclose(fp); + else + close(pipefd[PIPE_READ]); - close(pipefd[PIPE_WRITE]); - return NULL; + close(pipefd[PIPE_WRITE]); + } + return -1; } // See man environ @@ -222,26 +251,37 @@ int myp_reap(pid_t pid) { } FILE *mypopen(const char *command, volatile pid_t *pidptr) { - return custom_popene(command, pidptr, environ); + FILE *fp = NULL; + (void)custom_popene(command, pidptr, environ, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp); + return fp; } FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env) { - return custom_popene(command, pidptr, env); + FILE *fp = NULL; + (void)custom_popene(command, pidptr, env, FLAG_CREATE_PIPE | FLAG_CLOSE_FD, &fp); + return fp; +} + +// returns 0 on success, -1 on failure +int netdata_spawn(const char *command, volatile pid_t *pidptr) { + return custom_popene(command, pidptr, environ, 0, NULL); } -int mypclose(FILE *fp, pid_t pid) { +int custom_pclose(FILE *fp, pid_t pid) { int ret; siginfo_t info; debug(D_EXIT, "Request to mypclose() on pid %d", pid); - // close the pipe fd - // this is required in musl - // without it the childs do not exit - close(fileno(fp)); + if (fp) { + // close the pipe fd + // this is required in musl + // without it the childs do not exit + close(fileno(fp)); - // close the pipe file pointer - fclose(fp); + // close the pipe file pointer + fclose(fp); + } errno = 0; @@ -285,3 +325,13 @@ int mypclose(FILE *fp, pid_t pid) { return 0; } + +int mypclose(FILE *fp, pid_t pid) +{ + return custom_pclose(fp, pid); +} + +int netdata_spawn_waitpid(pid_t pid) +{ + return custom_pclose(NULL, pid); +}
\ No newline at end of file diff --git a/libnetdata/popen/popen.h b/libnetdata/popen/popen.h index 32f64e460b..f387cff0a1 100644 --- a/libnetdata/popen/popen.h +++ b/libnetdata/popen/popen.h @@ -11,6 +11,8 @@ extern FILE *mypopen(const char *command, volatile pid_t *pidptr); extern FILE *mypopene(const char *command, volatile pid_t *pidptr, char **env); extern int mypclose(FILE *fp, pid_t pid); +extern int netdata_spawn(const char *command, volatile pid_t *pidptr); +extern int netdata_spawn_waitpid(pid_t pid); extern void myp_init(void); extern void myp_free(void); extern int myp_reap(pid_t pid); diff --git a/spawn/Makefile.am b/spawn/Makefile.am new file mode 100644 index 0000000000..02fe3a314f --- /dev/null +++ b/spawn/Makefile.am @@ -0,0 +1,9 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) + diff --git a/spawn/README.md b/spawn/README.md new file mode 100644 index 0000000000..e69de29bb2 --- /dev/null +++ b/spawn/README.md diff --git a/spawn/spawn.c b/spawn/spawn.c new file mode 100644 index 0000000000..8d57adc344 --- /dev/null +++ b/spawn/spawn.c @@ -0,0 +1,289 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "spawn.h" +#include "../database/engine/rrdenginelib.h" + +static uv_thread_t thread; +int spawn_thread_error; +int spawn_thread_shutdown; + +struct spawn_queue spawn_cmd_queue; + +static struct spawn_cmd_info *create_spawn_cmd(char *command_to_run) +{ + struct spawn_cmd_info *cmdinfo; + + cmdinfo = mallocz(sizeof(*cmdinfo)); + assert(0 == uv_cond_init(&cmdinfo->cond)); + assert(0 == uv_mutex_init(&cmdinfo->mutex)); + cmdinfo->serial = 0; /* invalid */ + cmdinfo->command_to_run = strdupz(command_to_run); + cmdinfo->exit_status = -1; /* invalid */ + cmdinfo->pid = -1; /* invalid */ + cmdinfo->flags = 0; + + return cmdinfo; +} + +void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo) +{ + uv_cond_destroy(&cmdinfo->cond); + uv_mutex_destroy(&cmdinfo->mutex); + + freez(cmdinfo->command_to_run); + freez(cmdinfo); +} + +int spawn_cmd_compare(void *a, void *b) +{ + struct spawn_cmd_info *cmda = a, *cmdb = b; + + /* No need for mutex, serial will never change and the entries cannot be deallocated yet */ + if (cmda->serial < cmdb->serial) return -1; + if (cmda->serial > cmdb->serial) return 1; + + return 0; +} + +static void init_spawn_cmd_queue(void) +{ + spawn_cmd_queue.cmd_tree.root = NULL; + spawn_cmd_queue.cmd_tree.compar = spawn_cmd_compare; + spawn_cmd_queue.size = 0; + spawn_cmd_queue.latest_serial = 0; + assert(0 == uv_cond_init(&spawn_cmd_queue.cond)); + assert(0 == uv_mutex_init(&spawn_cmd_queue.mutex)); +} + +/* + * Returns serial number of the enqueued command + */ +uint64_t spawn_enq_cmd(char *command_to_run) +{ + unsigned queue_size; + uint64_t serial; + avl *avl_ret; + struct spawn_cmd_info *cmdinfo; + + cmdinfo = create_spawn_cmd(command_to_run); + + /* wait for free space in queue */ + uv_mutex_lock(&spawn_cmd_queue.mutex); + while ((queue_size = spawn_cmd_queue.size) == SPAWN_MAX_OUTSTANDING) { + uv_cond_wait(&spawn_cmd_queue.cond, &spawn_cmd_queue.mutex); + } + assert(queue_size < SPAWN_MAX_OUTSTANDING); + spawn_cmd_queue.size = queue_size + 1; + + serial = ++spawn_cmd_queue.latest_serial; /* 0 is invalid */ + cmdinfo->serial = serial; /* No need to take the cmd mutex since it is unreachable at the moment */ + + /* enqueue command */ + avl_ret = avl_insert(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo); + assert(avl_ret == (avl *)cmdinfo); + uv_mutex_unlock(&spawn_cmd_queue.mutex); + + /* wake up event loop */ + assert(0 == uv_async_send(&spawn_async)); + return serial; +} + +/* + * Blocks until command with serial finishes running. Only one thread is allowed to wait per command. + */ +void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp) +{ + avl *avl_ret; + struct spawn_cmd_info tmp, *cmdinfo; + + tmp.serial = serial; + + uv_mutex_lock(&spawn_cmd_queue.mutex); + avl_ret = avl_search(&spawn_cmd_queue.cmd_tree, (avl *)&tmp); + uv_mutex_unlock(&spawn_cmd_queue.mutex); + + assert(avl_ret); /* Could be NULL if more than 1 threads wait for the command */ + cmdinfo = (struct spawn_cmd_info *)avl_ret; + + uv_mutex_lock(&cmdinfo->mutex); + while (!(cmdinfo->flags & SPAWN_CMD_DONE)) { + /* Only 1 thread is allowed to wait for this command to finish */ + uv_cond_wait(&cmdinfo->cond, &cmdinfo->mutex); + } + uv_mutex_unlock(&cmdinfo->mutex); + + spawn_deq_cmd(cmdinfo); + *exit_status = cmdinfo->exit_status; + *exec_run_timestamp = cmdinfo->exec_run_timestamp; + + destroy_spawn_cmd(cmdinfo); +} + +void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo) +{ + unsigned queue_size; + avl *avl_ret; + + uv_mutex_lock(&spawn_cmd_queue.mutex); + queue_size = spawn_cmd_queue.size; + assert(queue_size); + /* dequeue command */ + avl_ret = avl_remove(&spawn_cmd_queue.cmd_tree, (avl *)cmdinfo); + assert(avl_ret); + + spawn_cmd_queue.size = queue_size - 1; + + /* wake up callers */ + uv_cond_signal(&spawn_cmd_queue.cond); + uv_mutex_unlock(&spawn_cmd_queue.mutex); +} + +/* + * Must be called from the spawn client event loop context. This way no mutex is needed because the event loop is the + * only writer as far as struct spawn_cmd_info entries are concerned. + */ +static int find_unprocessed_spawn_cmd_cb(void *entry, void *data) +{ + struct spawn_cmd_info **cmdinfop = data, *cmdinfo = entry; + + if (!(cmdinfo->flags & SPAWN_CMD_PROCESSED)) { + *cmdinfop = cmdinfo; + return -1; /* break tree traversal */ + } + return 0; /* continue traversing */ +} + +struct spawn_cmd_info *spawn_get_unprocessed_cmd(void) +{ + struct spawn_cmd_info *cmdinfo; + unsigned queue_size; + int ret; + + uv_mutex_lock(&spawn_cmd_queue.mutex); + queue_size = spawn_cmd_queue.size; + if (queue_size == 0) { + uv_mutex_unlock(&spawn_cmd_queue.mutex); + return NULL; + } + /* find command */ + cmdinfo = NULL; + ret = avl_traverse(&spawn_cmd_queue.cmd_tree, find_unprocessed_spawn_cmd_cb, (void *)&cmdinfo); + if (-1 != ret) { /* no commands available for processing */ + uv_mutex_unlock(&spawn_cmd_queue.mutex); + return NULL; + } + uv_mutex_unlock(&spawn_cmd_queue.mutex); + + return cmdinfo; +} + +/** + * This function spawns a process that shares a libuv IPC pipe with the caller and performs spawn server duties. + * The spawn server process will close all open file descriptors except for the pipe, UV_STDOUT_FD, and UV_STDERR_FD. + * The caller has to be the netdata user as configured. + * + * @param loop the libuv loop of the caller context + * @param spawn_channel the birectional libuv IPC pipe that the server and the caller will share + * @param process the spawn server libuv process context + * @return 0 on success or the libuv error code + */ +int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process) +{ + uv_process_options_t options = {0}; + size_t exepath_size; + char exepath[FILENAME_MAX]; + char *args[3]; + int ret; +#define SPAWN_SERVER_DESCRIPTORS (3) + uv_stdio_container_t stdio[SPAWN_SERVER_DESCRIPTORS]; + + exepath_size = sizeof(exepath); + ret = uv_exepath(exepath, &exepath_size); + assert(ret == 0); + + exepath[exepath_size] = '\0'; + args[0] = exepath; + args[1] = SPAWN_SERVER_COMMAND_LINE_ARGUMENT; + args[2] = NULL; + + memset(&options, 0, sizeof(options)); + options.file = exepath; + options.args = args; + options.exit_cb = NULL; //exit_cb; + options.stdio = stdio; + options.stdio_count = SPAWN_SERVER_DESCRIPTORS; + + stdio[0].flags = UV_CREATE_PIPE | UV_READABLE_PIPE | UV_WRITABLE_PIPE; + stdio[0].data.stream = (uv_stream_t *)spawn_channel; /* bidirectional libuv pipe */ + stdio[1].flags = UV_INHERIT_FD; + stdio[1].data.fd = 1 /* UV_STDOUT_FD */; + stdio[2].flags = UV_INHERIT_FD; + stdio[2].data.fd = 2 /* UV_STDERR_FD */; + + ret = uv_spawn(loop, process, &options); /* execute the netdata binary again as the netdata user */ + assert(ret == 0); + + return ret; +} + +#define CONCURRENT_SPAWNS 16 +#define SPAWN_ITERATIONS 10000 +#undef CONCURRENT_STRESS_TEST + +void spawn_init(void) +{ + struct completion completion; + int error; + + info("Initializing spawn client."); + + init_spawn_cmd_queue(); + + init_completion(&completion); + error = uv_thread_create(&thread, spawn_client, &completion); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + goto after_error; + } + /* wait for spawn client thread to initialize */ + wait_for_completion(&completion); + destroy_completion(&completion); + uv_thread_set_name_np(thread, "DAEMON_SPAWN"); + + if (spawn_thread_error) { + error = uv_thread_join(&thread); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + } + goto after_error; + } +#ifdef CONCURRENT_STRESS_TEST + signals_reset(); + signals_unblock(); + + sleep(60); + uint64_t serial[CONCURRENT_SPAWNS]; + for (int j = 0 ; j < SPAWN_ITERATIONS ; ++j) { + for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { + char cmd[64]; + sprintf(cmd, "echo CONCURRENT_STRESS_TEST %d 1>&2", j * CONCURRENT_SPAWNS + i + 1); + serial[i] = spawn_enq_cmd(cmd); + info("Queued command %s for spawning.", cmd); + } + int exit_status; + time_t exec_run_timestamp; + for (int i = 0; i < CONCURRENT_SPAWNS; ++i) { + info("Started waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, + exec_run_timestamp); + spawn_wait_cmd(serial[i], &exit_status, &exec_run_timestamp); + info("Finished waiting for serial %llu exit status %d run timestamp %llu.", serial[i], exit_status, + exec_run_timestamp); + } + } + exit(0); +#endif + return; + + after_error: + error("Failed to initialize spawn service. The alarms notifications will not be spawned."); +} diff --git a/spawn/spawn.h b/spawn/spawn.h new file mode 100644 index 0000000000..aaeadb7a69 --- /dev/null +++ b/spawn/spawn.h @@ -0,0 +1,109 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_SPAWN_H +#define NETDATA_SPAWN_H 1 + +#include "../daemon/common.h" + +#define SPAWN_SERVER_COMMAND_LINE_ARGUMENT "--special-spawn-server" + +typedef enum spawn_protocol { + SPAWN_PROT_EXEC_CMD = 0, + SPAWN_PROT_SPAWN_RESULT, + SPAWN_PROT_CMD_EXIT_STATUS +} spawn_prot_t; + +struct spawn_prot_exec_cmd { + uint16_t command_length; + char command_to_run[]; +}; + +struct spawn_prot_spawn_result { + pid_t exec_pid; /* 0 if failed to spawn */ + time_t exec_run_timestamp; /* time of successfully spawning the command */ +}; + +struct spawn_prot_cmd_exit_status { + int exec_exit_status; +}; + +struct spawn_prot_header { + spawn_prot_t opcode; + void *handle; +}; + +#undef SPAWN_DEBUG /* define to enable debug prints */ + +#define SPAWN_MAX_OUTSTANDING (32768) + +#define SPAWN_CMD_PROCESSED 0x00000001 +#define SPAWN_CMD_IN_PROGRESS 0x00000002 +#define SPAWN_CMD_FAILED_TO_SPAWN 0x00000004 +#define SPAWN_CMD_DONE 0x00000008 + +struct spawn_cmd_info { + avl avl; + + /* concurrency control per command */ + uv_mutex_t mutex; + uv_cond_t cond; /* users block here until command has finished */ + + uint64_t serial; + char *command_to_run; + int exit_status; + pid_t pid; + unsigned long flags; + time_t exec_run_timestamp; /* time of successfully spawning the command */ +}; + +/* spawn command queue */ +struct spawn_queue { + avl_tree cmd_tree; + + /* concurrency control of command queue */ + uv_mutex_t mutex; + uv_cond_t cond; + + volatile unsigned size; + uint64_t latest_serial; +}; + +struct write_context { + uv_write_t write_req; + struct spawn_prot_header header; + struct spawn_prot_cmd_exit_status exit_status; + struct spawn_prot_spawn_result spawn_result; + struct spawn_prot_exec_cmd payload; +}; + +extern int spawn_thread_error; +extern int spawn_thread_shutdown; +extern uv_async_t spawn_async; + +void spawn_init(void); +void spawn_server(void); +void spawn_client(void *arg); +void destroy_spawn_cmd(struct spawn_cmd_info *cmdinfo); +uint64_t spawn_enq_cmd(char *command_to_run); +void spawn_wait_cmd(uint64_t serial, int *exit_status, time_t *exec_run_timestamp); +void spawn_deq_cmd(struct spawn_cmd_info *cmdinfo); +struct spawn_cmd_info *spawn_get_unprocessed_cmd(void); +int create_spawn_server(uv_loop_t *loop, uv_pipe_t *spawn_channel, uv_process_t *process); + +/* + * Copies from the source buffer to the protocol buffer. It advances the source buffer by the amount copied. It + * subtracts the amount copied from the source length. + */ +static inline void copy_to_prot_b |