summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-06-03 08:38:25 +0200
committerGitHub <noreply@github.com>2020-06-03 08:38:25 +0200
commit49719a961d6c079004b65458ea8c5e08ada1c44c (patch)
tree258b25ac60c403696a72b1589d5fa8634dfc6764 /database
parent1aa2cd7c43f6dd68b4bb43a87eb8b2995687ca9c (diff)
Fix bugs in streaming and enable support for gap filling (#9214)
This PR adds (inactive) support that we will use to fill the gaps on chart when a receiving agent goes offline and the sender reconnects. The streaming component has been reworked to make the connection bi-directional and fix several outstanding bugs in the area. * Fixed an incorrect case of version negotiation. Removed fatal() on exhaustion of fds. * Fixed cases that fell through to polling the socket after closing. * Fixed locking of data related to sender and receiver in the host structure. * Added fine-grained locks to reduce contention. * Added circular buffer to sender to prevent starvation in high-latency conditions. * Fixed case where agent is a proxy and negotiated different streaming versions with sender and receiver. * Changed interface to new parser to put the buffering code in streaming. * Fixed the bug that stopped senders from reconnecting after their socket times out - this was part of the scaling fixes that provide an early shortcut path for rejecting connections without lock contention. * Uses fine-grained locking and a different approach to thread shutdown instead. * Added liveness detection to connections to allow selection of the best connection.
Diffstat (limited to 'database')
-rw-r--r--database/rrd.h14
-rw-r--r--database/rrdhost.c43
2 files changed, 44 insertions, 13 deletions
diff --git a/database/rrd.h b/database/rrd.h
index 8fb6ef14a6..7e1df252a8 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -29,6 +29,7 @@ struct pg_cache_page_index;
#include "rrddimvar.h"
#include "rrdcalc.h"
#include "rrdcalctemplate.h"
+#include "../streaming/rrdpush.h"
#define UPDATE_EVERY 1
#define UPDATE_EVERY_MAX 3600
@@ -692,6 +693,7 @@ struct rrdhost {
// the following are state information for the threading
// streaming metrics from this netdata to an upstream netdata
+ struct sender_state *sender;
volatile unsigned int rrdpush_sender_spawn:1; // 1 when the sender thread has been spawn
netdata_thread_t rrdpush_sender_thread; // the sender thread
@@ -703,13 +705,10 @@ struct rrdhost {
SIMPLE_PATTERN *rrdpush_send_charts_matching; // pattern to match the charts to be sent
- // metrics may be collected asynchronously
- // these synchronize all the threads willing the write to our sending buffer
- netdata_mutex_t rrdpush_sender_buffer_mutex; // exclusive access to rrdpush_sender_buffer
int rrdpush_sender_pipe[2]; // collector to sender thread signaling
- BUFFER *rrdpush_sender_buffer; // collector fills it, sender sends it
+ //BUFFER *rrdpush_sender_buffer; // collector fills it, sender sends it
- uint32_t stream_version; //Set the current version of the stream.
+ //uint32_t stream_version; //Set the current version of the stream.
// ------------------------------------------------------------------------
// streaming of data from remote hosts - rrdpush
@@ -719,6 +718,9 @@ struct rrdhost {
time_t senders_disconnected_time; // the time the last sender was disconnected
+ struct receiver_state *receiver;
+ netdata_mutex_t receiver_lock;
+
// ------------------------------------------------------------------------
// health monitoring options
@@ -974,6 +976,8 @@ static inline time_t rrdset_first_entry_t(RRDSET *st) {
}
}
+time_t rrdhost_last_entry_t(RRDHOST *h);
+
// get the last slot updated in the round robin database
#define rrdset_last_slot(st) ((size_t)(((st)->current_entry == 0) ? (st)->entries - 1 : (st)->current_entry - 1))
diff --git a/database/rrdhost.c b/database/rrdhost.c
index 1b5e460d02..c4613e3642 100644
--- a/database/rrdhost.c
+++ b/database/rrdhost.c
@@ -139,6 +139,11 @@ RRDHOST *rrdhost_create(const char *hostname,
host->disk_space_mb = default_rrdeng_disk_quota_mb;
#endif
host->health_enabled = (memory_mode == RRD_MEMORY_MODE_NONE)? 0 : health_enabled;
+
+ host->sender = mallocz(sizeof(*host->sender));
+ sender_init(host->sender, host);
+ netdata_mutex_init(&host->receiver_lock);
+
host->rrdpush_send_enabled = (rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) ? 1 : 0;
host->rrdpush_send_destination = (host->rrdpush_send_enabled)?strdupz(rrdpush_destination):NULL;
host->rrdpush_send_api_key = (host->rrdpush_send_enabled)?strdupz(rrdpush_api_key):NULL;
@@ -148,7 +153,7 @@ RRDHOST *rrdhost_create(const char *hostname,
host->rrdpush_sender_pipe[1] = -1;
host->rrdpush_sender_socket = -1;
- host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
+ //host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused?
#ifdef ENABLE_HTTPS
host->ssl.conn = NULL;
host->ssl.flags = NETDATA_SSL_START;
@@ -156,7 +161,6 @@ RRDHOST *rrdhost_create(const char *hostname,
host->stream_ssl.flags = NETDATA_SSL_START;
#endif
- netdata_mutex_init(&host->rrdpush_sender_buffer_mutex);
netdata_rwlock_init(&host->rrdhost_rwlock);
netdata_rwlock_init(&host->labels_rwlock);
@@ -407,7 +411,7 @@ RRDHOST *rrdhost_find_or_create(
}
else {
host->health_enabled = health_enabled;
- host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION;
+ //host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused?
if(strcmp(host->hostname, hostname) != 0) {
info("Host '%s' has been renamed to '%s'. If this is not intentional it may mean multiple hosts are using the same machine_guid.", host->hostname, hostname);
@@ -455,7 +459,7 @@ inline int rrdhost_should_be_removed(RRDHOST *host, RRDHOST *protected, time_t n
if(host != protected
&& host != localhost
&& rrdhost_flag_check(host, RRDHOST_FLAG_ORPHAN)
- && !host->connected_senders
+ && host->receiver
&& host->senders_disconnected_time
&& host->senders_disconnected_time + rrdhost_free_orphan_time < now)
return 1;
@@ -601,8 +605,13 @@ void rrdhost_free(RRDHOST *host) {
rrd_check_wrlock(); // make sure the RRDs are write locked
- // stop a possibly running thread
- rrdpush_sender_thread_stop(host);
+ // ------------------------------------------------------------------------
+ // clean up the sender
+ rrdpush_sender_thread_stop(host); // stop a possibly running thread
+ cbuffer_free(host->sender->buffer);
+ buffer_free(host->sender->build);
+ freez(host->sender);
+ host->sender = NULL;
rrdhost_wrlock(host); // lock this RRDHOST
@@ -668,6 +677,8 @@ void rrdhost_free(RRDHOST *host) {
else error("Request to free RRDHOST '%s': cannot find it", host->hostname);
}
+
+
// ------------------------------------------------------------------------
// free it
@@ -1193,11 +1204,12 @@ void reload_host_labels()
health_label_log_save(localhost);
rrdhost_unlock(localhost);
+/* TODO-GAPS - fix this so that it looks properly at the state and version of the sender
if(localhost->rrdpush_send_enabled && localhost->rrdpush_sender_buffer){
localhost->labels_flag |= LABEL_FLAG_UPDATE_STREAM;
rrdpush_send_labels(localhost);
}
-
+*/
health_reload();
}
@@ -1283,7 +1295,7 @@ void rrdhost_cleanup_all(void) {
RRDHOST *host;
rrdhost_foreach_read(host) {
- if(host != localhost && rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS) && !host->connected_senders)
+ if(host != localhost && rrdhost_flag_check(host, RRDHOST_FLAG_DELETE_OBSOLETE_CHARTS) && !host->receiver)
rrdhost_delete_charts(host);
else
rrdhost_cleanup_charts(host);
@@ -1482,3 +1494,18 @@ int alarm_compare_name(void *a, void *b) {
return strcmp(in1->name,in2->name);
}
+
+// Added for gap-filling, if this proves to be a bottleneck in large-scale systems then we will need to cache
+// the last entry times as the metric updates, but let's see if it is a problem first.
+time_t rrdhost_last_entry_t(RRDHOST *h) {
+ rrdhost_rdlock(h);
+ RRDSET *st;
+ time_t result = 0;
+ rrdset_foreach_read(st, h) {
+ time_t st_last = rrdset_last_entry_t(st);
+ if (st_last > result)
+ result = st_last;
+ }
+ rrdhost_unlock(h);
+ return result;
+}