summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorAndrew Moss <1043609+amoss@users.noreply.github.com>2020-06-19 19:37:47 +0200
committerGitHub <noreply@github.com>2020-06-19 19:37:47 +0200
commit85752833adac111cf01ae558ee0c46b3a76aed69 (patch)
tree1594cb6ddbe49ad98b43b4ffb25c25fc760d60d9 /streaming
parent51cff0660b0eb671dbec42e54d7567db8b6ba712 (diff)
Fixes the race-hazard in streaming during the shutdown sequence (#9370)
The streaming component detects when a receiver stream has closed, and stops an attached sender on the same host. This is to support proxy configurations where the stream is passed through. During the shutdown sequence, once netdata_exit has been set no thread should touch any RRDHOST structure as the non-static threads are not joined before the database shuts down. The destruction of the thread state has been separated from the cleanup and can be called from two points. If the thread can detach itself from the host (i.e. it is not during the shutdown sequence) then it does so and destroys the state. During shutdown the thread leaves the state intact so that it can be destroyed during the host destruction, and the host destruction now cancels the thread to ensure a consistent sequence of events.
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c83
-rw-r--r--streaming/rrdpush.c3
-rw-r--r--streaming/rrdpush.h4
3 files changed, 51 insertions, 39 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 2bfe28dfd6..c96e9732a1 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -4,14 +4,40 @@
extern struct config stream_config;
+void destroy_receiver_state(struct receiver_state *rpt) {
+ freez(rpt->key);
+ freez(rpt->hostname);
+ freez(rpt->registry_hostname);
+ freez(rpt->machine_guid);
+ freez(rpt->os);
+ freez(rpt->timezone);
+ freez(rpt->tags);
+ freez(rpt->client_ip);
+ freez(rpt->client_port);
+ freez(rpt->program_name);
+ freez(rpt->program_version);
+#ifdef ENABLE_HTTPS
+ if(rpt->ssl.conn){
+ SSL_free(rpt->ssl.conn);
+ }
+#endif
+ freez(rpt);
+}
+
static void rrdpush_receiver_thread_cleanup(void *ptr) {
static __thread int executed = 0;
if(!executed) {
executed = 1;
struct receiver_state *rpt = (struct receiver_state *) ptr;
+ // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
+ // the host pointer as it is unpredicable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
+ if (netdata_exit && rpt->host) {
+ rpt->exited = 1;
+ return;
+ }
// Make sure that we detach this thread and don't kill a freshly arriving receiver
- if (rpt->host) {
+ if (!netdata_exit && rpt->host) {
netdata_mutex_lock(&rpt->host->receiver_lock);
if (rpt->host->receiver == rpt)
rpt->host->receiver = NULL;
@@ -19,25 +45,7 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
}
info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
-
- freez(rpt->key);
- freez(rpt->hostname);
- freez(rpt->registry_hostname);
- freez(rpt->machine_guid);
- freez(rpt->os);
- freez(rpt->timezone);
- freez(rpt->tags);
- freez(rpt->client_ip);
- freez(rpt->client_port);
- freez(rpt->program_name);
- freez(rpt->program_version);
-#ifdef ENABLE_HTTPS
- if(rpt->ssl.conn){
- SSL_free(rpt->ssl.conn);
- }
-#endif
- freez(rpt);
-
+ destroy_receiver_state(rpt);
}
}
@@ -413,26 +421,29 @@ static int rrdpush_receive(struct receiver_state *rpt)
size_t count = streaming_parser(rpt, &cd, fp);
- //size_t count = pluginsd_process(host, &cd, fp, 1);
-
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "DISCONNECTED");
- error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->host->hostname, rpt->client_ip, rpt->client_port, count);
-
- netdata_mutex_lock(&rpt->host->receiver_lock);
- if (rpt->host->receiver == rpt) {
- rrdhost_wrlock(rpt->host);
- rpt->host->senders_disconnected_time = now_realtime_sec();
- rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
- if(health_enabled == CONFIG_BOOLEAN_AUTO)
- rpt->host->health_enabled = 0;
- rrdhost_unlock(rpt->host);
- rrdpush_sender_thread_stop(rpt->host);
+
+ log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname,
+ "DISCONNECTED");
+ error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip,
+ rpt->client_port, count);
+
+ // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread
+ if (!netdata_exit && rpt->host) {
+ netdata_mutex_lock(&rpt->host->receiver_lock);
+ if (rpt->host->receiver == rpt) {
+ rrdhost_wrlock(rpt->host);
+ rpt->host->senders_disconnected_time = now_realtime_sec();
+ rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN);
+ if(health_enabled == CONFIG_BOOLEAN_AUTO)
+ rpt->host->health_enabled = 0;
+ rrdhost_unlock(rpt->host);
+ rrdpush_sender_thread_stop(rpt->host);
+ }
+ netdata_mutex_unlock(&rpt->host->receiver_lock);
}
- netdata_mutex_unlock(&rpt->host->receiver_lock);
// cleanup
fclose(fp);
-
return (int)count;
}
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index ebd76b9397..a9f06f10ca 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -673,14 +673,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
}
- netdata_thread_t thread;
debug(D_SYSTEM, "starting STREAM receive thread.");
char tag[FILENAME_MAX + 1];
snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port);
- if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
+ if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt))
error("Failed to create new STREAM receive thread for client.");
// prevent the caller from closing the streaming socket
diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h
index efbf2f1e6f..f747b69b2d 100644
--- a/streaming/rrdpush.h
+++ b/streaming/rrdpush.h
@@ -64,6 +64,7 @@ struct sender_state {
struct receiver_state {
RRDHOST *host;
+ netdata_thread_t thread;
int fd;
char *key;
char *hostname;
@@ -85,7 +86,8 @@ struct receiver_state {
#ifdef ENABLE_HTTPS
struct netdata_ssl ssl;
#endif
- unsigned int shutdown:1;
+ unsigned int shutdown:1; // Tell the thread to exit
+ unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!)
};