summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c257
1 files changed, 145 insertions, 112 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index c8e53cc349..1dd3167ce4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -252,52 +252,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HAN
return true;
}
-/* Produce a full line if one exists, statefully return where we start next time.
- * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
- */
-inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
- buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
-
- size_t start = reader->pos;
-
- char *ss = &reader->read_buffer[start];
- char *se = &reader->read_buffer[reader->read_len];
- char *ds = &dst->buffer[dst->len];
- char *de = &ds[dst->size - dst->len - 2];
-
- if(ss >= se) {
- *ds = '\0';
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
- }
-
- // copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n') {
- *ds++ = *ss++;
- dst->len++;
- }
-
- // if we have a newline, return the buffer
- if(ss < se && ds < de && *ss == '\n') {
- // newline found in the r->read_buffer
-
- *ds++ = *ss++; // copy the newline too
- dst->len++;
-
- *ds = '\0';
-
- reader->pos = ss - reader->read_buffer;
- return true;
- }
-
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
-}
-
bool plugin_is_enabled(struct plugind *cd);
static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) {
@@ -356,45 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- bool compressed_connection = rrdpush_decompression_initialize(rpt);
+ {
+ bool compressed_connection = rrdpush_decompression_initialize(rpt);
- buffered_reader_init(&rpt->reader);
+ buffered_reader_init(&rpt->reader);
#ifdef NETDATA_LOG_STREAM_RECEIVE
- {
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(rpt->host) : "unknown");
- parser->user.stream_log_fp = fopen(filename, "w");
- parser->user.stream_log_repertoire = PARSER_REP_METADATA;
- }
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
+ rpt->host) : "unknown"
+ );
+ parser->user.stream_log_fp = fopen(filename, "w");
+ parser->user.stream_log_repertoire = PARSER_REP_METADATA;
+ }
#endif
- BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- while(!receiver_should_stop(rpt)) {
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- if(!buffered_reader_next_line(&rpt->reader, buffer)) {
- STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) : receiver_read_uncompressed(rpt, &reason);
+ while(!receiver_should_stop(rpt)) {
- if(unlikely(!have_new_data)) {
- receiver_set_exit_reason(rpt, reason, false);
- break;
- }
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
+ STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
- continue;
- }
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
+ : receiver_read_uncompressed(rpt, &reason);
- if (unlikely(parser_action(parser, buffer->buffer))) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
- break;
- }
+ if(unlikely(!have_new_data)) {
+ receiver_set_exit_reason(rpt, reason, false);
+ break;
+ }
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
- result = parser->user.data_collections_count;
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
+ break;
+ }
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ result = parser->user.data_collections_count;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -435,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
if (rpt->config.alarms_delay > 0) {
host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
- netdata_log_health(
- "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
- rrdhost_hostname(host),
- (int64_t) rpt->config.alarms_delay);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(host),
+ (int64_t) rpt->config.alarms_delay);
}
}
@@ -556,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r
5);
}
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
-
- log_stream_connection(rpt->client_ip, rpt->client_port,
- (rpt->key && *rpt->key)? rpt->key : "-",
- (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-",
- (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
- status);
-
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "%s. "
- "STATUS: %s%s%s%s"
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- , msg
- , status
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
- , stream_handshake_error_to_string(rpt->exit.reason)
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
- );
-
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
+ // this function may be called BEFORE we spawn the receiver thread
+ // so, we need to add the fields again (it does not harm)
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""),
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'"
+ , (rpt->key && *rpt->key)? rpt->key : ""
+ , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : ""
+ , msg);
+
+ nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s"
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""
+ , msg
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
+ , stream_handshake_error_to_string(rpt->exit.reason)
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
+ );
}
static void rrdpush_receive(struct receiver_state *rpt)
@@ -688,13 +661,19 @@ static void rrdpush_receive(struct receiver_state *rpt)
);
if(!host) {
- rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt,"failed to find/create host structure, rejecting connection",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
goto cleanup;
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
- rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
+ rrdpush_receive_log_status(
+ rpt, "host is initializing, retry later",
+ RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
goto cleanup;
}
@@ -703,7 +682,10 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->system_info = NULL;
if(!rrdhost_set_receiver(host, rpt)) {
- rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt, "host is already served by another receiver",
+ RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
goto cleanup;
}
@@ -784,7 +766,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
if(bytes_sent != (ssize_t)strlen(initial_response)) {
internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
- rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt, "cannot reply back, dropping connection",
+ RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR);
goto cleanup;
}
#ifdef ENABLE_H2O
@@ -815,7 +799,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->fd);
}
- rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
+ rrdpush_receive_log_status(
+ rpt, "connected and ready to receive data",
+ RRDPUSH_STATUS_CONNECTED, NDLP_INFO);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
@@ -842,7 +828,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
char msg[100 + 1];
snprintfz(msg, 100, "disconnected (completed %zu updates)", count);
- rrdpush_receive_log_status(rpt, msg, "DISCONNECTED");
+ rrdpush_receive_log_status(
+ rpt, msg,
+ RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
}
#ifdef ENABLE_ACLK
@@ -873,19 +861,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
rrdhost_set_is_parent_label();
}
+static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+ stream_capabilities_to_string(wb, rpt->capabilities);
+ return true;
+}
+
+static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
void *rrdpush_receiver_thread(void *ptr) {
netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
- worker_register("STREAMRCV");
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+ {
+ worker_register("STREAMRCV");
- struct receiver_state *rpt = (struct receiver_state *)ptr;
- rpt->tid = gettid();
- netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
+ "received bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
- rrdpush_receive(rpt);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
+ "uncompressed bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
+
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ "replication completion", "%",
+ WORKER_METRIC_ABSOLUTE);
+
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ rpt->tid = gettid();
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
+ ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
+ , rpt->client_port);
+
+ rrdpush_receive(rpt);
+ }
netdata_thread_cleanup_pop(1);
return NULL;