summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.h
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-20 00:50:42 +0200
committerGitHub <noreply@github.com>2023-01-20 00:50:42 +0200
commit9232bfb6a072155388578dc4e1338c6002afb515 (patch)
treedc85f9bfe3ed97394e6f2a92a2f710796d6d8979 /streaming/rrdpush.h
parent86538b005de50f23c9ff66542abab11683b85c06 (diff)
track memory footprint of Netdata (#14294)
* track memory footprint of Netdata * track db modes alloc/ram/save/map * track system info; track sender and receiver * fixes * more fixes * track workers memory, onewayalloc memory; unify judyhs size estimation * track replication structures and buffers * Properly clear host RRDHOST_FLAG_METADATA_UPDATE flag * flush the replication buffer every 1000 times the circular buffer is found empty * dont take timestamp too frequently in sender loop * sender buffers are not used by the same thread as the sender, so they were never recreated - fixed it * free sender thread buffer on replication threads when replication is idle * use the last sender flag as a timestamp of the last buffer recreation * free cbuffer before reconnecting * recreate cbuffer on every flush * timings for journal v2 loading * inlining of metric and cache functions * aral likely/unlikely * free left-over thread buffers * fix NULL pointer dereference in replication * free sender thread buffer on sender thread too * mark ctx as used before flushing * better logging on ctx datafiles closing Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/rrdpush.h')
-rw-r--r--streaming/rrdpush.h13
1 files changed, 9 insertions, 4 deletions
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index c7f07ac7e5..94c1320e76 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -10,7 +10,7 @@
#define CONNECTED_TO_SIZE 100
#define CBUFFER_INITIAL_SIZE (16 * 1024)
-#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE * 4)
+#define THREAD_BUFFER_INITIAL_SIZE (CBUFFER_INITIAL_SIZE / 2)
// ----------------------------------------------------------------------------
// obsolete versions - do not use anymore
@@ -131,8 +131,8 @@ struct decompressor_state {
// Metric transmission: collector threads asynchronously fill the buffer, sender thread uses it.
typedef enum {
- SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
- SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
+ SENDER_FLAG_OVERFLOW = (1 << 0), // The buffer has been overflown
+ SENDER_FLAG_COMPRESSION = (1 << 1), // The stream needs to have and has compression
} SENDER_FLAGS;
struct sender_state {
@@ -189,9 +189,13 @@ struct sender_state {
struct {
size_t buffer_used_percentage; // the current utilization of the sending buffer
usec_t last_flush_time_ut; // the last time the sender flushed the sending buffer in USEC
+ time_t last_buffer_recreate_s; // true when the sender buffer should be re-created
} atomic;
};
+#define rrdpush_sender_last_buffer_recreate_get(sender) __atomic_load_n(&(sender)->atomic.last_buffer_recreate_s, __ATOMIC_RELAXED)
+#define rrdpush_sender_last_buffer_recreate_set(sender, value) __atomic_store_n(&(sender)->atomic.last_buffer_recreate_s, value, __ATOMIC_RELAXED)
+
#define rrdpush_sender_replication_buffer_full_set(sender, value) __atomic_store_n(&((sender)->replication.atomic.reached_max), value, __ATOMIC_SEQ_CST)
#define rrdpush_sender_replication_buffer_full_get(sender) __atomic_load_n(&((sender)->replication.atomic.reached_max), __ATOMIC_SEQ_CST)
@@ -296,7 +300,6 @@ void rrdpush_destinations_free(RRDHOST *host);
BUFFER *sender_start(struct sender_state *s);
void sender_commit(struct sender_state *s, BUFFER *wb);
-void sender_cancel(struct sender_state *s);
int rrdpush_init();
bool rrdpush_receiver_needs_dbengine();
int configured_as_parent();
@@ -339,6 +342,8 @@ int32_t stream_capabilities_to_vn(uint32_t caps);
void receiver_state_free(struct receiver_state *rpt);
bool stop_streaming_receiver(RRDHOST *host, const char *reason);
+void sender_thread_buffer_free(void);
+
#include "replication.h"
#endif //NETDATA_RRDPUSH_H