diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-06-19 20:52:35 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-19 20:52:35 +0300 |
commit | 0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch) | |
tree | 641fcb81e9c84e08fbe08ca80776c6b593b218ba /collectors/plugins.d | |
parent | 35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff) |
/api/v2/nodes and streaming function (#15168)
* dummy streaming function
* expose global functions upstream
* separate function for pushing global functions
* add missing conditions
* allow streaming function to run async
* started internal API for functions
* cache host retention and expose it to /api/v2/nodes
* internal API for function table fields; more progress on streaming status
* abstracted and unified rrdhost status
* port old coverity warning fix - although it is not needed
* add ML information to rrdhost status
* add ML capability to streaming to signal the transmission of ML information; added ML information to host status
* protect host->receiver
* count metrics and instances per host
* exposed all inbound and outbound streaming
* fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754
* update ML dummy
* added all fields
* added streaming group by and cleaned up accepted values by cloud
* removed type
* Revert "removed type"
This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e.
* added context to db summary
* new /api/v2/nodes schema
* added ML type
* change default function charts
* log to trace new capa
* add more debug
* removed debugging code
* retry on receive interrupted read; respect sender reconnect delay in all cases
* set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver
* fix infinite loop
* send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time
* global cloud_status() call
* cloud should be a section, since it will contain error information
* put cloud capabilities into cloud
* aclk status in /api/v2 agents sections
* keep aclk_connection_counter
* updates on /api/v2/nodes
* final /api/v2/nodes and addition of /api/v2/nodes_instances
* parametrize all /api/v2/xxx output to control which info is outputed per endpoint
* always accept nodes selector
* st needs to be per instance, not per node
* fix merging of contexts; fix cups plugin priorities
* add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q
* give each libuv worker a unique id
* aclk http_api_v2 version 4
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r-- | collectors/plugins.d/pluginsd_parser.c | 35 |
1 files changed, 26 insertions, 9 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c index 097e5ea606..1bca604b4f 100644 --- a/collectors/plugins.d/pluginsd_parser.c +++ b/collectors/plugins.d/pluginsd_parser.c @@ -4,51 +4,66 @@ #define LOG_FUNCTIONS false -static int send_to_plugin(const char *txt, void *data) { +static ssize_t send_to_plugin(const char *txt, void *data) { PARSER *parser = data; if(!txt || !*txt) return 0; + errno = 0; + netdata_spinlock_lock(&parser->writer.spinlock); + ssize_t bytes = -1; + #ifdef ENABLE_HTTPS NETDATA_SSL *ssl = parser->ssl_output; if(ssl) { + if(SSL_connection(ssl)) - return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt)); + bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt)); - error("PLUGINSD: cannot send command (SSL)"); - return -1; + else + error("PLUGINSD: cannot send command (SSL)"); + + netdata_spinlock_unlock(&parser->writer.spinlock); + return bytes; } #endif if(parser->fp_output) { - int bytes = fprintf(parser->fp_output, "%s", txt); + + bytes = fprintf(parser->fp_output, "%s", txt); if(bytes <= 0) { error("PLUGINSD: cannot send command (FILE)"); - return -2; + bytes = -2; } - fflush(parser->fp_output); + else + fflush(parser->fp_output); + + netdata_spinlock_unlock(&parser->writer.spinlock); return bytes; } if(parser->fd != -1) { - size_t bytes = 0; - size_t total = strlen(txt); + bytes = 0; + ssize_t total = (ssize_t)strlen(txt); ssize_t sent; do { sent = write(parser->fd, &txt[bytes], total - bytes); if(sent <= 0) { error("PLUGINSD: cannot send command (fd)"); + netdata_spinlock_unlock(&parser->writer.spinlock); return -3; } bytes += sent; } while(bytes < total); + netdata_spinlock_unlock(&parser->writer.spinlock); return (int)bytes; } + netdata_spinlock_unlock(&parser->writer.spinlock); error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)"); return -4; } @@ -402,6 +417,8 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu false ); + rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST); + if(host->rrdlabels) { rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels); } |