diff options
Diffstat (limited to 'streaming/sender.c')
-rw-r--r-- | streaming/sender.c | 338 |
1 files changed, 247 insertions, 91 deletions
diff --git a/streaming/sender.c b/streaming/sender.c index 945807ec07..19bc219200 100644 --- a/streaming/sender.c +++ b/streaming/sender.c @@ -4,32 +4,33 @@ #include "common.h" #include "aclk/https_client.h" -#define WORKER_SENDER_JOB_CONNECT 0 -#define WORKER_SENDER_JOB_PIPE_READ 1 -#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2 -#define WORKER_SENDER_JOB_EXECUTE 3 -#define WORKER_SENDER_JOB_SOCKET_SEND 4 -#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5 -#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 -#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 -#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 -#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 -#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 -#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 -#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 -#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 -#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 -#define WORKER_SENDER_JOB_BUFFER_RATIO 15 -#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 -#define WORKER_SENDER_JOB_BYTES_SENT 17 -#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18 -#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19 -#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20 -#define WORKER_SENDER_JOB_REPLAY_REQUEST 21 -#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22 -#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23 - -#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21 +#define WORKER_SENDER_JOB_CONNECT 0 +#define WORKER_SENDER_JOB_PIPE_READ 1 +#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2 +#define WORKER_SENDER_JOB_EXECUTE 3 +#define WORKER_SENDER_JOB_SOCKET_SEND 4 +#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5 +#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6 +#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7 +#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8 +#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9 +#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10 +#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11 +#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12 +#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13 +#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14 +#define WORKER_SENDER_JOB_BUFFER_RATIO 15 +#define WORKER_SENDER_JOB_BYTES_RECEIVED 16 +#define WORKER_SENDER_JOB_BYTES_SENT 17 +#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18 +#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19 +#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20 +#define WORKER_SENDER_JOB_REPLAY_REQUEST 21 +#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22 +#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23 +#define WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION 24 + +#if WORKER_UTILIZATION_MAX_JOB_TYPES < 25 #error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21 #endif @@ -353,8 +354,7 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) { rrdpush_sender_charts_and_replication_reset(host); } -void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) -{ +void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) { se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz(""); se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz(""); se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz(""); @@ -362,128 +362,155 @@ void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz(""); } -void rrdpush_clean_encoded(stream_encoded_t *se) -{ - if (se->os_name) +void rrdpush_clean_encoded(stream_encoded_t *se) { + if (se->os_name) { freez(se->os_name); + se->os_name = NULL; + } - if (se->os_id) + if (se->os_id) { freez(se->os_id); + se->os_id = NULL; + } - if (se->os_version) + if (se->os_version) { freez(se->os_version); + se->os_version = NULL; + } - if (se->kernel_name) + if (se->kernel_name) { freez(se->kernel_name); + se->kernel_name = NULL; + } - if (se->kernel_version) + if (se->kernel_version) { freez(se->kernel_version); + se->kernel_version = NULL; + } } struct { const char *response; + const char *status; size_t length; int32_t version; bool dynamic; const char *error; int worker_job_id; int postpone_reconnect_seconds; - bool prevent_log; + ND_LOG_FIELD_PRIORITY priority; } stream_responses[] = { { .response = START_STREAMING_PROMPT_VN, .length = sizeof(START_STREAMING_PROMPT_VN) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V3, // and above .dynamic = true, // dynamic = we will parse the version / capabilities .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_PROMPT_V2, .length = sizeof(START_STREAMING_PROMPT_V2) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V2, .dynamic = false, .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_PROMPT_V1, .length = sizeof(START_STREAMING_PROMPT_V1) - 1, + .status = RRDPUSH_STATUS_CONNECTED, .version = STREAM_HANDSHAKE_OK_V1, .dynamic = false, .error = NULL, .worker_job_id = 0, .postpone_reconnect_seconds = 0, + .priority = NDLP_INFO, }, { .response = START_STREAMING_ERROR_SAME_LOCALHOST, .length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1, + .status = RRDPUSH_STATUS_LOCALHOST, .version = STREAM_HANDSHAKE_ERROR_LOCALHOST, .dynamic = false, .error = "remote server rejected this stream, the host we are trying to stream is its localhost", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour - .prevent_log = true, + .priority = NDLP_DEBUG, }, { .response = START_STREAMING_ERROR_ALREADY_STREAMING, .length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1, + .status = RRDPUSH_STATUS_ALREADY_CONNECTED, .version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED, .dynamic = false, .error = "remote server rejected this stream, the host we are trying to stream is already streamed to it", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes - .prevent_log = true, + .priority = NDLP_DEBUG, }, { .response = START_STREAMING_ERROR_NOT_PERMITTED, .length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1, + .status = RRDPUSH_STATUS_PERMISSION_DENIED, .version = STREAM_HANDSHAKE_ERROR_DENIED, .dynamic = false, .error = "remote server denied access, probably we don't have the right API key?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute + .priority = NDLP_ERR, }, { .response = START_STREAMING_ERROR_BUSY_TRY_LATER, .length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1, + .status = RRDPUSH_STATUS_RATE_LIMIT, .version = STREAM_HANDSHAKE_BUSY_TRY_LATER, .dynamic = false, .error = "remote server is currently busy, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minutes + .priority = NDLP_NOTICE, }, { .response = START_STREAMING_ERROR_INTERNAL_ERROR, .length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1, + .status = RRDPUSH_STATUS_INTERNAL_SERVER_ERROR, .version = STREAM_HANDSHAKE_INTERNAL_ERROR, .dynamic = false, .error = "remote server is encountered an internal error, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 5 * 60, // 5 minutes + .priority = NDLP_CRIT, }, { .response = START_STREAMING_ERROR_INITIALIZATION, .length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1, + .status = RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS, .version = STREAM_HANDSHAKE_INITIALIZATION, .dynamic = false, .error = "remote server is initializing, we should try later", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 2 * 60, // 2 minute + .priority = NDLP_NOTICE, }, // terminator { .response = NULL, .length = 0, + .status = RRDPUSH_STATUS_BAD_HANDSHAKE, .version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE, .dynamic = false, .error = "remote node response is not understood, is it Netdata?", .worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, .postpone_reconnect_seconds = 1 * 60, // 1 minute - .prevent_log = false, + .priority = NDLP_ERR, } }; @@ -513,8 +540,9 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender return true; } - bool prevent_log = stream_responses[i].prevent_log; + ND_LOG_FIELD_PRIORITY priority = stream_responses[i].priority; const char *error = stream_responses[i].error; + const char *status = stream_responses[i].status; int worker_job_id = stream_responses[i].worker_job_id; int delay = stream_responses[i].postpone_reconnect_seconds; @@ -523,15 +551,18 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender host->destination->reason = version; host->destination->postpone_reconnection_until = now_realtime_sec() + delay; - char buf[LOG_DATE_LENGTH]; - log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + char buf[ISO8601_MAX_LENGTH]; + iso8601_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0); - if(prevent_log) - internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); - else - netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s", - rrdhost_hostname(host), s->connected_to, error, delay, buf); + nd_log(NDLS_DAEMON, priority, + "STREAM %s [send to %s]: %s - will retry in %d secs, at %s", + rrdhost_hostname(host), s->connected_to, error, delay, buf); return false; } @@ -557,6 +588,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { if(!netdata_ssl_connect(&host->sender->ssl)) { // couldn't connect + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_SSL_ERROR), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); rrdpush_sender_thread_close_socket(host); host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR; @@ -568,6 +605,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { security_test_certificate(host->sender->ssl.conn)) { // certificate is not valid + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR); netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid."); rrdpush_sender_thread_close_socket(host); @@ -579,6 +622,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) { return true; } + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + netdata_log_error("SSL: failed to establish connection."); return false; @@ -826,6 +875,13 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p return false; if (s->parent_using_h2o && rrdpush_http_upgrade_prelude(host, s)) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION); rrdpush_sender_thread_close_socket(host); host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE; host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; @@ -845,9 +901,19 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p timeout); if(bytes <= 0) { // timeout is 0 + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", + rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60; return false; @@ -864,22 +930,35 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p timeout); if(bytes <= 0) { // timeout is 0 + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT); rrdpush_sender_thread_close_socket(host); - netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_ERR, + "STREAM %s [send to %s]: remote netdata does not respond.", + rrdhost_hostname(host), s->connected_to); + host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT; host->destination->postpone_reconnection_until = now_realtime_sec() + 30; return false; } if(sock_setnonblock(s->rrdpush_sender_socket) < 0) - netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to); + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s]: cannot set non-blocking mode for socket.", + rrdhost_hostname(host), s->connected_to); if(sock_enlarge_out(s->rrdpush_sender_socket) < 0) - netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to); + nd_log(NDLS_DAEMON, NDLP_WARNING, + "STREAM %s [send to %s]: cannot enlarge the socket buffer.", + rrdhost_hostname(host), s->connected_to); http[bytes] = '\0'; - netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http); if(!rrdpush_sender_validate_response(host, s, http, bytes)) return false; @@ -887,13 +966,26 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p log_sender_capabilities(s); - netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CONNECTED), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s: connected to %s...", + rrdhost_hostname(host), s->connected_to); return true; } -static bool attempt_to_connect(struct sender_state *state) -{ +static bool attempt_to_connect(struct sender_state *state) { + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + state->send_attempts = 0; // reset the bytes we have sent for this session @@ -1062,6 +1154,12 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) { void execute_commands(struct sender_state *s) { worker_is_busy(WORKER_SENDER_JOB_EXECUTE); + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &s->line), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline; *end = 0; while( start < end && (newline = strchr(start, '\n')) ) { @@ -1075,27 +1173,22 @@ void execute_commands(struct sender_state *s) { continue; } - netdata_log_access("STREAM: %d from '%s' for host '%s': %s", - gettid(), s->connected_to, rrdhost_hostname(s->host), start); + s->line.count++; + s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS); + const char *command = get_word(s->line.words, s->line.num_words, 0); - // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start); - - char *words[PLUGINSD_MAX_WORDS] = { NULL }; - size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS); - - const char *keyword = get_word(words, num_words, 0); - - if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { + if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); - char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1); - char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2); - char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3); + char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2); + char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -1133,9 +1226,12 @@ void execute_commands(struct sender_state *s) { memset(&s->function_payload, 0, sizeof(struct function_payload_state)); } } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) { + nd_log(NDLS_ACCESS, NDLP_INFO, NULL); + if (s->receiving_function_payload) { - netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword); + netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", + rrdhost_hostname(s->host), s->connected_to, command); s->receiving_function_payload = false; buffer_free(s->function_payload.payload); s->function_payload.payload = NULL; @@ -1143,14 +1239,14 @@ void execute_commands(struct sender_state *s) { // TODO send error response } - char *transaction = get_word(words, num_words, 1); - char *timeout_s = get_word(words, num_words, 2); - char *function = get_word(words, num_words, 3); + char *transaction = get_word(s->line.words, s->line.num_words, 1); + char *timeout_s = get_word(s->line.words, s->line.num_words, 2); + char *function = get_word(s->line.words, s->line.num_words, 3); if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) { netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, transaction?transaction:"(unset)", timeout_s?timeout_s:"(unset)", function?function:"(unset)"); @@ -1159,30 +1255,32 @@ void execute_commands(struct sender_state *s) { s->receiving_function_payload = true; s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions); - s->function_payload.txid = strdupz(get_word(words, num_words, 1)); - s->function_payload.timeout = strdupz(get_word(words, num_words, 2)); - s->function_payload.fn_name = strdupz(get_word(words, num_words, 3)); + s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1)); + s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2)); + s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3)); } - else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { + else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) { worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); - char *transaction = get_word(words, num_words, 1); + char *transaction = get_word(s->line.words, s->line.num_words, 1); if(transaction && *transaction) rrd_function_cancel(transaction); } - else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { + else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) { worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST); + nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL); - const char *chart_id = get_word(words, num_words, 1); - const char *start_streaming = get_word(words, num_words, 2); - const char *after = get_word(words, num_words, 3); - const char *before = get_word(words, num_words, 4); + const char *chart_id = get_word(s->line.words, s->line.num_words, 1); + const char *start_streaming = get_word(s->line.words, s->line.num_words, 2); + const char *after = get_word(s->line.words, s->line.num_words, 3); + const char *before = get_word(s->line.words, s->line.num_words, 4); if (!chart_id || !start_streaming || !after || !before) { netdata_log_error("STREAM %s [send to %s] %s command is incomplete" " (chart=%s, start_streaming=%s, after=%s, before=%s)", rrdhost_hostname(s->host), s->connected_to, - keyword, + command, chart_id ? chart_id : "(unset)", start_streaming ? start_streaming : "(unset)", after ? after : "(unset)", @@ -1197,12 +1295,14 @@ void execute_commands(struct sender_state *s) { } } else { - netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)"); + netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->line.words[0]?s->line.words[0]:"(unset)"); } + line_splitter_reset(&s->line); worker_is_busy(WORKER_SENDER_JOB_EXECUTE); start = newline + 1; } + if (start < end) { memmove(s->read_buffer, start, end-start); s->read_len = end - start; @@ -1397,7 +1497,61 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) { #endif } +static bool stream_sender_log_capabilities(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state) + return false; + + stream_capabilities_to_string(wb, state->capabilities); + return true; +} + +static bool stream_sender_log_transport(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state) + return false; + +#ifdef ENABLE_HTTPS + buffer_strcat(wb, SSL_connection(&state->ssl) ? "https" : "http"); +#else + buffer_strcat(wb, "http"); +#endif + return true; +} + +static bool stream_sender_log_dst_ip(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state || state->rrdpush_sender_socket == -1) + return false; + + SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket); + buffer_strcat(wb, peers.peer.ip); + return true; +} + +static bool stream_sender_log_dst_port(BUFFER *wb, void *ptr) { + struct sender_state *state = ptr; + if(!state || state->rrdpush_sender_socket == -1) + return false; + + SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket); + buffer_print_uint64(wb, peers.peer.port); + return true; +} + void *rrdpush_sender_thread(void *ptr) { + struct sender_state *s = ptr; + + ND_LOG_STACK lgs[] = { + ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname), + ND_LOG_FIELD_CB(NDF_DST_IP, stream_sender_log_dst_ip, s), + ND_LOG_FIELD_CB(NDF_DST_PORT, stream_sender_log_dst_port, s), + ND_LOG_FIELD_CB(NDF_DST_TRANSPORT, stream_sender_log_transport, s), + ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_sender_log_capabilities, s), + ND_LOG_FIELD_END(), + }; + ND_LOG_STACK_PUSH(lgs); + worker_register("STREAMSND"); worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect"); worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read"); @@ -1416,6 +1570,7 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression"); worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake"); + worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION, "disconnect cant upgrade"); worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request"); worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function"); @@ -1428,8 +1583,6 @@ void *rrdpush_sender_thread(void *ptr) { worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE); worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE); - struct sender_state *s = ptr; - if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination || !*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key || !*s->host->rrdpush_send_api_key) { @@ -1523,7 +1676,10 @@ void *rrdpush_sender_thread(void *ptr) { s->replication.oldest_request_after_t = 0; rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS); - netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to); + + nd_log(NDLS_DAEMON, NDLP_DEBUG, + "STREAM %s [send to %s]: enabling metrics streaming...", + rrdhost_hostname(s->host), s->connected_to); continue; } |