summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/plugins_d.c76
-rw-r--r--collectors/plugins.d/plugins_d.h2
-rw-r--r--collectors/plugins.d/pluginsd_parser.c180
-rw-r--r--collectors/plugins.d/pluginsd_parser.h45
4 files changed, 158 insertions, 145 deletions
diff --git a/collectors/plugins.d/plugins_d.c b/collectors/plugins.d/plugins_d.c
index 08c26a198b..e482aa805b 100644
--- a/collectors/plugins.d/plugins_d.c
+++ b/collectors/plugins.d/plugins_d.c
@@ -47,8 +47,7 @@ static inline bool plugin_is_running(struct plugind *cd) {
return ret;
}
-static void pluginsd_worker_thread_cleanup(void *arg)
-{
+static void pluginsd_worker_thread_cleanup(void *arg) {
struct plugind *cd = (struct plugind *)arg;
worker_unregister();
@@ -143,41 +142,64 @@ static void *pluginsd_worker_thread(void *arg) {
netdata_thread_cleanup_push(pluginsd_worker_thread_cleanup, arg);
- struct plugind *cd = (struct plugind *)arg;
- plugin_set_running(cd);
+ {
+ struct plugind *cd = (struct plugind *) arg;
+ plugin_set_running(cd);
- size_t count = 0;
+ size_t count = 0;
- while (service_running(SERVICE_COLLECTORS)) {
- FILE *fp_child_input = NULL;
- FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
+ while(service_running(SERVICE_COLLECTORS)) {
+ FILE *fp_child_input = NULL;
+ FILE *fp_child_output = netdata_popen(cd->cmd, &cd->unsafe.pid, &fp_child_input);
- if (unlikely(!fp_child_input || !fp_child_output)) {
- netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").", rrdhost_hostname(cd->host), cd->cmd);
- break;
- }
+ if(unlikely(!fp_child_input || !fp_child_output)) {
+ netdata_log_error("PLUGINSD: 'host:%s', cannot popen(\"%s\", \"r\").",
+ rrdhost_hostname(cd->host), cd->cmd);
+ break;
+ }
- netdata_log_info("PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
- rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: 'host:%s' connected to '%s' running on pid %d",
+ rrdhost_hostname(cd->host),
+ cd->fullfilename, cd->unsafe.pid);
- count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
+ const char *plugin = strrchr(cd->fullfilename, '/');
+ if(plugin)
+ plugin++;
+ else
+ plugin = cd->fullfilename;
- netdata_log_info("PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
- rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
+ char module[100];
+ snprintfz(module, sizeof(module), "plugins.d[%s]", plugin);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_MODULE, module),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rrdhost_hostname(cd->host)),
+ ND_LOG_FIELD_TXT(NDF_SRC_TRANSPORT, "pluginsd"),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- killpid(cd->unsafe.pid);
+ count = pluginsd_process(cd->host, cd, fp_child_input, fp_child_output, 0);
- int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "PLUGINSD: 'host:%s', '%s' (pid %d) disconnected after %zu successful data collections (ENDs).",
+ rrdhost_hostname(cd->host), cd->fullfilename, cd->unsafe.pid, count);
- if (likely(worker_ret_code == 0))
- pluginsd_worker_thread_handle_success(cd);
- else
- pluginsd_worker_thread_handle_error(cd, worker_ret_code);
+ killpid(cd->unsafe.pid);
- cd->unsafe.pid = 0;
- if (unlikely(!plugin_is_enabled(cd)))
- break;
- }
+ int worker_ret_code = netdata_pclose(fp_child_input, fp_child_output, cd->unsafe.pid);
+
+ if(likely(worker_ret_code == 0))
+ pluginsd_worker_thread_handle_success(cd);
+ else
+ pluginsd_worker_thread_handle_error(cd, worker_ret_code);
+
+ cd->unsafe.pid = 0;
+
+ if(unlikely(!plugin_is_enabled(cd)))
+ break;
+ }
+ }
netdata_thread_cleanup_pop(1);
return NULL;
diff --git a/collectors/plugins.d/plugins_d.h b/collectors/plugins.d/plugins_d.h
index c6202b1372..37c70f7e39 100644
--- a/collectors/plugins.d/plugins_d.h
+++ b/collectors/plugins.d/plugins_d.h
@@ -21,8 +21,6 @@
#define PLUGINSD_KEYWORD_REPORT_JOB_STATUS "REPORT_JOB_STATUS"
#define PLUGINSD_KEYWORD_DELETE_JOB "DELETE_JOB"
-#define PLUGINSD_MAX_WORDS 30
-
#define PLUGINSD_MAX_DIRECTORIES 20
extern char *plugin_directories[PLUGINSD_MAX_DIRECTORIES];
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 2932862c6e..f0fe9f6d58 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -153,11 +153,12 @@ static inline bool pluginsd_set_scope_chart(PARSER *parser, RRDSET *st, const ch
if(unlikely(old_collector_tid)) {
if(old_collector_tid != my_collector_tid) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
- keyword ? keyword : "UNKNOWN",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- my_collector_tid, old_collector_tid);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: keyword %s: 'host:%s/chart:%s' is collected twice (my tid %d, other collector tid %d)",
+ keyword ? keyword : "UNKNOWN",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ my_collector_tid, old_collector_tid);
return false;
}
@@ -389,8 +390,9 @@ static inline PARSER_RC PLUGINSD_DISABLE_PLUGIN(PARSER *parser, const char *keyw
parser->user.enabled = 0;
if(keyword && msg) {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: keyword %s: %s", keyword, msg);
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_INFO,
+ "PLUGINSD: keyword %s: %s", keyword, msg);
}
return PARSER_RC_ERROR;
@@ -1109,7 +1111,8 @@ void pluginsd_function_cancel(void *data) {
dfe_done(t);
if(sent <= 0)
- netdata_log_error("PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
+ nd_log(NDLS_DAEMON, NDLP_NOTICE,
+ "PLUGINSD: FUNCTION_CANCEL request didn't match any pending function requests in pluginsd.d.");
}
// this is the function that is called from
@@ -1626,9 +1629,10 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
if(!st) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
if(!parser->user.replay.rset_enabled) {
- error_limit_static_thread_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
- rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
+ nd_log_limit_static_thread_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_ERR,
+ "PLUGINSD: 'host:%s/chart:%s' got a %s but it is disabled by %s errors",
+ rrdhost_hostname(host), rrdset_id(st), PLUGINSD_KEYWORD_REPLAY_SET, PLUGINSD_KEYWORD_REPLAY_BEGIN);
// we have to return OK here
return PARSER_RC_OK;
@@ -1675,8 +1679,10 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
rd->collector.counter++;
}
else {
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. Ignoring data.",
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_COLLECTORS, NDLP_WARNING,
+ "PLUGINSD: 'host:%s/chart:%s/dim:%s' has the ARCHIVED flag set, but it is replicated. "
+ "Ignoring data.",
rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_name(rd));
}
}
@@ -2832,61 +2838,6 @@ static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PAR
// ----------------------------------------------------------------------------
-static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
-#ifdef NETDATA_INTERNAL_CHECKS
- if(reader->read_buffer[reader->read_len] != '\0')
- fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
-#endif
-
- ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
- if(unlikely(bytes_read <= 0))
- return false;
-
- reader->read_len += bytes_read;
- reader->read_buffer[reader->read_len] = '\0';
-
- return true;
-}
-
-static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
- errno = 0;
- struct pollfd fds[1];
-
- fds[0].fd = fd;
- fds[0].events = POLLIN;
-
- int ret = poll(fds, 1, timeout_ms);
-
- if (ret > 0) {
- /* There is data to read */
- if (fds[0].revents & POLLIN)
- return buffered_reader_read(reader, fd);
-
- else if(fds[0].revents & POLLERR) {
- netdata_log_error("PARSER: read failed: POLLERR.");
- return false;
- }
- else if(fds[0].revents & POLLHUP) {
- netdata_log_error("PARSER: read failed: POLLHUP.");
- return false;
- }
- else if(fds[0].revents & POLLNVAL) {
- netdata_log_error("PARSER: read failed: POLLNVAL.");
- return false;
- }
-
- netdata_log_error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
- return false;
- }
- else if (ret == 0) {
- netdata_log_error("PARSER: timeout while waiting for data.");
- return false;
- }
-
- netdata_log_error("PARSER: poll() failed with code %d.", ret);
- return false;
-}
-
void pluginsd_process_thread_cleanup(void *ptr) {
PARSER *parser = (PARSER *)ptr;
@@ -2905,6 +2856,33 @@ void pluginsd_process_thread_cleanup(void *ptr) {
parser_destroy(parser);
}
+bool parser_reconstruct_node(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.host)
+ return false;
+
+ buffer_strcat(wb, rrdhost_hostname(parser->user.host));
+ return true;
+}
+
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
+
+ buffer_strcat(wb, rrdset_name(parser->user.st));
+ return true;
+}
+
+bool parser_reconstruct_context(BUFFER *wb, void *ptr) {
+ PARSER *parser = ptr;
+ if(!parser || !parser->user.st)
+ return false;
+
+ buffer_strcat(wb, string2str(parser->user.st->context));
+ return true;
+}
+
inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations)
{
int enabled = cd->unsafe.enabled;
@@ -2952,33 +2930,51 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- buffered_reader_init(&parser->reader);
- BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
- while(likely(service_running(SERVICE_COLLECTORS))) {
- if (unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
- if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
- break;
-
- continue;
- }
-
- if(unlikely(parser_action(parser, buffer->buffer)))
- break;
+ {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ buffered_reader_init(&parser->reader);
+ BUFFER *buffer = buffer_create(sizeof(parser->reader.read_buffer) + 2, NULL);
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+
+ if(unlikely(!buffered_reader_next_line(&parser->reader, buffer))) {
+ buffered_reader_ret_t ret = buffered_reader_read_timeout(
+ &parser->reader,
+ fileno((FILE *) parser->fp_input),
+ 2 * 60 * MSEC_PER_SEC, true
+ );
+
+ if(unlikely(ret != BUFFERED_READER_READ_OK))
+ break;
+
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer)))
+ break;
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ buffer_free(buffer);
- cd->unsafe.enabled = parser->user.enabled;
- count = parser->user.data_collections_count;
+ cd->unsafe.enabled = parser->user.enabled;
+ count = parser->user.data_collections_count;
- if (likely(count)) {
- cd->successful_collections += count;
- cd->serial_failures = 0;
- }
- else
- cd->serial_failures++;
+ if(likely(count)) {
+ cd->successful_collections += count;
+ cd->serial_failures = 0;
+ }
+ else
+ cd->serial_failures++;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index 7a7b30a521..532a12139d 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -97,7 +97,6 @@ typedef struct parser {
PARSER_REPERTOIRE repertoire;
uint32_t flags;
int fd; // Socket
- size_t line;
FILE *fp_input; // Input source e.g. stream
FILE *fp_output; // Stream to send commands to plugin
@@ -111,6 +110,8 @@ typedef struct parser {
PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
struct buffered_reader reader;
+ struct line_splitter line;
+ PARSER_KEYWORD *keyword;
struct {
const char *end_keyword;
@@ -162,13 +163,17 @@ static inline PARSER_KEYWORD *parser_find_keyword(PARSER *parser, const char *co
return NULL;
}
+bool parser_reconstruct_node(BUFFER *wb, void *ptr);
+bool parser_reconstruct_instance(BUFFER *wb, void *ptr);
+bool parser_reconstruct_context(BUFFER *wb, void *ptr);
+
static inline int parser_action(PARSER *parser, char *input) {
#ifdef NETDATA_LOG_STREAM_RECEIVE
static __thread char line[PLUGINSD_LINE_MAX + 1];
strncpyz(line, input, sizeof(line) - 1);
#endif
- parser->line++;
+ parser->line.count++;
if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) {
char command[100 + 1];
@@ -200,24 +205,25 @@ static inline int parser_action(PARSER *parser, char *input) {
return 0;
}
- static __thread char *words[PLUGINSD_MAX_WORDS];
- size_t num_words = quoted_strings_splitter_pluginsd(input, words, PLUGINSD_MAX_WORDS);
- const char *command = get_word(words, num_words, 0);
+ parser->line.num_words = quoted_strings_splitter_pluginsd(input, parser->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(parser->line.words, parser->line.num_words, 0);
- if(unlikely(!command))
+ if(unlikely(!command)) {
+ line_splitter_reset(&parser->line);
return 0;
+ }
PARSER_RC rc;
- PARSER_KEYWORD *t = parser_find_keyword(parser, command);
- if(likely(t)) {
- worker_is_busy(t->worker_job_id);
+ parser->keyword = parser_find_keyword(parser, command);
+ if(likely(parser->keyword)) {
+ worker_is_busy(parser->keyword->worker_job_id);
#ifdef NETDATA_LOG_STREAM_RECEIVE
- if(parser->user.stream_log_fp && t->repertoire & parser->user.stream_log_repertoire)
+ if(parser->user.stream_log_fp && parser->keyword->repertoire & parser->user.stream_log_repertoire)
fprintf(parser->user.stream_log_fp, "%s", line);
#endif
- rc = parser_execute(parser, t, words, num_words);
+ rc = parser_execute(parser, parser->keyword, parser->line.words, parser->line.num_words);
// rc = (*t->func)(words, num_words, parser);
worker_is_idle();
}
@@ -225,22 +231,13 @@ static inline int parser_action(PARSER *parser, char *input) {
rc = PARSER_RC_ERROR;
if(rc == PARSER_RC_ERROR) {
- BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
- for(size_t i = 0; i < num_words ;i++) {
- if(i) buffer_fast_strcat(wb, " ", 1);
-
- buffer_fast_strcat(wb, "\"", 1);
- const char *s = get_word(words, num_words, i);
- buffer_strcat(wb, s?s:"");
- buffer_fast_strcat(wb, "\"", 1);
- }
-
+ CLEAN_BUFFER *wb = buffer_create(PLUGINSD_LINE_MAX, NULL);
+ line_splitter_reconstruct_line(wb, &parser->line);
netdata_log_error("PLUGINSD: parser_action('%s') failed on line %zu: { %s } (quotes added to show parsing)",
- command, parser->line, buffer_tostring(wb));
-
- buffer_free(wb);
+ command, parser->line.count, buffer_tostring(wb));
}
+ line_splitter_reset(&parser->line);
return (rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP);
}