summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-07-01 01:13:00 +0300
committerGitHub <noreply@github.com>2023-07-01 01:13:00 +0300
commitfdfc8fa0b13414898d1ac7d6e51808b418b951de (patch)
tree97adfd5bdbd1cfe6eadbe143c0517a59eb9f1e45 /streaming
parent5b56f09dbcfa159605268e731c02734486530507 (diff)
Optimizations part 3 (#15293)
* use madvise to speed up indexing * collect all rrddim members into a collector structure * use tier 0 virtual point for storing last stored value * reorganize key fields in rrddim * remove fgets from pluginsd and replace it with read() * properly uncork the web server sockets * Revert "reorganize key fields in rrddim" This reverts commit 2d45fa3959087e05462d387ff115a260f3a04b60. * Revert "use tier 0 virtual point for storing last stored value" This reverts commit a576cdd377ad4778a3b8608cabbb7ea7bb19a3a8. * fix cork names * fix compilation warnings
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c80
-rw-r--r--streaming/replication.c10
-rw-r--r--streaming/rrdpush.c6
-rw-r--r--streaming/rrdpush.h17
4 files changed, 62 insertions, 51 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 9cf69c00f7..424c983316 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -94,19 +94,19 @@ static inline int read_stream(struct receiver_state *r, char* buffer, size_t siz
static inline bool receiver_read_uncompressed(struct receiver_state *r) {
#ifdef NETDATA_INTERNAL_CHECKS
- if(r->read_buffer[r->read_len] != '\0')
+ if(r->reader.read_buffer[r->reader.read_len] != '\0')
fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
- int bytes_read = read_stream(r, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
if(unlikely(bytes_read <= 0))
return false;
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
- r->read_len += bytes_read;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += bytes_read;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
@@ -114,24 +114,24 @@ static inline bool receiver_read_uncompressed(struct receiver_state *r) {
#ifdef ENABLE_COMPRESSION
static inline bool receiver_read_compressed(struct receiver_state *r) {
- internal_fatal(r->read_buffer[r->read_len] != '\0',
+ internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
"%s: read_buffer does not start with zero #2", __FUNCTION__ );
// first use any available uncompressed data
if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) {
- size_t available = sizeof(r->read_buffer) - r->read_len - 1;
+ size_t available = sizeof(r->reader.read_buffer) - r->reader.read_len - 1;
if (likely(available)) {
- size_t len = rrdpush_decompressor_get(&r->decompressor, r->read_buffer + r->read_len, available);
+ size_t len = rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, available);
if (unlikely(!len)) {
internal_error(true, "decompressor returned zero length #1");
return false;
}
- r->read_len += (int)len;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += (int)len;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
}
else
- internal_fatal(true, "The line to read is too big! Already have %d bytes in read_buffer.", r->read_len);
+ internal_fatal(true, "The line to read is too big! Already have %zd bytes in read_buffer.", r->reader.read_len);
return true;
}
@@ -139,9 +139,9 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
// no decompressed data available
// read the compression signature of the next block
- if(unlikely(r->read_len + r->decompressor.signature_size > sizeof(r->read_buffer) - 1)) {
+ if(unlikely(r->reader.read_len + r->decompressor.signature_size > sizeof(r->reader.read_buffer) - 1)) {
internal_error(true, "The last incomplete line does not leave enough room for the next compression header! "
- "Already have %d bytes in read_buffer.", r->read_len);
+ "Already have %zd bytes in read_buffer.", r->reader.read_len);
return false;
}
@@ -149,7 +149,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
// we have to do a loop here, because read_stream() may return less than the data we need
int bytes_read = 0;
do {
- int ret = read_stream(r, r->read_buffer + r->read_len + bytes_read, r->decompressor.signature_size - bytes_read);
+ int ret = read_stream(r, r->reader.read_buffer + r->reader.read_len + bytes_read, r->decompressor.signature_size - bytes_read);
if (unlikely(ret <= 0))
return false;
@@ -161,11 +161,11 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
if(unlikely(bytes_read != (int)r->decompressor.signature_size))
fatal("read %d bytes, but expected compression signature of size %zu", bytes_read, r->decompressor.signature_size);
- size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->read_buffer + r->read_len, bytes_read);
+ size_t compressed_message_size = rrdpush_decompressor_start(&r->decompressor, r->reader.read_buffer + r->reader.read_len, bytes_read);
if (unlikely(!compressed_message_size)) {
internal_error(true, "multiplexed uncompressed data in compressed stream!");
- r->read_len += bytes_read;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += bytes_read;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
@@ -176,7 +176,7 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
}
// delete compression header from our read buffer
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_buffer[r->reader.read_len] = '\0';
// Read the entire compressed block of compressed data
char compressed[compressed_message_size];
@@ -207,13 +207,13 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_to_parse);
// fill read buffer with decompressed data
- size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1);
+ size_t len = (int) rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
if (unlikely(!len)) {
internal_error(true, "decompressor returned zero length #2");
return false;
}
- r->read_len += (int)len;
- r->read_buffer[r->read_len] = '\0';
+ r->reader.read_len += (int)len;
+ r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
@@ -226,19 +226,19 @@ static inline bool receiver_read_compressed(struct receiver_state *r) {
/* Produce a full line if one exists, statefully return where we start next time.
* When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
*/
-static inline char *receiver_next_line(struct receiver_state *r, char *buffer, size_t buffer_length, size_t *pos) {
- size_t start = *pos;
+inline char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size) {
+ size_t start = reader->pos;
- char *ss = &r->read_buffer[start];
- char *se = &r->read_buffer[r->read_len];
- char *ds = buffer;
- char *de = &buffer[buffer_length - 2];
+ char *ss = &reader->read_buffer[start];
+ char *se = &reader->read_buffer[reader->read_len];
+ char *ds = dst;
+ char *de = &dst[dst_size - 2];
if(ss >= se) {
*ds = '\0';
- *pos = 0;
- r->read_len = 0;
- r->read_buffer[r->read_len] = '\0';
+ reader->pos = 0;
+ reader->read_len = 0;
+ reader->read_buffer[reader->read_len] = '\0';
return NULL;
}
@@ -253,25 +253,25 @@ static inline char *receiver_next_line(struct receiver_state *r, char *buffer, s
*ds++ = *ss++; // copy the newline too
*ds = '\0';
- *pos = ss - r->read_buffer;
- return buffer;
+ reader->pos = ss - reader->read_buffer;
+ return dst;
}
// if the destination is full, oops!
if(ds == de) {
error("STREAM: received line exceeds %d bytes. Truncating it.", PLUGINSD_LINE_MAX);
*ds = '\0';
- *pos = ss - r->read_buffer;
- return buffer;
+ reader->pos = ss - reader->read_buffer;
+ return dst;
}
// no newline found in the r->read_buffer
// move everything to the beginning
- memmove(r->read_buffer, &r->read_buffer[start], r->read_len - start);
- r->read_len -= (int)start;
- r->read_buffer[r->read_len] = '\0';
+ memmove(reader->read_buffer, &reader->read_buffer[start], reader->read_len - start);
+ reader->read_len -= (int)start;
+ reader->read_buffer[reader->read_len] = '\0';
*ds = '\0';
- *pos = 0;
+ reader->pos = 0;
return NULL;
}
@@ -340,14 +340,12 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
- rpt->read_buffer[0] = '\0';
- rpt->read_len = 0;
+ buffered_reader_init(&rpt->reader);
- size_t read_buffer_start = 0;
char buffer[PLUGINSD_LINE_MAX + 2] = "";
while(!receiver_should_stop(rpt)) {
- if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
+ if(!buffered_reader_next_line(&rpt->reader, buffer, PLUGINSD_LINE_MAX + 2)) {
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
if(unlikely(!have_new_data)) {
diff --git a/streaming/replication.c b/streaming/replication.c
index 3ae00a10ba..cf4202f0b5 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -222,14 +222,14 @@ static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st, STRE
sizeof(PLUGINSD_KEYWORD_REPLAY_RRDDIM_STATE) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->last_collected_time.tv_sec * USEC_PER_SEC +
- (usec_t) rd->last_collected_time.tv_usec);
+ buffer_print_uint64_encoded(wb, encoding, (usec_t) rd->collector.last_collected_time.tv_sec * USEC_PER_SEC +
+ (usec_t) rd->collector.last_collected_time.tv_usec);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_int64_encoded(wb, encoding, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->last_calculated_value);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_calculated_value);
buffer_fast_strcat(wb, " ", 1);
- buffer_print_netdata_double_encoded(wb, encoding, rd->last_stored_value);
+ buffer_print_netdata_double_encoded(wb, encoding, rd->collector.last_stored_value);
buffer_fast_strcat(wb, "\n", 1);
}
rrddim_foreach_done(rd);
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index f998b60ce6..ca6a23c896 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -362,7 +362,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, "SET \"", 5);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "\" = ", 4);
- buffer_print_int64(wb, rd->collected_value);
+ buffer_print_int64(wb, rd->collector.collected_value);
buffer_fast_strcat(wb, "\n", 1);
}
else {
@@ -436,10 +436,10 @@ void rrddim_push_metrics_v2(RRDSET_STREAM_BUFFER *rsb, RRDDIM *rd, usec_t point_
buffer_fast_strcat(wb, PLUGINSD_KEYWORD_SET_V2 " '", sizeof(PLUGINSD_KEYWORD_SET_V2) - 1 + 2);
buffer_fast_strcat(wb, rrddim_id(rd), string_strlen(rd->id));
buffer_fast_strcat(wb, "' ", 2);
- buffer_print_int64_encoded(wb, integer_encoding, rd->last_collected_value);
+ buffer_print_int64_encoded(wb, integer_encoding, rd->collector.last_collected_value);
buffer_fast_strcat(wb, " ", 1);
- if((NETDATA_DOUBLE)rd->last_collected_value == n)
+ if((NETDATA_DOUBLE)rd->collector.last_collected_value == n)
buffer_fast_strcat(wb, "#", 1);
else
buffer_print_netdata_double_encoded(wb, doubles_encoding, n);
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 7ee2e1bf85..f97b7f6912 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -350,6 +350,19 @@ typedef struct stream_node_instance {
} STREAM_NODE_INSTANCE;
*/
+struct buffered_reader {
+ ssize_t read_len;
+ ssize_t pos;
+ char read_buffer[PLUGINSD_LINE_MAX + 1];
+};
+
+char *buffered_reader_next_line(struct buffered_reader *reader, char *dst, size_t dst_size);
+static inline void buffered_reader_init(struct buffered_reader *reader) {
+ reader->read_buffer[0] = '\0';
+ reader->read_len = 0;
+ reader->pos = 0;
+}
+
struct receiver_state {
RRDHOST *host;
pid_t tid;
@@ -371,8 +384,8 @@ struct receiver_state {
struct rrdhost_system_info *system_info;
STREAM_CAPABILITIES capabilities;
time_t last_msg_t;
- char read_buffer[PLUGINSD_LINE_MAX + 1];
- int read_len;
+
+ struct buffered_reader reader;
uint16_t hops;