summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorStelios Fragkakis <52996999+stelfrag@users.noreply.github.com>2022-11-15 23:00:53 +0200
committerGitHub <noreply@github.com>2022-11-15 23:00:53 +0200
commit224b051a2b2bab39a4b536e531ab9ca590bf31bb (patch)
treeadb3ca35d6d6a4d4f1b7aad50542619c3efb38c0 /streaming
parentb4a0298bd48f217c4a6f2eaf729e0684966ea7a3 (diff)
New journal disk based indexing for agent memory reduction (#13885)
* Add read only option to netdata_mmap so files are accessed ousing PROT_READ * Initial functions to write the new journal file and switch to the new indexing * Cleanup code, add parameters to pg_cache_punch_hole to avoid updating page latets oldest times pg_cache insert to have parameter if page index locked needs to be done Page eviction functions will try to deallocate the descriptor as well (pg_cache_punch_hole without page_index time updates) Cleanup messages during startup * Cleanup messages during startup * Disbale extent caching for now, add placeholder for journal indexing and activation while the agent is running * Add main function to populate descriptors by checking the new journal indexing * prevent crash * fix for binary search crash * Avoid Time-of-check time-of-use filesystem race condition * always add a page * populate fixes - it is still incomplete * pg_cache_insert returns the descriptor that ends up in the page_index * Add populate next (Fix 1) * Fix compilation warnings, reactivate extent caching * Add populate next (Fix 2) * Add populate next (Fix 3) switch to the next entry or journal file when asking to populate descriptor with next * Fix resource leak and wrong sizeof * Rework page population (part 1) * Additional checksums added / journal validation * Cleanup (part 1) * Locking added and Cleanup (part 2) * Close journal file after new journal index activation * Skip warning when compiling without NETDATA_INTERNAL_CHECKS * Ignore empty index file (header and trailer and no metrics) * Try to remove all evicted descriptors (may prevent slight memory increase) * Evict pages also when we succesfully do try_reserve * Precache pages and cleanup * Add a separate cleanup thread to release unused descriptors * Check existence of key correctly * Fix total file size calculation * Statistics for journal descriptors * Track and release jourval v2 descriptors * Do not try to allocate pages for locality if under pressure * Do not track v2 descriptors when populating the page_index * Track page descriptors as they are inserted in the page index (per journal file) Scan journal files for pending items to cleanup Cleanup v2 descriptors only if they are not populated Check before adding to page cache to avoid memory allocation /free * Close journal file that has been processed and migrated to the new index Check for valid file before trying to truncate / close. This file has been closed during startup * Better calculation for the number of prefetched data pages based on the query end time Code cleanup and comments Add v2 populated descriptor expiration based on journal access time * Code cleanup * Faster indexing Better journal validation (more sanity checks) Detect new datafile/ journal creation and trigger index generation Switch to the new index / mark descriptors in memory as needed Update journal access time when a descriptor is returned Code cleanup (part 1) * Re activate descriptor clean Code cleanup * Allow locality precaching * Allow locality precaching for the same page alignment * Descriptor cleanup internal changed * Disable locality precaching * Precache only if not under pressure / internal cleanup at 60 seconds * Remove unused functions * Migrate on startup always Make sure the metric uuid is valid (we have a page_index) Prevent crash if no datafile is available when logging an error Remove unused functions * New warn limit for precaching Stress test v2 descriptor cleanup - Every 1s cleanup if it doesnt exist in cache - 60s cache eviction * Arrayalloc internal checks on free activated with NETDATA_ARRAYALLOC_INTERNAL_CHECKS Ability to add DESCRIPTOR_EXPIRATION_TIME and DESCRIPTOR_INTERVAL_CLEANUP during compile Defaults DESCRIPTOR_INTERVAL_CLEANUP = 60 and DESCRIPTOR_EXPIRATION_TIME = 600 * Lookup page index correctly * Calculate index time once * Detect a duplicate page when doing cache insert and during flushing of pages * Better logging * Descriptor validation (extent vs page index) when building an index file while the agent is running * Mark invalid entries in the journal v2 file * Schedule an index rebuild if a descriptor is found without an extent in the timerange we are processing Release descriptor lock to prevent random shutdown locks * Proper unlock * Skip descriptor cleanup when journal file v2 migration is running * Fix page cache statistics Remove multiple entries of the page_index from the page cache Cleanup * Adjust preload pages on pg_cache_next. Handle invalid descriptor properly Unlock properly * Better handling of invalid pages Journal indexing during runtime will scan all files to find potential ones to index * Reactivate migration on startup Evict descriptors to cause migration Don't count the entries in page index (calculate when processing the extent list) Check for valid extent since we may set the extent to NULL on startup if it is invalid Better structure init Address valgrind issues * Add don't fork/dump option * Add separate lock to protect accessing a datafile's extent list Comment out some unused code (for now) Abort descriptor cleanup if we are force flushing pages (page cache under pressure) * Check for index and schedule when data flush completes Configure max datafile size during compilation Keep a separate JudyL array for descriptors Skip quota test if we are deleting descriptors or explicitly flushing pages under pressure * Fix * set function when waiters are waken up * add the line number to trace the deadlock * add thread id * add wait list * init to zero * disable thread cancelability inside dbengine rrdeng_load_page_next() * make sure the owner is the thread * disable thread cancelability for replication as a whole * Check and queue indexing after first page flush * Queue indexing after a small delay to allow some time for page flushing * tracing of waiters only when compiled with internal checks * Mark descr with extent_entry * Return page timeout * Check if a journalfile is ready to be indexed Migrate the descriptors or evict if possible Compilation warning fix * Use page index if indexing during startup Mark if journalfile should be checked depending on whether we can migrate or delete a page during indexing * require 3x max message size as sender buffer * fix for the msg of the adaptive buffer size * fix for the msg of the duplicate replication commands * Disable descriptor deletion during migration * Detect descriptor with same start page time * sender sorts replication requests before fullfilling them; receiver does not send duplicate replication requests * dbengine never allows past timestamps to be collected * do not accept values same as last data point stored in dbengine * replicate non-overlapping ranges * a better replication logic to avoid sending overlapping data to parents * Do not start journal migration in parallel * Always update page index times * Fix page index first / last times on load * internal log when replication responses do not match the requests or when replication commands are sent while there are others inflight * do not log out of bounds RBEGIN if it is the last replication command we sent * better checking of past data collection points * better checking of past data collection points - optimized * fix corruption during decompression of streaming * Add config to disable journal indexing Add config parameter for detailed journal integrity check (Metric chain validation check during startup) pg cache insert drop check for existing page Fix crc calculation for metric headers * children disable compression globally, only when the compression gives an error * turn boolean member into RRDHOST OPTION * Compilation warnings * Remove unused code * replication sender statistics * replication sender statistics set to 100% when no replication requests are pending * Fix casting warning Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/compression.c1
-rw-r--r--streaming/receiver.c25
-rw-r--r--streaming/replication.c16
-rw-r--r--streaming/rrdpush.h2
-rw-r--r--streaming/sender.c184
5 files changed, 166 insertions, 62 deletions
diff --git a/streaming/compression.c b/streaming/compression.c
index 1fddc02b91..9fa69c5cd2 100644
--- a/streaming/compression.c
+++ b/streaming/compression.c
@@ -5,6 +5,7 @@
#define STREAM_COMPRESSION_MSG "STREAM_COMPRESSION"
+// signature MUST end with a newline
#define SIGNATURE ((uint32_t)('z' | 0x80) | (0x80 << 8) | (0x80 << 16) | ('\n' << 24))
#define SIGNATURE_MASK ((uint32_t)0xff | (0x80 << 8) | (0x80 << 16) | (0xff << 24))
#define SIGNATURE_SIZE 4
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 40673f05b4..a872642a44 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -240,8 +240,11 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
return 0;
}
+ // for compressed streams, the compression signature header ends with a new line
+ // so, here we read a single line from the stream.
+
int ret = 0;
- if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) {
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) {
internal_error(true, "read_stream() failed (1).");
return 1;
}
@@ -284,7 +287,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
}
// Fill read buffer with decompressed data
- r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
+ r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len);
return 0;
}
@@ -724,8 +727,15 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_set_is_parent_label(++localhost->senders_count);
- rrdcontext_host_child_connected(rpt->host);
+ if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
+ RRDSET *st;
+ rrdset_foreach_read(st, rpt->host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ }
+ rrdset_foreach_done(st);
+ }
+ rrdcontext_host_child_connected(rpt->host);
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
@@ -746,6 +756,15 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
rpt->hostname, rpt->client_ip, rpt->client_port, count);
+ if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
+ RRDSET *st;
+ rrdset_foreach_read(st, rpt->host) {
+ rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
+ rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
+ }
+ rrdset_foreach_done(st);
+ }
+
rrdcontext_host_child_disconnected(rpt->host);
#ifdef ENABLE_ACLK
diff --git a/streaming/replication.c b/streaming/replication.c
index ef384f4e8f..39a115d44b 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -42,7 +42,7 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
rrddim_foreach_done(rd);
}
- time_t now = after, actual_after = 0, actual_before = 0;
+ time_t now = after + 1, actual_after = 0, actual_before = 0;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
for (size_t i = 0; i < dimensions && data[i].rd; i++) {
@@ -252,6 +252,18 @@ static bool send_replay_chart_cmd(send_command callback, void *callback_data, RR
}
#endif
+#ifdef NETDATA_INTERNAL_CHECKS
+ internal_error(
+ st->replay.after != 0 || st->replay.before != 0,
+ "REPLAY: host '%s', chart '%s': sending replication request, while there is another inflight",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st)
+ );
+
+ st->replay.start_streaming = start_streaming;
+ st->replay.after = after;
+ st->replay.before = before;
+#endif
+
debug(D_REPLICATION, PLUGINSD_KEYWORD_REPLAY_CHART " \"%s\" \"%s\" %llu %llu\n",
rrdset_id(st), start_streaming ? "true" : "false", (unsigned long long)after, (unsigned long long)before);
@@ -277,7 +289,7 @@ bool replicate_chart_request(send_command callback, void *callback_data, RRDHOST
// if replication is disabled, send an empty replication request
// asking no data
- if (!host->rrdpush_enable_replication) {
+ if (unlikely(!rrdhost_option_check(host, RRDHOST_OPTION_REPLICATION))) {
internal_error(true,
"REPLAY: host '%s', chart '%s': sending empty replication request because replication is disabled",
rrdhost_hostname(host), rrdset_id(st));
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 819a94cd27..5b277cf7ee 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -170,6 +170,8 @@ struct sender_state {
#endif
DICTIONARY *replication_requests;
+ time_t replication_first_time;
+ time_t replication_min_time;
};
struct receiver_state {
diff --git a/streaming/sender.c b/streaming/sender.c
index e0964f9e3a..b34132b75c 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -21,9 +21,14 @@
#define WORKER_SENDER_JOB_BUFFER_RATIO 15
#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
#define WORKER_SENDER_JOB_BYTES_SENT 17
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 18
-#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 18
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 18
+#define WORKER_SENDER_JOB_REPLAY_RESPONSE 19
+#define WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE 20
+#define WORKER_SENDER_JOB_REPLAY_COMPLETION 21
+#define WORKER_SENDER_JOB_FUNCTION 22
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 23
+#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 23
#endif
extern struct config stream_config;
@@ -81,6 +86,8 @@ static inline void deactivate_compression(struct sender_state *s) {
}
#endif
+#define SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE 3
+
// Collector thread finishing a transmission
void sender_commit(struct sender_state *s, BUFFER *wb) {
@@ -100,15 +107,15 @@ void sender_commit(struct sender_state *s, BUFFER *wb) {
netdata_mutex_lock(&s->mutex);
- if(unlikely(s->host->sender->buffer->max_size < (buffer_strlen(wb) + 1) * 2)) {
- error("STREAM %s [send to %s]: max buffer size of %zu is too small for data of size %zu. Increasing the max buffer size to twice the max data size.",
- rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1);
+ if(unlikely(s->host->sender->buffer->max_size < (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE)) {
+ info("STREAM %s [send to %s]: max buffer size of %zu is too small for a data message of size %zu. Increasing the max buffer size to %d times the max data message size.",
+ rrdhost_hostname(s->host), s->connected_to, s->host->sender->buffer->max_size, buffer_strlen(wb) + 1, SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE);
- s->host->sender->buffer->max_size = (buffer_strlen(wb) + 1) * 2;
+ s->host->sender->buffer->max_size = (src_len + 1) * SENDER_BUFFER_ADAPT_TO_TIMES_MAX_SIZE;
}
#ifdef ENABLE_COMPRESSION
- if (s->flags & SENDER_FLAG_COMPRESSION && s->compressor) {
+ if (stream_has_capability(s, STREAM_CAP_COMPRESSION) && s->compressor) {
while(src_len) {
size_t size_to_compress = src_len;
@@ -468,7 +475,7 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
#ifdef ENABLE_COMPRESSION
// If we don't want compression, remove it from our capabilities
- if(!(s->flags & SENDER_FLAG_COMPRESSION) && stream_has_capability(s, STREAM_CAP_COMPRESSION))
+ if(!(s->flags & SENDER_FLAG_COMPRESSION))
s->capabilities &= ~STREAM_CAP_COMPRESSION;
#endif // ENABLE_COMPRESSION
@@ -664,20 +671,12 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
#ifdef ENABLE_COMPRESSION
- // if the stream does not have compression capability,
- // shut it down for us too.
- // FIXME - this means that if there are multiple parents and one of them does not support compression
- // we are going to shut it down for all of them eventually...
- if(!stream_has_capability(s, STREAM_CAP_COMPRESSION))
- s->flags &= ~SENDER_FLAG_COMPRESSION;
-
- if(s->flags & SENDER_FLAG_COMPRESSION) {
- if(s->compressor)
+ if(stream_has_capability(s, STREAM_CAP_COMPRESSION)) {
+ if(!s->compressor)
+ s->compressor = create_compressor();
+ else
s->compressor->reset(s->compressor);
}
- else
- info("STREAM %s [send to %s]: compression is disabled on this connection.", rrdhost_hostname(host), s->connected_to);
-
#endif //ENABLE_COMPRESSION
log_sender_capabilities(s);
@@ -881,6 +880,8 @@ void execute_commands(struct sender_state *s) {
const char *keyword = get_word(words, num_words, 0);
if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_FUNCTION);
+
char *transaction = get_word(words, num_words, 1);
char *timeout_s = get_word(words, num_words, 2);
char *function = get_word(words, num_words, 3);
@@ -909,7 +910,10 @@ void execute_commands(struct sender_state *s) {
stream_execute_function_callback(wb, code, tmp);
}
}
- } else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ }
+ else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+
const char *chart_id = get_word(words, num_words, 1);
const char *start_streaming = get_word(words, num_words, 2);
const char *after = get_word(words, num_words, 3);
@@ -932,7 +936,8 @@ void execute_commands(struct sender_state *s) {
};
dictionary_set(s->replication_requests, chart_id, &tmp, sizeof(struct replication_request));
}
- } else {
+ }
+ else {
error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
}
@@ -1045,10 +1050,12 @@ static bool replication_request_conflict_callback(const DICTIONARY_ITEM *item, v
struct replication_request *rr = old_value;
struct replication_request *rr_new = new_value;
- error("STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
- rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
- (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
- (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
+ internal_error(
+ true,
+ "STREAM %s [send to %s]: duplicate replication command received for chart '%s' (existing from %llu to %llu [%s], new from %llu to %llu [%s])",
+ rrdhost_hostname(s->host), s->connected_to, dictionary_acquired_item_name(item),
+ (unsigned long long)rr->after, (unsigned long long)rr->before, rr->start_streaming?"true":"false",
+ (unsigned long long)rr_new->after, (unsigned long long)rr_new->before, rr_new->start_streaming?"true":"false");
bool updated = false;
@@ -1092,6 +1099,8 @@ void sender_init(RRDHOST *host)
host->sender->flags |= SENDER_FLAG_COMPRESSION;
host->sender->compressor = create_compressor();
}
+ else
+ host->sender->flags &= ~SENDER_FLAG_COMPRESSION;
#endif
netdata_mutex_init(&host->sender->mutex);
@@ -1110,45 +1119,98 @@ static size_t sender_buffer_used_percent(struct sender_state *s) {
return (s->host->sender->buffer->max_size - available) * 100 / s->host->sender->buffer->max_size;
}
+int replication_request_compar(const DICTIONARY_ITEM **item1, const DICTIONARY_ITEM **item2) {
+ struct replication_request *rr1 = dictionary_acquired_item_value(*item1);
+ struct replication_request *rr2 = dictionary_acquired_item_value(*item2);
+
+ time_t after1 = rr1->after;
+ time_t after2 = rr2->after;
+
+ if(after1 < after2)
+ return -1;
+ if(after1 > after2)
+ return 1;
+
+ return 0;
+}
+
+int process_one_replication_request(const DICTIONARY_ITEM *item, void *value, void *data) {
+ struct sender_state *s = data;
+
+ size_t used_percent = sender_buffer_used_percent(s);
+ if(used_percent >= 50) return -1; // signal the traversal to stop
+
+ worker_is_busy(WORKER_SENDER_JOB_REPLAY_RESPONSE);
+
+ struct replication_request *rr = value;
+ const char *name = dictionary_acquired_item_name(item);
+
+ // delete it from the dictionary
+ // the current item is referenced - it will not go away until the next iteration of the dfe loop
+ dictionary_del(s->replication_requests, name);
+
+ // find the chart
+ RRDSET *st = rrdset_find(s->host, name);
+ if(unlikely(!st)) {
+ internal_error(true,
+ "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
+ , rrdhost_hostname(s->host), s->connected_to, name);
+ return 0;
+ }
+
+ if(rr->after < s->replication_first_time || !s->replication_first_time)
+ s->replication_first_time = rr->after;
+
+ if(rr->before < s->replication_min_time || !s->replication_min_time)
+ s->replication_min_time = rr->before;
+
+ netdata_thread_disable_cancelability();
+
+ // send the replication data
+ bool start_streaming = replicate_chart_response(st->rrdhost, st,
+ rr->start_streaming, rr->after, rr->before);
+
+ netdata_thread_enable_cancelability();
+
+ // enable normal streaming if we have to
+ if (start_streaming) {
+ debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
+ rrdhost_hostname(s->host), rrdset_id(st));
+
+ rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ }
+
+ return 1;
+}
+
static void process_replication_requests(struct sender_state *s) {
- if(dictionary_entries(s->replication_requests) == 0)
- return;
+ size_t entries = dictionary_entries(s->replication_requests);
- struct replication_request *rr;
- dfe_start_write(s->replication_requests, rr) {
- size_t used_percent = sender_buffer_used_percent(s);
- if(used_percent > 50) break;
-
- // delete it from the dictionary
- // the current item is referenced - it will not go away until the next iteration of the dfe loop
- dictionary_del(s->replication_requests, rr_dfe.name);
-
- // find the chart
- RRDSET *st = rrdset_find(s->host, rr_dfe.name);
- if(unlikely(!st)) {
- internal_error(true,
- "STREAM %s [send to %s]: cannot find chart '%s' to satisfy pending replication command."
- , rrdhost_hostname(s->host), s->connected_to, rr_dfe.name);
- continue;
- }
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, (NETDATA_DOUBLE)entries);
- netdata_thread_disable_cancelability();
+ if(!entries) {
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0);
+ return;
+ }
- // send the replication data
- bool start_streaming = replicate_chart_response(st->rrdhost, st,
- rr->start_streaming, rr->after, rr->before);
+ s->replication_min_time = 0;
- netdata_thread_enable_cancelability();
+ int count = dictionary_sorted_walkthrough_rw(s->replication_requests, DICTIONARY_LOCK_WRITE,
+ process_one_replication_request, s,
+ replication_request_compar);
- // enable normal streaming if we have to
- if (start_streaming) {
- debug(D_REPLICATION, "Enabling metric streaming for chart %s.%s",
- rrdhost_hostname(s->host), rrdset_id(st));
+ if(count != 0 && s->replication_min_time && s->replication_first_time) {
+ time_t now = now_realtime_sec();
+ if(now > s->replication_first_time && now >= s->replication_min_time) {
+ time_t completed = s->replication_min_time - s->replication_first_time;
+ time_t all_duration = now - s->replication_first_time;
- rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED);
+ NETDATA_DOUBLE percent = (NETDATA_DOUBLE) completed * 100.0 / (NETDATA_DOUBLE) all_duration;
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, percent);
}
}
- dfe_done(rr);
+
+ worker_is_idle();
}
void *rrdpush_sender_thread(void *ptr) {
@@ -1171,9 +1233,17 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+ worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
+ worker_register_job_name(WORKER_SENDER_JOB_REPLAY_RESPONSE, "replay response");
+ worker_register_job_name(WORKER_SENDER_JOB_FUNCTION, "function");
+
worker_register_job_custom_metric(WORKER_SENDER_JOB_BUFFER_RATIO, "used buffer ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_RECEIVED, "bytes received", "bytes/s", WORKER_METRIC_INCREMENTAL);
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_SENT, "bytes sent", "bytes/s", WORKER_METRIC_INCREMENTAL);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+ worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_QUEUE_SIZE, "replications pending", "commands", WORKER_METRIC_ABSOLUTE);
+
+ worker_set_metric(WORKER_SENDER_JOB_REPLAY_COMPLETION, 100.0);
struct sender_state *s = ptr;
s->tid = gettid();