From 8fc3b351a2e7fc96eced8f924de2e9cec9842128 Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Wed, 5 Oct 2022 14:13:46 +0300 Subject: Allow netdata plugins to expose functions for querying more information about specific charts (#13720) * function renames and code cleanup in popen.c; no actual code changes * netdata popen() now opens both child process stdin and stdout and returns FILE * for both * pass both input and output to parser structures * updated rrdset to call custom functions * RRDSET FUNCTION leading calls for both sync and async operation * put RRDSET functions to a separate file * added format and timeout at function definition * support for synchronous (internal plugins) and asynchronous (external plugins and children) functions * /api/v1/function endpoint * functions are now attached to the host and there is a dictionary view per chart * functions implemented at plugins.d * remove the defer until keyword hook from plugins.d when it is done * stream sender implementation of functions * sanitization of all functions so that certain characters are only allowed * strictier sanitization * common max size * 1st working plugins.d example * always init inflight dictionary * properly destroy dictionaries to avoid parallel insertion of items * add more debugging on disconnection reasons * add more debugging on disconnection reasons again * streaming receiver respects newlines * dont use the same fp for both streaming receive and send * dont free dbengine memory with internal checks * make sender proceed in the buffer * added timing info and garbage collection at plugins.d * added info about routing nodes * added info about routing nodes with delay * added more info about delays * added more info about delays again * signal sending thread to wake up * streaming version labeling and commented code to support capabilities * added functions to /api/v1/data, /api/v1/charts, /api/v1/chart, /api/v1/info * redirect top output to stdout * address coverity findings * fix resource leaks of popen * log attempts to connect to individual destinations * better messages * properly parse destinations * try to find a function from the most matching to the least matching * log added streaming destinations * rotate destinations bypassing a node in the middle that does not accept our connection * break the loops properly * use typedef to define callbacks * capabilities negotiation during streaming * functions exposed upstream based on capabilities; compression disabled per node persisting reconnects; always try to connect with all capabilities * restore functionality to lookup functions * better logging of capabilities * remove old versions from capabilities when a newer version is there * fix formatting * optimization for plugins.d rrdlabels to avoid creating and destructing dictionaries all the time * delayed health initialization for rrddim and rrdset * cleanup health initialization * fix for popen() not returning the right value * add health worker jobs for initializing rrdset and rrddim * added content type support for functions; apps.plugin permanent function to display all the processes * fixes for functions parameters parsing in apps.plugin * fix for process matching in apps.plugiin * first working function for apps.plugin * Dashboard ACL is disabled for functions; Function errors are all in JSON format * apps.plugin function processes returns json table * use json_escape_string() to escape message * fix formatting * apps.plugin exposes all its metrics to function processes * fix json formatting when filtering out some rows * reopen the internal pipe of rrdpush in case of errors * misplaced statement * do not use buffer->len * support for GLOBAL functions (functions that are not linked to a chart * added /api/v1/functions endpoint; removed format from the FUNCTIONS api; * swagger documentation about the new api end points * added plugins.d documentation about functions * never re-close a file * remove uncessesary ifdef * fixed issues identified by codacy * fix for null label value * make edit-config copy-and-paste friendly * Revert "make edit-config copy-and-paste friendly" This reverts commit 54500c0e0a97f65a0c66c4d34e966f6a9056698e. * reworked sender handshake to fix coverity findings * timeout is zero, for both send_timeout() and recv_timeout() * properly detect that parent closed the socket * support caching of function responses; limit function response to 10MB; added protection from malformed function responses * disabled excessive logging * added units to apps.plugin function processes and normalized all values to be human readable * shorter field names * fixed issues reported * fixed apps.plugin error response; tested that pluginsd can properly handle faulty responses * use double linked list macros for double linked list management * faster apps.plugin function printing by minimizing file operations * added memory percentage * fix compatibility issues with older compilers and FreeBSD * rrdpush sender code cleanup; rrhost structure cleanup from sender flags and variables; * fix letftover variable in ifdef * apps.plugin: do not call detach from the thread; exit immediately when input is broken * exclude AR charts from health * flush cleaner; prefer sender output * clarity * do not fill the cbuffer if not connected * fix * dont enabled host->sender if streaming is not enabled; send host label updates to parent; * functions are only available through ACLK * Prepared statement reports only in dev mode * fix AR chart detection * fix for streaming not being enabling itself * more cleanup of sender and receiver structures * moved read-only flags and configuration options to rrdhost->options * fixed merge with master * fix for incomplete rename * prevent service thread from working on charts that are being collected Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> --- parser/parser.c | 106 +++++++++++++++++++++++++++++++++++++++++++------------- parser/parser.h | 33 +++++++++++++----- 2 files changed, 105 insertions(+), 34 deletions(-) (limited to 'parser') diff --git a/parser/parser.c b/parser/parser.c index 45026e5143..1f1034add3 100644 --- a/parser/parser.c +++ b/parser/parser.c @@ -1,10 +1,11 @@ // SPDX-License-Identifier: GPL-3.0-or-later #include "parser.h" +#include "collectors/plugins.d/pluginsd_parser.h" -static inline int find_keyword(char *str, char *keyword, int max_size, int (*custom_isspace)(char)) +inline int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)) { - char *s = str, *keyword_start; + const char *s = str, *keyword_start; while (unlikely(custom_isspace(*s))) s++; keyword_start = s; @@ -28,7 +29,7 @@ static inline int find_keyword(char *str, char *keyword, int max_size, int (*cus * */ -PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags) +PARSER *parser_init(RRDHOST *host, void *user, void *input, void *output, PARSER_INPUT_TYPE flags) { PARSER *parser; @@ -36,9 +37,11 @@ PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE fl parser->plugins_action = callocz(1, sizeof(PLUGINSD_ACTION)); parser->user = user; parser->input = input; + parser->output = output; parser->flags = flags; parser->host = host; parser->worker_job_next_id = WORKER_PARSER_FIRST_JOB; + inflight_functions_init(parser); #ifdef ENABLE_HTTPS parser->bytesleft = 0; @@ -46,18 +49,34 @@ PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE fl #endif if (unlikely(!(flags & PARSER_NO_PARSE_INIT))) { - parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); - parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); - parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); - parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); - parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); - parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); - parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); - parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); - parser_add_keyword(parser, "CLABEL_COMMIT", pluginsd_clabel_commit); - parser_add_keyword(parser, "CLABEL", pluginsd_clabel); - parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); - parser_add_keyword(parser, "SET", pluginsd_set); + parser_add_keyword(parser, PLUGINSD_KEYWORD_FLUSH, pluginsd_flush); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CHART, pluginsd_chart); + parser_add_keyword(parser, PLUGINSD_KEYWORD_DIMENSION, pluginsd_dimension); + parser_add_keyword(parser, PLUGINSD_KEYWORD_DISABLE, pluginsd_disable); + parser_add_keyword(parser, PLUGINSD_KEYWORD_VARIABLE, pluginsd_variable); + parser_add_keyword(parser, PLUGINSD_KEYWORD_LABEL, pluginsd_label); + parser_add_keyword(parser, PLUGINSD_KEYWORD_OVERWRITE, pluginsd_overwrite); + parser_add_keyword(parser, PLUGINSD_KEYWORD_END, pluginsd_end); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL_COMMIT, pluginsd_clabel_commit); + parser_add_keyword(parser, PLUGINSD_KEYWORD_CLABEL, pluginsd_clabel); + parser_add_keyword(parser, PLUGINSD_KEYWORD_BEGIN, pluginsd_begin); + parser_add_keyword(parser, PLUGINSD_KEYWORD_SET, pluginsd_set); + parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION, pluginsd_function); + parser_add_keyword(parser, PLUGINSD_KEYWORD_FUNCTION_RESULT_BEGIN, pluginsd_function_result_begin); + //parser_add_keyword(parser, PLUGINSD_KEYWORD_GAPS_REQUEST, pluginsd_gaps_request); + } + + if(unlikely(!(flags & PARSER_NO_ACTION_INIT))) { + parser->plugins_action->begin_action = &pluginsd_begin_action; + parser->plugins_action->flush_action = &pluginsd_flush_action; + parser->plugins_action->end_action = &pluginsd_end_action; + parser->plugins_action->disable_action = &pluginsd_disable_action; + parser->plugins_action->variable_action = &pluginsd_variable_action; + parser->plugins_action->dimension_action = &pluginsd_dimension_action; + parser->plugins_action->label_action = &pluginsd_label_action; + parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; + parser->plugins_action->chart_action = &pluginsd_chart_action; + parser->plugins_action->set_action = &pluginsd_set_action; } return parser; @@ -155,6 +174,8 @@ void parser_destroy(PARSER *parser) if (unlikely(!parser)) return; + dictionary_destroy(parser->inflight.functions); + PARSER_KEYWORD *tmp_keyword, *tmp_keyword_next; PARSER_DATA *tmp_parser_data, *tmp_parser_data_next; @@ -237,32 +258,65 @@ int parser_next(PARSER *parser) inline int parser_action(PARSER *parser, char *input) { - PARSER_RC rc = PARSER_RC_OK; + PARSER_RC rc = PARSER_RC_OK; char *words[PLUGINSD_MAX_WORDS] = { NULL }; char command[PLUGINSD_LINE_MAX + 1]; keyword_function action_function; keyword_function *action_function_list = NULL; - if (unlikely(!parser)) + if (unlikely(!parser)) { + internal_error(true, "parser is NULL"); return 1; + } + parser->recover_location[0] = 0x0; // if not direct input check if we have reprocessed this if (unlikely(!input && parser->flags & PARSER_INPUT_PROCESSED)) return 0; - PARSER_KEYWORD *tmp_keyword = parser->keyword; + PARSER_KEYWORD *tmp_keyword = parser->keyword; if (unlikely(!tmp_keyword)) { + internal_error(true, "called without a keyword"); return 1; } if (unlikely(!input)) input = parser->buffer; - if (unlikely(!find_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space))) + if(unlikely(parser->flags & PARSER_DEFER_UNTIL_KEYWORD)) { + bool has_keyword = find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space); + + if(!has_keyword || strcmp(command, parser->defer.end_keyword) != 0) { + if(parser->defer.response) { + buffer_strcat(parser->defer.response, input); + if(buffer_strlen(parser->defer.response) > 10 * 1024 * 1024) { + // more than 10MB of data + // a bad plugin that did not send the end_keyword + internal_error(true, "Deferred response is too big (%zu bytes). Stopping this plugin.", buffer_strlen(parser->defer.response)); + return 1; + } + } + return 0; + } + else { + // call the action + parser->defer.action(parser, parser->defer.action_data); + + // empty everything + parser->defer.action = NULL; + parser->defer.action_data = NULL; + parser->defer.end_keyword = NULL; + parser->defer.response = NULL; + parser->flags &= ~PARSER_DEFER_UNTIL_KEYWORD; + } + return 0; + } + + if (unlikely(!find_first_keyword(input, command, PLUGINSD_LINE_MAX, pluginsd_space))) return 0; - if ((parser->flags & PARSER_INPUT_ORIGINAL) == PARSER_INPUT_ORIGINAL) + if ((parser->flags & PARSER_INPUT_KEEP_ORIGINAL) == PARSER_INPUT_KEEP_ORIGINAL) pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, parser->recover_input, parser->recover_location, PARSER_MAX_RECOVER_KEYWORDS); else pluginsd_split_words(input, words, PLUGINSD_MAX_WORDS, NULL, NULL, 0); @@ -285,16 +339,17 @@ inline int parser_action(PARSER *parser, char *input) rc = parser->unknown_function(words, parser->user, NULL); else rc = PARSER_RC_ERROR; -#ifdef NETDATA_INTERNAL_CHECKS - error("Unknown keyword [%s]", input); -#endif + + internal_error(rc != PARSER_RC_OK, "Unknown keyword [%s]", input); } else { worker_is_busy(worker_job_id); while ((action_function = *action_function_list) != NULL) { rc = action_function(words, parser->user, parser->plugins_action); - if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) - break; + if (unlikely(rc == PARSER_RC_ERROR || rc == PARSER_RC_STOP)) { + internal_error(true, "action_function() failed with rc = %u", rc); + break; + } action_function_list++; } worker_is_idle(); @@ -303,6 +358,7 @@ inline int parser_action(PARSER *parser, char *input) if (likely(input == parser->buffer)) parser->flags |= PARSER_INPUT_PROCESSED; + internal_error(rc == PARSER_RC_ERROR, "parser_action() failed."); return (rc == PARSER_RC_ERROR); } diff --git a/parser/parser.h b/parser/parser.h index c64475f9c5..1be3e94f25 100644 --- a/parser/parser.h +++ b/parser/parser.h @@ -32,8 +32,6 @@ typedef struct pluginsd_action { PARSER_RC (*variable_action)(void *user, RRDHOST *host, RRDSET *st, char *name, int global, NETDATA_DOUBLE value); PARSER_RC (*label_action)(void *user, char *key, char *value, RRDLABEL_SRC source); PARSER_RC (*overwrite_action)(void *user, RRDHOST *host, DICTIONARY *new_labels); - PARSER_RC (*clabel_action)(void *user, char *key, char *value, RRDLABEL_SRC source); - PARSER_RC (*clabel_commit_action)(void *user, RRDHOST *host, DICTIONARY *new_labels); PARSER_RC (*guid_action)(void *user, uuid_t *uuid); PARSER_RC (*context_action)(void *user, uuid_t *uuid); @@ -43,11 +41,12 @@ typedef struct pluginsd_action { } PLUGINSD_ACTION; typedef enum parser_input_type { - PARSER_INPUT_SPLIT = 1 << 1, - PARSER_INPUT_ORIGINAL = 1 << 2, - PARSER_INPUT_PROCESSED = 1 << 3, - PARSER_NO_PARSE_INIT = 1 << 4, - PARSER_NO_ACTION_INIT = 1 << 5, + PARSER_INPUT_SPLIT = (1 << 1), + PARSER_INPUT_KEEP_ORIGINAL = (1 << 2), + PARSER_INPUT_PROCESSED = (1 << 3), + PARSER_NO_PARSE_INIT = (1 << 4), + PARSER_NO_ACTION_INIT = (1 << 5), + PARSER_DEFER_UNTIL_KEYWORD = (1 << 6), } PARSER_INPUT_TYPE; #define PARSER_INPUT_FULL (PARSER_INPUT_SPLIT|PARSER_INPUT_ORIGINAL) @@ -73,6 +72,7 @@ typedef struct parser { uint8_t version; // Parser version RRDHOST *host; void *input; // Input source e.g. stream + void *output; // Stream to send commands to plugin PARSER_DATA *data; // extra input PARSER_KEYWORD *keyword; // List of parse keywords and functions PLUGINSD_ACTION *plugins_action; @@ -90,9 +90,24 @@ typedef struct parser { char tmpbuffer[PLUGINSD_LINE_MAX]; char *readfrom; #endif + + struct { + const char *end_keyword; + BUFFER *response; + void (*action)(struct parser *parser, void *action_data); + void *action_data; + } defer; + + struct { + DICTIONARY *functions; + usec_t smaller_timeout; + } inflight; + } PARSER; -PARSER *parser_init(RRDHOST *host, void *user, void *input, PARSER_INPUT_TYPE flags); +extern int find_first_keyword(const char *str, char *keyword, int max_size, int (*custom_isspace)(char)); + +PARSER *parser_init(RRDHOST *host, void *user, void *input, void *output, PARSER_INPUT_TYPE flags); int parser_add_keyword(PARSER *working_parser, char *keyword, keyword_function func); int parser_next(PARSER *working_parser); int parser_action(PARSER *working_parser, char *input); @@ -100,7 +115,7 @@ int parser_push(PARSER *working_parser, char *line); void parser_destroy(PARSER *working_parser); int parser_recover_input(PARSER *working_parser); -extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp, int trust_durations); +extern size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugin_input, FILE *fp_plugin_output, int trust_durations); extern PARSER_RC pluginsd_set(char **words, void *user, PLUGINSD_ACTION *plugins_action); extern PARSER_RC pluginsd_begin(char **words, void *user, PLUGINSD_ACTION *plugins_action); -- cgit v1.2.3