// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
extern struct config stream_config;
void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->registry_hostname);
freez(rpt->machine_guid);
freez(rpt->os);
freez(rpt->timezone);
freez(rpt->abbrev_timezone);
freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt->program_name);
freez(rpt->program_version);
#ifdef ENABLE_HTTPS
netdata_ssl_close(&rpt->ssl);
#endif
if(rpt->fd != -1) {
internal_error(true, "closing socket...");
close(rpt->fd);
}
#ifdef ENABLE_RRDPUSH_COMPRESSION
rrdpush_decompressor_destroy(&rpt->decompressor);
#endif
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
freez(rpt);
}
#include "collectors/plugins.d/pluginsd_parser.h"
// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
// this has to be the same at parser.h
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
#if WORKER_PARSER_FIRST_JOB < 1
#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
#endif
static inline int read_stream(struct receiver_state *r, char* buffer, size_t size) {
if(unlikely(!size)) {
internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
return 0;
}
int tries = 100;
ssize_t bytes_read;
do {
errno = 0;
#ifdef ENABLE_HTTPS
if (SSL_connection(&r->ssl))
bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
else
bytes_read = read(r->fd, buffer, size);
#else
bytes_read = read(r->fd, buffer, size);
#endif
} while(bytes_read < 0 && errno == EINTR && tries--);
if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
netdata_log_error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
}
else if (bytes_read == 0) {
netdata_log_error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
bytes_read = -1;
}
else if (bytes_read < 0) {
netdata_log_error("STREAM: %s() failed to read from socket!", __FUNCTION__);
bytes_read = -2;
}
return (int)bytes_read;
}
static inline bool receiver_read_uncompressed(struct receiver_state *r) {
#ifdef NETDATA_INTERNAL_CHECKS
if(r->reader.read_buffer[r->reader.read_len] != '\0')
fatal("%s(): read_buffer does not start with zero", __FUNCTION__ );
#endif
int bytes_read = read_stream(r, r->reader.read_buffer + r->reader.read_len, sizeof(r->reader.read_buffer) - r->reader.read_len - 1);
if(unlikely(bytes_read <= 0))
return false;
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, (NETDATA_DOUBLE)bytes_read);
worker_set_metric(WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED, (NETDATA_DOUBLE)bytes_read);
r->reader.read_len += bytes_read;
r->reader.read_buffer[r->reader.read_len] = '\0';
return true;
}
#ifdef ENABLE_RRDPUSH_COMPRESSION
static inline bool receiver_read_compressed(struct receiver_state *r) {
internal_fatal(r->reader.read_buffer[r->reader.read_len] != '\0',
"%s: read_buffer does not start with zero #2", __FUNCTION__ );
// first use any available uncompressed data
if (likely(rrdpush_decompressed_bytes_in_buffer(&r->decompressor))) {
size_t available = sizeof(r->reader.read_buffer) - r->reader.read_len - 1;
if (likely(available)) {
size_t len = rrdpush_decompressor_get(&r->decompressor, r->reader.read_buffer + r->reader.read_len, available);
if (unlikely(!len)) {
internal_error(true, "decompressor returned zero length #1");
return false;
}
r->reader.read_len += (int)len;
r->reader.read_buffer[r->reader.read_len] = '\0';
}
else
internal_fatal(true, "The line to read is too big! Already have %zd bytes in read_buffer.", r->