diff options
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r-- | streaming/rrdpush.h | 13 |
1 files changed, 9 insertions, 4 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index c7f07ac7e5..94c1320e76 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -10,7 +10,7 @@ #define CONNECTED_TO_SIZE 100 #define CBUFFER_INITIAL_SIZE (16 * 1024) -#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE * 4) +#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2) // ---------------------------------------------------------------------------- // obsolete versions - do not use anymore @@ -131,8 +131,8 @@ struct decompressor_state { // Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it. typedef enum { - SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown - SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression + SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown + SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression } SENDER_FLAGS; struct sender_state { @@ -189,9 +189,13 @@ struct sender_state { struct { size_t buffer_used_percentage; // the current utilization of the sending buffer usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC + time_t last_buffer_recreate_s; // true when the sender buffer should be re-created } atomic; }; +#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED) +#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED) + #define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST) #define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST) @@ -296,7 +300,6 @@ void rrdpush_destinations_free(RRDHOST *host); BUFFER *sender_start(struct sender_state *s); void sender_commit(struct sender_state *s, BUFFER *wb); -void sender_cancel(struct sender_state *s); int rrdpush_init(); bool rrdpush_receiver_needs_dbengine(); int configured_as_parent(); @@ -339,6 +342,8 @@ int32_t stream_capabilities_to_vn(uint32_t caps); void receiver_state_free(struct receiver_state *rpt); bool stop_streaming_receiver(RRDHOST *host, const char *reason); +void sender_thread_buffer_free(void); + #include "replication.h" #endif //NETDATA_RRDPUSH_H |