diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-06-29 15:02:13 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-06-29 15:02:13 +0300 |
commit | c62dcb2a9bd8ca6ec0c483bb3506c733d96648c6 (patch) | |
tree | 15f9e27879cfb951fa853fcb4f570066cb7f54a6 /streaming | |
parent | a1503807bb57399eae83606146281036da25e610 (diff) |
Optimizations part 2 (#15280)
* make all pluginsd functions inline, instead of function pointers
* dynamic MRG partitions based on the number of CPUs
* report the right size of the MRG
* prevent invalid read on pluginsd exit
* faster service_running() check; fix compiler warnings; shutdown replication after streaming to prevent crash on shutdown
* sender is now using a spinlock
* rrdcontext uses spinlock
* replace select() with poll()
* signed calculation of threads
* disable read-ahead on jnfv2 files during scan
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 23 | ||||
-rw-r--r-- | streaming/replication.c | 4 | ||||
-rw-r--r-- | streaming/rrdpush.c | 17 | ||||
-rw-r--r-- | streaming/rrdpush.h | 5 | ||||
-rw-r--r-- | streaming/sender.c | 30 |
5 files changed, 40 insertions, 39 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index ce48019683..37b14b5f9d 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -282,31 +282,30 @@ static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAK rpt->exit.reason = reason; } -static inline bool receiver_should_continue(struct receiver_state *rpt) { +static inline bool receiver_should_stop(struct receiver_state *rpt) { static __thread size_t counter = 0; if(unlikely(rpt->exit.shutdown)) { receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false); - return false; + return true; } - // check every 1000 lines read - if((counter++ % 1000) != 0) return true; - if(unlikely(!service_running(SERVICE_STREAMING))) { receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false); - return false; + return true; } - netdata_thread_testcancel(); - - rpt->last_msg_t = now_monotonic_sec(); + if(unlikely((counter++ % 1000) == 0)) { + // check every 1000 lines read + netdata_thread_testcancel(); + rpt->last_msg_t = now_monotonic_sec(); + } - return true; + return false; } static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) { - size_t result; + size_t result = 0; PARSER *parser = NULL; { @@ -346,7 +345,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(receiver_should_continue(rpt)) { + while(!receiver_should_stop(rpt)) { if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt); diff --git a/streaming/replication.c b/streaming/replication.c index d7909e7eb0..51b1747f0d 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -1096,7 +1096,7 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) { // ---------------------------------------------------------------------------- // replication sort entry management -static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { +static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) { struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse); __atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED); @@ -1120,7 +1120,7 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) { } static void replication_sort_entry_add(struct replication_request *rq) { - if(rrdpush_sender_replication_buffer_full_get(rq->sender)) { + if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) { rq->indexed_in_judy = false; rq->not_indexed_buffer_full = true; rq->not_indexed_preprocessing = false; diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index b2da305787..bc09bf3fbc 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -687,7 +687,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai if (!host->sender) return; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { @@ -698,20 +698,19 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai netdata_thread_cancel(host->rrdpush_sender_thread); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); if(wait) { - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); while(host->sender->tid) { - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); sleep_usec(10 * USEC_PER_MS); - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } } - // ---------------------------------------------------------------------------- // rrdpush receiver thread @@ -721,7 +720,7 @@ void log_stream_connection(const char *client_ip, const char *client_port, const static void rrdpush_sender_thread_spawn(RRDHOST *host) { - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) { char tag[NETDATA_THREAD_TAG_MAX + 1]; @@ -733,7 +732,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) { rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } int rrdpush_receiver_permission_denied(struct web_client *w) { diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index 35347aaa0a..7ee2e1bf85 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -249,7 +249,7 @@ struct sender_state { size_t not_connected_loops; // Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger // the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here. - netdata_mutex_t mutex; + SPINLOCK spinlock; struct circular_buffer *buffer; char read_buffer[PLUGINSD_LINE_MAX + 1]; ssize_t read_len; @@ -296,6 +296,9 @@ struct sender_state { } atomic; }; +#define sender_lock(sender) spinlock_lock(&(sender)->spinlock) +#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock) + #define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED) #define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED) #define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED) diff --git a/streaming/sender.c b/streaming/sender.c index cd1f75b12b..ac0a522505 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -100,7 +100,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) if(unlikely(!src || !src_len)) return; - netdata_mutex_lock(&s->mutex); + sender_lock(s); // FILE *fp = fopen("/tmp/stream.txt", "a"); // fprintf(fp, @@ -156,7 +156,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) rrdhost_hostname(s->host), s->connected_to); deactivate_compression(s); - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); return; } } @@ -189,7 +189,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type) signal_sender = true; } - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); if(signal_sender) rrdpush_signal_sender_to_wake_up(s); @@ -273,7 +273,7 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t return; if(!have_mutex) - netdata_mutex_lock(&s->mutex); + sender_lock(s); rrdpush_sender_last_buffer_recreate_set(s, now_s); last_reset_time_s = now_s; @@ -287,20 +287,20 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t sender_thread_buffer_free(); if(!have_mutex) - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); } static void rrdpush_sender_cbuffer_flush(RRDHOST *host) { rrdpush_sender_set_flush_time(host->sender); - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); // flush the output buffer from any data it may have cbuffer_flush(host->sender->buffer); rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true); replication_recalculate_buffer_used_ratio_unsafe(host->sender); - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); } static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) { @@ -821,7 +821,7 @@ static ssize_t attempt_to_send(struct sender_state *s) { struct circular_buffer *cb = s->buffer; #endif - netdata_mutex_lock(&s->mutex); + sender_lock(s); char *chunk; size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk); debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding); @@ -853,7 +853,7 @@ static ssize_t attempt_to_send(struct sender_state *s) { debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission"); replication_recalculate_buffer_used_ratio_unsafe(s); - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); return ret; } @@ -1093,7 +1093,7 @@ static bool rrdhost_set_sender(RRDHOST *host) { if(unlikely(!host->sender)) return false; bool ret = false; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); if(!host->sender->tid) { rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN); @@ -1103,7 +1103,7 @@ static bool rrdhost_set_sender(RRDHOST *host) { host->sender->exit.reason = STREAM_HANDSHAKE_NEVER; ret = true; } - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); rrdpush_reset_destinations_postpone_time(host); @@ -1164,7 +1164,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { RRDHOST *host = s->host; - netdata_mutex_lock(&host->sender->mutex); + sender_lock(host->sender); info("STREAM %s [send]: sending thread exits %s", rrdhost_hostname(host), host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : ""); @@ -1173,7 +1173,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) { rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false); rrdhost_clear_sender___while_having_sender_mutex(host); - netdata_mutex_unlock(&host->sender->mutex); + sender_unlock(host->sender); freez(s->pipe_buffer); freez(s); @@ -1343,14 +1343,14 @@ void *rrdpush_sender_thread(void *ptr) { continue; } - netdata_mutex_lock(&s->mutex); + sender_lock(s); size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL); size_t available = cbuffer_available_size_unsafe(s->buffer); if (unlikely(!outstanding)) { rrdpush_sender_pipe_clear_pending_data(s); rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false); } - netdata_mutex_unlock(&s->mutex); + sender_unlock(s); worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size); |