// SPDX-License-Identifier: GPL-3.0-or-later
#include "rrdpush.h"
#include "parser/parser.h"
#define WORKER_RECEIVER_JOB_BYTES_READ (WORKER_PARSER_FIRST_JOB - 1)
#if WORKER_PARSER_FIRST_JOB < 1
#error The define WORKER_PARSER_FIRST_JOB needs to be at least 1
#endif
extern struct config stream_config;
void destroy_receiver_state(struct receiver_state *rpt) {
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->registry_hostname);
freez(rpt->machine_guid);
freez(rpt->os);
freez(rpt->timezone);
freez(rpt->abbrev_timezone);
freez(rpt->tags);
freez(rpt->client_ip);
freez(rpt->client_port);
freez(rpt->program_name);
freez(rpt->program_version);
#ifdef ENABLE_HTTPS
if(rpt->ssl.conn){
SSL_free(rpt->ssl.conn);
}
#endif
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
#endif
freez(rpt);
}
static void rrdpush_receiver_thread_cleanup(void *ptr) {
worker_unregister();
static __thread int executed = 0;
if(!executed) {
executed = 1;
struct receiver_state *rpt = (struct receiver_state *) ptr;
// If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
// the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
if (netdata_exit && rpt->host) {
rpt->exited = 1;
return;
}
// Make sure that we detach this thread and don't kill a freshly arriving receiver
if (!netdata_exit && rpt->host) {
netdata_mutex_lock(&rpt->host->receiver_lock);
if (rpt->host->receiver == rpt)
rpt->host->receiver = NULL;
netdata_mutex_unlock(&rpt->host->receiver_lock);
}
info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
destroy_receiver_state(rpt);
}
}
#include "collectors/plugins.d/pluginsd_parser.h"
PARSER_RC streaming_timestamp(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(plugins_action);
char *remote_time_txt = words[1];
time_t remote_time = 0;
RRDHOST *host = ((PARSER_USER_OBJECT *)user)->host;
struct plugind *cd = ((PARSER_USER_OBJECT *)user)->cd;
if (!(cd->capabilities & STREAM_CAP_GAP_FILLING)) {
error("STREAM %s from %s: Child negotiated version %u but sent TIMESTAMP!", rrdhost_hostname(host), cd->cmd, cd->capabilities);
return PARSER_RC_OK; // Ignore error and continue stream
}
if (remote_time_txt && *remote_time_txt) {
remote_time = str2ull(remote_time_txt);
time_t now = now_realtime_sec(), prev = rrdhost_last_entry_t(host);
time_t gap = 0;
if (prev == 0)
info(
"STREAM %s from %s: Initial connection (no gap to check), "
"remote=%"PRId64" local=%"PRId64" slew=%"PRId64"",
rrdhost_hostname(host),
cd->cmd,
(int64_t)remote_time,
(int64_t)now,
(int64_t)now - remote_time);
else {
gap = now - prev;
info(
"STREAM %s from %s: Checking for gaps... "
"remote=%"PRId64" local=%"PRId64"..%"PRId64" slew=%"PRId64" %"PRId64"-sec gap",
rrdhost_hostname(host),
cd->cmd,
(int64_t)remote_time,
(int64_t)prev,
(int64_t)now,
(int64_t)(remote_time - now),
(int64_t)gap);
}
char message[128];
sprintf(
message,
"REPLICATE %"PRId64" %"PRId64"\n",
(int64_t)(remote_time - gap),
(int64_t)remote_time);
int ret;
#ifdef ENABLE_HTTPS
SSL *conn = host->receiver->ssl.conn ;
if(conn && !host->receiver->ssl.flags) {
ret = SSL_write(conn, message, strlen(message));
} else {
ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
}
#else
ret = send(host->receiver->fd, message, strlen(message), MSG_DONTWAIT);
#endif
if (ret != (int)strlen(message))
error("Failed to send initial timestamp - gaps may appear in charts");
return PARSER_RC_OK;
}
return PARSER_RC_ERROR;
}
#define CLAIMED_ID_MIN_WORDS 3
PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugins_action)
{
UNUSED(plugins_action);
int i;
uuid_t uuid;
RRDHOST *