summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-22 22:42:16 +0200
committerGitHub <noreply@github.com>2022-11-22 22:42:16 +0200
commit4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch)
tree4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming/rrdpush.h
parent77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff)
Replication fixes #3 (#14035)
* cleanup and additional information about replication * fix deadlock on sender mutex * do not ignore start streaming empty requests; when there duplicate requests, merge them * flipped the flag * final touch * added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h34
1 files changed, 25 insertions, 9 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index ed3b30bc5c..079c63acc5 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -159,20 +159,36 @@ struct sender_state {
struct compressor_state *compressor;
#endif
#ifdef ENABLE_HTTPS
- struct netdata_ssl ssl; // Structure used to encrypt the connection
+ struct netdata_ssl ssl; // structure used to encrypt the connection
#endif
- DICTIONARY *replication_requests;
- size_t replication_pending_requests;
- time_t replication_first_time;
- time_t replication_min_time;
- size_t replication_sender_buffer_percent_used;
- bool replication_reached_max;
+ DICTIONARY *replication_requests; // de-duplication of replication requests, per chart
- usec_t last_flush_time_ut;
- size_t receiving_metrics;
+ size_t replication_pending_requests; // the currently outstanding replication requests
+ size_t replication_charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+
+ time_t replication_first_time; // the oldest time that has been requested to be replicated
+ time_t replication_current_time; // the minimum(before) of the executed replication requests
+
+ bool replication_reached_max; // used to avoid resetting the replication thread too frequently
+
+ size_t buffer_used_percentage; // the current utilization of the sending buffer
+ usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
};
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
+#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->last_flush_time_ut), __ATOMIC_RELAXED)
+
+#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication_charts_replicating), __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication_charts_replicating), 0, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication_pending_requests), __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication_pending_requests), 0, __ATOMIC_RELAXED)
+
struct receiver_state {
RRDHOST *host;
netdata_thread_t thread;