diff options
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 83 | ||||
-rw-r--r-- | streaming/rrdpush.c | 3 | ||||
-rw-r--r-- | streaming/rrdpush.h | 4 |
3 files changed, 51 insertions, 39 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index 2bfe28dfd6..c96e9732a1 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -4,14 +4,40 @@ extern struct config stream_config; +void destroy_receiver_state(struct receiver_state *rpt) { + freez(rpt->key); + freez(rpt->hostname); + freez(rpt->registry_hostname); + freez(rpt->machine_guid); + freez(rpt->os); + freez(rpt->timezone); + freez(rpt->tags); + freez(rpt->client_ip); + freez(rpt->client_port); + freez(rpt->program_name); + freez(rpt->program_version); +#ifdef ENABLE_HTTPS + if(rpt->ssl.conn){ + SSL_free(rpt->ssl.conn); + } +#endif + freez(rpt); +} + static void rrdpush_receiver_thread_cleanup(void *ptr) { static __thread int executed = 0; if(!executed) { executed = 1; struct receiver_state *rpt = (struct receiver_state *) ptr; + // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch + // the host pointer as it is unpredicable when the RRDHOST is deleted. Do the cleanup from rrdhost_free(). + if (netdata_exit && rpt->host) { + rpt->exited = 1; + return; + } // Make sure that we detach this thread and don't kill a freshly arriving receiver - if (rpt->host) { + if (!netdata_exit && rpt->host) { netdata_mutex_lock(&rpt->host->receiver_lock); if (rpt->host->receiver == rpt) rpt->host->receiver = NULL; @@ -19,25 +45,7 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { } info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - - freez(rpt->key); - freez(rpt->hostname); - freez(rpt->registry_hostname); - freez(rpt->machine_guid); - freez(rpt->os); - freez(rpt->timezone); - freez(rpt->tags); - freez(rpt->client_ip); - freez(rpt->client_port); - freez(rpt->program_name); - freez(rpt->program_version); -#ifdef ENABLE_HTTPS - if(rpt->ssl.conn){ - SSL_free(rpt->ssl.conn); - } -#endif - freez(rpt); - + destroy_receiver_state(rpt); } } @@ -413,26 +421,29 @@ static int rrdpush_receive(struct receiver_state *rpt) size_t count = streaming_parser(rpt, &cd, fp); - //size_t count = pluginsd_process(host, &cd, fp, 1); - - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->host->hostname, "DISCONNECTED"); - error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->host->hostname, rpt->client_ip, rpt->client_port, count); - - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) { - rrdhost_wrlock(rpt->host); - rpt->host->senders_disconnected_time = now_realtime_sec(); - rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); - if(health_enabled == CONFIG_BOOLEAN_AUTO) - rpt->host->health_enabled = 0; - rrdhost_unlock(rpt->host); - rrdpush_sender_thread_stop(rpt->host); + + log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rpt->hostname, + "DISCONNECTED"); + error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, + rpt->client_port, count); + + // During a shutdown there is cleanup code in rrdhost that will cancel the sender thread + if (!netdata_exit && rpt->host) { + netdata_mutex_lock(&rpt->host->receiver_lock); + if (rpt->host->receiver == rpt) { + rrdhost_wrlock(rpt->host); + rpt->host->senders_disconnected_time = now_realtime_sec(); + rrdhost_flag_set(rpt->host, RRDHOST_FLAG_ORPHAN); + if(health_enabled == CONFIG_BOOLEAN_AUTO) + rpt->host->health_enabled = 0; + rrdhost_unlock(rpt->host); + rrdpush_sender_thread_stop(rpt->host); + } + netdata_mutex_unlock(&rpt->host->receiver_lock); } - netdata_mutex_unlock(&rpt->host->receiver_lock); // cleanup fclose(fp); - return (int)count; } diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index ebd76b9397..a9f06f10ca 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -673,14 +673,13 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) { } - netdata_thread_t thread; debug(D_SYSTEM, "starting STREAM receive thread."); char tag[FILENAME_MAX + 1]; snprintfz(tag, FILENAME_MAX, "STREAM_RECEIVER[%s,[%s]:%s]", rpt->hostname, w->client_ip, w->client_port); - if(netdata_thread_create(&thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) + if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) error("Failed to create new STREAM receive thread for client."); // prevent the caller from closing the streaming socket diff --git a/streaming/rrdpush.h b/streaming/rrdpush.h index efbf2f1e6f..f747b69b2d 100644 --- a/streaming/rrdpush.h +++ b/streaming/rrdpush.h @@ -64,6 +64,7 @@ struct sender_state { struct receiver_state { RRDHOST *host; + netdata_thread_t thread; int fd; char *key; char *hostname; @@ -85,7 +86,8 @@ struct receiver_state { #ifdef ENABLE_HTTPS struct netdata_ssl ssl; #endif - unsigned int shutdown:1; + unsigned int shutdown:1; // Tell the thread to exit + unsigned int exited; // Indicates that the thread has exited (NOT A BITFIELD!) }; |