// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
// this has to be the same at parser.h
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
#if WORKER_PARSER_FIRST_JOB < 1
#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
#endif
extern struct config stream_config;
void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->registry_hostname);
freez(rpt->machine_guid);
freez(rpt->os);
freez(rpt->timezone);
freez(rpt->abbrev_timezone);
freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt->program_name);
freez(rpt->program_version);
#ifdef ENABLE_HTTPS
if(rpt->ssl.conn)
SSL_free(rpt->ssl.conn);
#endif
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
#endif
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
freez(rpt);
}
#include "collectors/plugins.d/pluginsd_parser.h"
PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
{
const char *host_uuid_str = get_word(words, num_words, 1);
const char *claim_id_str = get_word(words, num_words, 2);
if (!host_uuid_str || !claim_id_str) {
error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
host_uuid_str ? host_uuid_str : "[unset]",
claim_id_str ? claim_id_str : "[unset]");
return PARSER_RC_ERROR;
}
uuid_t uuid;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
// We don't need the parsed UUID
// just do it to check the format
if(uuid_parse(host_uuid_str, uuid)) {
error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
return PARSER_RC_ERROR;
}
if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) {
error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
return PARSER_RC_ERROR;
}
if(strcmp(host_uuid_str, host->machine_guid)) {
error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
return PARSER_RC_OK; //the message is OK problem must be somewhere else
}
rrdhost_aclk_state_lock(host);
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
rrdhost_aclk_state_unlock(host);
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
rrdpush_claimed_id(host);
return PARSER_RC_OK;
}
static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
if(unlikely(!size)) {
internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
return 0;
}
#ifdef ENABLE_HTTPS
if (r->ssl.conn && r->ssl.flags == NETDATA_SSL_HANDSHAKE_COMPLETE)
return (int)netdata_ssl_read(r->ssl.conn, buffer, size);
#endif
ssize_t bytes_read = read(r->fd, buffer, size);
if(bytes_read == 0 && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
}
else if (bytes_read == 0) {
error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
bytes_read = -1;
}
else if (bytes_read < 0) {
error("STREAM: %s() failed to read from socket!", __FUNCTION__);
bytes_read = -2;
}
// do {
// bytes_read = (int) fread(buffer, 1, size, fp);
// if (unlikely(bytes_read <= 0)) {
// if(feof(fp)) {
// internal_error(true, "%s(): fread() failed with EOF", __FUNCTION__);
// bytes_read = -2;
// }
// else if(ferror(fp)) {
// internal_error(true, "%s(): fread() failed with ERROR", __FUNCTION__);
// bytes_read = -3;
// }
// else bytes_read = 0;
// }
// else
// worker_set_metric(WORKER_RECEIVER_JOB_BYTES_READ, bytes_read);
// } while(bytes_read == 0);
return (int)bytes_read;
}
static bool receiver_read_uncompressed(struct receiver_state