diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-29 16:03:40 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-29 16:03:40 +0200 |
commit | 462988dac901e95e765cd6be2dc24a5c33595526 (patch) | |
tree | 601474721ffe5cbe7885faa1b78947ce467d2e89 /streaming/rrdpush.h | |
parent | 009029052f54224b2387e652a6a81a9887008b15 (diff) |
replication fixes No 8 (#14061)
* replication requests with start_streaming=true are executed immediately upon reception, instead of being placed in the queue
* disable thread cancelability while workers cleanup
* remove obsolete worker from replication
* multi-threaded replication with netdata.conf option to set number of replication threads
* revert spinlock to mutex
* separate worker and main thread worker jobs
* restart the queue every 10 seconds only
* use atomic for sender buffer percentage
* reset the queue position after sleeping
* use sender resets to sleep properly
* fix condition
* cleanup sender members related to replication
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 45 |
1 files changed, 27 insertions, 18 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 079c63acc5..c5f7618c13 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -162,32 +162,41 @@ struct sender_state { struct netdata_ssl ssl; // structure used to encrypt the connection #endif - DICTIONARY *replication_requests; // de-duplication of replication requests, per chart + struct { + DICTIONARY *requests; // de-duplication of replication requests, per chart - size_t replication_pending_requests; // the currently outstanding replication requests - size_t replication_charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + struct { + size_t pending_requests; // the currently outstanding replication requests + size_t charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + } atomic; - time_t replication_first_time; // the oldest time that has been requested to be replicated - time_t replication_current_time; // the minimum(before) of the executed replication requests + struct { + bool reached_max; // used to avoid resetting the replication thread too frequently + } unsafe; // protected by sender mutex - bool replication_reached_max; // used to avoid resetting the replication thread too frequently + } replication; - size_t buffer_used_percentage; // the current utilization of the sending buffer - usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + struct { + size_t buffer_used_percentage; // the current utilization of the sending buffer + usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + } atomic; }; -#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); -#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->last_flush_time_ut), __ATOMIC_RELAXED) +#define rrdpush_sender_set_buffer_used_percent(sender, value) __atomic_store_n(&((sender)->atomic.buffer_used_percentage), value, __ATOMIC_RELAXED); +#define rrdpush_sender_get_buffer_used_percent(sender) __atomic_load_n(&((sender)->atomic.buffer_used_percentage), __ATOMIC_RELAXED) -#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication_charts_replicating), __ATOMIC_RELAXED) -#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED) -#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED) -#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication_charts_replicating), 0, __ATOMIC_RELAXED) +#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->atomic.last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); +#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->atomic.last_flush_time_ut), __ATOMIC_RELAXED) -#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication_pending_requests), __ATOMIC_RELAXED) -#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED) -#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED) -#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication_pending_requests), 0, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication.atomic.charts_replicating), __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication.atomic.charts_replicating), 0, __ATOMIC_RELAXED) + +#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication.atomic.pending_requests), __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication.atomic.pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication.atomic.pending_requests), 0, __ATOMIC_RELAXED) struct receiver_state { RRDHOST *host; |