summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-18 21:32:50 +0200
committerGitHub <noreply@github.com>2023-01-18 21:32:50 +0200
commitc1908d3163185cf65a139edeb11a165a10eca1e9 (patch)
tree6fe69ef8cd5f5797253e0d583fe60d56c69ec3da /streaming
parent51565d55e92b6f814a09ca87d3c77a40acba576c (diff)
DBENGINE v2 - improvements part 5 (#14289)
* cleanup journal v2 mounts periodically * fix for last commit * re-enable loading page from disk when the arrangement of pages requires it * Remove unused statistics * Estimate diskspace when the current datafile is full and queue a rotate command (Currently it will not attempt to estimate end size for journals) Queue a command to check quota on startup per tier * apps.plugin now exposes RSS chart * shorter thread names to make debugging easier, since thread names can only be 15 characters * more thread names fixes * allow an apps_groups.conf target to be pid 0 or 1 Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/replication.c8
-rw-r--r--streaming/rrdpush.c4
-rw-r--r--streaming/rrdpush.h3
3 files changed, 10 insertions, 5 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index 96859723a9..c639016349 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -1553,13 +1553,15 @@ void *replication_thread_main(void *ptr __maybe_unused) {
threads = 1;
}
- if(--threads) {
+ if(threads > 1) {
replication_globals.main_thread.threads = threads;
replication_globals.main_thread.threads_ptrs = mallocz(threads * sizeof(netdata_thread_t *));
- for(int i = 0; i < threads ;i++) {
+ for(int i = 1; i < threads ;i++) {
+ char tag[NETDATA_THREAD_TAG_MAX + 1];
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, "REPLAY[%d]", i + 1);
replication_globals.main_thread.threads_ptrs[i] = mallocz(sizeof(netdata_thread_t));
- netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], "REPLICATION",
+ netdata_thread_create(replication_globals.main_thread.threads_ptrs[i], tag,
NETDATA_THREAD_OPTION_JOINABLE, replication_worker_thread, NULL);
}
}
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 9fa32b9df5..7767c371c1 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -597,7 +597,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
if(!rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
- snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host));
+ snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
@@ -1036,7 +1036,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
debug(D_SYSTEM, "starting STREAM receive thread.");
char tag[FILENAME_MAX + 1];
- snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
+ snprintfz(tag, FILENAME_MAX, THREAD_TAG_STREAM_RECEIVER "[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
rrdpush_receive_log_status(
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index 406fdcdcb2..c7f07ac7e5 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -306,6 +306,9 @@ void *rrdpush_sender_thread(void *ptr);
void rrdpush_send_host_labels(RRDHOST *host);
void rrdpush_claimed_id(RRDHOST *host);
+#define THREAD_TAG_STREAM_RECEIVER "RCVR" // "[host]" is appended
+#define THREAD_TAG_STREAM_SENDER "SNDR" // "[host]" is appended
+
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url);
void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait);