summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-29 16:03:40 +0200
committerGitHub <noreply@github.com>2022-11-29 16:03:40 +0200
commit462988dac901e95e765cd6be2dc24a5c33595526 (patch)
tree601474721ffe5cbe7885faa1b78947ce467d2e89 /streaming/rrdpush.h
parent009029052f54224b2387e652a6a81a9887008b15 (diff)
replication fixes No 8 (#14061)
* replication requests with start_streaming=true are executed immediately upon reception, instead of being placed in the queue * disable thread cancelability while workers cleanup * remove obsolete worker from replication * multi-threaded replication with netdata.conf option to set number of replication threads * revert spinlock to mutex * separate worker and main thread worker jobs * restart the queue every 10 seconds only * use atomic for sender buffer percentage * reset the queue position after sleeping * use sender resets to sleep properly * fix condition * cleanup sender members related to replication
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h45
1 files changed, 27 insertions, 18 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 079c63acc5..c5f7618c13 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -162,32 +162,41 @@ struct sender_state {
struct netdata_ssl ssl; // structure used to encrypt the connection
#endif
- DICTIONARY *replication_requests; // de-duplication of replication requests, per chart
+ struct {
+ DICTIONARY *requests; // de-duplication of replication requests, per chart
- size_t replication_pending_requests; // the currently outstanding replication requests
- size_t replication_charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+ struct {
+ size_t pending_requests; // the currently outstanding replication requests
+ size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart)
+ } atomic;
- time_t replication_first_time; // the oldest time that has been requested to be replicated
- time_t replication_current_time; // the minimum(before) of the executed replication requests
+ struct {
+ bool reached_max; // used to avoid resetting the replication thread too frequently
+ } unsafe; // protected by sender mutex
- bool replication_reached_max; // used to avoid resetting the replication thread too frequently
+ } replication;
- size_t buffer_used_percentage; // the current utilization of the sending buffer
- usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ struct {
+ size_t buffer_used_percentage; // the current utilization of the sending buffer
+ usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ } atomic;
};
-#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
-#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->last_flush_time_ut), __ATOMIC_RELAXED)
+#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED);
+#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED)
-#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication_charts_replicating), __ATOMIC_RELAXED)
-#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED)
-#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED)
-#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication_charts_replicating), 0, __ATOMIC_RELAXED)
+#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED);
+#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED)
-#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication_pending_requests), __ATOMIC_RELAXED)
-#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED)
-#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED)
-#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication_pending_requests), 0, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED)
+
+#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED)
+#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED)
struct receiver_state {
RRDHOST *host;