summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-06-29 15:02:13 +0300
committerGitHub <noreply@github.com>2023-06-29 15:02:13 +0300
commitc62dcb2a9bd8ca6ec0c483bb3506c733d96648c6 (patch)
tree15f9e27879cfb951fa853fcb4f570066cb7f54a6 /streaming
parenta1503807bb57399eae83606146281036da25e610 (diff)
Optimizations part 2 (#15280)
* make all pluginsd functions inline, instead of function pointers * dynamic MRG partitions based on the number of CPUs * report the right size of the MRG * prevent invalid read on pluginsd exit * faster service_running() check; fix compiler warnings; shutdown replication after streaming to prevent crash on shutdown * sender is now using a spinlock * rrdcontext uses spinlock * replace select() with poll() * signed calculation of threads * disable read-ahead on jnfv2 files during scan
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c23
-rw-r--r--streaming/replication.c4
-rw-r--r--streaming/rrdpush.c17
-rw-r--r--streaming/rrdpush.h5
-rw-r--r--streaming/sender.c30
5 files changed, 40 insertions, 39 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index ce48019683..37b14b5f9d 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -282,31 +282,30 @@ static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAK
rpt->exit.reason = reason;
}
-static inline bool receiver_should_continue(struct receiver_state *rpt) {
+static inline bool receiver_should_stop(struct receiver_state *rpt) {
static __thread size_t counter = 0;
if(unlikely(rpt->exit.shutdown)) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_SHUTDOWN, false);
- return false;
+ return true;
}
- // check every 1000 lines read
- if((counter++ % 1000) != 0) return true;
-
if(unlikely(!service_running(SERVICE_STREAMING))) {
receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_NETDATA_EXIT, false);
- return false;
+ return true;
}
- netdata_thread_testcancel();
-
- rpt->last_msg_t = now_monotonic_sec();
+ if(unlikely((counter++ % 1000) == 0)) {
+ // check every 1000 lines read
+ netdata_thread_testcancel();
+ rpt->last_msg_t = now_monotonic_sec();
+ }
- return true;
+ return false;
}
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
- size_t result;
+ size_t result = 0;
PARSER *parser = NULL;
{
@@ -346,7 +345,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
size_t read_buffer_start = 0;
char buffer[PLUGINSD_LINE_MAX + 2] = "";
- while(receiver_should_continue(rpt)) {
+ while(!receiver_should_stop(rpt)) {
if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
bool have_new_data = compressed_connection ? receiver_read_compressed(rpt) : receiver_read_uncompressed(rpt);
diff --git a/streaming/replication.c b/streaming/replication.c
index d7909e7eb0..51b1747f0d 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -1096,7 +1096,7 @@ void replication_set_next_point_in_time(time_t after, size_t unique_id) {
// ----------------------------------------------------------------------------
// replication sort entry management
-static struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
+static inline struct replication_sort_entry *replication_sort_entry_create(struct replication_request *rq) {
struct replication_sort_entry *rse = aral_mallocz(replication_globals.aral_rse);
__atomic_add_fetch(&replication_globals.atomic.memory, sizeof(struct replication_sort_entry), __ATOMIC_RELAXED);
@@ -1120,7 +1120,7 @@ static void replication_sort_entry_destroy(struct replication_sort_entry *rse) {
}
static void replication_sort_entry_add(struct replication_request *rq) {
- if(rrdpush_sender_replication_buffer_full_get(rq->sender)) {
+ if(unlikely(rrdpush_sender_replication_buffer_full_get(rq->sender))) {
rq->indexed_in_judy = false;
rq->not_indexed_buffer_full = true;
rq->not_indexed_preprocessing = false;
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index b2da305787..bc09bf3fbc 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -687,7 +687,7 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai
if (!host->sender)
return;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
@@ -698,20 +698,19 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai
netdata_thread_cancel(host->rrdpush_sender_thread);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
if(wait) {
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
while(host->sender->tid) {
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
sleep_usec(10 * USEC_PER_MS);
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
}
-
// ----------------------------------------------------------------------------
// rrdpush receiver thread
@@ -721,7 +720,7 @@ void log_stream_connection(const char *client_ip, const char *client_port, const
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
@@ -733,7 +732,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
int rrdpush_receiver_permission_denied(struct web_client *w) {
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 35347aaa0a..7ee2e1bf85 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -249,7 +249,7 @@ struct sender_state {
size_t not_connected_loops;
// Metrics are collected asynchronously by collector threads calling rrdset_done_push(). This can also trigger
// the lazy creation of the sender thread - both cases (buffer access and thread creation) are guarded here.
- netdata_mutex_t mutex;
+ SPINLOCK spinlock;
struct circular_buffer *buffer;
char read_buffer[PLUGINSD_LINE_MAX + 1];
ssize_t read_len;
@@ -296,6 +296,9 @@ struct sender_state {
} atomic;
};
+#define sender_lock(sender) spinlock_lock(&(sender)->spinlock)
+#define sender_unlock(sender) spinlock_unlock(&(sender)->spinlock)
+
#define rrdpush_sender_pipe_has_pending_data(sender) __atomic_load_n(&(sender)->atomic.pending_data, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_set_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, true, __ATOMIC_RELAXED)
#define rrdpush_sender_pipe_clear_pending_data(sender) __atomic_store_n(&(sender)->atomic.pending_data, false, __ATOMIC_RELAXED)
diff --git a/streaming/sender.c b/streaming/sender.c
index cd1f75b12b..ac0a522505 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -100,7 +100,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
if(unlikely(!src || !src_len))
return;
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
// FILE *fp = fopen("/tmp/stream.txt", "a");
// fprintf(fp,
@@ -156,7 +156,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
rrdhost_hostname(s->host), s->connected_to);
deactivate_compression(s);
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
return;
}
}
@@ -189,7 +189,7 @@ void sender_commit(struct sender_state *s, BUFFER *wb, STREAM_TRAFFIC_TYPE type)
signal_sender = true;
}
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
if(signal_sender)
rrdpush_signal_sender_to_wake_up(s);
@@ -273,7 +273,7 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t
return;
if(!have_mutex)
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
rrdpush_sender_last_buffer_recreate_set(s, now_s);
last_reset_time_s = now_s;
@@ -287,20 +287,20 @@ static void rrdpush_sender_cbuffer_recreate_timed(struct sender_state *s, time_t
sender_thread_buffer_free();
if(!have_mutex)
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
}
static void rrdpush_sender_cbuffer_flush(RRDHOST *host) {
rrdpush_sender_set_flush_time(host->sender);
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
// flush the output buffer from any data it may have
cbuffer_flush(host->sender->buffer);
rrdpush_sender_cbuffer_recreate_timed(host->sender, now_monotonic_sec(), true, true);
replication_recalculate_buffer_used_ratio_unsafe(host->sender);
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
}
static void rrdpush_sender_charts_and_replication_reset(RRDHOST *host) {
@@ -821,7 +821,7 @@ static ssize_t attempt_to_send(struct sender_state *s) {
struct circular_buffer *cb = s->buffer;
#endif
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
char *chunk;
size_t outstanding = cbuffer_next_unsafe(s->buffer, &chunk);
debug(D_STREAM, "STREAM: Sending data. Buffer r=%zu w=%zu s=%zu, next chunk=%zu", cb->read, cb->write, cb->size, outstanding);
@@ -853,7 +853,7 @@ static ssize_t attempt_to_send(struct sender_state *s) {
debug(D_STREAM, "STREAM: send() returned 0 -> no error but no transmission");
replication_recalculate_buffer_used_ratio_unsafe(s);
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
return ret;
}
@@ -1093,7 +1093,7 @@ static bool rrdhost_set_sender(RRDHOST *host) {
if(unlikely(!host->sender)) return false;
bool ret = false;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
if(!host->sender->tid) {
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_CONNECTED | RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
@@ -1103,7 +1103,7 @@ static bool rrdhost_set_sender(RRDHOST *host) {
host->sender->exit.reason = STREAM_HANDSHAKE_NEVER;
ret = true;
}
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
rrdpush_reset_destinations_postpone_time(host);
@@ -1164,7 +1164,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
RRDHOST *host = s->host;
- netdata_mutex_lock(&host->sender->mutex);
+ sender_lock(host->sender);
info("STREAM %s [send]: sending thread exits %s",
rrdhost_hostname(host),
host->sender->exit.reason != STREAM_HANDSHAKE_NEVER ? stream_handshake_error_to_string(host->sender->exit.reason) : "");
@@ -1173,7 +1173,7 @@ static void rrdpush_sender_thread_cleanup_callback(void *ptr) {
rrdpush_sender_pipe_close(host, host->sender->rrdpush_sender_pipe, false);
rrdhost_clear_sender___while_having_sender_mutex(host);
- netdata_mutex_unlock(&host->sender->mutex);
+ sender_unlock(host->sender);
freez(s->pipe_buffer);
freez(s);
@@ -1343,14 +1343,14 @@ void *rrdpush_sender_thread(void *ptr) {
continue;
}
- netdata_mutex_lock(&s->mutex);
+ sender_lock(s);
size_t outstanding = cbuffer_next_unsafe(s->buffer, NULL);
size_t available = cbuffer_available_size_unsafe(s->buffer);
if (unlikely(!outstanding)) {
rrdpush_sender_pipe_clear_pending_data(s);
rrdpush_sender_cbuffer_recreate_timed(s, now_s, true, false);
}
- netdata_mutex_unlock(&s->mutex);
+ sender_unlock(s);
worker_set_metric(WORKER_SENDER_JOB_BUFFER_RATIO, (NETDATA_DOUBLE)(s->buffer->max_size - available) * 100.0 / (NETDATA_DOUBLE)s->buffer->max_size);