diff options
author | Costa Tsaousis (ktsaou) <costa@tsaousis.gr> | 2017-02-21 01:47:38 +0200 |
---|---|---|
committer | Costa Tsaousis (ktsaou) <costa@tsaousis.gr> | 2017-02-22 01:00:26 +0200 |
commit | 4f6bb2e96f622233e130b249b38c99a47c3b766f (patch) | |
tree | d415fbf4e47ee8bf378a8cad7657baee8b7b8911 /src/rrdpush.c | |
parent | eb13ee695866a42e1504799775c3bff5a8dab471 (diff) |
switch from HTTP to stream data
Diffstat (limited to 'src/rrdpush.c')
-rw-r--r-- | src/rrdpush.c | 100 |
1 files changed, 84 insertions, 16 deletions
diff --git a/src/rrdpush.c b/src/rrdpush.c index ec780b2b3d..c68d0e4f34 100644 --- a/src/rrdpush.c +++ b/src/rrdpush.c @@ -147,47 +147,115 @@ void *central_netdata_push_thread(void *ptr) { size_t sent_bytes = 0; size_t sent_connection = 0; int sock = -1; - char buffer[1]; + char buffer[1000 + 1]; + + struct pollfd fds[2], *ifd, *ofd; + + ifd = &fds[0]; + ofd = &fds[1]; + + ifd->fd = rrdpush_pipe[PIPE_READ]; + ifd->events = POLLIN; + ofd->events = POLLOUT; + + nfds_t fdmax = 2; for(;;) { + if(netdata_exit) break; + if(unlikely(sock == -1)) { info("PUSH: connecting to central netdata at: %s", central_netdata_to_push_data); sock = connect_to_one_of(central_netdata_to_push_data, 19999, &tv, &reconnects_counter); - if(unlikely(sock != -1)) { - info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data); + if(unlikely(sock == -1)) { + error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data); + sleep(5); + continue; + } + + info("PUSH: connected to central netdata at: %s", central_netdata_to_push_data); - if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0) - error("PUSH: cannot set non-blocking mode for socket."); + char http[1000 + 1]; + snprintfz(http, 1000, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), program_version); + if(send_timeout(sock, http, strlen(http), 0, 60) == -1) { + close(sock); + sock = -1; + error("PUSH: failed to send http header to netdata at: %s", central_netdata_to_push_data); + sleep(5); + continue; } - else - error("PUSH: failed to connect to central netdata at: %s", central_netdata_to_push_data); + + if(recv_timeout(sock, http, 1000, 0, 60) == -1) { + close(sock); + sock = -1; + error("PUSH: failed to receive OK from netdata at: %s", central_netdata_to_push_data); + sleep(5); + continue; + } + + if(strncmp(http, "STREAM", 6)) { + close(sock); + sock = -1; + error("PUSH: netdata servers at %s, did not send STREAM", central_netdata_to_push_data); + sleep(5); + continue; + } + + if(fcntl(sock, F_SETFL, O_NONBLOCK) < 0) + error("PUSH: cannot set non-blocking mode for socket."); rrdpush_lock(); if(buffer_strlen(rrdpush_buffer)) error("PUSH: discarding %zu bytes of metrics data already in the buffer.", buffer_strlen(rrdpush_buffer)); buffer_flush(rrdpush_buffer); - buffer_sprintf(rrdpush_buffer, "GET /stream?key=%s HTTP/1.1\r\nUser-Agent: netdata-push-service/%s\r\nAccept: */*\r\n\r\n", config_get("global", "central netdata api key", ""), VERSION); reset_all_charts(); + last_host = NULL; rrdpush_unlock(); sent_connection = 0; } - if(read(rrdpush_pipe[PIPE_READ], buffer, 1) == -1) { - error("PUSH: Cannot read from internal pipe."); - sleep(1); + ifd->revents = 0; + ofd->revents = 0; + ofd->fd = sock; + + if(begin < buffer_strlen(rrdpush_buffer)) + ofd->events = POLLOUT; + else + ofd->events = 0; + + if(netdata_exit) break; + int retval = poll(fds, fdmax, 60 * 1000); + if(netdata_exit) break; + + if(unlikely(retval == -1)) { + if(errno == EAGAIN || errno == EINTR) + continue; + + error("PUSH: Failed to poll()."); + close(sock); + sock = -1; + break; + } + else if(unlikely(!retval)) { + // timeout + continue; + } + + if(ifd->revents & POLLIN) { + if(read(rrdpush_pipe[PIPE_READ], buffer, 1000) == -1) + error("PUSH: Cannot read from internal pipe."); } - if(likely(sock != -1 && begin < rrdpush_buffer->len)) { + if(ofd->revents & POLLOUT && begin < buffer_strlen(rrdpush_buffer)) { // fprintf(stderr, "PUSH BEGIN\n"); - // fwrite(&rrdpush_buffer->buffer[begin], 1, rrdpush_buffer->len - begin, stderr); + // fwrite(&rrdpush_buffer->buffer[begin], 1, buffer_strlen(rrdpush_buffer) - begin, stderr); // fprintf(stderr, "\nPUSH END\n"); rrdpush_lock(); - ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], rrdpush_buffer->len - begin, MSG_DONTWAIT); + ssize_t ret = send(sock, &rrdpush_buffer->buffer[begin], buffer_strlen(rrdpush_buffer) - begin, MSG_DONTWAIT); if(ret == -1) { - if(errno != EAGAIN) { + if(errno != EAGAIN && errno != EINTR) { error("PUSH: failed to send metrics to central netdata at %s. We have sent %zu bytes on this connection.", central_netdata_to_push_data, sent_connection); close(sock); sock = -1; @@ -197,7 +265,7 @@ void *central_netdata_push_thread(void *ptr) { sent_connection += ret; sent_bytes += ret; begin += ret; - if(begin == rrdpush_buffer->len) { + if(begin == buffer_strlen(rrdpush_buffer)) { buffer_flush(rrdpush_buffer); begin = 0; } |