// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
// IMPORTANT: to add workers, you have to edit WORKER_PARSER_FIRST_JOB accordingly
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
#define WORKER_RECEIVER_JOB_BYTES_UNCOMPRESSED (WORKER_PARSER_FIRST_JOB - 2)
// this has to be the same at parser.h
#define WORKER_RECEIVER_JOB_REPLICATION_COMPLETION (WORKER_PARSER_FIRST_JOB - 3)
#if WORKER_PARSER_FIRST_JOB < 1
#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
#endif
extern struct config stream_config;
void receiver_state_free(struct receiver_state *rpt) {
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->registry_hostname);
freez(rpt->machine_guid);
freez(rpt->os);
freez(rpt->timezone);
freez(rpt->abbrev_timezone);
freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt->program_name);
freez(rpt->program_version);
#ifdef ENABLE_HTTPS
netdata_ssl_close(&rpt->ssl);
#endif
if(rpt->fd != -1) {
internal_error(true, "closing socket...");
close(rpt->fd);
}
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
#endif
if(rpt->system_info)
rrdhost_system_info_free(rpt->system_info);
__atomic_sub_fetch(&netdata_buffers_statistics.rrdhost_receivers, sizeof(*rpt), __ATOMIC_RELAXED);
freez(rpt);
}
#include "collectors/plugins.d/pluginsd_parser.h"
PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
{
const char *host_uuid_str = get_word(words, num_words, 1);
const char *claim_id_str = get_word(words, num_words, 2);
if (!host_uuid_str || !claim_id_str) {
error("Command CLAIMED_ID came malformed, uuid = '%s', claim_id = '%s'",
host_uuid_str ? host_uuid_str : "[unset]",
claim_id_str ? claim_id_str : "[unset]");
return PARSER_RC_ERROR;
}
uuid_t uuid;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
// We don't need the parsed UUID
// just do it to check the format
if(uuid_parse(host_uuid_str, uuid)) {
error("1st parameter (host GUID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", host_uuid_str);
return PARSER_RC_ERROR;
}
if(uuid_parse(claim_id_str, uuid) && strcmp(claim_id_str, "NULL")) {
error("2nd parameter (Claim ID) to CLAIMED_ID command is not valid GUID. Received: \"%s\".", claim_id_str);
return PARSER_RC_ERROR;
}
if(strcmp(host_uuid_str, host->machine_guid)) {
error("Claim ID is for host \"%s\" but it came over connection for \"%s\"", host_uuid_str, host->machine_guid);
return PARSER_RC_OK; //the message is OK problem must be somewhere else
}
rrdhost_aclk_state_lock(host);
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
rrdhost_aclk_state_unlock(host);
rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
rrdpush_claimed_id(host);
return PARSER_RC_OK;
}
static int read_stream(struct receiver_state *r, char* buffer, size_t size) {
if(unlikely(!size)) {
internal_error(true, "%s() asked to read zero bytes", __FUNCTION__);
return 0;
}
ssize_t bytes_read;
#ifdef ENABLE_HTTPS
if (SSL_connection(&r->ssl))
bytes_read = netdata_ssl_read(&r->ssl, buffer, size);
else
bytes_read = read(r->fd, buffer, size);
#else
bytes_read = read(r->fd, buffer, size);
#endif
if((bytes_read == 0 || bytes_read == -1) && (errno == EAGAIN || errno == EWOULDBLOCK || errno == EINPROGRESS)) {
error("STREAM: %s(): timeout while waiting for data on socket!", __FUNCTION__);
bytes_read = -3;
}
else if (bytes_read == 0) {
error("STREAM: %s(): EOF while reading data from socket!", __FUNCTION__);
bytes_read = -1;
}