summaryrefslogtreecommitdiffstats
path: root/collectors/plugins.d
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-07-01 01:13:00 +0300
committerGitHub <noreply@github.com>2023-07-01 01:13:00 +0300
commitfdfc8fa0b13414898d1ac7d6e51808b418b951de (patch)
tree97adfd5bdbd1cfe6eadbe143c0517a59eb9f1e45 /collectors/plugins.d
parent5b56f09dbcfa159605268e731c02734486530507 (diff)
Optimizations part 3 (#15293)
* use madvise to speed up indexing * collect all rrddim members into a collector structure * use tier 0 virtual point for storing last stored value * reorganize key fields in rrddim * remove fgets from pluginsd and replace it with read() * properly uncork the web server sockets * Revert "reorganize key fields in rrddim" This reverts commit 2d45fa3959087e05462d387ff115a260f3a04b60. * Revert "use tier 0 virtual point for storing last stored value" This reverts commit a576cdd377ad4778a3b8608cabbb7ea7bb19a3a8. * fix cork names * fix compilation warnings
Diffstat (limited to 'collectors/plugins.d')
-rw-r--r--collectors/plugins.d/pluginsd_parser.c115
-rw-r--r--collectors/plugins.d/pluginsd_parser.h2
2 files changed, 53 insertions, 64 deletions
diff --git a/collectors/plugins.d/pluginsd_parser.c b/collectors/plugins.d/pluginsd_parser.c
index 985640f341..d7c4d137e1 100644
--- a/collectors/plugins.d/pluginsd_parser.c
+++ b/collectors/plugins.d/pluginsd_parser.c
@@ -1307,9 +1307,9 @@ static inline PARSER_RC pluginsd_replay_set(char **words, size_t num_words, PARS
}
rrddim_store_metric(rd, parser->user.replay.end_time_ut, value, flags);
- rd->last_collected_time.tv_sec = parser->user.replay.end_time;
- rd->last_collected_time.tv_usec = 0;
- rd->collections_counter++;
+ rd->collector.last_collected_time.tv_sec = parser->user.replay.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.counter++;
}
else {
error_limit_static_global_var(erl, 1, 0);
@@ -1340,16 +1340,16 @@ static inline PARSER_RC pluginsd_replay_rrddim_collection_state(char **words, si
RRDDIM *rd = pluginsd_acquire_dimension(host, st, dimension, PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE);
if(!rd) return PLUGINSD_DISABLE_PLUGIN(parser, NULL, NULL);
- usec_t dim_last_collected_ut = (usec_t)rd->last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->last_collected_time.tv_usec;
+ usec_t dim_last_collected_ut = (usec_t)rd->collector.last_collected_time.tv_sec * USEC_PER_SEC + (usec_t)rd->collector.last_collected_time.tv_usec;
usec_t last_collected_ut = last_collected_ut_str ? str2ull_encoded(last_collected_ut_str) : 0;
if(last_collected_ut > dim_last_collected_ut) {
- rd->last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
- rd->last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_sec = (time_t)(last_collected_ut / USEC_PER_SEC);
+ rd->collector.last_collected_time.tv_usec = (last_collected_ut % USEC_PER_SEC);
}
- rd->last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
- rd->last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
- rd->last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
+ rd->collector.last_collected_value = last_collected_value_str ? str2ll_encoded(last_collected_value_str) : 0;
+ rd->collector.last_calculated_value = last_calculated_value_str ? str2ndd_encoded(last_calculated_value_str, NULL) : 0;
+ rd->collector.last_stored_value = last_stored_value_str ? str2ndd_encoded(last_stored_value_str, NULL) : 0.0;
return PARSER_RC_OK;
}
@@ -1718,12 +1718,12 @@ static inline PARSER_RC pluginsd_set_v2(char **words, size_t num_words, PARSER *
// store it
rrddim_store_metric(rd, parser->user.v2.end_time * USEC_PER_SEC, value, flags);
- rd->last_collected_time.tv_sec = parser->user.v2.end_time;
- rd->last_collected_time.tv_usec = 0;
- rd->last_collected_value = collected_value;
- rd->last_stored_value = value;
- rd->last_calculated_value = value;
- rd->collections_counter++;
+ rd->collector.last_collected_time.tv_sec = parser->user.v2.end_time;
+ rd->collector.last_collected_time.tv_usec = 0;
+ rd->collector.last_collected_value = collected_value;
+ rd->collector.last_stored_value = value;
+ rd->collector.last_calculated_value = value;
+ rd->collector.counter++;
rrddim_set_updated(rd);
timing_step(TIMING_STEP_SET2_STORE);
@@ -1778,8 +1778,8 @@ static inline PARSER_RC pluginsd_end_v2(char **words __maybe_unused, size_t num_
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
- rd->calculated_value = 0;
- rd->collected_value = 0;
+ rd->collector.calculated_value = 0;
+ rd->collector.collected_value = 0;
rrddim_clear_updated(rd);
}
rrddim_foreach_done(rd);
@@ -1849,76 +1849,59 @@ static inline PARSER_RC streaming_claimed_id(char **words, size_t num_words, PAR
// ----------------------------------------------------------------------------
-typedef enum {
- PARSER_FGETS_RESULT_OK,
- PARSER_FGETS_RESULT_TIMEOUT,
- PARSER_FGETS_RESULT_ERROR,
- PARSER_FGETS_RESULT_EOF,
-} PARSER_FGETS_RESULT;
+static inline bool buffered_reader_read(struct buffered_reader *reader, int fd) {
+#ifdef NETDATA_INTERNAL_CHECKS
+ if(reader->read_buffer[reader->read_len] != '\0')
+ fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
+#endif
-static inline PARSER_FGETS_RESULT parser_fgets(char *s, int size, FILE *stream) {
- errno = 0;
+ ssize_t bytes_read = read(fd, reader->read_buffer + reader->read_len, sizeof(reader->read_buffer) - reader->read_len - 1);
+ if(unlikely(bytes_read <= 0))
+ return false;
+
+ reader->read_len += bytes_read;
+ reader->read_buffer[reader->read_len] = '\0';
+
+ return true;
+}
+static inline bool buffered_reader_read_timeout(struct buffered_reader *reader, int fd, int timeout_ms) {
+ errno = 0;
struct pollfd fds[1];
- int timeout_msecs = 2 * 60 * MSEC_PER_SEC;
- fds[0].fd = fileno(stream);
+ fds[0].fd = fd;
fds[0].events = POLLIN;
- int ret = poll(fds, 1, timeout_msecs);
+ int ret = poll(fds, 1, timeout_ms);
if (ret > 0) {
/* There is data to read */
- if (fds[0].revents & POLLIN) {
- char *tmp = fgets(s, size, stream);
-
- if(unlikely(!tmp)) {
- if (feof(stream)) {
- error("PARSER: read failed: end of file.");
- return PARSER_FGETS_RESULT_EOF;
- }
-
- else if (ferror(stream)) {
- error("PARSER: read failed: input error.");
- return PARSER_FGETS_RESULT_ERROR;
- }
-
- error("PARSER: read failed: unknown error.");
- return PARSER_FGETS_RESULT_ERROR;
- }
+ if (fds[0].revents & POLLIN)
+ return buffered_reader_read(reader, fd);
- return PARSER_FGETS_RESULT_OK;
- }
else if(fds[0].revents & POLLERR) {
error("PARSER: read failed: POLLERR.");
- return PARSER_FGETS_RESULT_ERROR;
+ return false;
}
else if(fds[0].revents & POLLHUP) {
error("PARSER: read failed: POLLHUP.");
- return PARSER_FGETS_RESULT_ERROR;
+ return false;
}
else if(fds[0].revents & POLLNVAL) {
error("PARSER: read failed: POLLNVAL.");
- return PARSER_FGETS_RESULT_ERROR;
+ return false;
}
error("PARSER: poll() returned positive number, but POLLIN|POLLERR|POLLHUP|POLLNVAL are not set.");
- return PARSER_FGETS_RESULT_ERROR;
+ return false;
}
else if (ret == 0) {
error("PARSER: timeout while waiting for data.");
- return PARSER_FGETS_RESULT_TIMEOUT;
+ return false;
}
error("PARSER: poll() failed with code %d.", ret);
- return PARSER_FGETS_RESULT_ERROR;
-}
-
-static int parser_next(PARSER *parser, char *buffer, size_t buffer_size) {
- if(likely(parser_fgets(buffer, (int)buffer_size, (FILE *)parser->fp_input) == PARSER_FGETS_RESULT_OK))
- return 0;
-
- return 1;
+ return false;
}
void pluginsd_process_thread_cleanup(void *ptr) {
@@ -1979,10 +1962,14 @@ inline size_t pluginsd_process(RRDHOST *host, struct plugind *cd, FILE *fp_plugi
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- char buffer[PLUGINSD_LINE_MAX + 1];
-
- while (likely(!parser_next(parser, buffer, PLUGINSD_LINE_MAX))) {
- if (unlikely(!service_running(SERVICE_COLLECTORS) || parser_action(parser, buffer)))
+ buffered_reader_init(&parser->reader);
+ char buffer[PLUGINSD_LINE_MAX + 2];
+ while(likely(service_running(SERVICE_COLLECTORS))) {
+ if (unlikely(!buffered_reader_next_line(&parser->reader, buffer, PLUGINSD_LINE_MAX + 2))) {
+ if(unlikely(!buffered_reader_read_timeout(&parser->reader, fileno((FILE *)parser->fp_input), 2 * 60 * MSEC_PER_SEC)))
+ break;
+ }
+ else if(unlikely(parser_action(parser, buffer)))
break;
}
diff --git a/collectors/plugins.d/pluginsd_parser.h b/collectors/plugins.d/pluginsd_parser.h
index d45a45e8af..cfbb20dee3 100644
--- a/collectors/plugins.d/pluginsd_parser.h
+++ b/collectors/plugins.d/pluginsd_parser.h
@@ -95,6 +95,8 @@ typedef struct parser {
PARSER_USER_OBJECT user; // User defined structure to hold extra state between calls
+ struct buffered_reader reader;
+
struct {
const char *end_keyword;
BUFFER *response;