summaryrefslogtreecommitdiffstats
path: root/src
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@tsaousis.gr>2017-02-21 14:28:27 +0200
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>2017-02-22 01:00:27 +0200
commit0e4b7907f2bf29a86c0e5245812394d2f0fc048a (patch)
treeddaeaeb32357cdb590f062e04fc844be9fd926f6 /src
parent4613856eafe1e928f81887886565662d92d38430 (diff)
do not try to reconnect too soon
Diffstat (limited to 'src')
-rw-r--r--src/log.c2
-rw-r--r--src/rrdpush.c73
-rw-r--r--src/web_client.c17
3 files changed, 53 insertions, 39 deletions
diff --git a/src/log.c b/src/log.c
index 02cbca40e2..01beaf7ec4 100644
--- a/src/log.c
+++ b/src/log.c
@@ -257,7 +257,7 @@ void info_int( const char *file, const char *function, const unsigned long line,
log_date(stderr);
va_start( args, fmt );
- if(debug_flags) fprintf(stderr, "%s: INFO: (%04lu@%-10.10s:%-15.15s):", program_name, line, file, function);
+ if(debug_flags) fprintf(stderr, "%s: INFO: (%04lu@%-10.10s:%-15.15s): ", program_name, line, file, function);
else fprintf(stderr, "%s: INFO: ", program_name);
vfprintf( stderr, fmt, args );
va_end( args );
diff --git a/src/rrdpush.c b/src/rrdpush.c
index 9d2ad79a9d..de2a455763 100644
--- a/src/rrdpush.c
+++ b/src/rrdpush.c
@@ -104,40 +104,43 @@ void rrdset_done_push(RRDSET *st) {
if(unlikely(!rrdset_flag_check(st, RRDSET_FLAG_ENABLED)))
return;
+
+ rrdpush_lock();
+
if(unlikely(!rrdpush_buffer || !rrdpush_connected)) {
if(!error_shown)
- error("PUSH: not ready - discarding collected metrics.");
+ error("STREAM: not ready - discarding collected metrics.");
error_shown = 1;
+
+ rrdpush_unlock();
return;
}
error_shown = 0;
- rrdpush_lock();
- rrdset_rdlock(st);
-
if(st->rrdhost != last_host) {
buffer_sprintf(rrdpush_buffer, "HOST '%s' '%s'\n", st->rrdhost->machine_guid, st->rrdhost->hostname);
last_host = st->rrdhost;
}
+ rrdset_rdlock(st);
if(need_to_send_chart_definition(st))
send_chart_definition(st);
send_chart_metrics(st);
+ rrdset_unlock(st);
// signal the sender there are more data
if(write(rrdpush_pipe[PIPE_WRITE], " ", 1) == -1)
- error("Cannot write to internal pipe");
+ error("STREAM: cannot write to internal pipe");
- rrdset_unlock(st);
rrdpush_unlock();
}
static inline void rrdpush_flush(void) {
rrdpush_lock();
if(buffer_strlen(rrdpush_buffer))
- error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
+ error("STREAM: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
buffer_flush(rrdpush_buffer);
reset_all_charts();
@@ -148,19 +151,19 @@ static inline void rrdpush_flush(void) {
void *central_netdata_push_thread(void *ptr) {
struct netdata_static_thread *static_thread = (struct netdata_static_thread *)ptr;
- info("Central netdata push thread created with task id %d", gettid());
+ info("STREAM: central netdata push thread created with task id %d", gettid());
if(pthread_setcanceltype(PTHREAD_CANCEL_DEFERRED, NULL) != 0)
- error("Cannot set pthread cancel type to DEFERRED.");
+ error("STREAM: cannot set pthread cancel type to DEFERRED.");
if(pthread_setcancelstate(PTHREAD_CANCEL_ENABLE, NULL) != 0)
- error("Cannot set pthread cancel state to ENABLE.");
+ error("STREAM: cannot set pthread cancel state to ENABLE.");
rrdpush_buffer = buffer_create(1);
if(pipe(rrdpush_pipe) == -1)
- fatal("Cannot create required pipe.");
+ fatal("STREAM: cannot create required pipe.");
struct timeval tv = {
.tv_sec = 60,
@@ -176,6 +179,7 @@ void *central_netdata_push_thread(void *ptr) {
int sock = -1;
struct pollfd fds[2], *ifd, *ofd;
+ nfds_t fdmax;
ifd = &fds[0];
ofd = &fds[1];
@@ -184,32 +188,37 @@ void *central_netdata_push_thread(void *ptr) {
if(netdata_exit) break;
if(unlikely(sock == -1)) {
+ // stop appending data into rrdpush_buffer
+ // they will be lost, so there is no point to do it
rrdpush_connected = 0;
- info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
+ info("STREAM: connecting to central netdata at: %s", central_netdata_to_push_data);
sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
if(unlikely(sock == -1)) {
- error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+ error("STREAM: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
continue;
}
- info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+ info("STREAM: initializing communication to central netdata at: %s", central_netdata_to_push_data);
char http[1000 + 1];
snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
close(sock);
sock = -1;
- error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+ error("STREAM: failed to send http header to netdata at: %s", central_netdata_to_push_data);
sleep(5);
continue;
}
+ info("STREAM: Waiting for STREAM from central netdata at: %s", central_netdata_to_push_data);
+
if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
close(sock);
sock = -1;
- error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+ error("STREAM: failed to receive STREAM from netdata at: %s", central_netdata_to_push_data);
sleep(5);
continue;
}
@@ -217,16 +226,20 @@ void *central_netdata_push_thread(void *ptr) {
if(strncmp(http, "STREAM", 6)) {
close(sock);
sock = -1;
- error("PUSH: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
+ error("STREAM: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
sleep(5);
continue;
}
+ info("STREAM: Established STREAM with central netdata at: %s - sending metrics...", central_netdata_to_push_data);
+
if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
- error("PUSH: cannot set non-blocking mode for socket.");
+ error("STREAM: cannot set non-blocking mode for socket.");
rrdpush_flush();
sent_connection = 0;
+
+ // allow appending data into rrdpush_buffer
rrdpush_connected = 1;
}
@@ -235,15 +248,15 @@ void *central_netdata_push_thread(void *ptr) {
ifd->revents = 0;
ofd->fd = sock;
- ofd->events = POLLOUT;
ofd->revents = 0;
-
- nfds_t fdmax = 2;
-
- if(begin < buffer_strlen(rrdpush_buffer))
+ if(begin < buffer_strlen(rrdpush_buffer)) {
ofd->events = POLLOUT;
- else
+ fdmax = 2;
+ }
+ else {
ofd->events = 0;
+ fdmax = 1;
+ }
if(netdata_exit) break;
int retval = poll(fds, fdmax, 60 * 1000);
@@ -253,7 +266,7 @@ void *central_netdata_push_thread(void *ptr) {
if(errno == EAGAIN || errno == EINTR)
continue;
- error("PUSH: Failed to poll().");
+ error("STREAM: Failed to poll().");
close(sock);
sock = -1;
break;
@@ -266,11 +279,11 @@ void *central_netdata_push_thread(void *ptr) {
if(ifd->revents & POLLIN) {
char buffer[1000 + 1];
if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
- error("PUSH: Cannot read from internal pipe.");
+ error("STREAM: Cannot read from internal pipe.");
}
if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
- // info("PUSH: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
+ // info("STREAM: send buffer is ready, sending %zu bytes starting at %zu", buffer_strlen(rrdpush_buffer) - begin, begin);
// fprintf(stderr, "PUSH BEGIN\n");
// fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
@@ -280,7 +293,7 @@ void *central_netdata_push_thread(void *ptr) {
ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
if(errno != EAGAIN && errno != EINTR) {
- error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
+ error("STREAM: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
close(sock);
sock = -1;
}
@@ -300,7 +313,7 @@ void *central_netdata_push_thread(void *ptr) {
// protection from overflow
if(rrdpush_buffer->len > max_size) {
errno = 0;
- error("PUSH: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
+ error("STREAM: too many data pending. Buffer is %zu bytes long, %zu unsent. We have sent %zu bytes in total, %zu on this connection. Closing connection to flush the data.", rrdpush_buffer->len, rrdpush_buffer->len - begin, sent_bytes, sent_connection);
if(sock != -1) {
close(sock);
sock = -1;
@@ -308,7 +321,7 @@ void *central_netdata_push_thread(void *ptr) {
}
}
- debug(D_WEB_CLIENT, "Central netdata push thread exits.");
+ debug(D_WEB_CLIENT, "STREAM: central netdata push thread exits.");
if(sock != -1) {
close(sock);
}
diff --git a/src/web_client.c b/src/web_client.c
index db578f9c24..784a28f37b 100644
--- a/src/web_client.c
+++ b/src/web_client.c
@@ -1689,14 +1689,14 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
}
if(!key || !*key) {
- error("STREAM request from client '%s:%s', without an API key. Forbidding access.", w->client_ip, w->client_port);
+ error("STREAM [%s]:%s: request without an API key. Forbidding access.", w->client_ip, w->client_port);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "You need an API key for this request.");
return 401;
}
if(!validate_stream_api_key(key)) {
- error("STREAM request from client '%s:%s': API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ error("STREAM [%s]:%s: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Your API key is not permitted access.");
return 401;
@@ -1719,16 +1719,17 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
snprintfz(cd.fullfilename, FILENAME_MAX, "%s:%s", w->client_ip, w->client_port);
snprintfz(cd.cmd, PLUGINSD_CMD_MAX, "%s:%s", w->client_ip, w->client_port);
+ info("STREAM [%s]:%s: sending STREAM to initiate streaming...", w->client_ip, w->client_port);
if(send_timeout(w->ifd, "STREAM", 6, 0, 60) != 6) {
- error("Cannot send STREAM to netdata at %s:%s", w->client_ip, w->client_port);
+ error("STREAM [%s]:%s: cannot send STREAM.", w->client_ip, w->client_port);
buffer_flush(w->response.data);
- buffer_sprintf(w->response.data, "STREAM failed to reply back with STREAM");
+ buffer_sprintf(w->response.data, "Failed to reply back with STREAM");
return 400;
}
// remove the non-blocking flag from the socket
if(fcntl(w->ifd, F_SETFL, fcntl(w->ifd, F_GETFL, 0) & ~O_NONBLOCK) == -1)
- error("STREAM from '%s:%s': cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
+ error("STREAM [%s]:%s: cannot remove the non-blocking flag from socket %d", w->client_ip, w->client_port, w->ifd);
/*
char buffer[1000 + 1];
@@ -1742,16 +1743,16 @@ int web_client_stream_request(RRDHOST *host, struct web_client *w, char *url) {
// convert the socket to a FILE *
FILE *fp = fdopen(w->ifd, "r");
if(!fp) {
- error("STREAM from '%s:%s': failed to get a FILE for FD %d.", w->client_ip, w->client_port, w->ifd);
+ error("STREAM [%s]:%s: failed to get a FILE for FD %d.", w->client_ip, w->client_port, w->ifd);
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "Failed to get a FILE for an FD.");
return 500;
}
// call the plugins.d processor to receive the metrics
- info("STREAM connecting client '%s:%s' to plugins.d.", w->client_ip, w->client_port);
+ info("STREAM [%s]:%s: connecting client to plugins.d.", w->client_ip, w->client_port);
size_t count = pluginsd_process(host, &cd, fp, 1);
- error("STREAM from '%s:%s': client disconnected.", w->client_ip, w->client_port);
+ error("STREAM [%s]:%s: client disconnected.", w->client_ip, w->client_port);
// close all sockets, to let the socket worker we are done
fclose(fp);