diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 257 |
1 files changed, 145 insertions, 112 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index c8e53cc349..1dd3167ce4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -252,52 +252,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HAN return true; } -/* Produce a full line if one exists, statefully return where we start next time. - * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. - */ -inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) { - buffer_need_bytes(dst, reader->read_len - reader->pos + 2); - - size_t start = reader->pos; - - char *ss = &reader->read_buffer[start]; - char *se = &reader->read_buffer[reader->read_len]; - char *ds = &dst->buffer[dst->len]; - char *de = &ds[dst->size - dst->len - 2]; - - if(ss >= se) { - *ds = '\0'; - reader->pos = 0; - reader->read_len = 0; - reader->read_buffer[reader->read_len] = '\0'; - return false; - } - - // copy all bytes to buffer - while(ss < se && ds < de && *ss != '\n') { - *ds++ = *ss++; - dst->len++; - } - - // if we have a newline, return the buffer - if(ss < se && ds < de && *ss == '\n') { - // newline found in the r->read_buffer - - *ds++ = *ss++; // copy the newline too - dst->len++; - - *ds = '\0'; - - reader->pos = ss - reader->read_buffer; - return true; - } - - reader->pos = 0; - reader->read_len = 0; - reader->read_buffer[reader->read_len] = '\0'; - return false; -} - bool plugin_is_enabled(struct plugind *cd); static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) { @@ -356,45 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i // so, parser needs to be allocated before pushing it netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser); - bool compressed_connection = rrdpush_decompression_initialize(rpt); + { + bool compressed_connection = rrdpush_decompression_initialize(rpt); - buffered_reader_init(&rpt->reader); + buffered_reader_init(&rpt->reader); #ifdef NETDATA_LOG_STREAM_RECEIVE - { - char filename[FILENAME_MAX + 1]; - snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(rpt->host) : "unknown"); - parser->user.stream_log_fp = fopen(filename, "w"); - parser->user.stream_log_repertoire = PARSER_REP_METADATA; - } + { + char filename[FILENAME_MAX + 1]; + snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname( + rpt->host) : "unknown" + ); + parser->user.stream_log_fp = fopen(filename, "w"); + parser->user.stream_log_repertoire = PARSER_REP_METADATA; + } #endif - BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - while(!receiver_should_stop(rpt)) { + CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL); - if(!buffered_reader_next_line(&rpt->reader, buffer)) { - STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line), + ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser), + ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser), + ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); - bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) : receiver_read_uncompressed(rpt, &reason); + while(!receiver_should_stop(rpt)) { - if(unlikely(!have_new_data)) { - receiver_set_exit_reason(rpt, reason, false); - break; - } + if(!buffered_reader_next_line(&rpt->reader, buffer)) { + STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR; - continue; - } + bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) + : receiver_read_uncompressed(rpt, &reason); - if (unlikely(parser_action(parser, buffer->buffer))) { - receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); - break; - } + if(unlikely(!have_new_data)) { + receiver_set_exit_reason(rpt, reason, false); + break; + } - buffer->len = 0; - buffer->buffer[0] = '\0'; - } - buffer_free(buffer); - result = parser->user.data_collections_count; + continue; + } + + if(unlikely(parser_action(parser, buffer->buffer))) { + receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false); + break; + } + + buffer->len = 0; + buffer->buffer[0] = '\0'; + } + result = parser->user.data_collections_count; + } // free parser with the pop function netdata_thread_cleanup_pop(1); @@ -435,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { if (rpt->config.alarms_delay > 0) { host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; - netdata_log_health( - "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", - rrdhost_hostname(host), - (int64_t) rpt->config.alarms_delay); + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(host), + (int64_t) rpt->config.alarms_delay); } } @@ -556,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r 5); } -void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { - - log_stream_connection(rpt->client_ip, rpt->client_port, - (rpt->key && *rpt->key)? rpt->key : "-", - (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-", - (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", - status); - - netdata_log_info("STREAM '%s' [receive from [%s]:%s]: " - "%s. " - "STATUS: %s%s%s%s" - , rpt->hostname - , rpt->client_ip, rpt->client_port - , msg - , status - , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" - , stream_handshake_error_to_string(rpt->exit.reason) - , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" - ); - +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) { + // this function may be called BEFORE we spawn the receiver thread + // so, we need to add the fields again (it does not harm) + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""), + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status), + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'" + , (rpt->key && *rpt->key)? rpt->key : "" + , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "" + , msg); + + nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s" + , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "" + , msg + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":"" + , stream_handshake_error_to_string(rpt->exit.reason) + , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":"" + ); } static void rrdpush_receive(struct receiver_state *rpt) @@ -688,13 +661,19 @@ static void rrdpush_receive(struct receiver_state *rpt) ); if(!host) { - rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt,"failed to find/create host structure, rejecting connection", + RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR); goto cleanup; } if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) { - rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER"); + rrdpush_receive_log_status( + rpt, "host is initializing, retry later", + RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION); goto cleanup; } @@ -703,7 +682,10 @@ static void rrdpush_receive(struct receiver_state *rpt) rpt->system_info = NULL; if(!rrdhost_set_receiver(host, rpt)) { - rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt, "host is already served by another receiver", + RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO); + rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING); goto cleanup; } @@ -784,7 +766,9 @@ static void rrdpush_receive(struct receiver_state *rpt) if(bytes_sent != (ssize_t)strlen(initial_response)) { internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response)); - rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION"); + rrdpush_receive_log_status( + rpt, "cannot reply back, dropping connection", + RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR); goto cleanup; } #ifdef ENABLE_H2O @@ -815,7 +799,9 @@ static void rrdpush_receive(struct receiver_state *rpt) , rpt->fd); } - rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED"); + rrdpush_receive_log_status( + rpt, "connected and ready to receive data", + RRDPUSH_STATUS_CONNECTED, NDLP_INFO); #ifdef ENABLE_ACLK // in case we have cloud connection we inform cloud @@ -842,7 +828,9 @@ static void rrdpush_receive(struct receiver_state *rpt) { char msg[100 + 1]; snprintfz(msg, 100, "disconnected (completed %zu updates)", count); - rrdpush_receive_log_status(rpt, msg, "DISCONNECTED"); + rrdpush_receive_log_status( + rpt, msg, + RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING); } #ifdef ENABLE_ACLK @@ -873,19 +861,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) { rrdhost_set_is_parent_label(); } +static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) { + struct receiver_state *rpt = ptr; + if(!rpt) + return false; + + stream_capabilities_to_string(wb, rpt->capabilities); + return true; +} + +static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) { + struct receiver_state *rpt = ptr; + if(!rpt) + return false; + +#ifdef ENABLE_HTTPS + buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http"); +#else + buffer_strcat(wb, "http"); +#endif + return true; +} + void *rrdpush_receiver_thread(void *ptr) { netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr); - worker_register("STREAMRCV"); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT); - worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE); + { + worker_register("STREAMRCV"); - struct receiver_state *rpt = (struct receiver_state *)ptr; - rpt->tid = gettid(); - netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, + "received bytes", "bytes/s", + WORKER_METRIC_INCREMENT); - rrdpush_receive(rpt); + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, + "uncompressed bytes", "bytes/s", + WORKER_METRIC_INCREMENT); + + worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, + "replication completion", "%", + WORKER_METRIC_ABSOLUTE); + + struct receiver_state *rpt = (struct receiver_state *) ptr; + rpt->tid = gettid(); + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip), + ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port), + ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname), + ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip + , rpt->client_port); + + rrdpush_receive(rpt); + } netdata_thread_cleanup_pop(1); return NULL; |