summaryrefslogtreecommitdiffstats
path: root/streaming
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /streaming
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'streaming')
-rw-r--r--streaming/receiver.c611
-rw-r--r--streaming/replication.c733
-rw-r--r--streaming/rrdpush.c572
-rw-r--r--streaming/rrdpush.h48
-rw-r--r--streaming/sender.c174
5 files changed, 1300 insertions, 838 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index c1bb291523..012124edd9 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -16,7 +16,8 @@
extern struct config stream_config;
-void destroy_receiver_state(struct receiver_state *rpt) {
+void receiver_state_free(struct receiver_state *rpt) {
+
freez(rpt->key);
freez(rpt->hostname);
freez(rpt->registry_hostname);
@@ -29,43 +30,21 @@ void destroy_receiver_state(struct receiver_state *rpt) {
freez(rpt->client_port);
freez(rpt->program_name);
freez(rpt->program_version);
+
#ifdef ENABLE_HTTPS
- if(rpt->ssl.conn){
+ if(rpt->ssl.conn)
SSL_free(rpt->ssl.conn);
- }
#endif
+
#ifdef ENABLE_COMPRESSION
if (rpt->decompressor)
rpt->decompressor->destroy(&rpt->decompressor);
#endif
- freez(rpt);
-}
-
-static void rrdpush_receiver_thread_cleanup(void *ptr) {
- worker_unregister();
- static __thread int executed = 0;
- if(!executed) {
- executed = 1;
- struct receiver_state *rpt = (struct receiver_state *) ptr;
- // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch
- // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free().
- if (netdata_exit && rpt->host) {
- rpt->exited = 1;
- return;
- }
+ if(rpt->system_info)
+ rrdhost_system_info_free(rpt->system_info);
- // Make sure that we detach this thread and don't kill a freshly arriving receiver
- if (!netdata_exit && rpt->host) {
- netdata_mutex_lock(&rpt->host->receiver_lock);
- if (rpt->host->receiver == rpt)
- rpt->host->receiver = NULL;
- netdata_mutex_unlock(&rpt->host->receiver_lock);
- }
-
- info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid());
- destroy_receiver_state(rpt);
- }
+ freez(rpt);
}
#include "collectors/plugins.d/pluginsd_parser.h"
@@ -105,11 +84,10 @@ PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user)
if (host->aclk_state.claimed_id)
freez(host->aclk_state.claimed_id);
host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL;
-
- metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL);
-
rrdhost_aclk_state_unlock(host);
+ rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE);
+
rrdpush_claimed_id(host);
return PARSER_RC_OK;
@@ -390,39 +368,50 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
size_t read_buffer_start = 0;
char buffer[PLUGINSD_LINE_MAX + 2] = "";
- while(!netdata_exit) {
+ while(service_running(SERVICE_STREAMING)) {
+ netdata_thread_testcancel();
+
if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) {
bool have_new_data;
- if(compressed_connection)
+ if(likely(compressed_connection))
have_new_data = receiver_read_compressed(rpt);
else
have_new_data = receiver_read_uncompressed(rpt);
- if(!have_new_data)
+ if(unlikely(!have_new_data)) {
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "SOCKET READ ERROR";
+
break;
+ }
rpt->last_msg_t = now_realtime_sec();
continue;
}
- if(unlikely(netdata_exit)) {
- internal_error(true, "exiting...");
+ if(unlikely(!service_running(SERVICE_STREAMING))) {
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "NETDATA EXIT";
goto done;
}
- if(unlikely(rpt->shutdown)) {
- internal_error(true, "parser shutdown...");
+ if(unlikely(rpt->exit.shutdown)) {
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "SHUTDOWN REQUESTED";
+
goto done;
}
if (unlikely(parser_action(parser, buffer))) {
internal_error(true, "parser_action() failed on keyword '%s'.", buffer);
+
+ if(!rpt->exit.reason)
+ rpt->exit.reason = "PARSER FAILED";
+
break;
}
}
done:
- internal_error(true, "Streaming receiver thread stopping...");
-
result = user.count;
// free parser with the pop function
@@ -431,103 +420,236 @@ done:
return result;
}
-static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) {
+static void rrdpush_receiver_replication_reset(RRDHOST *host) {
RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
+ rrdset_foreach_read(st, host) {
rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
}
rrdset_foreach_done(st);
- rrdhost_receiver_replicating_charts_zero(rpt->host);
+ rrdhost_receiver_replicating_charts_zero(host);
+}
+
+bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) {
+ bool signal_rrdcontext = false;
+ bool set_this = false;
+
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if (!host->receiver || host->receiver == rpt) {
+ rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN);
+
+ host->receiver = rpt;
+ rpt->host = host;
+
+ host->child_connect_time = now_realtime_sec();
+ host->child_disconnected_time = 0;
+ host->child_last_chart_command = 0;
+ host->trigger_chart_obsoletion_check = 1;
+
+ if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) {
+ if (rpt->config.alarms_delay > 0) {
+ host->health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay;
+ log_health(
+ "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.",
+ rrdhost_hostname(host),
+ (int64_t) rpt->config.alarms_delay);
+ }
+ }
+
+ signal_rrdcontext = true;
+ rrdpush_receiver_replication_reset(host);
+
+ rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
+
+ set_this = true;
+ }
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ if(signal_rrdcontext)
+ rrdcontext_host_child_connected(host);
+
+ return set_this;
+}
+
+static void rrdhost_clear_receiver(struct receiver_state *rpt) {
+ bool signal_rrdcontext = false;
+
+ RRDHOST *host = rpt->host;
+ if(host) {
+ netdata_mutex_lock(&host->receiver_lock);
+
+ // Make sure that we detach this thread and don't kill a freshly arriving receiver
+ if(host->receiver == rpt) {
+ host->trigger_chart_obsoletion_check = 0;
+ host->child_connect_time = 0;
+ host->child_disconnected_time = now_realtime_sec();
+
+ if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO)
+ host->health_enabled = 0;
+
+ rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false);
+
+ signal_rrdcontext = true;
+ rrdpush_receiver_replication_reset(host);
+
+ if (host->receiver == rpt)
+ host->receiver = NULL;
+
+ rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN);
+ }
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ if(signal_rrdcontext)
+ rrdcontext_host_child_disconnected(host);
+ }
+}
+
+bool stop_streaming_receiver(RRDHOST *host, const char *reason) {
+ bool ret = false;
+
+ netdata_mutex_lock(&host->receiver_lock);
+
+ if(host->receiver) {
+ if(!host->receiver->exit.shutdown) {
+ host->receiver->exit.shutdown = true;
+ host->receiver->exit.reason = reason;
+ shutdown(host->receiver->fd, SHUT_RDWR);
+ }
+
+ netdata_thread_cancel(host->receiver->thread);
+ }
+
+ int count = 2000;
+ while (host->receiver && count-- > 0) {
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ // let the lock for the receiver thread to exit
+ sleep_usec(1 * USEC_PER_MS);
+
+ netdata_mutex_lock(&host->receiver_lock);
+ }
+
+ if(host->receiver)
+ error("STREAM '%s' [receive from [%s]:%s]: "
+ "thread %d takes too long to stop, giving up..."
+ , rrdhost_hostname(host)
+ , host->receiver->client_ip, host->receiver->client_port
+ , gettid());
+ else
+ ret = true;
+
+ netdata_mutex_unlock(&host->receiver_lock);
+
+ return ret;
+}
+
+void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) {
+
+ log_stream_connection(rpt->client_ip, rpt->client_port,
+ (rpt->key && *rpt->key)? rpt->key : "-",
+ (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-",
+ (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-",
+ status);
+
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "%s. "
+ "STATUS: %s%s%s%s"
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ , msg
+ , status
+ , rpt->exit.reason?" (":""
+ , rpt->exit.reason?rpt->exit.reason:""
+ , rpt->exit.reason?")":""
+ );
+
+}
+
+static void rrdhost_reset_destinations(RRDHOST *host) {
+ for (struct rrdpush_destinations *d = host->destinations; d; d = d->next)
+ d->postpone_reconnection_until = 0;
}
static int rrdpush_receive(struct receiver_state *rpt)
{
- int history = default_rrd_history_entries;
- RRD_MEMORY_MODE mode = default_rrd_memory_mode;
- int health_enabled = default_health_enabled;
- int rrdpush_enabled = default_rrdpush_enabled;
- char *rrdpush_destination = default_rrdpush_destination;
- char *rrdpush_api_key = default_rrdpush_api_key;
- char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
- bool rrdpush_enable_replication = default_rrdpush_enable_replication;
- time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
- time_t rrdpush_replication_step = default_rrdpush_replication_step;
- time_t alarms_delay = 60;
-
- rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every);
- if(rpt->update_every < 0) rpt->update_every = 1;
-
- history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history);
- history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history);
- if(history < 5) history = 5;
-
- mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode)));
- mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode)));
-
- if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
- error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port);
- mode = default_rrd_memory_mode;
+ rpt->config.mode = default_rrd_memory_mode;
+ rpt->config.history = default_rrd_history_entries;
+
+ rpt->config.health_enabled = (int)default_health_enabled;
+ rpt->config.alarms_delay = 60;
+
+ rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled;
+ rpt->config.rrdpush_destination = default_rrdpush_destination;
+ rpt->config.rrdpush_api_key = default_rrdpush_api_key;
+ rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching;
+
+ rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication;
+ rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate;
+ rpt->config.rrdpush_replication_step = default_rrdpush_replication_step;
+
+ rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every);
+ if(rpt->config.update_every < 0) rpt->config.update_every = 1;
+
+ rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history);
+ rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history);
+ if(rpt->config.history < 5) rpt->config.history = 5;
+
+ rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode)));
+ rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode)));
+
+ if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) {
+ error("STREAM '%s' [receive from %s:%s]: "
+ "dbengine is not enabled, falling back to default."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
+
+ rpt->config.mode = default_rrd_memory_mode;
}
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled);
- health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled);
+ rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled);
+ rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled);
- alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay);
- alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay);
+ rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay);
+ rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay);
- rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled);
- rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled);
+ rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled);
+ rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled);
- rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination);
- rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination);
+ rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination);
+ rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination);
- rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key);
- rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key);
+ rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key);
+ rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key);
- rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching);
- rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching);
+ rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
+ rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching);
- rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication);
- rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication);
+ rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication);
+ rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication);
- rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate);
- rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate);
+ rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
+ rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate);
- rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step);
- rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step);
+ rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step);
+ rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step);
#ifdef ENABLE_COMPRESSION
- unsigned int rrdpush_compression = default_compression_enabled;
- rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression);
- rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression);
- rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled);
+ rpt->config.rrdpush_compression = default_compression_enabled;
+ rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression);
+ rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression);
+ rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled);
#endif //ENABLE_COMPRESSION
(void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:"");
- if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT");
- error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid);
- char initial_response[HTTP_HEADER_SIZE + 1];
- snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
-#ifdef ENABLE_HTTPS
- if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
-#else
- if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) {
-#endif
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY");
- error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port);
- close(rpt->fd);
- return 0;
- }
- close(rpt->fd);
- return 0;
- }
-
- if (rpt->host==NULL) {
-
- rpt->host = rrdhost_find_or_create(
+ // find the host for this receiver
+ {
+ // this will also update the host with our system_info
+ RRDHOST *host = rrdhost_find_or_create(
rpt->hostname
, rpt->registry_hostname
, rpt->machine_guid
@@ -538,76 +660,41 @@ static int rrdpush_receive(struct receiver_state *rpt)
, rpt->tags
, rpt->program_name
, rpt->program_version
- , rpt->update_every
- , history
- , mode
- , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO)
- , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key)
- , rrdpush_destination
- , rrdpush_api_key
- , rrdpush_send_charts_matching
- , rrdpush_enable_replication
- , rrdpush_seconds_to_replicate
- , rrdpush_replication_step
+ , rpt->config.update_every
+ , rpt->config.history
+ , rpt->config.mode
+ , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO)
+ , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key)
+ , rpt->config.rrdpush_destination
+ , rpt->config.rrdpush_api_key
+ , rpt->config.rrdpush_send_charts_matching
+ , rpt->config.rrdpush_enable_replication
+ , rpt->config.rrdpush_seconds_to_replicate
+ , rpt->config.rrdpush_replication_step
, rpt->system_info
, 0
);
- if(!rpt->host) {
+ if(!host) {
+ rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION");
close(rpt->fd);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST");
- error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port);
return 1;
}
- netdata_mutex_lock(&rpt->host->receiver_lock);
- if (rpt->host->receiver == NULL)
- rpt->host->receiver = rpt;
- else {
- error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid);
- netdata_mutex_unlock(&rpt->host->receiver_lock);
+ // system_info has been consumed by the host structure
+ rpt->system_info = NULL;
+
+ if(!rrdhost_set_receiver(host, rpt)) {
+ rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION");
close(rpt->fd);
- log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION");
return 1;
}
- netdata_mutex_unlock(&rpt->host->receiver_lock);
- }
- else {
- rrd_wrlock();
- rrdhost_update(
- rpt->host,
- rpt->hostname,
- rpt->registry_hostname,
- rpt->machine_guid,
- rpt->os,
- rpt->timezone,
- rpt->abbrev_timezone,
- rpt->utc_offset,
- rpt->tags,
- rpt->program_name,
- rpt->program_version,
- rpt->update_every,
- history,
- mode,
- (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO),
- (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key),
- rrdpush_destination,
- rrdpush_api_key,
- rrdpush_send_charts_matching,
- rrdpush_enable_replication,
- rrdpush_seconds_to_replicate,
- rrdpush_replication_step,
- rpt->system_info);
- rrd_unlock();
}
#ifdef NETDATA_INTERNAL_CHECKS
- int ssl = 0;
-#ifdef ENABLE_HTTPS
- if (rpt->ssl.conn != NULL)
- ssl = 1;
-#endif
- info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "client willing to stream metrics for host '%s' with machine_guid '%s': "
+ "update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'"
, rpt->hostname