summaryrefslogtreecommitdiffstats
path: root/src/rrdpush.c
diff options
context:
space:
mode:
authorCosta Tsaousis (ktsaou) <costa@tsaousis.gr>2017-02-21 01:47:38 +0200
committerCosta Tsaousis (ktsaou) <costa@tsaousis.gr>2017-02-22 01:00:26 +0200
commit4f6bb2e96f622233e130b249b38c99a47c3b766f (patch)
treed415fbf4e47ee8bf378a8cad7657baee8b7b8911 /src/rrdpush.c
parenteb13ee695866a42e1504799775c3bff5a8dab471 (diff)
switch from HTTP to stream data
Diffstat (limited to 'src/rrdpush.c')
-rw-r--r--src/rrdpush.c100
1 files changed, 84 insertions, 16 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c
index ec780b2b3d..c68d0e4f34 100644
--- a/src/rrdpush.c
+++ b/src/rrdpush.c
@@ -147,47 +147,115 @@ void *central_netdata_push_thread(void *ptr) {
size_t sent_bytes = 0;
size_t sent_connection = 0;
int sock = -1;
- char buffer[1];
+ char buffer[1000 + 1];
+
+ struct pollfd fds[2], *ifd, *ofd;
+
+ ifd = &fds[0];
+ ofd = &fds[1];
+
+ ifd->fd = rrdpush_pipe[PIPE_READ];
+ ifd->events = POLLIN;
+ ofd->events = POLLOUT;
+
+ nfds_t fdmax = 2;
for(;;) {
+ if(netdata_exit) break;
+
if(unlikely(sock == -1)) {
info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data);
sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter);
- if(unlikely(sock != -1)) {
- info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
+ if(unlikely(sock == -1)) {
+ error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
+ continue;
+ }
+
+ info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data);
- if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
- error("PUSH: cannot set non-blocking mode for socket.");
+ char http[1000 + 1];
+ snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version);
+ if(send_timeout(sock, http, strlen(http), 0, 60) == -1) {
+ close(sock);
+ sock = -1;
+ error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
+ continue;
}
- else
- error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data);
+
+ if(recv_timeout(sock, http, 1000, 0, 60) == -1) {
+ close(sock);
+ sock = -1;
+ error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data);
+ sleep(5);
+ continue;
+ }
+
+ if(strncmp(http, "STREAM", 6)) {
+ close(sock);
+ sock = -1;
+ error("PUSH: netdata servers at %s, did not send STREAM", central_netdata_to_push_data);
+ sleep(5);
+ continue;
+ }
+
+ if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0)
+ error("PUSH: cannot set non-blocking mode for socket.");
rrdpush_lock();
if(buffer_strlen(rrdpush_buffer))
error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer));
buffer_flush(rrdpush_buffer);
- buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION);
reset_all_charts();
+ last_host = NULL;
rrdpush_unlock();
sent_connection = 0;
}
- if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) {
- error("PUSH: Cannot read from internal pipe.");
- sleep(1);
+ ifd->revents = 0;
+ ofd->revents = 0;
+ ofd->fd = sock;
+
+ if(begin < buffer_strlen(rrdpush_buffer))
+ ofd->events = POLLOUT;
+ else
+ ofd->events = 0;
+
+ if(netdata_exit) break;
+ int retval = poll(fds, fdmax, 60 * 1000);
+ if(netdata_exit) break;
+
+ if(unlikely(retval == -1)) {
+ if(errno == EAGAIN || errno == EINTR)
+ continue;
+
+ error("PUSH: Failed to poll().");
+ close(sock);
+ sock = -1;
+ break;
+ }
+ else if(unlikely(!retval)) {
+ // timeout
+ continue;
+ }
+
+ if(ifd->revents & POLLIN) {
+ if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1)
+ error("PUSH: Cannot read from internal pipe.");
}
- if(likely(sock != -1 && begin < rrdpush_buffer->len)) {
+ if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) {
// fprintf(stderr, "PUSH BEGIN\n");
- // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr);
+ // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr);
// fprintf(stderr, "\nPUSH END\n");
rrdpush_lock();
- ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT);
+ ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT);
if(ret == -1) {
- if(errno != EAGAIN) {
+ if(errno != EAGAIN && errno != EINTR) {
error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection);
close(sock);
sock = -1;
@@ -197,7 +265,7 @@ void *central_netdata_push_thread(void *ptr) {
sent_connection += ret;
sent_bytes += ret;
begin += ret;
- if(begin == rrdpush_buffer->len) {
+ if(begin == buffer_strlen(rrdpush_buffer)) {
buffer_flush(rrdpush_buffer);
begin = 0;
}