summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-19 20:52:35 +0300
committerGitHub <noreply@github.com>2023-06-19 20:52:35 +0300
commit0b4f820e9d42d10f64c3305d9c084261bc9880cf (patch)
tree641fcb81e9c84e08fbe08ca80776c6b593b218ba /collectors/plugins.d
parent35884c7a8447fbeb699cae6a2a20dc0a2137c659 (diff)
/api/v2/nodes and streaming function (#15168)
* dummy streaming function * expose global functions upstream * separate function for pushing global functions * add missing conditions * allow streaming function to run async * started internal API for functions * cache host retention and expose it to /api/v2/nodes * internal API for function table fields; more progress on streaming status * abstracted and unified rrdhost status * port old coverity warning fix - although it is not needed * add ML information to rrdhost status * add ML capability to streaming to signal the transmission of ML information; added ML information to host status * protect host->receiver * count metrics and instances per host * exposed all inbound and outbound streaming * fix for ML status and dependency of DATA_WITH_ML to INTERPOLATED, not IEEE754 * update ML dummy * added all fields * added streaming group by and cleaned up accepted values by cloud * removed type * Revert "removed type" This reverts commit faae4177e603d4f85b7433f33f92ef3ccd23976e. * added context to db summary * new /api/v2/nodes schema * added ML type * change default function charts * log to trace new capa * add more debug * removed debugging code * retry on receive interrupted read; respect sender reconnect delay in all cases * set disconnected host flag and manipulate localhost child count atomically, inside set/clear receiver * fix infinite loop * send_to_plugin() now has a spinlock to ensure that only 1 thread is writing to the plugin/child at the same time * global cloud_status() call * cloud should be a section, since it will contain error information * put cloud capabilities into cloud * aclk status in /api/v2 agents sections * keep aclk_connection_counter * updates on /api/v2/nodes * final /api/v2/nodes and addition of /api/v2/nodes_instances * parametrize all /api/v2/xxx output to control which info is outputed per endpoint * always accept nodes selector * st needs to be per instance, not per node * fix merging of contexts; fix cups plugin priorities * add after and before parameters to /api/v2/contexts/nodes/nodes_instances/q * give each libuv worker a unique id * aclk http_api_v2 version 4
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c35
1 files changed, 26 insertions, 9 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 097e5ea606..1bca604b4f 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -4,51 +4,66 @@
#define LOG_FUNCTIONS false
-static int send_to_plugin(const char *txt, void *data) {
+static ssize_t send_to_plugin(const char *txt, void *data) {
PARSER *parser = data;
if(!txt || !*txt)
return 0;
+ errno = 0;
+ netdata_spinlock_lock(&parser->writer.spinlock);
+ ssize_t bytes = -1;
+
#ifdef ENABLE_HTTPS
NETDATA_SSL *ssl = parser->ssl_output;
if(ssl) {
+
if(SSL_connection(ssl))
- return (int)netdata_ssl_write(ssl, (void *)txt, strlen(txt));
+ bytes = netdata_ssl_write(ssl, (void *) txt, strlen(txt));
- error("PLUGINSD: cannot send command (SSL)");
- return -1;
+ else
+ error("PLUGINSD: cannot send command (SSL)");
+
+ netdata_spinlock_unlock(&parser->writer.spinlock);
+ return bytes;
}
#endif
if(parser->fp_output) {
- int bytes = fprintf(parser->fp_output, "%s", txt);
+
+ bytes = fprintf(parser->fp_output, "%s", txt);
if(bytes <= 0) {
error("PLUGINSD: cannot send command (FILE)");
- return -2;
+ bytes = -2;
}
- fflush(parser->fp_output);
+ else
+ fflush(parser->fp_output);
+
+ netdata_spinlock_unlock(&parser->writer.spinlock);
return bytes;
}
if(parser->fd != -1) {
- size_t bytes = 0;
- size_t total = strlen(txt);
+ bytes = 0;
+ ssize_t total = (ssize_t)strlen(txt);
ssize_t sent;
do {
sent = write(parser->fd, &txt[bytes], total - bytes);
if(sent <= 0) {
error("PLUGINSD: cannot send command (fd)");
+ netdata_spinlock_unlock(&parser->writer.spinlock);
return -3;
}
bytes += sent;
}
while(bytes < total);
+ netdata_spinlock_unlock(&parser->writer.spinlock);
return (int)bytes;
}
+ netdata_spinlock_unlock(&parser->writer.spinlock);
error("PLUGINSD: cannot send command (no output socket/pipe/file given to plugins.d parser)");
return -4;
}
@@ -402,6 +417,8 @@ static PARSER_RC pluginsd_host_define_end(char **words __maybe_unused, size_t nu
false
);
+ rrdhost_option_set(host, RRDHOST_OPTION_VIRTUAL_HOST);
+
if(host->rrdlabels) {
rrdlabels_migrate_to_these(host->rrdlabels, u->host_define.rrdlabels);
}