diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-22 22:42:16 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-22 22:42:16 +0200 |
commit | 4e61a4244e2ab45c29de0ddd84bfec8d9339f388 (patch) | |
tree | 4882db8ee7bd9de3251960320c7f553edc52b6c9 /streaming/rrdpush.h | |
parent | 77a304f52e4c6aadef0eac06b4869b7e1c829175 (diff) |
Replication fixes #3 (#14035)
* cleanup and additional information about replication
* fix deadlock on sender mutex
* do not ignore start streaming empty requests; when there duplicate requests, merge them
* flipped the flag
* final touch
* added queued flag on the charts to prevent them from being obsoleted by the service thread
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 34 |
1 files changed, 25 insertions, 9 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index ed3b30bc5c..079c63acc5 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -159,20 +159,36 @@ struct sender_state { struct compressor_state *compressor; #endif #ifdef ENABLE_HTTPS - struct netdata_ssl ssl; // Structure used to encrypt the connection + struct netdata_ssl ssl; // structure used to encrypt the connection #endif - DICTIONARY *replication_requests; - size_t replication_pending_requests; - time_t replication_first_time; - time_t replication_min_time; - size_t replication_sender_buffer_percent_used; - bool replication_reached_max; + DICTIONARY *replication_requests; // de-duplication of replication requests, per chart - usec_t last_flush_time_ut; - size_t receiving_metrics; + size_t replication_pending_requests; // the currently outstanding replication requests + size_t replication_charts_replicating; // the number of unique charts having pending replication requests (on every request one is added and is removed when we finish it - it does not track completion of the replication for this chart) + + time_t replication_first_time; // the oldest time that has been requested to be replicated + time_t replication_current_time; // the minimum(before) of the executed replication requests + + bool replication_reached_max; // used to avoid resetting the replication thread too frequently + + size_t buffer_used_percentage; // the current utilization of the sending buffer + usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC }; +#define rrdpush_sender_set_flush_time(sender) __atomic_store_n(&((sender)->last_flush_time_ut), now_realtime_usec(), __ATOMIC_RELAXED); +#define rrdpush_sender_get_flush_time(sender) __atomic_load_n(&((sender)->last_flush_time_ut), __ATOMIC_RELAXED) + +#define rrdpush_sender_replicating_charts(sender) __atomic_load_n(&((sender)->replication_charts_replicating), __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_plus_one(sender) __atomic_add_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_charts_replicating), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_replicating_charts_zero(sender) __atomic_store_n(&((sender)->replication_charts_replicating), 0, __ATOMIC_RELAXED) + +#define rrdpush_sender_pending_replication_requests(sender) __atomic_load_n(&((sender)->replication_pending_requests), __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_plus_one(sender) __atomic_add_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_minus_one(sender) __atomic_sub_fetch(&((sender)->replication_pending_requests), 1, __ATOMIC_RELAXED) +#define rrdpush_sender_pending_replication_requests_zero(sender) __atomic_store_n(&((sender)->replication_pending_requests), 0, __ATOMIC_RELAXED) + struct receiver_state { RRDHOST *host; netdata_thread_t thread; |