diff options
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r-- | streaming/rrdpush.c | 1099 |
1 files changed, 74 insertions, 1025 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c index fa3cc2d478..5dc3f832dc 100644 --- a/streaming/rrdpush.c +++ b/streaming/rrdpush.c @@ -26,25 +26,7 @@ * */ -#define STREAMING_PROTOCOL_VERSION "1.1" -#define START_STREAMING_PROMPT "Hit me baby, push them over..." -#define START_STREAMING_PROMPT_V2 "Hit me baby, push them over and bring the host labels..." -#define START_STREAMING_PROMPT_VN "Hit me baby, push them over with the version=" - -typedef enum { - RRDPUSH_MULTIPLE_CONNECTIONS_ALLOW, - RRDPUSH_MULTIPLE_CONNECTIONS_DENY_NEW -} RRDPUSH_MULTIPLE_CONNECTIONS_STRATEGY; - -typedef struct { - char *os_name; - char *os_id; - char *os_version; - char *kernel_name; - char *kernel_version; -} stream_encoded_t; - -static struct config stream_config = { +struct config stream_config = { .first_section = NULL, .last_section = NULL, .mutex = NETDATA_MUTEX_INITIALIZER, @@ -125,8 +107,6 @@ int rrdpush_init() { return default_rrdpush_enabled; } -#define CONNECTED_TO_SIZE 100 - // data collection happens from multiple threads // each of these threads calls rrdset_done() // which in turn calls rrdset_done_push() @@ -141,8 +121,6 @@ int rrdpush_init() { // this is for the first iterations of each chart unsigned int remote_clock_resync_iterations = 60; -#define rrdpush_buffer_lock(host) netdata_mutex_lock(&((host)->rrdpush_sender_buffer_mutex)) -#define rrdpush_buffer_unlock(host) netdata_mutex_unlock(&((host)->rrdpush_sender_buffer_mutex)) static inline int should_send_chart_matching(RRDSET *st) { if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED))) { @@ -205,7 +183,8 @@ static inline int need_to_send_chart_definition(RRDSET *st) { return 0; } -// sends the current chart definition +// Send the current chart definition. +// Assumes that collector thread has already called sender_start for mutex / buffer state. static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { RRDHOST *host = st->rrdhost; @@ -224,11 +203,9 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { } } - // info("CHART '%s' '%s'", st->id, name); - // send the chart buffer_sprintf( - host->rrdpush_sender_buffer + host->sender->build , "CHART \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" \"%s\" %ld %d \"%s %s %s %s\" \"%s\" \"%s\"\n" , st->id , name @@ -251,7 +228,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { RRDDIM *rd; rrddim_foreach_read(rd, st) { buffer_sprintf( - host->rrdpush_sender_buffer + host->sender->build , "DIMENSION \"%s\" \"%s\" \"%s\" " COLLECTED_NUMBER_FORMAT " " COLLECTED_NUMBER_FORMAT " \"%s %s %s\"\n" , rd->id , rd->name @@ -272,7 +249,7 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { calculated_number *value = (calculated_number *) rs->value; buffer_sprintf( - host->rrdpush_sender_buffer + host->sender->build , "VARIABLE CHART %s = " CALCULATED_NUMBER_FORMAT "\n" , rs->variable , *value @@ -284,25 +261,29 @@ static inline void rrdpush_send_chart_definition_nolock(RRDSET *st) { } // sends the current chart dimensions -static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st) { +static inline void rrdpush_send_chart_metrics_nolock(RRDSET *st, struct sender_state *s) { RRDHOST *host = st->rrdhost; - buffer_sprintf(host->rrdpush_sender_buffer, "BEGIN \"%s\" %llu\n", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); + buffer_sprintf(host->sender->build, "BEGIN \"%s\" %llu", st->id, (st->last_collected_time.tv_sec > st->upstream_resync_time)?st->usec_since_last_update:0); + if (s->version >= VERSION_GAP_FILLING) + buffer_sprintf(host->sender->build, " %ld\n", st->last_collected_time.tv_sec); + else + buffer_strcat(host->sender->build, "\n"); RRDDIM *rd; rrddim_foreach_read(rd, st) { if(rd->updated && rd->exposed) - buffer_sprintf(host->rrdpush_sender_buffer + buffer_sprintf(host->sender->build , "SET \"%s\" = " COLLECTED_NUMBER_FORMAT "\n" , rd->id , rd->collected_value ); } - - buffer_strcat(host->rrdpush_sender_buffer, "END\n"); + buffer_strcat(host->sender->build, "END\n"); } static void rrdpush_sender_thread_spawn(RRDHOST *host); +// Called from the internal collectors to mark a chart obsolete. void rrdset_push_chart_definition_now(RRDSET *st) { RRDHOST *host = st->rrdhost; @@ -310,9 +291,9 @@ void rrdset_push_chart_definition_now(RRDSET *st) { return; rrdset_rdlock(st); - rrdpush_buffer_lock(host); + sender_start(host->sender); rrdpush_send_chart_definition_nolock(st); - rrdpush_buffer_unlock(host); + sender_commit(host->sender); rrdset_unlock(st); } @@ -322,18 +303,14 @@ void rrdset_done_push(RRDSET *st) { RRDHOST *host = st->rrdhost; - rrdpush_buffer_lock(host); - if(unlikely(host->rrdpush_send_enabled && !host->rrdpush_sender_spawn)) rrdpush_sender_thread_spawn(host); - if(unlikely(!host->rrdpush_sender_buffer || !host->rrdpush_sender_connected)) { + // Handle non-connected case + if(unlikely(!host->rrdpush_sender_connected)) { if(unlikely(!host->rrdpush_sender_error_shown)) error("STREAM %s [send]: not ready - discarding collected metrics.", host->hostname); - host->rrdpush_sender_error_shown = 1; - - rrdpush_buffer_unlock(host); return; } else if(unlikely(host->rrdpush_sender_error_shown)) { @@ -341,16 +318,18 @@ void rrdset_done_push(RRDSET *st) { host->rrdpush_sender_error_shown = 0; } + sender_start(host->sender); + if(need_to_send_chart_definition(st)) rrdpush_send_chart_definition_nolock(st); - rrdpush_send_chart_metrics_nolock(st); + rrdpush_send_chart_metrics_nolock(st, host->sender); // signal the sender there are more data if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); - rrdpush_buffer_unlock(host); + sender_commit(host->sender); } // labels @@ -358,13 +337,13 @@ void rrdpush_send_labels(RRDHOST *host) { if (!host->labels || !(host->labels_flag & LABEL_FLAG_UPDATE_STREAM) || (host->labels_flag & LABEL_FLAG_STOP_STREAM)) return; - rrdpush_buffer_lock(host); + sender_start(host->sender); rrdhost_rdlock(host); netdata_rwlock_rdlock(&host->labels_rwlock); struct label *labels = host->labels; while(labels) { - buffer_sprintf(host->rrdpush_sender_buffer + buffer_sprintf(host->sender->build , "LABEL \"%s\" = %d %s\n" , labels->key , (int)labels->label_source @@ -373,103 +352,26 @@ void rrdpush_send_labels(RRDHOST *host) { labels = labels->next; } - buffer_sprintf(host->rrdpush_sender_buffer + buffer_sprintf(host->sender->build , "OVERWRITE %s\n", "labels"); netdata_rwlock_unlock(&host->labels_rwlock); rrdhost_unlock(host); + sender_commit(host->sender); if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1 && write(host->rrdpush_sender_pipe[PIPE_WRITE], " ", 1) == -1) error("STREAM %s [send]: cannot write to internal pipe", host->hostname); - rrdpush_buffer_unlock(host); host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; } // ---------------------------------------------------------------------------- // rrdpush sender thread -static inline void rrdpush_sender_add_host_variable_to_buffer_nolock(RRDHOST *host, RRDVAR *rv) { - calculated_number *value = (calculated_number *)rv->value; - - buffer_sprintf( - host->rrdpush_sender_buffer - , "VARIABLE HOST %s = " CALCULATED_NUMBER_FORMAT "\n" - , rv->name - , *value - ); - - debug(D_STREAM, "RRDVAR pushed HOST VARIABLE %s = " CALCULATED_NUMBER_FORMAT, rv->name, *value); -} - -void rrdpush_sender_send_this_host_variable_now(RRDHOST *host, RRDVAR *rv) { - if(host->rrdpush_send_enabled && host->rrdpush_sender_spawn && host->rrdpush_sender_connected) { - rrdpush_buffer_lock(host); - rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); - rrdpush_buffer_unlock(host); - } -} - -static int rrdpush_sender_thread_custom_host_variables_callback(void *rrdvar_ptr, void *host_ptr) { - RRDVAR *rv = (RRDVAR *)rrdvar_ptr; - RRDHOST *host = (RRDHOST *)host_ptr; - - if(unlikely(rv->options & RRDVAR_OPTION_CUSTOM_HOST_VAR && rv->type == RRDVAR_TYPE_CALCULATED)) { - rrdpush_sender_add_host_variable_to_buffer_nolock(host, rv); - - // return 1, so that the traversal will return the number of variables sent - return 1; - } - - // returning a negative number will break the traversal - return 0; -} - -static void rrdpush_sender_thread_send_custom_host_variables(RRDHOST *host) { - int ret = rrdvar_callback_for_all_host_variables(host, rrdpush_sender_thread_custom_host_variables_callback, host); - (void)ret; - - debug(D_STREAM, "RRDVAR sent %d VARIABLES", ret); -} - -// resets all the chart, so that their definitions -// will be resent to the central netdata -static void rrdpush_sender_thread_reset_all_charts(RRDHOST *host) { - rrdhost_rdlock(host); - - RRDSET *st; - rrdset_foreach_read(st, host) { - rrdset_flag_clear(st, RRDSET_FLAG_UPSTREAM_EXPOSED); - - st->upstream_resync_time = 0; - - rrdset_rdlock(st); - - RRDDIM *rd; - rrddim_foreach_read(rd, st) - rd->exposed = 0; - - rrdset_unlock(st); - } - - rrdhost_unlock(host); -} - -static inline void rrdpush_sender_thread_data_flush(RRDHOST *host) { - rrdpush_buffer_lock(host); - - if(buffer_strlen(host->rrdpush_sender_buffer)) - error("STREAM %s [send]: discarding %zu bytes of metrics already in the buffer.", host->hostname, buffer_strlen(host->rrdpush_sender_buffer)); - - buffer_flush(host->rrdpush_sender_buffer); - - rrdpush_sender_thread_reset_all_charts(host); - rrdpush_sender_thread_send_custom_host_variables(host); - - rrdpush_buffer_unlock(host); -} - +// Either the receiver lost the connection or the host is being destroyed. +// Don't lock the sender buffer - doesn't affect consistency in either case. +// TODO-GAPS During the host destruction sequence we should make sure the disconnect happens early enough to lock +// out collectors hitting the sender. Locking the mutex means there may be waiting threads when we free. void rrdpush_sender_thread_stop(RRDHOST *host) { - rrdpush_buffer_lock(host); rrdhost_wrlock(host); netdata_thread_t thr = 0; @@ -489,7 +391,6 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { } rrdhost_unlock(host); - rrdpush_buffer_unlock(host); if(thr != 0) { info("STREAM %s [send]: waiting for the sending thread to stop...", host->hostname); @@ -499,903 +400,14 @@ void rrdpush_sender_thread_stop(RRDHOST *host) { } } -static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { - host->rrdpush_sender_connected = 0; - - if(host->rrdpush_sender_socket != -1) { - close(host->rrdpush_sender_socket); - host->rrdpush_sender_socket = -1; - } -} - -static inline void rrdpush_set_flags_to_newest_stream(RRDHOST *host) { - host->labels_flag |= LABEL_FLAG_UPDATE_STREAM; - host->labels_flag &= ~LABEL_FLAG_STOP_STREAM; -} - -void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) -{ - se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):""; - se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):""; - se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):""; - se->kernel_name = (host->system_info->kernel_name)?url_encode(host->system_info->kernel_name):""; - se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):""; -} - -void rrdpush_clean_encoded(stream_encoded_t *se) -{ - if (se->os_name) - freez(se->os_name); - - if (se->os_id) - freez(se->os_id); - - if (se->os_version) - freez(se->os_version); - - if (se->kernel_name) - freez(se->kernel_name); - - if (se->kernel_version) - freez(se->kernel_version); -} - -//called from client side -static int rrdpush_sender_thread_connect_to_master(RRDHOST *host, int default_port, int timeout, size_t *reconnects_counter, char *connected_to, size_t connected_to_size) { - struct timeval tv = { - .tv_sec = timeout, - .tv_usec = 0 - }; - - // make sure the socket is closed - rrdpush_sender_thread_close_socket(host); - - debug(D_STREAM, "STREAM: Attempting to connect..."); - info("STREAM %s [send to %s]: connecting...", host->hostname, host->rrdpush_send_destination); - - host->rrdpush_sender_socket = connect_to_one_of( - host->rrdpush_send_destination - , default_port - , &tv - , reconnects_counter - , connected_to - , connected_to_size - ); - - if(unlikely(host->rrdpush_sender_socket == -1)) { - error("STREAM %s [send to %s]: failed to connect", host->hostname, host->rrdpush_send_destination); - return 0; - } - - info("STREAM %s [send to %s]: initializing communication...", host->hostname, connected_to); - -#ifdef ENABLE_HTTPS - if( netdata_client_ctx ){ - host->ssl.flags = NETDATA_SSL_START; - if (!host->ssl.conn){ - host->ssl.conn = SSL_new(netdata_client_ctx); - if(!host->ssl.conn){ - error("Failed to allocate SSL structure."); - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } - } - else{ - SSL_clear(host->ssl.conn); - } - - if (host->ssl.conn) - { - if (SSL_set_fd(host->ssl.conn, host->rrdpush_sender_socket) != 1) { - error("Failed to set the socket to the SSL on socket fd %d.", host->rrdpush_sender_socket); - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } else{ - host->ssl.flags = NETDATA_SSL_HANDSHAKE_COMPLETE; - } - } - } - else { - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } -#endif - - /* TODO: During the implementation of #7265 switch the set of variables to HOST_* and CONTAINER_* if the - version negotiation resulted in a high enough version. - */ - stream_encoded_t se; - rrdpush_encode_variable(&se, host); - - #define HTTP_HEADER_SIZE 8192 - char http[HTTP_HEADER_SIZE + 1]; - int eol = snprintfz(http, HTTP_HEADER_SIZE, - "STREAM key=%s&hostname=%s®istry_hostname=%s&machine_guid=%s&update_every=%d&os=%s&timezone=%s&tags=%s&ver=%u" - "&NETDATA_SYSTEM_OS_NAME=%s" - "&NETDATA_SYSTEM_OS_ID=%s" - "&NETDATA_SYSTEM_OS_ID_LIKE=%s" - "&NETDATA_SYSTEM_OS_VERSION=%s" - "&NETDATA_SYSTEM_OS_VERSION_ID=%s" - "&NETDATA_SYSTEM_OS_DETECTION=%s" - "&NETDATA_SYSTEM_KERNEL_NAME=%s" - "&NETDATA_SYSTEM_KERNEL_VERSION=%s" - "&NETDATA_SYSTEM_ARCHITECTURE=%s" - "&NETDATA_SYSTEM_VIRTUALIZATION=%s" - "&NETDATA_SYSTEM_VIRT_DETECTION=%s" - "&NETDATA_SYSTEM_CONTAINER=%s" - "&NETDATA_SYSTEM_CONTAINER_DETECTION=%s" - "&NETDATA_CONTAINER_OS_NAME=%s" - "&NETDATA_CONTAINER_OS_ID=%s" - "&NETDATA_CONTAINER_OS_ID_LIKE=%s" - "&NETDATA_CONTAINER_OS_VERSION=%s" - "&NETDATA_CONTAINER_OS_VERSION_ID=%s" - "&NETDATA_CONTAINER_OS_DETECTION=%s" - "&NETDATA_SYSTEM_CPU_LOGICAL_CPU_COUNT=%s" - "&NETDATA_SYSTEM_CPU_FREQ=%s" - "&NETDATA_SYSTEM_TOTAL_RAM=%s" - "&NETDATA_SYSTEM_TOTAL_DISK_SIZE=%s" - "&NETDATA_PROTOCOL_VERSION=%s" - " HTTP/1.1\r\n" - "User-Agent: %s/%s\r\n" - "Accept: */*\r\n\r\n" - , host->rrdpush_send_api_key - , host->hostname - , host->registry_hostname - , host->machine_guid - , default_rrd_update_every - , host->os - , host->timezone - , (host->tags) ? host->tags : "" - , STREAMING_PROTOCOL_CURRENT_VERSION - , se.os_name - , se.os_id - , (host->system_info->host_os_id_like) ? host->system_info->host_os_id_like : "" - , se.os_version - , (host->system_info->host_os_version_id) ? host->system_info->host_os_version_id : "" - , (host->system_info->host_os_detection) ? host->system_info->host_os_detection : "" - , se.kernel_name - , se.kernel_version - , (host->system_info->architecture) ? host->system_info->architecture : "" - , (host->system_info->virtualization) ? host->system_info->virtualization : "" - , (host->system_info->virt_detection) ? host->system_info->virt_detection : "" - , (host->system_info->container) ? host->system_info->container : "" - , (host->system_info->container_detection) ? host->system_info->container_detection : "" - , (host->system_info->container_os_name) ? host->system_info->container_os_name : "" - , (host->system_info->container_os_id) ? host->system_info->container_os_id : "" - , (host->system_info->container_os_id_like) ? host->system_info->container_os_id_like : "" - , (host->system_info->container_os_version) ? host->system_info->container_os_version : "" - , (host->system_info->container_os_version_id) ? host->system_info->container_os_version_id : "" - , (host->system_info->container_os_detection) ? host->system_info->container_os_detection : "" - , (host->system_info->host_cores) ? host->system_info->host_cores : "" - , (host->system_info->host_cpu_freq) ? host->system_info->host_cpu_freq : "" - , (host->system_info->host_ram_total) ? host->system_info->host_ram_total : "" - , (host->system_info->host_disk_space) ? host->system_info->host_disk_space : "" - , STREAMING_PROTOCOL_VERSION - , host->program_name - , host->program_version - ); - http[eol] = 0x00; - rrdpush_clean_encoded(&se); - -#ifdef ENABLE_HTTPS - if (!host->ssl.flags) { - ERR_clear_error(); - SSL_set_connect_state(host->ssl.conn); - int err = SSL_connect(host->ssl.conn); - if (err != 1){ - err = SSL_get_error(host->ssl.conn, err); - error("SSL cannot connect with the server: %s ",ERR_error_string((long)SSL_get_error(host->ssl.conn,err),NULL)); - if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - rrdpush_sender_thread_close_socket(host); - return 0; - }else { - host->ssl.flags = NETDATA_SSL_NO_HANDSHAKE; - } - } - else { - if (netdata_use_ssl_on_stream == NETDATA_SSL_FORCE) { - if (netdata_validate_server == NETDATA_SSL_VALID_CERTIFICATE) { - if ( security_test_certificate(host->ssl.conn)) { - error("Closing the stream connection, because the server SSL certificate is not valid."); - rrdpush_sender_thread_close_socket(host); - return 0; - } - } - } - } - } - if(send_timeout(&host->ssl,host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) { -#else - if(send_timeout(host->rrdpush_sender_socket, http, strlen(http), 0, timeout) == -1) { -#endif - error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", host->hostname, connected_to); - rrdpush_sender_thread_close_socket(host); - return 0; - } - - info("STREAM %s [send to %s]: waiting response from remote netdata...", host->hostname, connected_to); - - ssize_t received; -#ifdef ENABLE_HTTPS - received = recv_timeout(&host->ssl,host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout); - if(received == -1) { -#else - received = recv_timeout(host->rrdpush_sender_socket, http, HTTP_HEADER_SIZE, 0, timeout); - if(received == -1) { -#endif - error("STREAM %s [send to %s]: remote netdata does not respond.", host->hostname, connected_to); - rrdpush_sender_thread_close_socket(host); - return 0; - } - - http[received] = '\0'; - int answer = -1; - char *version_start = strchr(http, '='); - uint32_t version; - if(version_start) { - version_start++; - version = (uint32_t)strtol(version_start, NULL, 10); - answer = memcmp(http, START_STREAMING_PROMPT_VN, (size_t)(version_start - http)); - if(!answer) { - rrdpush_set_flags_to_newest_stream(host); - host->stream_version = version; - } - } else { - answer = memcmp(http, START_STREAMING_PROMPT_V2, strlen(START_STREAMING_PROMPT_V2)); - if(!answer) { - version = 1; - rrdpush_set_flags_to_newest_stream(host); - } - else { - answer = memcmp(http, START_STREAMING_PROMPT, strlen(START_STREAMING_PROMPT)); - if(!answer) { - version = 0; - host->labels_flag |= LABEL_FLAG_STOP_STREAM; - host->labels_flag &= ~LABEL_FLAG_UPDATE_STREAM; - } - } - } - - if(answer != 0) { - error("STREAM %s [send to %s]: server is not replying properly (is it a netdata?).", host->hostname, connected_to); - rrdpush_sender_thread_close_socket(host); - return 0; - } - - info("STREAM %s [send to %s]: established communication with a master using protocol version %u - ready to send metrics..." - , host->hostname - , connected_to - , version); - - if(sock_setnonblock(host->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", host->hostname, connected_to); - - if(sock_enlarge_out(host->rrdpush_sender_socket) < 0) - error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", host->hostname, connected_to); - - debug(D_STREAM, "STREAM: Connected on fd %d...", host->rrdpush_sender_socket); - - return 1; -} - -static void rrdpush_sender_thread_cleanup_callback(void *ptr) { - RRDHOST *host = (RRDHOST *)ptr; - - rrdpush_buffer_lock(host); - rrdhost_wrlock(host); - - info("STREAM %s [send]: sending thread cleans up...", host->hostname); - - rrdpush_sender_thread_close_socket(host); - - // close the pipe - if(host->rrdpush_sender_pipe[PIPE_READ] != -1) { - close(host->rrdpush_sender_pipe[PIPE_READ]); - host->rrdpush_sender_pipe[PIPE_READ] = -1; - } - - if(host->rrdpush_sender_pipe[PIPE_WRITE] != -1) { - close(host->rrdpush_sender_pipe[PIPE_WRITE]); - host->rrdpush_sender_pipe[PIPE_WRITE] = -1; - } - - buffer_free(host->rrdpush_sender_buffer); - host->rrdpush_sender_buffer = NULL; - - if(!host->rrdpush_sender_join) { - info("STREAM %s [send]: sending thread detaches itself.", host->hostname); - netdata_thread_detach(netdata_thread_self()); - } - - host->rrdpush_sender_spawn = 0; - - info("STREAM %s [send]: sending thread now exits.", host->hostname); - - rrdhost_unlock(host); - rrdpush_buffer_unlock(host); -} - -void *rrdpush_sender_thread(void *ptr) { - RRDHOST *host = (RRDHOST *)ptr; - - if(!host->rrdpush_send_enabled || !host->rrdpush_send_destination || !*host->rrdpush_send_destination || !host->rrdpush_send_api_key || !*host->rrdpush_send_api_key) { - error("STREAM %s [send]: thread created (task id %d), but host has streaming disabled.", host->hostname, gettid()); - return NULL; - } - -#ifdef ENABLE_HTTPS - if (netdata_use_ssl_on_stream & NETDATA_SSL_FORCE ){ - security_start_ssl(NETDATA_SSL_CONTEXT_STREAMING); - security_location_for_context(netdata_client_ctx, netdata_ssl_ca_file, netdata_ssl_ca_path); - } -#endif - - info("STREAM %s [send]: thread created (task id %d)", host->hostname, gettid()); - - int timeout = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "timeout seconds", 60); - int default_port = (int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "default port", 19999); - size_t max_size = (size_t)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "buffer size bytes", 1024 * 1024); - unsigned int reconnect_delay = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "reconnect delay seconds", 5); - remote_clock_resync_iterations = (unsigned int)appconfig_get_number(&stream_config, CONFIG_SECTION_STREAM, "initial clock resync iterations", remote_clock_resync_iterations); - char connected_to[CONNECTED_TO_SIZE + 1] = ""; - - // initialize rrdpush globals - host->rrdpush_sender_buffer = buffer_create(1); - host->rrdpush_sender_connected = 0; - if(pipe(host->rrdpush_sender_pipe) == -1) fatal("STREAM %s [send]: cannot create required pipe.", host->hostname); - - // initialize local variables - size_t begin = 0; - size_t reconnects_counter = 0; - size_t sent_bytes = 0; - size_t sent_bytes_on_this_connection = 0; - size_t send_attempts = 0; - - - time_t last_sent_t = 0; - struct pollfd fds[2], *ifd, *ofd; - nfds_t fdmax; - - ifd = &fds[0]; - ofd = &fds[1]; - - size_t not_connected_loops = 0; - - netdata_thread_cleanup_push(rrdpush_sender_thread_cleanup_callback, host); - - for(; host->rrdpush_send_enabled && !netdata_exit ;) { - // check for outstanding cancellation requests - netdata_thread_testcancel(); - - // if we don't have socket open, lets wait a bit - if(unlikely(host->rrdpush_sender_socket == -1)) { - send_attempts = 0; - - if(not_connected_loops == 0 && sent_bytes_on_this_connection > 0) { - // fast re-connection on first disconnect - sleep_usec(USEC_PER_MS * 500); // milliseconds - } - else { - // slow re-connection on repeating errors - sleep_usec(USEC_PER_SEC * reconnect_delay); // seconds - } - - if(rrdpush_sender_thread_connect_to_master(host, default_port, timeout, &reconnects_counter, connected_to, CONNECTED_TO_SIZE)) { - last_sent_t = now_monotonic_sec(); - - // reset the buffer, to properly send charts and metrics - rrdpush_sender_thread_data_flush(host); - - // send from the beginning - begin = 0; - - // make sure the next reconnection will be immediate - not_connected_loops = 0; - - // reset the bytes we have sent for this session - sent_bytes_on_this_connection = 0; - - // let the data collection threads know we are ready - host->rrdpush_sender_connected = 1; - } - else { - // increase the failed connections counter - not_connected_loops++; - - // reset the number of bytes sent - sent_bytes_on_this_connection = 0; - } - - // loop through - continue; - } - else if(unlikely(now_monotonic_sec() - last_sent_t > timeout)) { - error("STREAM %s [send to %s]: could not send metrics for %d seconds - closing connection - we have sent %zu bytes on this connection via %zu send attempts.", host->hostname, connected_to, timeout, sent_bytes_on_this_connection, send_attempts); - rrdpush_sender_thread_close_socket(host); - } - - ifd->fd = host->rrdpush_sender_pipe[PIPE_READ]; - ifd->events = POLLIN; - ifd->revents = 0; - - ofd->fd = host->rrdpush_sender_socket; - ofd->revents = 0; - if(ofd->fd != -1 && begin < buffer_strlen(host->rrdpush_sender_buffer)) { - debug(D_STREAM, "STREAM: Requesting data output on streaming socket %d...", ofd->fd); - ofd->events = POLLOUT; - fdmax = 2; - send_attempts++; - } - else { - debug(D_STREAM, "STREAM: Not requesting data output on streaming socket %d (nothing to send now)...", ofd->fd); - ofd->events = 0; - fdmax = 1; - } - - debug(D_STREAM, "STREAM: Waiting for poll() events (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer)); - if(unlikely(netdata_exit)) break; - int retval = poll(fds, fdmax, 1000); - if(unlikely(netdata_exit)) break; - - if(unlikely(retval == -1)) { - debug(D_STREAM, "STREAM: poll() failed (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer)); - - if(errno == EAGAIN || errno == EINTR) { - debug(D_STREAM, "STREAM: poll() failed with EAGAIN or EINTR..."); - } - else { - error("STREAM %s [send to %s]: failed to poll(). Closing socket.", host->hostname, connected_to); - rrdpush_sender_thread_close_socket(host); - } - - continue; - } - else if(likely(retval)) { - if (ifd->revents & POLLIN || ifd->revents & POLLPRI) { - debug(D_STREAM, "STREAM: Data added to send buffer (current buffer length %zu bytes)...", buffer_strlen(host->rrdpush_sender_buffer)); - - char buffer[1000 + 1]; - if (read(host->rrdpush_sender_pipe[PIPE_READ], buffer, 1000) == -1) - error("STREAM %s [send to %s]: cannot read from internal pipe.", host->hostname, connected_to); - } - - if (ofd->revents & POLLOUT) { - rrdpush_send_labels(host); - - if (begin < buffer_strlen(host->rrdpush_sender_buffer)) { - debug(D_STREAM, "STREAM: Sending data (current buffer length %zu bytes, begin = %zu)...", buffer_strlen(host->rrdpush_sender_buffer), begin); - - // BEGIN RRDPUSH LOCKED SESSION - - // during this session, data collectors - // will not be able to append data to our buffer - // but the socket is in non-blocking mode - // so, we will not block at send() - - netdata_thread_disable_cancelability(); - - debug(D_STREAM, "STREAM: Getting exclusive lock on host..."); - rrdpush_buffer_lock(host); - - debug(D_STREAM, "STREAM: Sending data, starting from %zu, size %zu...", begin, buffer_strlen(host->rrdpush_sender_buffer)); - ssize_t ret; -#ifdef ENABLE_HTTPS - SSL *conn = host->ssl.conn ; - if(conn && !host->ssl.flags) { - ret = SSL_write(conn,&host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin); - } else { - ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT); - } -#else - ret = send(host->rrdpush_sender_socket, &host->rrdpush_sender_buffer->buffer[begin], buffer_strlen(host->rrdpush_sender_buffer) - begin, MSG_DONTWAIT); -#endif - if (unlikely(ret == -1)) { - if (errno != EAGAIN && errno != EINTR && errno != EWOULDBLOCK) { - debug(D_STREAM, "STREAM: Send failed - closing socket..."); - error("STREAM %s [send to %s]: failed to send metrics - closing connection - we have sent %zu bytes on this connection.", host->hostname, connected_to, sent_bytes_on_this_connection); - rrdpush_sender_thread_close_socket(host); - } - else { - debug(D_STREAM, "STREAM: Send failed - will retry..."); - } - } - else if (likely(ret > 0)) { - // DEBUG - dump the string to see it - //char c = host->rrdpush_sender_buffer->buffer[begin + ret]; - //host->rrdpush_sender_buffer->buffer[begin + ret] = '\0'; - //debug(D_STREAM, "STREAM: sent from %zu to %zd:\n%s\n", begin, ret, &host->rrdpush_sender_buffer->buffer[begin]); - //host->rrdpush_sender_buffer->buffer[begin + ret] = c; - - sent_bytes_on_this_connection += ret; - sent_bytes += ret; - begin += ret; - - if (begin == buffer_strlen(host->rrdpush_sender_buffer)) { - // we send it all - - debug(D_STREAM, "STREAM: Sent %zd bytes (the whole buffer)...", ret); - buffer_flush(host->rrdpush_sender_buffer); - begin = 0; - } - else { - debug(D_STREAM, "STREAM: Sent %zd bytes (part of the data buffer)...", ret); - } - - last_sent_t = now_monotonic_sec(); - } - else { - debug(D_STREAM, "STREAM: send() returned %zd - closing the socket...", ret); - error("STREAM %s [send to %s]: failed to send metrics (send() returned %zd) - closing connection - we have sent %zu bytes on this connection.", - host->hostname, connected_to, ret, sent_bytes_on_this_connection); - rrdpush_sender_thread_close_socket(host); - } - - debug(D_STREAM, "STREAM: Releasing exclusive lock on host..."); - rrdpush_buffer_unlock(host); - - netdata_thread_enable_cancelability(); - - // END RRDPUSH LOCKED SESSION - } - else { - debug(D_STREAM, "STREAM: we have sent the entire buffer, but we received POLLOUT..."); - } - } - - if(host->rrdpush_sender_socket != -1) { - char *error = NULL; - - if (unlikely(ofd->revents & POLLERR)) - error = "socket reports errors (POLLERR)"; - - else if (unlikely(ofd->revents & POLLHUP)) - error = |