diff options
-rw-r--r-- | .gitignore | 1 | ||||
-rw-r--r-- | CMakeLists.txt | 17 | ||||
-rw-r--r-- | Makefile.am | 20 | ||||
-rw-r--r-- | cli/Makefile.am | 8 | ||||
-rw-r--r-- | cli/README.md | 26 | ||||
-rw-r--r-- | cli/cli.c | 201 | ||||
-rw-r--r-- | cli/cli.h | 8 | ||||
-rw-r--r-- | configure.ac | 9 | ||||
-rw-r--r-- | daemon/README.md | 2 | ||||
-rw-r--r-- | daemon/commands.c | 554 | ||||
-rw-r--r-- | daemon/commands.h | 76 | ||||
-rw-r--r-- | daemon/common.h | 1 | ||||
-rw-r--r-- | daemon/main.c | 5 | ||||
-rw-r--r-- | daemon/signals.c | 16 | ||||
-rw-r--r-- | database/engine/rrdengine.c | 3 | ||||
-rw-r--r-- | database/engine/rrdengine.h | 6 | ||||
-rw-r--r-- | database/engine/rrdengineapi.c | 2 | ||||
-rw-r--r-- | database/engine/rrdenginelib.h | 3 | ||||
-rw-r--r-- | libnetdata/libnetdata.h | 2 | ||||
-rw-r--r-- | packaging/installer/README.md | 2 | ||||
-rwxr-xr-x | packaging/installer/netdata-uninstaller.sh | 1 |
21 files changed, 940 insertions, 23 deletions
diff --git a/.gitignore b/.gitignore index 0d8ba70f0f..31021152e9 100644 --- a/.gitignore +++ b/.gitignore @@ -37,6 +37,7 @@ sha256sums.txt # netdata binaries netdata +netdatacli !netdata/ upload/ artifacts/ diff --git a/CMakeLists.txt b/CMakeLists.txt index f3989b4c47..9eaa015838 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -628,6 +628,8 @@ set(DAEMON_FILES daemon/main.h daemon/signals.c daemon/signals.h + daemon/commands.c + daemon/commands.h daemon/unit_test.c daemon/unit_test.h ) @@ -648,6 +650,12 @@ set(NETDATA_FILES ${WEB_PLUGIN_FILES} ) +set(NETDATACLI_FILES + daemon/commands.h + cli/cli.c + cli/cli.h + ) + include_directories(AFTER .) add_definitions( @@ -785,6 +793,15 @@ ENDIF() # ----------------------------------------------------------------------------- +# netdatacli + +add_executable(netdatacli config.h ${NETDATACLI_FILES}) +target_link_libraries (netdatacli libnetdata ${NETDATA_COMMON_LIBRARIES}) +target_include_directories(netdatacli PUBLIC ${NETDATA_COMMON_INCLUDE_DIRS}) +target_compile_options(netdatacli PUBLIC ${NETDATA_COMMON_CFLAGS}) + + +# ----------------------------------------------------------------------------- # apps.plugin IF(ENABLE_PLUGIN_APPS) diff --git a/Makefile.am b/Makefile.am index 3ecdb14064..accf2ac691 100644 --- a/Makefile.am +++ b/Makefile.am @@ -482,6 +482,8 @@ DAEMON_FILES = \ daemon/main.h \ daemon/signals.c \ daemon/signals.h \ + daemon/commands.c \ + daemon/commands.h \ daemon/unit_test.c \ daemon/unit_test.h \ $(NULL) @@ -537,6 +539,13 @@ NETDATA_COMMON_LIBS = \ $(OPTIONAL_JSONC_LIBS) \ $(NULL) +NETDATACLI_FILES = \ + daemon/commands.h \ + $(LIBNETDATA_FILES) \ + cli/cli.c \ + cli/cli.h \ + $(NULL) + sbin_PROGRAMS += netdata netdata_SOURCES = $(NETDATA_FILES) netdata_LDADD = \ @@ -548,6 +557,17 @@ else netdata_LINK = $(CCLD) $(CFLAGS) $(LDFLAGS) -o $@ endif +sbin_PROGRAMS += netdatacli +netdatacli_SOURCES = $(NETDATACLI_FILES) +netdatacli_LDADD = \ + $(NETDATA_COMMON_LIBS) \ + $(NULL) +if ENABLE_CXX_LINKER + netdatacli_LINK = $(CXXLD) $(CXXFLAGS) $(LDFLAGS) -o $@ +else + netdatacli_LINK = $(CCLD) $(CFLAGS) $(LDFLAGS) -o $@ +endif + if ENABLE_PLUGIN_APPS plugins_PROGRAMS += apps.plugin apps_plugin_SOURCES = $(APPS_PLUGIN_FILES) diff --git a/cli/Makefile.am b/cli/Makefile.am new file mode 100644 index 0000000000..161784b8f6 --- /dev/null +++ b/cli/Makefile.am @@ -0,0 +1,8 @@ +# SPDX-License-Identifier: GPL-3.0-or-later + +AUTOMAKE_OPTIONS = subdir-objects +MAINTAINERCLEANFILES = $(srcdir)/Makefile.in + +dist_noinst_DATA = \ + README.md \ + $(NULL) diff --git a/cli/README.md b/cli/README.md new file mode 100644 index 0000000000..3872dac1c1 --- /dev/null +++ b/cli/README.md @@ -0,0 +1,26 @@ +# Netdata cli + +You can see the commands netdatacli supports by executing it with `netdatacli` and entering `help` in +standard input. All commands are given as standard input to `netdatacli`. + +The commands that a running netdata agent can execute are the following: + +```sh +The commands are (arguments are in brackets): +help + Show this help menu. +reload-health + Reload health configuration. +save-database + Save internal DB to disk for for memory mode save. +reopen-logs + Close and reopen log files. +shutdown-agent + Cleanup and exit the netdata agent. +fatal-agent + Log the state and halt the netdata agent. +``` + +Those commands are the same that can be sent to netdata via [signals](../daemon#command-line-options). + +[![analytics](https://www.google-analytics.com/collect?v=1&aip=1&t=pageview&_s=1&ds=github&dr=https%3A%2F%2Fgithub.com%2Fnetdata%2Fnetdata&dl=https%3A%2F%2Fmy-netdata.io%2Fgithub%2Fcli%2FREADME&_u=MAC~&cid=5792dfd7-8dc4-476b-af31-da2fdb9f93d2&tid=UA-64295674-3)](<>) diff --git a/cli/cli.c b/cli/cli.c new file mode 100644 index 0000000000..4df0201787 --- /dev/null +++ b/cli/cli.c @@ -0,0 +1,201 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "cli.h" +#include "../libnetdata/required_dummies.h" + +static uv_pipe_t client_pipe; +static uv_write_t write_req; +static uv_shutdown_t shutdown_req; + +static char command_string[MAX_COMMAND_LENGTH]; +static unsigned command_string_size; + +static char response_string[MAX_COMMAND_LENGTH]; +static unsigned response_string_size; + +static int exit_status; + +struct command_context { + uv_work_t work; + uv_stream_t *client; + cmd_t idx; + char *args; + char *message; + cmd_status_t status; +}; + +static void parse_command_reply(void) +{ + FILE *stream = NULL; + char *pos; + int syntax_error = 0; + + for (pos = response_string ; + pos < response_string + response_string_size && !syntax_error ; + ++pos) { + /* Skip white-space characters */ + for ( ; isspace(*pos) && ('\0' != *pos); ++pos) {;} + + if ('\0' == *pos) + continue; + + switch (*pos) { + case CMD_PREFIX_EXIT_CODE: + exit_status = atoi(++pos); + break; + case CMD_PREFIX_INFO: + stream = stdout; + break; + case CMD_PREFIX_ERROR: + stream = stderr; + break; + default: + syntax_error = 1; + fprintf(stderr, "Syntax error, failed to parse command response.\n"); + break; + } + if (stream) { + fprintf(stream, "%s\n", ++pos); + pos += strlen(pos); + stream = NULL; + } + } +} + +static void pipe_read_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) +{ + if (0 == nread) { + fprintf(stderr, "%s: Zero bytes read by command pipe.\n", __func__); + } else if (UV_EOF == nread) { +// fprintf(stderr, "EOF found in command pipe.\n"); + parse_command_reply(); + } else if (nread < 0) { + fprintf(stderr, "%s: %s\n", __func__, uv_strerror(nread)); + } + + if (nread < 0) { /* stop stream due to EOF or error */ + (void)uv_read_stop((uv_stream_t *)client); + } else if (nread) { + size_t to_copy; + + to_copy = MIN(nread, MAX_COMMAND_LENGTH - 1 - response_string_size); + memcpy(response_string + response_string_size, buf->base, to_copy); + response_string_size += to_copy; + response_string[response_string_size] = '\0'; + } + if (buf && buf->len) { + free(buf->base); + } +} + +static void alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) +{ + (void)handle; + + buf->base = malloc(suggested_size); + buf->len = suggested_size; +} + +static void shutdown_cb(uv_shutdown_t* req, int status) +{ + int ret; + + (void)req; + (void)status; + + /* receive reply */ + response_string_size = 0; + response_string[0] = '\0'; + + ret = uv_read_start((uv_stream_t *)&client_pipe, alloc_cb, pipe_read_cb); + if (ret) { + fprintf(stderr, "uv_read_start(): %s\n", uv_strerror(ret)); + uv_close((uv_handle_t *)&client_pipe, NULL); + return; + } + +} + +static void pipe_write_cb(uv_write_t* req, int status) +{ + int ret; + + (void)req; + (void)status; + + ret = uv_shutdown(&shutdown_req, (uv_stream_t *)&client_pipe, shutdown_cb); + if (ret) { + fprintf(stderr, "uv_shutdown(): %s\n", uv_strerror(ret)); + uv_close((uv_handle_t *)&client_pipe, NULL); + return; + } +} + +static void connect_cb(uv_connect_t* req, int status) +{ + int ret; + uv_buf_t write_buf; + char *s; + + (void)req; + if (status) { + fprintf(stderr, "uv_pipe_connect(): %s\n", uv_strerror(status)); + fprintf(stderr, "Make sure the netdata service is running.\n"); + exit(-1); + } + if (0 == command_string_size) { + s = fgets(command_string, MAX_COMMAND_LENGTH, stdin); + } + (void)s; /* We don't need input to communicate with the server */ + command_string_size = strlen(command_string); + + write_req.data = &client_pipe; + write_buf.base = command_string; + write_buf.len = command_string_size; + ret = uv_write(&write_req, (uv_stream_t *)&client_pipe, &write_buf, 1, pipe_write_cb); + if (ret) { + fprintf(stderr, "uv_write(): %s\n", uv_strerror(ret)); + } +// fprintf(stderr, "COMMAND: Sending command: \"%s\"\n", command_string); +} + +int main(int argc, char **argv) +{ + int ret, i; + static uv_loop_t* loop; + uv_connect_t req; + + exit_status = -1; /* default status for when there is no command response from server */ + + loop = uv_default_loop(); + + ret = uv_pipe_init(loop, &client_pipe, 1); + if (ret) { + fprintf(stderr, "uv_pipe_init(): %s\n", uv_strerror(ret)); + return exit_status; + } + + command_string_size = 0; + command_string[0] = '\0'; + for (i = 1 ; i < argc ; ++i) { + size_t to_copy; + + to_copy = MIN(strlen(argv[i]), MAX_COMMAND_LENGTH - 1 - command_string_size); + strncpyz(command_string + command_string_size, argv[i], to_copy); + command_string_size += to_copy; + + if (command_string_size < MAX_COMMAND_LENGTH - 1) { + command_string[command_string_size++] = ' '; + } else { + break; + } + } + + uv_pipe_connect(&req, &client_pipe, PIPENAME, connect_cb); + + uv_run(loop, UV_RUN_DEFAULT); + + uv_close((uv_handle_t *)&client_pipe, NULL); + + return exit_status; +}
\ No newline at end of file diff --git a/cli/cli.h b/cli/cli.h new file mode 100644 index 0000000000..9e730a3019 --- /dev/null +++ b/cli/cli.h @@ -0,0 +1,8 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#ifndef NETDATA_CLI_H +#define NETDATA_CLI_H 1 + +#include "../daemon/common.h" + +#endif //NETDATA_CLI_H diff --git a/configure.ac b/configure.ac index 35147b0e92..4ae08bffc9 100644 --- a/configure.ac +++ b/configure.ac @@ -279,6 +279,10 @@ AC_CHECK_LIB( [uv_fs_scandir_next], [UV_LIBS="-luv"] ) +test -z "${UV_LIBS}" && \ + AC_MSG_ERROR([libuv required but not found. Try installing 'libuv1-dev' or 'libuv-devel'.]) +OPTIONAL_UV_CFLAGS="${UV_CFLAGS}" +OPTIONAL_UV_LIBS="${UV_LIBS}" # ----------------------------------------------------------------------------- @@ -363,9 +367,6 @@ OPTIONAL_JSONC_LIBS="${JSONC_LIBS}" # ----------------------------------------------------------------------------- # DB engine and HTTPS -test "${enable_dbengine}" = "yes" -a -z "${UV_LIBS}" && \ - AC_MSG_ERROR([libuv required but not found. Try installing 'libuv1-dev' or 'libuv-devel'.]) - test "${enable_dbengine}" = "yes" -a -z "${LZ4_LIBS}" && \ AC_MSG_ERROR([liblz4 required but not found. Try installing 'liblz4-dev' or 'lz4-devel'.]) @@ -382,8 +383,6 @@ AC_MSG_CHECKING([if netdata dbengine should be used]) if test "${enable_dbengine}" != "no" -a "${UV_LIBS}" -a "${LZ4_LIBS}" -a "${JUDY_LIBS}" -a "${SSL_LIBS}"; then enable_dbengine="yes" AC_DEFINE([ENABLE_DBENGINE], [1], [netdata dbengine usability]) - OPTIONAL_UV_CFLAGS="${UV_CFLAGS}" - OPTIONAL_UV_LIBS="${UV_LIBS}" OPTIONAL_LZ4_CFLAGS="${LZ4_CFLAGS}" OPTIONAL_LZ4_LIBS="${LZ4_LIBS}" OPTIONAL_JUDY_CFLAGS="${JUDY_CFLAGS}" diff --git a/daemon/README.md b/daemon/README.md index 0d4b0cdbbc..246586addd 100644 --- a/daemon/README.md +++ b/daemon/README.md @@ -185,6 +185,8 @@ The command line options of the Netdata 1.10.0 version are the following: - USR2 Reload health configuration. ``` +You can send commands during runtime via [netdatacli](../cli). + ## Log files Netdata uses 3 log files: diff --git a/daemon/commands.c b/daemon/commands.c new file mode 100644 index 0000000000..fc68ed59a7 --- /dev/null +++ b/daemon/commands.c @@ -0,0 +1,554 @@ +// SPDX-License-Identifier: GPL-3.0-or-later + +#include "common.h" +#include "../database/engine/rrdenginelib.h" + +static uv_thread_t thread; +static uv_loop_t* loop; +static uv_async_t async; +static struct completion completion; +static uv_pipe_t server_pipe; + +char cmd_prefix_by_status[] = { + CMD_PREFIX_INFO, + CMD_PREFIX_ERROR, + CMD_PREFIX_ERROR +}; + +static int command_thread_error; +static int command_thread_shutdown; +static unsigned clients = 0; +static char command_string[MAX_COMMAND_LENGTH]; +static unsigned command_string_size; + +struct command_context { + uv_work_t work; + uv_stream_t *client; + cmd_t idx; + char *args; + char *message; + cmd_status_t status; +}; + +/* Forward declarations */ +static cmd_status_t cmd_help_execute(char *args, char **message); +static cmd_status_t cmd_reload_health_execute(char *args, char **message); +static cmd_status_t cmd_save_database_execute(char *args, char **message); +static cmd_status_t cmd_reopen_logs_execute(char *args, char **message); +static cmd_status_t cmd_exit_execute(char *args, char **message); +static cmd_status_t cmd_fatal_execute(char *args, char **message); + +static command_info_t command_info_array[] = { + {"help", cmd_help_execute, CMD_TYPE_HIGH_PRIORITY}, // show help menu + {"reload-health", cmd_reload_health_execute, CMD_TYPE_ORTHOGONAL}, // reload health configuration + {"save-database", cmd_save_database_execute, CMD_TYPE_ORTHOGONAL}, // save database for memory mode save + {"reopen-logs", cmd_reopen_logs_execute, CMD_TYPE_ORTHOGONAL}, // Close and reopen log files + {"shutdown-agent", cmd_exit_execute, CMD_TYPE_EXCLUSIVE}, // exit cleanly + {"fatal-agent", cmd_fatal_execute, CMD_TYPE_HIGH_PRIORITY}, // exit with fatal error +}; + +/* Mutexes for commands of type CMD_TYPE_ORTHOGONAL */ +static uv_mutex_t command_lock_array[CMD_TOTAL_COMMANDS]; +/* Commands of type CMD_TYPE_EXCLUSIVE are writers */ +static uv_rwlock_t exclusive_rwlock; +/* + * Locking order: + * 1. exclusive_rwlock + * 2. command_lock_array[] + */ + +/* Forward declarations */ +static void cmd_lock_exclusive(unsigned index); +static void cmd_lock_orthogonal(unsigned index); +static void cmd_lock_idempotent(unsigned index); +static void cmd_lock_high_priority(unsigned index); + +static command_lock_t *cmd_lock_by_type[] = { + cmd_lock_exclusive, + cmd_lock_orthogonal, + cmd_lock_idempotent, + cmd_lock_high_priority +}; + +/* Forward declarations */ +static void cmd_unlock_exclusive(unsigned index); +static void cmd_unlock_orthogonal(unsigned index); +static void cmd_unlock_idempotent(unsigned index); +static void cmd_unlock_high_priority(unsigned index); + +static command_lock_t *cmd_unlock_by_type[] = { + cmd_unlock_exclusive, + cmd_unlock_orthogonal, + cmd_unlock_idempotent, + cmd_unlock_high_priority +}; + +static cmd_status_t cmd_help_execute(char *args, char **message) +{ + (void)args; + + *message = mallocz(MAX_COMMAND_LENGTH); + strncpyz(*message, + "\nThe commands are (arguments are in brackets):\n" + "help\n" + " Show this help menu.\n" + "reload-health\n" + " Reload health configuration.\n" + "save-database\n" + " Save internal DB to disk for memory mode save.\n" + "reopen-logs\n" + " Close and reopen log files.\n" + "shutdown-agent\n" + " Cleanup and exit the netdata agent.\n" + "fatal-agent\n" + " Log the state and halt the netdata agent.\n", + MAX_COMMAND_LENGTH - 1); + return CMD_STATUS_SUCCESS; +} + +static cmd_status_t cmd_reload_health_execute(char *args, char **message) +{ + (void)args; + (void)message; + + error_log_limit_unlimited(); + info("COMMAND: Reloading HEALTH configuration."); + health_reload(); + error_log_limit_reset(); + + return CMD_STATUS_SUCCESS; +} + +static cmd_status_t cmd_save_database_execute(char *args, char **message) +{ + (void)args; + (void)message; + + error_log_limit_unlimited(); + info("COMMAND: Saving databases."); + rrdhost_save_all(); + info("COMMAND: Databases saved."); + error_log_limit_reset(); + + return CMD_STATUS_SUCCESS; +} + +static cmd_status_t cmd_reopen_logs_execute(char *args, char **message) +{ + (void)args; + (void)message; + + error_log_limit_unlimited(); + info("COMMAND: Reopening all log files."); + reopen_all_log_files(); + error_log_limit_reset(); + + return CMD_STATUS_SUCCESS; +} + +static cmd_status_t cmd_exit_execute(char *args, char **message) +{ + (void)args; + (void)message; + + error_log_limit_unlimited(); + info("COMMAND: Cleaning up to exit."); + netdata_cleanup_and_exit(0); + exit(0); + + return CMD_STATUS_SUCCESS; +} + +static cmd_status_t cmd_fatal_execute(char *args, char **message) +{ + (void)args; + (void)message; + + fatal("COMMAND: netdata now exits."); + + return CMD_STATUS_SUCCESS; +} + +static void cmd_lock_exclusive(unsigned index) +{ + (void)index; + + uv_rwlock_wrlock(&exclusive_rwlock); +} + +static void cmd_lock_orthogonal(unsigned index) +{ + uv_rwlock_rdlock(&exclusive_rwlock); + uv_mutex_lock(&command_lock_array[index]); +} + +static void cmd_lock_idempotent(unsigned index) +{ + (void)index; + + uv_rwlock_rdlock(&exclusive_rwlock); +} + +static void cmd_lock_high_priority(unsigned index) +{ + (void)index; +} + +static void cmd_unlock_exclusive(unsigned index) +{ + (void)index; + + uv_rwlock_wrunlock(&exclusive_rwlock); +} + +static void cmd_unlock_orthogonal(unsigned index) +{ + uv_rwlock_rdunlock(&exclusive_rwlock); + uv_mutex_unlock(&command_lock_array[index]); +} + +static void cmd_unlock_idempotent(unsigned index) +{ + (void)index; + + uv_rwlock_rdunlock(&exclusive_rwlock); +} + +static void cmd_unlock_high_priority(unsigned index) +{ + (void)index; +} + +static void pipe_write_cb(uv_write_t* req, int status) +{ + (void)status; + uv_pipe_t *client = req->data; + + uv_close((uv_handle_t *)client, NULL); + freez(client); + --clients; + info("Command Clients = %u\n", clients); +} + +static inline void add_char_to_command_reply(char *reply_string, unsigned *reply_string_size, char character) +{ + reply_string[(*reply_string_size)++] = character; +} + +static inline void add_string_to_command_reply(char *reply_string, unsigned *reply_string_size, char *str) +{ + unsigned len; + + len = strlen(str); + strncpyz(reply_string + *reply_string_size, str, len); + *reply_string_size += len; +} + +static void send_command_reply(uv_stream_t *client, cmd_status_t status, char *message) +{ + int ret; + char reply_string[MAX_COMMAND_LENGTH] = {'\0', }; + char exit_status_string[MAX_EXIT_STATUS_LENGTH + 1] = {'\0', }; + unsigned reply_string_size = 0; + uv_buf_t write_buf; + uv_write_t write_req; + + snprintfz(exit_status_string, MAX_EXIT_STATUS_LENGTH, "%u", status); + add_char_to_command_reply(reply_string, &reply_string_size, CMD_PREFIX_EXIT_CODE); + add_string_to_command_reply(reply_string, &reply_string_size, exit_status_string); + add_char_to_command_reply(reply_string, &reply_string_size, '\0'); + + if (message) { + add_char_to_command_reply(reply_string, &reply_string_size, cmd_prefix_by_status[status]); + add_string_to_command_reply(reply_string, &reply_string_size, message); + } + + write_req.data = client; + write_buf.base = reply_string; + write_buf.len = reply_string_size; + ret = uv_write(&write_req, (uv_stream_t *)client, &write_buf, 1, pipe_write_cb); + if (ret) { + error("uv_write(): %s", uv_strerror(ret)); + } + info("COMMAND: Sending reply: \"%s\"", reply_string); +} + +cmd_status_t execute_command(cmd_t idx, char *args, char **message) +{ + cmd_status_t status; + cmd_type_t type = command_info_array[idx].type; + + cmd_lock_by_type[type](idx); + status = command_info_array[idx].func(args, message); + cmd_unlock_by_type[type](idx); + + return status; +} + +static void after_schedule_command(uv_work_t *req, int status) +{ + struct command_context *cmd_ctx = req->data; + + (void)status; + + send_command_reply(cmd_ctx->client, cmd_ctx->status, cmd_ctx->message); + if (cmd_ctx->message) + freez(cmd_ctx->message); +} + +static void schedule_command(uv_work_t *req) +{ + struct command_context *cmd_ctx = req->data; + + cmd_ctx->status = execute_command(cmd_ctx->idx, cmd_ctx->args, &cmd_ctx->message); +} + +static void parse_commands(uv_stream_t *client) +{ + char *message = NULL, *pos; + cmd_t i; + cmd_status_t status; + struct command_context *cmd_ctx; + + status = CMD_STATUS_FAILURE; + + /* Skip white-space characters */ + for (pos = command_string ; isspace(*pos) && ('\0' != *pos) ; ++pos) {;} + for (i = 0 ; i < CMD_TOTAL_COMMANDS ; ++i) { + if (!strncmp(pos, command_info_array[i].cmd_str, strlen(command_info_array[i].cmd_str))) { + cmd_ctx = mallocz(sizeof(*cmd_ctx)); + cmd_ctx->work.data = cmd_ctx; + cmd_ctx->client = client; + cmd_ctx->idx = i; + cmd_ctx->args = pos + strlen(command_info_array[i].cmd_str); + cmd_ctx->message = NULL; + + assert(0 == uv_queue_work(loop, &cmd_ctx->work, schedule_command, after_schedule_command)); + break; + } + } + if (CMD_TOTAL_COMMANDS == i) { + /* no command found */ + message = strdupz("Illegal command. Please type \"help\" for instructions."); + send_command_reply(client, status, message); + freez(message); + } +} + +static void pipe_read_cb(uv_stream_t *client, ssize_t nread, const uv_buf_t *buf) +{ + if (0 == nread) { + info("%s: Zero bytes read by command pipe.", __func__); + } else if (UV_EOF == nread) { + info("EOF found in command pipe."); + parse_commands(client); + } else if (nread < 0) { + error("%s: %s", __func__, uv_strerror(nread)); + } + + if (nread < 0) { /* stop stream due to EOF or error */ + (void)uv_read_stop((uv_stream_t *)client); + } else if (nread) { + size_t to_copy; + + to_copy = MIN(nread, MAX_COMMAND_LENGTH - 1 - command_string_size); + strncpyz(command_string + command_string_size, buf->base, to_copy); + command_string_size += to_copy; + } + if (buf && buf->len) { + freez(buf->base); + } + + if (nread < 0 && UV_EOF != nread) { + uv_close((uv_handle_t *)client, NULL); + freez(client); + --clients; + info("Command Clients = %u\n", clients); + } +} + +static void alloc_cb(uv_handle_t *handle, size_t suggested_size, uv_buf_t *buf) +{ + (void)handle; + + buf->base = mallocz(suggested_size); + buf->len = suggested_size; +} + +static void connection_cb(uv_stream_t *server, int status) +{ + int ret; + uv_pipe_t *client; + assert(status == 0); + + client = mallocz(sizeof(*client)); + ret = uv_pipe_init(server->loop, client, 1); + if (ret) { + error("uv_pipe_init(): %s", uv_strerror(ret)); + freez(client); + return; + } + ret = uv_accept(server, (uv_stream_t *)client); + if (ret) { + error("uv_accept(): %s", uv_strerror(ret)); + uv_close((uv_handle_t *)client, NULL); + freez(client); + return; + } + + ++clients; + info("Command Clients = %u\n", clients); + /* Start parsing a new command */ + command_string_size = 0; + command_string[0] = '\0'; + + ret = uv_read_start((uv_stream_t*)client, alloc_cb, pipe_read_cb); + if (ret) { + error("uv_read_start(): %s", uv_strerror(ret)); + uv_close((uv_handle_t *)client, NULL); + freez(client); + --clients; + info("Command Clients = %u\n", clients); + return; + } +} + +static void async_cb(uv_async_t *handle) +{ + uv_stop(handle->loop); +} + +static void command_thread(void *arg) +{ + int ret; + uv_fs_t req; + + (void) arg; + loop = mallocz(sizeof(uv_loop_t)); + ret = uv_loop_init(loop); + if (ret) { + error("uv_loop_init(): %s", uv_strerror(ret)); + command_thread_error = ret; + goto error_after_loop_init; + } + loop->data = NULL; + + ret = uv_async_init(loop, &async, async_cb); + if (ret) { + error("uv_async_init(): %s", uv_strerror(ret)); + command_thread_error = ret; + goto error_after_async_init; + } + async.data = NULL; + + ret = uv_pipe_init(loop, &server_pipe, 1); + if (ret) { + error("uv_pipe_init(): %s", uv_strerror(ret)); + command_thread_error = ret; + goto error_after_pipe_init; + } + (void)uv_fs_unlink(loop, &req, PIPENAME, NULL); + uv_fs_req_cleanup(&req); + ret = uv_pipe_bind(&server_pipe, PIPENAME); + if (ret) { + error("uv_pipe_bind(): %s", uv_strerror(ret)); + command_thread_error = ret; + goto error_after_pipe_bind; + } + if ((ret = uv_listen((uv_stream_t *)&server_pipe, SOMAXCONN, connection_cb))) { + error("uv_listen(): %s", uv_strerror(ret)); + command_thread_error = ret; + goto error_after_uv_listen; + } + + command_thread_error = 0; + command_thread_shutdown = 0; + /* wake up initialization thread */ + complete(&completion); + + while (command_thread_shutdown == 0) { + uv_run(loop, UV_RUN_DEFAULT); + } + /* cleanup operations of the event loop */ + info("Shutting down command event loop."); + uv_close((uv_handle_t *)&async, NULL); + uv_close((uv_handle_t*)&server_pipe, NULL); + uv_run(loop, UV_RUN_DEFAULT); + + info("Shutting down command loop complete."); + assert(0 == uv_loop_close(loop)); + freez(loop); + + return; + +error_after_uv_listen: +error_after_pipe_bind: + uv_close((uv_handle_t*)&server_pipe, NULL); +error_after_pipe_init: + uv_close((uv_handle_t *)&async, NULL); +error_after_async_init: + assert(0 == uv_loop_close(loop)); +error_after_loop_init: + freez(loop); + + /* wake up initialization thread */ + complete(&completion); +} + +static void sanity_check(void) +{ + /* The size of command_info_array must be CMD_TOTAL_COMMANDS elements */ + BUILD_BUG_ON(CMD_TOTAL_COMMANDS != sizeof(command_info_array) / sizeof(command_info_array[0])); +} + +void commands_init(void) +{ + cmd_t i; + int error; + + sanity_check(); + info("Initializing command server."); + for (i = 0 ; i < CMD_TOTAL_COMMANDS ; ++i) { + uv_mutex_init(&command_lock_array[i]); + } + assert(0 == uv_rwlock_init(&exclusive_rwlock)); + + init_completion(&completion); + error = uv_thread_create(&thread, command_thread, NULL); + if (error) { + error("uv_thread_create(): %s", uv_strerror(error)); + goto after_error; + } + /* wait for worker thread to initialize */ + wait_for_completion(&completion); + destroy_completion(&completion); + + if (command_thread_error) { + error = uv_thread_join(&thread); + if (error) { + error( |