summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c257
-rw-r--r--streaming/replication.c25
-rw-r--r--streaming/rrdpush.c140
-rw-r--r--streaming/rrdpush.h43
-rw-r--r--streaming/sender.c338
5 files changed, 490 insertions, 313 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index c8e53cc349..1dd3167ce4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -252,52 +252,6 @@ static inline bool receiver_read_compressed(struct receiver_state *r, STREAM_HAN
return true;
}
-/* Produce a full line if one exists, statefully return where we start next time.
- * When we hit the end of the buffer with a partial line move it to the beginning for the next fill.
- */
-inline bool buffered_reader_next_line(struct buffered_reader *reader, BUFFER *dst) {
- buffer_need_bytes(dst, reader->read_len - reader->pos + 2);
-
- size_t start = reader->pos;
-
- char *ss = &reader->read_buffer[start];
- char *se = &reader->read_buffer[reader->read_len];
- char *ds = &dst->buffer[dst->len];
- char *de = &ds[dst->size - dst->len - 2];
-
- if(ss >= se) {
- *ds = '\0';
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
- }
-
- // copy all bytes to buffer
- while(ss < se && ds < de && *ss != '\n') {
- *ds++ = *ss++;
- dst->len++;
- }
-
- // if we have a newline, return the buffer
- if(ss < se && ds < de && *ss == '\n') {
- // newline found in the r->read_buffer
-
- *ds++ = *ss++; // copy the newline too
- dst->len++;
-
- *ds = '\0';
-
- reader->pos = ss - reader->read_buffer;
- return true;
- }
-
- reader->pos = 0;
- reader->read_len = 0;
- reader->read_buffer[reader->read_len] = '\0';
- return false;
-}
-
bool plugin_is_enabled(struct plugind *cd);
static void receiver_set_exit_reason(struct receiver_state *rpt, STREAM_HANDSHAKE reason, bool force) {
@@ -356,45 +310,59 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
// so, parser needs to be allocated before pushing it
netdata_thread_cleanup_push(pluginsd_process_thread_cleanup, parser);
- bool compressed_connection = rrdpush_decompression_initialize(rpt);
+ {
+ bool compressed_connection = rrdpush_decompression_initialize(rpt);
- buffered_reader_init(&rpt->reader);
+ buffered_reader_init(&rpt->reader);
#ifdef NETDATA_LOG_STREAM_RECEIVE
- {
- char filename[FILENAME_MAX + 1];
- snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(rpt->host) : "unknown");
- parser->user.stream_log_fp = fopen(filename, "w");
- parser->user.stream_log_repertoire = PARSER_REP_METADATA;
- }
+ {
+ char filename[FILENAME_MAX + 1];
+ snprintfz(filename, FILENAME_MAX, "/tmp/stream-receiver-%s.txt", rpt->host ? rrdhost_hostname(
+ rpt->host) : "unknown"
+ );
+ parser->user.stream_log_fp = fopen(filename, "w");
+ parser->user.stream_log_repertoire = PARSER_REP_METADATA;
+ }
#endif
- BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- while(!receiver_should_stop(rpt)) {
+ CLEAN_BUFFER *buffer = buffer_create(sizeof(rpt->reader.read_buffer), NULL);
- if(!buffered_reader_next_line(&rpt->reader, buffer)) {
- STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &parser->line),
+ ND_LOG_FIELD_CB(NDF_NIDL_NODE, parser_reconstruct_node, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_INSTANCE, parser_reconstruct_instance, parser),
+ ND_LOG_FIELD_CB(NDF_NIDL_CONTEXT, parser_reconstruct_context, parser),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
- bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason) : receiver_read_uncompressed(rpt, &reason);
+ while(!receiver_should_stop(rpt)) {
- if(unlikely(!have_new_data)) {
- receiver_set_exit_reason(rpt, reason, false);
- break;
- }
+ if(!buffered_reader_next_line(&rpt->reader, buffer)) {
+ STREAM_HANDSHAKE reason = STREAM_HANDSHAKE_DISCONNECT_UNKNOWN_SOCKET_READ_ERROR;
- continue;
- }
+ bool have_new_data = compressed_connection ? receiver_read_compressed(rpt, &reason)
+ : receiver_read_uncompressed(rpt, &reason);
- if (unlikely(parser_action(parser, buffer->buffer))) {
- receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
- break;
- }
+ if(unlikely(!have_new_data)) {
+ receiver_set_exit_reason(rpt, reason, false);
+ break;
+ }
- buffer->len = 0;
- buffer->buffer[0] = '\0';
- }
- buffer_free(buffer);
- result = parser->user.data_collections_count;
+ continue;
+ }
+
+ if(unlikely(parser_action(parser, buffer->buffer))) {
+ receiver_set_exit_reason(rpt, STREAM_HANDSHAKE_DISCONNECT_PARSER_FAILED, false);
+ break;
+ }
+
+ buffer->len = 0;
+ buffer->buffer[0] = '\0';
+ }
+ result = parser->user.data_collections_count;
+ }
// free parser with the pop function
netdata_thread_cleanup_pop(1);
@@ -435,10 +403,10 @@ static bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
if (rpt->config.alarms_delay > 0) {
host->health.health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
- netdata_log_health(
- "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
- rrdhost_hostname(host),
- (int64_t) rpt->config.alarms_delay);
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(host),
+ (int64_t) rpt->config.alarms_delay);
}
}
@@ -556,26 +524,31 @@ static void rrdpush_send_error_on_taken_over_connection(struct receiver_state *r
5);
}
-void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
-
- log_stream_connection(rpt->client_ip, rpt->client_port,
- (rpt->key && *rpt->key)? rpt->key : "-",
- (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-",
- (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
- status);
-
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "%s. "
- "STATUS: %s%s%s%s"
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- , msg
- , status
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
- , stream_handshake_error_to_string(rpt->exit.reason)
- , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
- );
-
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status, ND_LOG_FIELD_PRIORITY priority) {
+ // this function may be called BEFORE we spawn the receiver thread
+ // so, we need to add the fields again (it does not harm)
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""),
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_from_child_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_ACCESS, priority, "api_key:'%s' machine_guid:'%s' msg:'%s'"
+ , (rpt->key && *rpt->key)? rpt->key : ""
+ , (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : ""
+ , msg);
+
+ nd_log(NDLS_DAEMON, priority, "STREAM_RECEIVER for '%s': %s %s%s%s"
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : ""
+ , msg
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?" (":""
+ , stream_handshake_error_to_string(rpt->exit.reason)
+ , rpt->exit.reason != STREAM_HANDSHAKE_NEVER?")":""
+ );
}
static void rrdpush_receive(struct receiver_state *rpt)
@@ -688,13 +661,19 @@ static void rrdpush_receive(struct receiver_state *rpt)
);
if(!host) {
- rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt,"failed to find/create host structure, rejecting connection",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INTERNAL_ERROR);
goto cleanup;
}
if (unlikely(rrdhost_flag_check(host, RRDHOST_FLAG_PENDING_CONTEXT_LOAD))) {
- rrdpush_receive_log_status(rpt, "host is initializing", "INITIALIZATION IN PROGRESS RETRY LATER");
+ rrdpush_receive_log_status(
+ rpt, "host is initializing, retry later",
+ RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, NDLP_NOTICE);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_INITIALIZATION);
goto cleanup;
}
@@ -703,7 +682,10 @@ static void rrdpush_receive(struct receiver_state *rpt)
rpt->system_info = NULL;
if(!rrdhost_set_receiver(host, rpt)) {
- rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt, "host is already served by another receiver",
+ RRDPUSH_STATUS_DUPLICATE_RECEIVER, NDLP_INFO);
+
rrdpush_send_error_on_taken_over_connection(rpt, START_STREAMING_ERROR_ALREADY_STREAMING);
goto cleanup;
}
@@ -784,7 +766,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
if(bytes_sent != (ssize_t)strlen(initial_response)) {
internal_error(true, "Cannot send response, got %zd bytes, expecting %zu bytes", bytes_sent, strlen(initial_response));
- rrdpush_receive_log_status(rpt, "cannot reply back", "CANT REPLY DROPPING CONNECTION");
+ rrdpush_receive_log_status(
+ rpt, "cannot reply back, dropping connection",
+ RRDPUSH_STATUS_CANT_REPLY, NDLP_ERR);
goto cleanup;
}
#ifdef ENABLE_H2O
@@ -815,7 +799,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
, rpt->fd);
}
- rrdpush_receive_log_status(rpt, "ready to receive data", "CONNECTED");
+ rrdpush_receive_log_status(
+ rpt, "connected and ready to receive data",
+ RRDPUSH_STATUS_CONNECTED, NDLP_INFO);
#ifdef ENABLE_ACLK
// in case we have cloud connection we inform cloud
@@ -842,7 +828,9 @@ static void rrdpush_receive(struct receiver_state *rpt)
{
char msg[100 + 1];
snprintfz(msg, 100, "disconnected (completed %zu updates)", count);
- rrdpush_receive_log_status(rpt, msg, "DISCONNECTED");
+ rrdpush_receive_log_status(
+ rpt, msg,
+ RRDPUSH_STATUS_DISCONNECTED, NDLP_WARNING);
}
#ifdef ENABLE_ACLK
@@ -873,19 +861,64 @@ static void rrdpush_receiver_thread_cleanup(void *ptr) {
rrdhost_set_is_parent_label();
}
+static bool stream_receiver_log_capabilities(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+ stream_capabilities_to_string(wb, rpt->capabilities);
+ return true;
+}
+
+static bool stream_receiver_log_transport(BUFFER *wb, void *ptr) {
+ struct receiver_state *rpt = ptr;
+ if(!rpt)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&rpt->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
void *rrdpush_receiver_thread(void *ptr) {
netdata_thread_cleanup_push(rrdpush_receiver_thread_cleanup, ptr);
- worker_register("STREAMRCV");
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ, "received bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, "uncompressed bytes", "bytes/s", WORKER_METRIC_INCREMENT);
- worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION, "replication completion", "%", WORKER_METRIC_ABSOLUTE);
+ {
+ worker_register("STREAMRCV");
- struct receiver_state *rpt = (struct receiver_state *)ptr;
- rpt->tid = gettid();
- netdata_log_info("STREAM %s [%s]:%s: receive thread created (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->tid);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_READ,
+ "received bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
- rrdpush_receive(rpt);
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED,
+ "uncompressed bytes", "bytes/s",
+ WORKER_METRIC_INCREMENT);
+
+ worker_register_job_custom_metric(WORKER_RECEIVER_JOB_REPLICATION_COMPLETION,
+ "replication completion", "%",
+ WORKER_METRIC_ABSOLUTE);
+
+ struct receiver_state *rpt = (struct receiver_state *) ptr;
+ rpt->tid = gettid();
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_SRC_IP, rpt->client_ip),
+ ND_LOG_FIELD_TXT(NDF_SRC_PORT, rpt->client_port),
+ ND_LOG_FIELD_TXT(NDF_NIDL_NODE, rpt->hostname),
+ ND_LOG_FIELD_CB(NDF_SRC_TRANSPORT, stream_receiver_log_transport, rpt),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_receiver_log_capabilities, rpt),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ netdata_log_info("STREAM %s [%s]:%s: receive thread started", rpt->hostname, rpt->client_ip
+ , rpt->client_port);
+
+ rrdpush_receive(rpt);
+ }
netdata_thread_cleanup_pop(1);
return NULL;
diff --git a/streaming/replication.c b/streaming/replication.c
index 607348afd9..b722d47371 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -352,8 +352,8 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
if(max_skip <= 0) {
d->skip = true;
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
"STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query "
"beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
@@ -402,14 +402,15 @@ static bool replication_query_execute(BUFFER *wb, struct replication_query *q, s
fix_min_start_time = min_end_time - min_update_every;
#ifdef NETDATA_INTERNAL_CHECKS
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl, "REPLAY WARNING: 'host:%s/chart:%s' "
- "misaligned dimensions, "
- "update every (min: %ld, max: %ld), "
- "start time (min: %ld, max: %ld), "
- "end time (min %ld, max %ld), "
- "now %ld, last end time sent %ld, "
- "min start time is fixed to %ld",
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_WARNING,
+ "REPLAY WARNING: 'host:%s/chart:%s' "
+ "misaligned dimensions, "
+ "update every (min: %ld, max: %ld), "
+ "start time (min: %ld, max: %ld), "
+ "end time (min %ld, max %ld), "
+ "now %ld, last end time sent %ld, "
+ "min start time is fixed to %ld",
rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st),
min_update_every, max_update_every,
min_start_time, max_start_time,
@@ -757,8 +758,8 @@ static void replicate_log_request(struct replication_request_details *r, const c
#ifdef NETDATA_INTERNAL_CHECKS
internal_error(true,
#else
- error_limit_static_global_var(erl, 1, 0);
- error_limit(&erl,
+ nd_log_limit_static_global_var(erl, 1, 0);
+ nd_log_limit(&erl, NDLS_DAEMON, NDLP_ERR,
#endif
"REPLAY ERROR: 'host:%s/chart:%s' child sent: "
"db from %ld to %ld%s, wall clock time %ld, "
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 1a9dedb4fd..e9f12689f9 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -57,12 +57,12 @@ static void load_stream_conf() {
errno = 0;
char *filename = strdupz_path_subpath(netdata_configured_user_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL)) {
- netdata_log_info("CONFIG: cannot load user config '%s'. Will try stock config.", filename);
+ nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load user config '%s'. Will try stock config.", filename);
freez(filename);
filename = strdupz_path_subpath(netdata_configured_stock_config_dir, "stream.conf");
if(!appconfig_load(&stream_config, filename, 0, NULL))
- netdata_log_info("CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
+ nd_log_daemon(NDLP_NOTICE, "CONFIG: cannot load stock config '%s'. Running with internal defaults.", filename);
}
freez(filename);
}
@@ -128,7 +128,7 @@ int rrdpush_init() {
rrdpush_compression_levels[COMPRESSION_ALGORITHM_GZIP]);
if(default_rrdpush_enabled && (!default_rrdpush_destination || !*default_rrdpush_destination || !default_rrdpush_api_key || !*default_rrdpush_api_key)) {
- netdata_log_error("STREAM [send]: cannot enable sending thread - information is missing.");
+ nd_log_daemon(NDLP_WARNING, "STREAM [send]: cannot enable sending thread - information is missing.");
default_rrdpush_enabled = 0;
}
@@ -136,7 +136,7 @@ int rrdpush_init() {
netdata_ssl_validate_certificate_sender = !appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM, "ssl skip certificate verification", !netdata_ssl_validate_certificate);
if(!netdata_ssl_validate_certificate_sender)
- netdata_log_info("SSL: streaming senders will skip SSL certificates verification.");
+ nd_log_daemon(NDLP_NOTICE, "SSL: streaming senders will skip SSL certificates verification.");
netdata_ssl_ca_path = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CApath", NULL);
netdata_ssl_ca_file = appconfig_get(&stream_config, CONFIG_SECTION_STREAM, "CAfile", NULL);
@@ -542,13 +542,13 @@ RRDSET_STREAM_BUFFER rrdset_push_metric_initialize(RRDSET *st, time_t wall_clock
if(unlikely(!(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS))) {
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
- netdata_log_error("STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_NOTICE, "STREAM %s [send]: not ready - collected metrics are not sent to parent.", rrdhost_hostname(host));
}
return (RRDSET_STREAM_BUFFER) { .wb = NULL, };
}
else if(unlikely(host_flags & RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS)) {
- netdata_log_info("STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send]: sending metrics to parent...", rrdhost_hostname(host));
rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_LOGGED_STATUS);
}
@@ -741,11 +741,9 @@ int connect_to_one_of_destinations(
if(d->postpone_reconnection_until > now)
continue;
- internal_error(true,
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
"STREAM %s: connecting to '%s' (default port: %d)...",
- rrdhost_hostname(host),
- string2str(d->destination),
- default_port);
+ rrdhost_hostname(host), string2str(d->destination), default_port);
if (reconnects_counter)
*reconnects_counter += 1;
@@ -798,7 +796,7 @@ bool destinations_init_add_one(char *entry, void *data) {
DOUBLE_LINKED_LIST_APPEND_ITEM_UNSAFE(t->list, d, prev, next);
t->count++;
- netdata_log_info("STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
+ nd_log_daemon(NDLP_INFO, "STREAM: added streaming destination No %d: '%s' to host '%s'", t->count, string2str(d->destination), rrdhost_hostname(t->host));
return false; // we return false, so that we will get all defined destinations
}
@@ -867,11 +865,6 @@ void rrdpush_sender_thread_stop(RRDHOST *host, STREAM_HANDSHAKE reason, bool wai
// ----------------------------------------------------------------------------
// rrdpush receiver thread
-void log_stream_connection(const char *client_ip, const char *client_port, const char *api_key, const char *machine_guid, const char *host, const char *msg) {
- netdata_log_access("STREAM: %d '[%s]:%s' '%s' host '%s' api key '%s' machine guid '%s'", gettid(), client_ip, client_port, msg, host, api_key, machine_guid);
-}
-
-
static void rrdpush_sender_thread_spawn(RRDHOST *host) {
sender_lock(host->sender);
@@ -880,7 +873,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
snprintfz(tag, NETDATA_THREAD_TAG_MAX, THREAD_TAG_STREAM_SENDER "[%s]", rrdhost_hostname(host));
if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
- netdata_log_error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
+ nd_log_daemon(NDLP_ERR, "STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
}
@@ -1040,7 +1033,7 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rpt->capabilities = convert_stream_version_to_capabilities(1, NULL, false);
if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
"request has parameter '%s' = '%s', which is not used."
, (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
, rpt->client_ip, rpt->client_port
@@ -1069,9 +1062,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->key || !*rpt->key) {
rrdpush_receive_log_status(
- rpt,
- "request without an API key",
- "NO API KEY PERMISSION DENIED");
+ rpt, "request without an API key, rejecting connection",
+ RRDPUSH_STATUS_NO_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1079,9 +1071,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->hostname || !*rpt->hostname) {
rrdpush_receive_log_status(
- rpt,
- "request without a hostname",
- "NO HOSTNAME PERMISSION DENIED");
+ rpt, "request without a hostname, rejecting connection",
+ RRDPUSH_STATUS_NO_HOSTNAME, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1092,9 +1083,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!rpt->machine_guid || !*rpt->machine_guid) {
rrdpush_receive_log_status(
- rpt,
- "request without a machine GUID",
- "NO MACHINE GUID PERMISSION DENIED");
+ rpt, "request without a machine GUID, rejecting connection",
+ RRDPUSH_STATUS_NO_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1105,9 +1095,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (regenerate_guid(rpt->key, buf) == -1) {
rrdpush_receive_log_status(
- rpt,
- "API key is not a valid UUID (use the command uuidgen to generate one)",
- "INVALID API KEY PERMISSION DENIED");
+ rpt, "API key is not a valid UUID (use the command uuidgen to generate one)",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1115,9 +1104,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (regenerate_guid(rpt->machine_guid, buf) == -1) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not a valid UUID",
- "INVALID MACHINE GUID PERMISSION DENIED");
+ rpt, "machine GUID is not a valid UUID",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1128,9 +1116,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!api_key_type || !*api_key_type) api_key_type = "unknown";
if(strcmp(api_key_type, "api") != 0) {
rrdpush_receive_log_status(
- rpt,
- "API key is a machine GUID",
- "INVALID API KEY PERMISSION DENIED");
+ rpt, "API key is a machine GUID",
+ RRDPUSH_STATUS_INVALID_API_KEY, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1138,9 +1125,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
rrdpush_receive_log_status(
- rpt,
- "API key is not enabled",
- "API KEY DISABLED PERMISSION DENIED");
+ rpt, "API key is not enabled",
+ RRDPUSH_STATUS_API_KEY_DISABLED, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1156,9 +1142,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
simple_pattern_free(key_allow_from);
rrdpush_receive_log_status(
- rpt,
- "API key is not allowed from this IP",
- "NOT ALLOWED IP PERMISSION DENIED");
+ rpt, "API key is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1174,9 +1159,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if (strcmp(machine_guid_type, "machine") != 0) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is an API key",
- "INVALID MACHINE GUID PERMISSION DENIED");
+ rpt, "machine GUID is an API key",
+ RRDPUSH_STATUS_INVALID_MACHINE_GUID, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1185,9 +1169,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not enabled",
- "MACHINE GUID DISABLED PERMISSION DENIED");
+ rpt, "machine GUID is not enabled",
+ RRDPUSH_STATUS_MACHINE_GUID_DISABLED, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1203,9 +1186,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
simple_pattern_free(machine_allow_from);
rrdpush_receive_log_status(
- rpt,
- "machine GUID is not allowed from this IP",
- "NOT ALLOWED IP PERMISSION DENIED");
+ rpt, "machine GUID is not allowed from this IP",
+ RRDPUSH_STATUS_NOT_ALLOWED_IP, NDLP_WARNING);
receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
@@ -1220,9 +1202,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
rrdpush_receiver_takeover_web_connection(w, rpt);
rrdpush_receive_log_status(
- rpt,
- "machine GUID is my own",
- "LOCALHOST PERMISSION DENIED");
+ rpt, "machine GUID is my own",
+ RRDPUSH_STATUS_LOCALHOST, NDLP_DEBUG);
char initial_response[HTTP_HEADER_SIZE + 1];
snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
@@ -1233,11 +1214,11 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
#endif
rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
- netdata_log_error("STREAM '%s' [receive from [%s]:%s]: "
- "failed to reply."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ nd_log_daemon(NDLP_ERR, "STREAM '%s' [receive from [%s]:%s]: "
+ "failed to reply."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
receiver_state_free(rpt);
@@ -1263,9 +1244,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
(long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
rrdpush_receive_log_status(
- rpt,
- msg,
- "RATE LIMIT TRY LATER");
+ rpt, msg,
+ RRDPUSH_STATUS_RATE_LIMIT, NDLP_NOTICE);
receiver_state_free(rpt);
return rrdpush_receiver_too_busy_now(w);
@@ -1313,29 +1293,26 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
// we can proceed with this connection
receiver_stale = false;
- netdata_log_info("STREAM '%s' [receive from [%s]:%s]: "
- "stopped previous stale receiver to accept this one."
- , rpt->hostname
- , rpt->client_ip, rpt->client_port
- );
+ nd_log_daemon(NDLP_NOTICE, "STREAM '%s' [receive from [%s]:%s]: "
+ "stopped previous stale receiver to accept this one."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
}
if (receiver_working || receiver_stale) {
// another receiver is already connected
// try again later
-#ifdef NETDATA_INTERNAL_CHECKS
char msg[200 + 1];
snprintfz(msg, 200,
"multiple connections for same host, "
- "old connection was used %ld secs ago%s",
+ "old connection was last used %ld secs ago%s",
age, receiver_stale ? " (signaled old receiver to stop)" : " (new connection not accepted)");
rrdpush_receive_log_status(
- rpt,
- msg,
- "ALREADY CONNECTED");
-#endif
+ rpt, msg,
+ RRDPUSH_STATUS_ALREADY_CONNECTED, NDLP_DEBUG);
// Have not set WEB_CLIENT_FLAG_DONT_CLOSE_SOCKET - caller should clean up
buffer_flush(w->response.data);
@@ -1345,8 +1322,6 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
}
}
- netdata_log_debug(D_SYSTEM, "starting STREAM receive thread.");
-
rrdpush_receiver_takeover_web_connection(w, rpt);
char tag[NETDATA_THREAD_TAG_MAX + 1];
@@ -1355,9 +1330,8 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *decoded_query_stri
if(netdata_thread_create(&rpt->thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_receiver_thread, (void *)rpt)) {
rrdpush_receive_log_status(
- rpt,
- "can't create receiver thread",
- "INTERNAL SERVER ERROR");
+ rpt, "can't create receiver thread",
+ RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, NDLP_ERR);
buffer_flush(w->response.data);
buffer_strcat(w->response.data, "Can't handle this request");
@@ -1452,7 +1426,7 @@ static struct {
{0 , NULL },
};
-static void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
+void stream_capabilities_to_string(BUFFER *wb, STREAM_CAPABILITIES caps) {
for(size_t i = 0; capability_names[i].str ; i++) {
if(caps & capability_names[i].cap) {
buffer_strcat(wb, capability_names[i].str);
@@ -1479,8 +1453,8 @@ void log_receiver_capabilities(struct receiver_state *rpt) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, rpt->capabilities);
- netdata_log_info("STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
- rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [receive from [%s]:%s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port, buffer_tostring(wb));
buffer_free(wb);
}
@@ -1489,8 +1463,8 @@ void log_sender_capabilities(struct sender_state *s) {
BUFFER *wb = buffer_create(100, NULL);
stream_capabilities_to_string(wb, s->capabilities);
- netdata_log_info("STREAM %s [send to %s]: established link with negotiated capabilities: %s",
- rrdhost_hostname(s->host), s->connected_to, buffer_tostring(wb));
+ nd_log_daemon(NDLP_INFO, "STREAM %s [send to %s]: established link with negotiated capabilities: %s",
+ rrdhost_hostname(s->host), s->connected_to, buffer