summaryrefslogtreecommitdiffstats
path: root/streaming/sender.c
diff options
context:
space:
mode:
Diffstat (limited to 'streaming/sender.c')
-rw-r--r--streaming/sender.c338
1 files changed, 247 insertions, 91 deletions
diff --git a/streaming/sender.c b/streaming/sender.c
index 945807ec07..19bc219200 100644
--- a/streaming/sender.c
+++ b/streaming/sender.c
@@ -4,32 +4,33 @@
#include "common.h"
#include "aclk/https_client.h"
-#define WORKER_SENDER_JOB_CONNECT 0
-#define WORKER_SENDER_JOB_PIPE_READ 1
-#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
-#define WORKER_SENDER_JOB_EXECUTE 3
-#define WORKER_SENDER_JOB_SOCKET_SEND 4
-#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
-#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
-#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
-#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
-#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
-#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
-#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
-#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
-#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
-#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
-#define WORKER_SENDER_JOB_BUFFER_RATIO 15
-#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
-#define WORKER_SENDER_JOB_BYTES_SENT 17
-#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18
-#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19
-#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20
-#define WORKER_SENDER_JOB_REPLAY_REQUEST 21
-#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22
-#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23
-
-#if WORKER_UTILIZATION_MAX_JOB_TYPES < 21
+#define WORKER_SENDER_JOB_CONNECT 0
+#define WORKER_SENDER_JOB_PIPE_READ 1
+#define WORKER_SENDER_JOB_SOCKET_RECEIVE 2
+#define WORKER_SENDER_JOB_EXECUTE 3
+#define WORKER_SENDER_JOB_SOCKET_SEND 4
+#define WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE 5
+#define WORKER_SENDER_JOB_DISCONNECT_OVERFLOW 6
+#define WORKER_SENDER_JOB_DISCONNECT_TIMEOUT 7
+#define WORKER_SENDER_JOB_DISCONNECT_POLL_ERROR 8
+#define WORKER_SENDER_JOB_DISCONNECT_SOCKET_ERROR 9
+#define WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR 10
+#define WORKER_SENDER_JOB_DISCONNECT_PARENT_CLOSED 11
+#define WORKER_SENDER_JOB_DISCONNECT_RECEIVE_ERROR 12
+#define WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR 13
+#define WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION 14
+#define WORKER_SENDER_JOB_BUFFER_RATIO 15
+#define WORKER_SENDER_JOB_BYTES_RECEIVED 16
+#define WORKER_SENDER_JOB_BYTES_SENT 17
+#define WORKER_SENDER_JOB_BYTES_COMPRESSED 18
+#define WORKER_SENDER_JOB_BYTES_UNCOMPRESSED 19
+#define WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO 20
+#define WORKER_SENDER_JOB_REPLAY_REQUEST 21
+#define WORKER_SENDER_JOB_FUNCTION_REQUEST 22
+#define WORKER_SENDER_JOB_REPLAY_DICT_SIZE 23
+#define WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION 24
+
+#if WORKER_UTILIZATION_MAX_JOB_TYPES < 25
#error WORKER_UTILIZATION_MAX_JOB_TYPES has to be at least 21
#endif
@@ -353,8 +354,7 @@ static inline void rrdpush_sender_thread_close_socket(RRDHOST *host) {
rrdpush_sender_charts_and_replication_reset(host);
}
-void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
-{
+void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host) {
se->os_name = (host->system_info->host_os_name)?url_encode(host->system_info->host_os_name):strdupz("");
se->os_id = (host->system_info->host_os_id)?url_encode(host->system_info->host_os_id):strdupz("");
se->os_version = (host->system_info->host_os_version)?url_encode(host->system_info->host_os_version):strdupz("");
@@ -362,128 +362,155 @@ void rrdpush_encode_variable(stream_encoded_t *se, RRDHOST *host)
se->kernel_version = (host->system_info->kernel_version)?url_encode(host->system_info->kernel_version):strdupz("");
}
-void rrdpush_clean_encoded(stream_encoded_t *se)
-{
- if (se->os_name)
+void rrdpush_clean_encoded(stream_encoded_t *se) {
+ if (se->os_name) {
freez(se->os_name);
+ se->os_name = NULL;
+ }
- if (se->os_id)
+ if (se->os_id) {
freez(se->os_id);
+ se->os_id = NULL;
+ }
- if (se->os_version)
+ if (se->os_version) {
freez(se->os_version);
+ se->os_version = NULL;
+ }
- if (se->kernel_name)
+ if (se->kernel_name) {
freez(se->kernel_name);
+ se->kernel_name = NULL;
+ }
- if (se->kernel_version)
+ if (se->kernel_version) {
freez(se->kernel_version);
+ se->kernel_version = NULL;
+ }
}
struct {
const char *response;
+ const char *status;
size_t length;
int32_t version;
bool dynamic;
const char *error;
int worker_job_id;
int postpone_reconnect_seconds;
- bool prevent_log;
+ ND_LOG_FIELD_PRIORITY priority;
} stream_responses[] = {
{
.response = START_STREAMING_PROMPT_VN,
.length = sizeof(START_STREAMING_PROMPT_VN) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V3, // and above
.dynamic = true, // dynamic = we will parse the version / capabilities
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_PROMPT_V2,
.length = sizeof(START_STREAMING_PROMPT_V2) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V2,
.dynamic = false,
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_PROMPT_V1,
.length = sizeof(START_STREAMING_PROMPT_V1) - 1,
+ .status = RRDPUSH_STATUS_CONNECTED,
.version = STREAM_HANDSHAKE_OK_V1,
.dynamic = false,
.error = NULL,
.worker_job_id = 0,
.postpone_reconnect_seconds = 0,
+ .priority = NDLP_INFO,
},
{
.response = START_STREAMING_ERROR_SAME_LOCALHOST,
.length = sizeof(START_STREAMING_ERROR_SAME_LOCALHOST) - 1,
+ .status = RRDPUSH_STATUS_LOCALHOST,
.version = STREAM_HANDSHAKE_ERROR_LOCALHOST,
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is its localhost",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 60 * 60, // the IP may change, try it every hour
- .prevent_log = true,
+ .priority = NDLP_DEBUG,
},
{
.response = START_STREAMING_ERROR_ALREADY_STREAMING,
.length = sizeof(START_STREAMING_ERROR_ALREADY_STREAMING) - 1,
+ .status = RRDPUSH_STATUS_ALREADY_CONNECTED,
.version = STREAM_HANDSHAKE_ERROR_ALREADY_CONNECTED,
.dynamic = false,
.error = "remote server rejected this stream, the host we are trying to stream is already streamed to it",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
- .prevent_log = true,
+ .priority = NDLP_DEBUG,
},
{
.response = START_STREAMING_ERROR_NOT_PERMITTED,
.length = sizeof(START_STREAMING_ERROR_NOT_PERMITTED) - 1,
+ .status = RRDPUSH_STATUS_PERMISSION_DENIED,
.version = STREAM_HANDSHAKE_ERROR_DENIED,
.dynamic = false,
.error = "remote server denied access, probably we don't have the right API key?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
+ .priority = NDLP_ERR,
},
{
.response = START_STREAMING_ERROR_BUSY_TRY_LATER,
.length = sizeof(START_STREAMING_ERROR_BUSY_TRY_LATER) - 1,
+ .status = RRDPUSH_STATUS_RATE_LIMIT,
.version = STREAM_HANDSHAKE_BUSY_TRY_LATER,
.dynamic = false,
.error = "remote server is currently busy, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minutes
+ .priority = NDLP_NOTICE,
},
{
.response = START_STREAMING_ERROR_INTERNAL_ERROR,
.length = sizeof(START_STREAMING_ERROR_INTERNAL_ERROR) - 1,
+ .status = RRDPUSH_STATUS_INTERNAL_SERVER_ERROR,
.version = STREAM_HANDSHAKE_INTERNAL_ERROR,
.dynamic = false,
.error = "remote server is encountered an internal error, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 5 * 60, // 5 minutes
+ .priority = NDLP_CRIT,
},
{
.response = START_STREAMING_ERROR_INITIALIZATION,
.length = sizeof(START_STREAMING_ERROR_INITIALIZATION) - 1,
+ .status = RRDPUSH_STATUS_INITIALIZATION_IN_PROGRESS,
.version = STREAM_HANDSHAKE_INITIALIZATION,
.dynamic = false,
.error = "remote server is initializing, we should try later",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 2 * 60, // 2 minute
+ .priority = NDLP_NOTICE,
},
// terminator
{
.response = NULL,
.length = 0,
+ .status = RRDPUSH_STATUS_BAD_HANDSHAKE,
.version = STREAM_HANDSHAKE_ERROR_BAD_HANDSHAKE,
.dynamic = false,
.error = "remote node response is not understood, is it Netdata?",
.worker_job_id = WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE,
.postpone_reconnect_seconds = 1 * 60, // 1 minute
- .prevent_log = false,
+ .priority = NDLP_ERR,
}
};
@@ -513,8 +540,9 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
return true;
}
- bool prevent_log = stream_responses[i].prevent_log;
+ ND_LOG_FIELD_PRIORITY priority = stream_responses[i].priority;
const char *error = stream_responses[i].error;
+ const char *status = stream_responses[i].status;
int worker_job_id = stream_responses[i].worker_job_id;
int delay = stream_responses[i].postpone_reconnect_seconds;
@@ -523,15 +551,18 @@ static inline bool rrdpush_sender_validate_response(RRDHOST *host, struct sender
host->destination->reason = version;
host->destination->postpone_reconnection_until = now_realtime_sec() + delay;
- char buf[LOG_DATE_LENGTH];
- log_date(buf, LOG_DATE_LENGTH, host->destination->postpone_reconnection_until);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, status),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ char buf[ISO8601_MAX_LENGTH];
+ iso8601_datetime_ut(buf, sizeof(buf), host->destination->postpone_reconnection_until * USEC_PER_SEC, 0);
- if(prevent_log)
- internal_error(true, "STREAM %s [send to %s]: %s - will retry in %ld secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
- else
- netdata_log_error("STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
- rrdhost_hostname(host), s->connected_to, error, delay, buf);
+ nd_log(NDLS_DAEMON, priority,
+ "STREAM %s [send to %s]: %s - will retry in %d secs, at %s",
+ rrdhost_hostname(host), s->connected_to, error, delay, buf);
return false;
}
@@ -557,6 +588,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
if(!netdata_ssl_connect(&host->sender->ssl)) {
// couldn't connect
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_SSL_ERROR),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
rrdpush_sender_thread_close_socket(host);
host->destination->reason = STREAM_HANDSHAKE_ERROR_SSL_ERROR;
@@ -568,6 +605,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
security_test_certificate(host->sender->ssl.conn)) {
// certificate is not valid
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_INVALID_SSL_CERTIFICATE),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_SSL_ERROR);
netdata_log_error("SSL: closing the stream connection, because the server SSL certificate is not valid.");
rrdpush_sender_thread_close_socket(host);
@@ -579,6 +622,12 @@ static bool rrdpush_sender_connect_ssl(struct sender_state *s __maybe_unused) {
return true;
}
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_ESTABLISH_SSL_CONNECTION),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
netdata_log_error("SSL: failed to establish connection.");
return false;
@@ -826,6 +875,13 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
return false;
if (s->parent_using_h2o && rrdpush_http_upgrade_prelude(host, s)) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CANT_UPGRADE_CONNECTION),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION);
rrdpush_sender_thread_close_socket(host);
host->destination->reason = STREAM_HANDSHAKE_ERROR_HTTP_UPGRADE;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
@@ -845,9 +901,19 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
timeout);
if(bytes <= 0) { // timeout is 0
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- netdata_log_error("STREAM %s [send to %s]: failed to send HTTP header to remote netdata.", rrdhost_hostname(host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s]: failed to send HTTP header to remote netdata.",
+ rrdhost_hostname(host), s->connected_to);
+
host->destination->reason = STREAM_HANDSHAKE_ERROR_SEND_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 1 * 60;
return false;
@@ -864,22 +930,35 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
timeout);
if(bytes <= 0) { // timeout is 0
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_TIMEOUT),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_is_busy(WORKER_SENDER_JOB_DISCONNECT_TIMEOUT);
rrdpush_sender_thread_close_socket(host);
- netdata_log_error("STREAM %s [send to %s]: remote netdata does not respond.", rrdhost_hostname(host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_ERR,
+ "STREAM %s [send to %s]: remote netdata does not respond.",
+ rrdhost_hostname(host), s->connected_to);
+
host->destination->reason = STREAM_HANDSHAKE_ERROR_RECEIVE_TIMEOUT;
host->destination->postpone_reconnection_until = now_realtime_sec() + 30;
return false;
}
if(sock_setnonblock(s->rrdpush_sender_socket) < 0)
- netdata_log_error("STREAM %s [send to %s]: cannot set non-blocking mode for socket.", rrdhost_hostname(host), s->connected_to);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s]: cannot set non-blocking mode for socket.",
+ rrdhost_hostname(host), s->connected_to);
if(sock_enlarge_out(s->rrdpush_sender_socket) < 0)
- netdata_log_error("STREAM %s [send to %s]: cannot enlarge the socket buffer.", rrdhost_hostname(host), s->connected_to);
+ nd_log(NDLS_DAEMON, NDLP_WARNING,
+ "STREAM %s [send to %s]: cannot enlarge the socket buffer.",
+ rrdhost_hostname(host), s->connected_to);
http[bytes] = '\0';
- netdata_log_debug(D_STREAM, "Response to sender from far end: %s", http);
if(!rrdpush_sender_validate_response(host, s, http, bytes))
return false;
@@ -887,13 +966,26 @@ static bool rrdpush_sender_thread_connect_to_parent(RRDHOST *host, int default_p
log_sender_capabilities(s);
- netdata_log_debug(D_STREAM, "STREAM: Connected on fd %d...", s->rrdpush_sender_socket);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_TXT(NDF_RESPONSE_CODE, RRDPUSH_STATUS_CONNECTED),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "STREAM %s: connected to %s...",
+ rrdhost_hostname(host), s->connected_to);
return true;
}
-static bool attempt_to_connect(struct sender_state *state)
-{
+static bool attempt_to_connect(struct sender_state *state) {
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_UUID(NDF_MESSAGE_ID, &streaming_to_parent_msgid),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
state->send_attempts = 0;
// reset the bytes we have sent for this session
@@ -1062,6 +1154,12 @@ void stream_execute_function_callback(BUFFER *func_wb, int code, void *data) {
void execute_commands(struct sender_state *s) {
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_CB(NDF_REQUEST, line_splitter_reconstruct_line, &s->line),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
char *start = s->read_buffer, *end = &s->read_buffer[s->read_len], *newline;
*end = 0;
while( start < end && (newline = strchr(start, '\n')) ) {
@@ -1075,27 +1173,22 @@ void execute_commands(struct sender_state *s) {
continue;
}
- netdata_log_access("STREAM: %d from '%s' for host '%s': %s",
- gettid(), s->connected_to, rrdhost_hostname(s->host), start);
+ s->line.count++;
+ s->line.num_words = quoted_strings_splitter_pluginsd(start, s->line.words, PLUGINSD_MAX_WORDS);
+ const char *command = get_word(s->line.words, s->line.num_words, 0);
- // internal_error(true, "STREAM %s [send to %s] received command over connection: %s", rrdhost_hostname(s->host), s->connected_to, start);
-
- char *words[PLUGINSD_MAX_WORDS] = { NULL };
- size_t num_words = quoted_strings_splitter_pluginsd(start, words, PLUGINSD_MAX_WORDS);
-
- const char *keyword = get_word(words, num_words, 0);
-
- if(keyword && (strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
+ if(command && (strcmp(command, PLUGINSD_KEYWORD_FUNCTION) == 0 || strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD_END) == 0)) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
- char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(words, num_words, 1);
- char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(words, num_words, 2);
- char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(words, num_words, 3);
+ char *transaction = s->receiving_function_payload ? s->function_payload.txid : get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = s->receiving_function_payload ? s->function_payload.timeout : get_word(s->line.words, s->line.num_words, 2);
+ char *function = s->receiving_function_payload ? s->function_payload.fn_name : get_word(s->line.words, s->line.num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -1133,9 +1226,12 @@ void execute_commands(struct sender_state *s) {
memset(&s->function_payload, 0, sizeof(struct function_payload_state));
}
}
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ else if (command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_PAYLOAD) == 0) {
+ nd_log(NDLS_ACCESS, NDLP_INFO, NULL);
+
if (s->receiving_function_payload) {
- netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload", rrdhost_hostname(s->host), s->connected_to, keyword);
+ netdata_log_error("STREAM %s [send to %s] received %s command while already receiving function payload",
+ rrdhost_hostname(s->host), s->connected_to, command);
s->receiving_function_payload = false;
buffer_free(s->function_payload.payload);
s->function_payload.payload = NULL;
@@ -1143,14 +1239,14 @@ void execute_commands(struct sender_state *s) {
// TODO send error response
}
- char *transaction = get_word(words, num_words, 1);
- char *timeout_s = get_word(words, num_words, 2);
- char *function = get_word(words, num_words, 3);
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
+ char *timeout_s = get_word(s->line.words, s->line.num_words, 2);
+ char *function = get_word(s->line.words, s->line.num_words, 3);
if(!transaction || !*transaction || !timeout_s || !*timeout_s || !function || !*function) {
netdata_log_error("STREAM %s [send to %s] %s execution command is incomplete (transaction = '%s', timeout = '%s', function = '%s'). Ignoring it.",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
transaction?transaction:"(unset)",
timeout_s?timeout_s:"(unset)",
function?function:"(unset)");
@@ -1159,30 +1255,32 @@ void execute_commands(struct sender_state *s) {
s->receiving_function_payload = true;
s->function_payload.payload = buffer_create(4096, &netdata_buffers_statistics.buffers_functions);
- s->function_payload.txid = strdupz(get_word(words, num_words, 1));
- s->function_payload.timeout = strdupz(get_word(words, num_words, 2));
- s->function_payload.fn_name = strdupz(get_word(words, num_words, 3));
+ s->function_payload.txid = strdupz(get_word(s->line.words, s->line.num_words, 1));
+ s->function_payload.timeout = strdupz(get_word(s->line.words, s->line.num_words, 2));
+ s->function_payload.fn_name = strdupz(get_word(s->line.words, s->line.num_words, 3));
}
- else if(keyword && strcmp(keyword, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
+ else if(command && strcmp(command, PLUGINSD_KEYWORD_FUNCTION_CANCEL) == 0) {
worker_is_busy(WORKER_SENDER_JOB_FUNCTION_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
- char *transaction = get_word(words, num_words, 1);
+ char *transaction = get_word(s->line.words, s->line.num_words, 1);
if(transaction && *transaction)
rrd_function_cancel(transaction);
}
- else if (keyword && strcmp(keyword, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
+ else if (command && strcmp(command, PLUGINSD_KEYWORD_REPLAY_CHART) == 0) {
worker_is_busy(WORKER_SENDER_JOB_REPLAY_REQUEST);
+ nd_log(NDLS_ACCESS, NDLP_DEBUG, NULL);
- const char *chart_id = get_word(words, num_words, 1);
- const char *start_streaming = get_word(words, num_words, 2);
- const char *after = get_word(words, num_words, 3);
- const char *before = get_word(words, num_words, 4);
+ const char *chart_id = get_word(s->line.words, s->line.num_words, 1);
+ const char *start_streaming = get_word(s->line.words, s->line.num_words, 2);
+ const char *after = get_word(s->line.words, s->line.num_words, 3);
+ const char *before = get_word(s->line.words, s->line.num_words, 4);
if (!chart_id || !start_streaming || !after || !before) {
netdata_log_error("STREAM %s [send to %s] %s command is incomplete"
" (chart=%s, start_streaming=%s, after=%s, before=%s)",
rrdhost_hostname(s->host), s->connected_to,
- keyword,
+ command,
chart_id ? chart_id : "(unset)",
start_streaming ? start_streaming : "(unset)",
after ? after : "(unset)",
@@ -1197,12 +1295,14 @@ void execute_commands(struct sender_state *s) {
}
}
else {
- netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, words[0]?words[0]:"(unset)");
+ netdata_log_error("STREAM %s [send to %s] received unknown command over connection: %s", rrdhost_hostname(s->host), s->connected_to, s->line.words[0]?s->line.words[0]:"(unset)");
}
+ line_splitter_reset(&s->line);
worker_is_busy(WORKER_SENDER_JOB_EXECUTE);
start = newline + 1;
}
+
if (start < end) {
memmove(s->read_buffer, start, end-start);
s->read_len = end - start;
@@ -1397,7 +1497,61 @@ void rrdpush_initialize_ssl_ctx(RRDHOST *host __maybe_unused) {
#endif
}
+static bool stream_sender_log_capabilities(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state)
+ return false;
+
+ stream_capabilities_to_string(wb, state->capabilities);
+ return true;
+}
+
+static bool stream_sender_log_transport(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state)
+ return false;
+
+#ifdef ENABLE_HTTPS
+ buffer_strcat(wb, SSL_connection(&state->ssl) ? "https" : "http");
+#else
+ buffer_strcat(wb, "http");
+#endif
+ return true;
+}
+
+static bool stream_sender_log_dst_ip(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state || state->rrdpush_sender_socket == -1)
+ return false;
+
+ SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket);
+ buffer_strcat(wb, peers.peer.ip);
+ return true;
+}
+
+static bool stream_sender_log_dst_port(BUFFER *wb, void *ptr) {
+ struct sender_state *state = ptr;
+ if(!state || state->rrdpush_sender_socket == -1)
+ return false;
+
+ SOCKET_PEERS peers = socket_peers(state->rrdpush_sender_socket);
+ buffer_print_uint64(wb, peers.peer.port);
+ return true;
+}
+
void *rrdpush_sender_thread(void *ptr) {
+ struct sender_state *s = ptr;
+
+ ND_LOG_STACK lgs[] = {
+ ND_LOG_FIELD_STR(NDF_NIDL_NODE, s->host->hostname),
+ ND_LOG_FIELD_CB(NDF_DST_IP, stream_sender_log_dst_ip, s),
+ ND_LOG_FIELD_CB(NDF_DST_PORT, stream_sender_log_dst_port, s),
+ ND_LOG_FIELD_CB(NDF_DST_TRANSPORT, stream_sender_log_transport, s),
+ ND_LOG_FIELD_CB(NDF_SRC_CAPABILITIES, stream_sender_log_capabilities, s),
+ ND_LOG_FIELD_END(),
+ };
+ ND_LOG_STACK_PUSH(lgs);
+
worker_register("STREAMSND");
worker_register_job_name(WORKER_SENDER_JOB_CONNECT, "connect");
worker_register_job_name(WORKER_SENDER_JOB_PIPE_READ, "pipe read");
@@ -1416,6 +1570,7 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_SEND_ERROR, "disconnect send error");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_NO_COMPRESSION, "disconnect no compression");
worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_BAD_HANDSHAKE, "disconnect bad handshake");
+ worker_register_job_name(WORKER_SENDER_JOB_DISCONNECT_CANT_UPGRADE_CONNECTION, "disconnect cant upgrade");
worker_register_job_name(WORKER_SENDER_JOB_REPLAY_REQUEST, "replay request");
worker_register_job_name(WORKER_SENDER_JOB_FUNCTION_REQUEST, "function");
@@ -1428,8 +1583,6 @@ void *rrdpush_sender_thread(void *ptr) {
worker_register_job_custom_metric(WORKER_SENDER_JOB_BYTES_COMPRESSION_RATIO, "cumulative compression savings ratio", "%", WORKER_METRIC_ABSOLUTE);
worker_register_job_custom_metric(WORKER_SENDER_JOB_REPLAY_DICT_SIZE, "replication dict entries", "entries", WORKER_METRIC_ABSOLUTE);
- struct sender_state *s = ptr;
-
if(!rrdhost_has_rrdpush_sender_enabled(s->host) || !s->host->rrdpush_send_destination ||
!*s->host->rrdpush_send_destination || !s->host->rrdpush_send_api_key ||
!*s->host->rrdpush_send_api_key) {
@@ -1523,7 +1676,10 @@ void *rrdpush_sender_thread(void *ptr) {
s->replication.oldest_request_after_t = 0;
rrdhost_flag_set(s->host, RRDHOST_FLAG_RRDPUSH_SENDER_READY_4_METRICS);
- netdata_log_info("STREAM %s [send to %s]: enabling metrics streaming...", rrdhost_hostname(s->host), s->connected_to);
+
+ nd_log(NDLS_DAEMON, NDLP_DEBUG,
+ "STREAM %s [send to %s]: enabling metrics streaming...",
+ rrdhost_hostname(s->host), s->connected_to);
continue;
}