diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-10 19:59:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 19:59:21 +0200 |
commit | 368a26cfee6887ca0cb2301d93138f63b75e353a (patch) | |
tree | b57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /streaming | |
parent | b513888be389f92b2323d1bb3fdf55c22d4e4bad (diff) |
DBENGINE v2 (#14125)
* count open cache pages refering to datafile
* eliminate waste flush attempts
* remove eliminated variable
* journal v2 scanning split functions
* avoid locking open cache for a long time while migrating to journal v2
* dont acquire datafile for the loop; disable thread cancelability while a query is running
* work on datafile acquiring
* work on datafile deletion
* work on datafile deletion again
* logs of dbengine should start with DBENGINE
* thread specific key for queries to check if a query finishes without a finalize
* page_uuid is not used anymore
* Cleanup judy traversal when building new v2
Remove not needed calls to metric registry
* metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent
* disable checks for invalid time-ranges
* Remove type from page details
* report scanning time
* remove infinite loop from datafile acquire for deletion
* remove infinite loop from datafile acquire for deletion again
* trace query handles
* properly allocate array of dimensions in replication
* metrics cleanup
* metrics registry uses arrayalloc
* arrayalloc free should be protected by lock
* use array alloc in page cache
* journal v2 scanning fix
* datafile reference leaking hunding
* do not load metrics of future timestamps
* initialize reasons
* fix datafile reference leak
* do not load pages that are entirely overlapped by others
* expand metric retention atomically
* split replication logic in initialization and execution
* replication prepare ahead queries
* replication prepare ahead queries fixed
* fix replication workers accounting
* add router active queries chart
* restore accounting of pages metadata sources; cleanup replication
* dont count skipped pages as unroutable
* notes on services shutdown
* do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file
* do not add pages we dont need to pdc
* time in range re-work to provide info about past and future matches
* finner control on the pages selected for processing; accounting of page related issues
* fix invalid reference to handle->page
* eliminate data collection handle of pg_lookup_next
* accounting for queries with gaps
* query preprocessing the same way the processing is done; cache now supports all operations on Judy
* dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3)
* get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query
* finner gaps calculation; accounting of overlapping pages in queries
* fix gaps accounting
* move datafile deletion to worker thread
* tune libuv workers and thread stack size
* stop netdata threads gradually
* run indexing together with cache flush/evict
* more work on clean shutdown
* limit the number of pages to evict per run
* do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction
* economies on flags for smaller page footprint; cleanup and renames
* eviction moves referenced pages to the end of the queue
* use murmur hash for indexing partition
* murmur should be static
* use more indexing partitions
* revert number of partitions to number of cpus
* cancel threads first, then stop services
* revert default thread stack size
* dont execute replication requests of disconnected senders
* wait more time for services that are exiting gradually
* fixed last commit
* finer control on page selection algorithm
* default stacksize of 1MB
* fix formatting
* fix worker utilization going crazy when the number is rotating
* avoid buffer full due to replication preprocessing of requests
* support query priorities
* add count of spins in spinlock when compiled with netdata internal checks
* remove prioritization from dbengine queries; cache now uses mutexes for the queues
* hot pages are now in sections judy arrays, like dirty
* align replication queries to optimal page size
* during flushing add to clean and evict in batches
* Revert "during flushing add to clean and evict in batches"
This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7.
* dont lock clean while evicting pages during flushing
* Revert "dont lock clean while evicting pages during flushing"
This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3.
* Revert "Revert "during flushing add to clean and evict in batches""
This reverts commit ca7a187537fb8f743992700427e13042561211ec.
* dont cross locks during flushing, for the fastest flushes possible
* low-priority queries load pages synchronously
* Revert "low-priority queries load pages synchronously"
This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7.
* cache uses spinlock again
* during flushing, dont lock the clean queue at all; each item is added atomically
* do smaller eviction runs
* evict one page at a time to minimize lock contention on the clean queue
* fix eviction statistics
* fix last commit
* plain should be main cache
* event loop cleanup; evictions and flushes can now happen concurrently
* run flush and evictions from tier0 only
* remove not needed variables
* flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time
* added worker jobs in timer to find the slow part of it
* support fast eviction of pages when all_of_them is set
* revert default thread stack size
* bypass event loop for dispatching read extent commands to workers - send them directly
* Revert "bypass event loop for dispatching read extent commands to workers - send them directly"
This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce.
* cache work requests
* minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors
* publish flushed pages to open cache in the thread pool
* prevent eventloop requests from getting stacked in the event loop
* single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c
* more rrdengine.c cleanup
* enable db rotation
* do not log when there is a filter
* do not run multiple migration to journal v2
* load all extents async
* fix wrong paste
* report opcodes waiting, works dispatched, works executing
* cleanup event loop memory every 10 minutes
* dont dispatch more work requests than the number of threads available
* use the dispatched counter instead of the executing counter to check if the worker thread pool is full
* remove UV_RUN_NOWAIT
* replication to fill the queues
* caching of extent buffers; code cleanup
* caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation
* single transaction wal
* synchronous flushing
* first cancel the threads, then signal them to exit
* caching of rrdeng query handles; added priority to query target; health is now low prio
* add priority to the missing points; do not allow critical priority in queries
* offload query preparation and routing to libuv thread pool
* updated timing charts for the offloaded query preparation
* caching of WALs
* accounting for struct caches (buffers); do not load extents with invalid sizes
* protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset
* also check if the expanded before is not the chart later updated time
* also check if the expanded before is not after the wall clock time of when the query started
* Remove unused variable
* replication to queue less queries; cleanup of internal fatals
* Mark dimension to be updated async
* caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol)
* disable pgc stress test, under an ifdef
* disable mrg stress test under an ifdef
* Mark chart and host labels, host info for async check and store in the database
* dictionary items use arrayalloc
* cache section pages structure is allocated with arrayalloc
* Add function to wakeup the aclk query threads and check for exit
Register function to be called during shutdown after signaling the service to exit
* parallel preparation of all dimensions of queries
* be more sensitive to enable streaming after replication
* atomically finish chart replication
* fix last commit
* fix last commit again
* fix last commit again again
* fix last commit again again again
* unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication
* do not cancel start streaming; use high priority queries when we have locked chart data collection
* prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered
* opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue
* Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore
* Fix bad memory allocation / cleanup
* Cleanup ACLK sync initialization (part 1)
* Don't update metric registry during shutdown (part 1)
* Prevent crash when dashboard is refreshed and host goes away
* Mark ctx that is shutting down.
Test not adding flushed pages to open cache as hot if we are shutting down
* make ML work
* Fix compile without NETDATA_INTERNAL_CHECKS
* shutdown each ctx independently
* fix completion of quiesce
* do not update shared ML charts
* Create ML charts on child hosts.
When a parent runs a ML for a child, the relevant-ML charts
should be created on the child host. These charts should use
the parent's hostname to differentiate multiple parents that might
run ML for a child.
The only exception to this rule is the training/prediction resource
usage charts. These are created on the localhost of the parent host,
because they provide information specific to said host.
* check new ml code
* first save the database, then free all memory
* dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db
* Cleanup metadata thread (part 2)
* increase refcount before dispatching prep command
* Do not try to stop anomaly detection threads twice.
A separate function call has been added to stop anomaly detection threads.
This commit removes the left over function calls that were made
internally when a host was being created/destroyed.
* Remove allocations when smoothing samples buffer
The number of dims per sample is always 1, ie. we are training and
predicting only individual dimensions.
* set the orphan flag when loading archived hosts
* track worker dispatch callbacks and threadpool worker init
* make ML threads joinable; mark ctx having flushing in progress as early as possible
* fix allocation counter
* Cleanup metadata thread (part 3)
* Cleanup metadata thread (part 4)
* Skip metadata host scan when running unittest
* unittest support during init
* dont use all the libuv threads for queries
* break an infinite loop when sleep_usec() is interrupted
* ml prediction is a collector for several charts
* sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit
* worker_unregister() in netdata threads cleanup
* moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h
* fixed ML issues
* removed engine2 directory
* added dbengine2 files in CMakeLists.txt
* move query plan data to query target, so that they can be exposed by in jsonwrap
* uniform definition of query plan according to the other query target members
* event_loop should be in daemon, not libnetdata
* metric_retention_by_uuid() is now part of the storage engine abstraction
* unify time_t variables to have the suffix _s (meaning: seconds)
* old dbengine statistics become "dbengine io"
* do not enable ML resource usage charts by default
* unify ml chart families, plugins and modules
* cleanup query plans from query target
* cleanup all extent buffers
* added debug info for rrddim slot to time
* rrddim now does proper gap management
* full rewrite of the mem modes
* use library functions for madvise
* use CHECKSUM_SZ for the checksum size
* fix coverity warning about the impossible case of returning a page that is entirely in the past of the query
* fix dbengine shutdown
* keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently
* fine tune cache evictions
* dont initialize health if the health service is not running - prevent crash on shutdown while children get connected
* rename AS threads to ACLK[hostname]
* prevent re-use of uninitialized memory in queries
* use JulyL instead of JudyL for PDC operations - to test it first
* add also JulyL files
* fix July memory accounting
* disable July for PDC (use Judy)
* use the function to remove datafiles from linked list
* fix july and event_loop
* add july to libnetdata subdirs
* rename time_t variables that end in _t to end in _s
* replicate when there is a gap at the beginning of the replication period
* reset postponing of sender connections when a receiver is connected
* Adjust update every properly
* fix replication infinite loop due to last change
* packed enums in rrd.h and cleanup of obsolete rrd structure members
* prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests()
* void unused variable
* void unused variables
* fix indentation
* entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps
* macros to caclulate page entries by time and size
* prevent statsd cleanup crash on exit
* cleanup health thread related variables
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'streaming')
-rw-r--r-- | streaming/receiver.c | 611 | ||||
-rw-r--r-- | streaming/replication.c | 733 | ||||
-rw-r--r-- | streaming/rrdpush.c | 572 | ||||
-rw-r--r-- | streaming/rrdpush.h | 48 | ||||
-rw-r--r-- | streaming/sender.c | 174 |
5 files changed, 1300 insertions, 838 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index c1bb291523..012124edd9 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -16,7 +16,8 @@ extern struct config stream_config; -void destroy_receiver_state(struct receiver_state *rpt) { +void receiver_state_free(struct receiver_state *rpt) { + freez(rpt->key); freez(rpt->hostname); freez(rpt->registry_hostname); @@ -29,43 +30,21 @@ void destroy_receiver_state(struct receiver_state *rpt) { freez(rpt->client_port); freez(rpt->program_name); freez(rpt->program_version); + #ifdef ENABLE_HTTPS - if(rpt->ssl.conn){ + if(rpt->ssl.conn) SSL_free(rpt->ssl.conn); - } #endif + #ifdef ENABLE_COMPRESSION if (rpt->decompressor) rpt->decompressor->destroy(&rpt->decompressor); #endif - freez(rpt); -} - -static void rrdpush_receiver_thread_cleanup(void *ptr) { - worker_unregister(); - static __thread int executed = 0; - if(!executed) { - executed = 1; - struct receiver_state *rpt = (struct receiver_state *) ptr; - // If the shutdown sequence has started, and this receiver is still attached to the host then we cannot touch - // the host pointer as it is unpredictable when the RRDHOST is deleted. Do the cleanup from rrdhost_free(). - if (netdata_exit && rpt->host) { - rpt->exited = 1; - return; - } + if(rpt->system_info) + rrdhost_system_info_free(rpt->system_info); - // Make sure that we detach this thread and don't kill a freshly arriving receiver - if (!netdata_exit && rpt->host) { - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == rpt) - rpt->host->receiver = NULL; - netdata_mutex_unlock(&rpt->host->receiver_lock); - } - - info("STREAM %s [receive from [%s]:%s]: receive thread ended (task id %d)", rpt->hostname, rpt->client_ip, rpt->client_port, gettid()); - destroy_receiver_state(rpt); - } + freez(rpt); } #include "collectors/plugins.d/pluginsd_parser.h" @@ -105,11 +84,10 @@ PARSER_RC streaming_claimed_id(char **words, size_t num_words, void *user) if (host->aclk_state.claimed_id) freez(host->aclk_state.claimed_id); host->aclk_state.claimed_id = strcmp(claim_id_str, "NULL") ? strdupz(claim_id_str) : NULL; - - metaqueue_store_claim_id(&host->host_uuid, host->aclk_state.claimed_id ? &uuid : NULL); - rrdhost_aclk_state_unlock(host); + rrdhost_flag_set(host, RRDHOST_FLAG_METADATA_CLAIMID |RRDHOST_FLAG_METADATA_UPDATE); + rrdpush_claimed_id(host); return PARSER_RC_OK; @@ -390,39 +368,50 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i size_t read_buffer_start = 0; char buffer[PLUGINSD_LINE_MAX + 2] = ""; - while(!netdata_exit) { + while(service_running(SERVICE_STREAMING)) { + netdata_thread_testcancel(); + if(!receiver_next_line(rpt, buffer, PLUGINSD_LINE_MAX + 2, &read_buffer_start)) { bool have_new_data; - if(compressed_connection) + if(likely(compressed_connection)) have_new_data = receiver_read_compressed(rpt); else have_new_data = receiver_read_uncompressed(rpt); - if(!have_new_data) + if(unlikely(!have_new_data)) { + if(!rpt->exit.reason) + rpt->exit.reason = "SOCKET READ ERROR"; + break; + } rpt->last_msg_t = now_realtime_sec(); continue; } - if(unlikely(netdata_exit)) { - internal_error(true, "exiting..."); + if(unlikely(!service_running(SERVICE_STREAMING))) { + if(!rpt->exit.reason) + rpt->exit.reason = "NETDATA EXIT"; goto done; } - if(unlikely(rpt->shutdown)) { - internal_error(true, "parser shutdown..."); + if(unlikely(rpt->exit.shutdown)) { + if(!rpt->exit.reason) + rpt->exit.reason = "SHUTDOWN REQUESTED"; + goto done; } if (unlikely(parser_action(parser, buffer))) { internal_error(true, "parser_action() failed on keyword '%s'.", buffer); + + if(!rpt->exit.reason) + rpt->exit.reason = "PARSER FAILED"; + break; } } done: - internal_error(true, "Streaming receiver thread stopping..."); - result = user.count; // free parser with the pop function @@ -431,103 +420,236 @@ done: return result; } -static void rrdpush_receiver_replication_reset(struct receiver_state *rpt) { +static void rrdpush_receiver_replication_reset(RRDHOST *host) { RRDSET *st; - rrdset_foreach_read(st, rpt->host) { + rrdset_foreach_read(st, host) { rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); } rrdset_foreach_done(st); - rrdhost_receiver_replicating_charts_zero(rpt->host); + rrdhost_receiver_replicating_charts_zero(host); +} + +bool rrdhost_set_receiver(RRDHOST *host, struct receiver_state *rpt) { + bool signal_rrdcontext = false; + bool set_this = false; + + netdata_mutex_lock(&host->receiver_lock); + + if (!host->receiver || host->receiver == rpt) { + rrdhost_flag_clear(host, RRDHOST_FLAG_ORPHAN); + + host->receiver = rpt; + rpt->host = host; + + host->child_connect_time = now_realtime_sec(); + host->child_disconnected_time = 0; + host->child_last_chart_command = 0; + host->trigger_chart_obsoletion_check = 1; + + if (rpt->config.health_enabled != CONFIG_BOOLEAN_NO) { + if (rpt->config.alarms_delay > 0) { + host->health_delay_up_to = now_realtime_sec() + rpt->config.alarms_delay; + log_health( + "[%s]: Postponing health checks for %" PRId64 " seconds, because it was just connected.", + rrdhost_hostname(host), + (int64_t) rpt->config.alarms_delay); + } + } + + signal_rrdcontext = true; + rrdpush_receiver_replication_reset(host); + + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); + + set_this = true; + } + + netdata_mutex_unlock(&host->receiver_lock); + + if(signal_rrdcontext) + rrdcontext_host_child_connected(host); + + return set_this; +} + +static void rrdhost_clear_receiver(struct receiver_state *rpt) { + bool signal_rrdcontext = false; + + RRDHOST *host = rpt->host; + if(host) { + netdata_mutex_lock(&host->receiver_lock); + + // Make sure that we detach this thread and don't kill a freshly arriving receiver + if(host->receiver == rpt) { + host->trigger_chart_obsoletion_check = 0; + host->child_connect_time = 0; + host->child_disconnected_time = now_realtime_sec(); + + if (rpt->config.health_enabled == CONFIG_BOOLEAN_AUTO) + host->health_enabled = 0; + + rrdpush_sender_thread_stop(host, "RECEIVER LEFT", false); + + signal_rrdcontext = true; + rrdpush_receiver_replication_reset(host); + + if (host->receiver == rpt) + host->receiver = NULL; + + rrdhost_flag_set(host, RRDHOST_FLAG_ORPHAN); + } + + netdata_mutex_unlock(&host->receiver_lock); + + if(signal_rrdcontext) + rrdcontext_host_child_disconnected(host); + } +} + +bool stop_streaming_receiver(RRDHOST *host, const char *reason) { + bool ret = false; + + netdata_mutex_lock(&host->receiver_lock); + + if(host->receiver) { + if(!host->receiver->exit.shutdown) { + host->receiver->exit.shutdown = true; + host->receiver->exit.reason = reason; + shutdown(host->receiver->fd, SHUT_RDWR); + } + + netdata_thread_cancel(host->receiver->thread); + } + + int count = 2000; + while (host->receiver && count-- > 0) { + netdata_mutex_unlock(&host->receiver_lock); + + // let the lock for the receiver thread to exit + sleep_usec(1 * USEC_PER_MS); + + netdata_mutex_lock(&host->receiver_lock); + } + + if(host->receiver) + error("STREAM '%s' [receive from [%s]:%s]: " + "thread %d takes too long to stop, giving up..." + , rrdhost_hostname(host) + , host->receiver->client_ip, host->receiver->client_port + , gettid()); + else + ret = true; + + netdata_mutex_unlock(&host->receiver_lock); + + return ret; +} + +void rrdpush_receive_log_status(struct receiver_state *rpt, const char *msg, const char *status) { + + log_stream_connection(rpt->client_ip, rpt->client_port, + (rpt->key && *rpt->key)? rpt->key : "-", + (rpt->machine_guid && *rpt->machine_guid) ? rpt->machine_guid : "-", + (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-", + status); + + info("STREAM '%s' [receive from [%s]:%s]: " + "%s. " + "STATUS: %s%s%s%s" + , rpt->hostname + , rpt->client_ip, rpt->client_port + , msg + , status + , rpt->exit.reason?" (":"" + , rpt->exit.reason?rpt->exit.reason:"" + , rpt->exit.reason?")":"" + ); + +} + +static void rrdhost_reset_destinations(RRDHOST *host) { + for (struct rrdpush_destinations *d = host->destinations; d; d = d->next) + d->postpone_reconnection_until = 0; } static int rrdpush_receive(struct receiver_state *rpt) { - int history = default_rrd_history_entries; - RRD_MEMORY_MODE mode = default_rrd_memory_mode; - int health_enabled = default_health_enabled; - int rrdpush_enabled = default_rrdpush_enabled; - char *rrdpush_destination = default_rrdpush_destination; - char *rrdpush_api_key = default_rrdpush_api_key; - char *rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; - bool rrdpush_enable_replication = default_rrdpush_enable_replication; - time_t rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; - time_t rrdpush_replication_step = default_rrdpush_replication_step; - time_t alarms_delay = 60; - - rpt->update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->update_every); - if(rpt->update_every < 0) rpt->update_every = 1; - - history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", history); - history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", history); - if(history < 5) history = 5; - - mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(mode))); - mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(mode))); - - if (unlikely(mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { - error("STREAM %s [receive from %s:%s]: dbengine is not enabled, falling back to default.", rpt->hostname, rpt->client_ip, rpt->client_port); - mode = default_rrd_memory_mode; + rpt->config.mode = default_rrd_memory_mode; + rpt->config.history = default_rrd_history_entries; + + rpt->config.health_enabled = (int)default_health_enabled; + rpt->config.alarms_delay = 60; + + rpt->config.rrdpush_enabled = (int)default_rrdpush_enabled; + rpt->config.rrdpush_destination = default_rrdpush_destination; + rpt->config.rrdpush_api_key = default_rrdpush_api_key; + rpt->config.rrdpush_send_charts_matching = default_rrdpush_send_charts_matching; + + rpt->config.rrdpush_enable_replication = default_rrdpush_enable_replication; + rpt->config.rrdpush_seconds_to_replicate = default_rrdpush_seconds_to_replicate; + rpt->config.rrdpush_replication_step = default_rrdpush_replication_step; + + rpt->config.update_every = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "update every", rpt->config.update_every); + if(rpt->config.update_every < 0) rpt->config.update_every = 1; + + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->key, "default history", rpt->config.history); + rpt->config.history = (int)appconfig_get_number(&stream_config, rpt->machine_guid, "history", rpt->config.history); + if(rpt->config.history < 5) rpt->config.history = 5; + + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->key, "default memory mode", rrd_memory_mode_name(rpt->config.mode))); + rpt->config.mode = rrd_memory_mode_id(appconfig_get(&stream_config, rpt->machine_guid, "memory mode", rrd_memory_mode_name(rpt->config.mode))); + + if (unlikely(rpt->config.mode == RRD_MEMORY_MODE_DBENGINE && !dbengine_enabled)) { + error("STREAM '%s' [receive from %s:%s]: " + "dbengine is not enabled, falling back to default." + , rpt->hostname + , rpt->client_ip, rpt->client_port + ); + + rpt->config.mode = default_rrd_memory_mode; } - health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", health_enabled); - health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", health_enabled); + rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->key, "health enabled by default", rpt->config.health_enabled); + rpt->config.health_enabled = appconfig_get_boolean_ondemand(&stream_config, rpt->machine_guid, "health enabled", rpt->config.health_enabled); - alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", alarms_delay); - alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", alarms_delay); + rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->key, "default postpone alarms on connect seconds", rpt->config.alarms_delay); + rpt->config.alarms_delay = appconfig_get_number(&stream_config, rpt->machine_guid, "postpone alarms on connect seconds", rpt->config.alarms_delay); - rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rrdpush_enabled); - rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->key, "default proxy enabled", rpt->config.rrdpush_enabled); + rpt->config.rrdpush_enabled = appconfig_get_boolean(&stream_config, rpt->machine_guid, "proxy enabled", rpt->config.rrdpush_enabled); - rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rrdpush_destination); - rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->key, "default proxy destination", rpt->config.rrdpush_destination); + rpt->config.rrdpush_destination = appconfig_get(&stream_config, rpt->machine_guid, "proxy destination", rpt->config.rrdpush_destination); - rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rrdpush_api_key); - rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->key, "default proxy api key", rpt->config.rrdpush_api_key); + rpt->config.rrdpush_api_key = appconfig_get(&stream_config, rpt->machine_guid, "proxy api key", rpt->config.rrdpush_api_key); - rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); - rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rpt->config.rrdpush_send_charts_matching); + rpt->config.rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rpt->config.rrdpush_send_charts_matching); - rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rrdpush_enable_replication); - rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rrdpush_enable_replication); + rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->key, "enable replication", rpt->config.rrdpush_enable_replication); + rpt->config.rrdpush_enable_replication = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable replication", rpt->config.rrdpush_enable_replication); - rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rrdpush_seconds_to_replicate); - rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->key, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); + rpt->config.rrdpush_seconds_to_replicate = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds to replicate", rpt->config.rrdpush_seconds_to_replicate); - rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rrdpush_replication_step); - rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->key, "seconds per replication step", rpt->config.rrdpush_replication_step); + rpt->config.rrdpush_replication_step = appconfig_get_number(&stream_config, rpt->machine_guid, "seconds per replication step", rpt->config.rrdpush_replication_step); #ifdef ENABLE_COMPRESSION - unsigned int rrdpush_compression = default_compression_enabled; - rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); - rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression); - rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled); + rpt->config.rrdpush_compression = default_compression_enabled; + rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rpt->config.rrdpush_compression); + rpt->config.rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rpt->config.rrdpush_compression); + rpt->rrdpush_compression = (rpt->config.rrdpush_compression && default_compression_enabled); #endif //ENABLE_COMPRESSION (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); - if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "DENIED - ATTEMPT TO RECEIVE METRICS FROM MACHINE_GUID IDENTICAL TO PARENT"); - error("STREAM %s [receive from %s:%s]: denied to receive metrics, machine GUID [%s] is my own. Did you copy the parent/proxy machine GUID to a child, or is this an inter-agent loop?", rpt->hostname, rpt->client_ip, rpt->client_port, rpt->machine_guid); - char initial_response[HTTP_HEADER_SIZE + 1]; - snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST); -#ifdef ENABLE_HTTPS - if(send_timeout(&rpt->ssl, rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) { -#else - if(send_timeout(rpt->fd, initial_response, strlen(initial_response), 0, 60) != strlen(initial_response)) { -#endif - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->host->machine_guid, rrdhost_hostname(rpt->host), "FAILED - CANNOT REPLY"); - error("STREAM %s [receive from [%s]:%s]: cannot send command.", rrdhost_hostname(rpt->host), rpt->client_ip, rpt->client_port); - close(rpt->fd); - return 0; - } - close(rpt->fd); - return 0; - } - - if (rpt->host==NULL) { - - rpt->host = rrdhost_find_or_create( + // find the host for this receiver + { + // this will also update the host with our system_info + RRDHOST *host = rrdhost_find_or_create( rpt->hostname , rpt->registry_hostname , rpt->machine_guid @@ -538,76 +660,41 @@ static int rrdpush_receive(struct receiver_state *rpt) , rpt->tags , rpt->program_name , rpt->program_version - , rpt->update_every - , history - , mode - , (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO) - , (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key) - , rrdpush_destination - , rrdpush_api_key - , rrdpush_send_charts_matching - , rrdpush_enable_replication - , rrdpush_seconds_to_replicate - , rrdpush_replication_step + , rpt->config.update_every + , rpt->config.history + , rpt->config.mode + , (unsigned int)(rpt->config.health_enabled != CONFIG_BOOLEAN_NO) + , (unsigned int)(rpt->config.rrdpush_enabled && rpt->config.rrdpush_destination && *rpt->config.rrdpush_destination && rpt->config.rrdpush_api_key && *rpt->config.rrdpush_api_key) + , rpt->config.rrdpush_destination + , rpt->config.rrdpush_api_key + , rpt->config.rrdpush_send_charts_matching + , rpt->config.rrdpush_enable_replication + , rpt->config.rrdpush_seconds_to_replicate + , rpt->config.rrdpush_replication_step , rpt->system_info , 0 ); - if(!rpt->host) { + if(!host) { + rrdpush_receive_log_status(rpt, "failed to find/create host structure", "INTERNAL ERROR DROPPING CONNECTION"); close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - CANNOT ACQUIRE HOST"); - error("STREAM %s [receive from [%s]:%s]: failed to find/create host structure.", rpt->hostname, rpt->client_ip, rpt->client_port); return 1; } - netdata_mutex_lock(&rpt->host->receiver_lock); - if (rpt->host->receiver == NULL) - rpt->host->receiver = rpt; - else { - error("Multiple receivers connected for %s concurrently, cancelling this one...", rpt->machine_guid); - netdata_mutex_unlock(&rpt->host->receiver_lock); + // system_info has been consumed by the host structure + rpt->system_info = NULL; + + if(!rrdhost_set_receiver(host, rpt)) { + rrdpush_receive_log_status(rpt, "host is already served by another receiver", "DUPLICATE RECEIVER DROPPING CONNECTION"); close(rpt->fd); - log_stream_connection(rpt->client_ip, rpt->client_port, rpt->key, rpt->machine_guid, rpt->hostname, "FAILED - BEATEN TO HOST CREATION"); return 1; } - netdata_mutex_unlock(&rpt->host->receiver_lock); - } - else { - rrd_wrlock(); - rrdhost_update( - rpt->host, - rpt->hostname, - rpt->registry_hostname, - rpt->machine_guid, - rpt->os, - rpt->timezone, - rpt->abbrev_timezone, - rpt->utc_offset, - rpt->tags, - rpt->program_name, - rpt->program_version, - rpt->update_every, - history, - mode, - (unsigned int)(health_enabled != CONFIG_BOOLEAN_NO), - (unsigned int)(rrdpush_enabled && rrdpush_destination && *rrdpush_destination && rrdpush_api_key && *rrdpush_api_key), - rrdpush_destination, - rrdpush_api_key, - rrdpush_send_charts_matching, - rrdpush_enable_replication, - rrdpush_seconds_to_replicate, - rrdpush_replication_step, - rpt->system_info); - rrd_unlock(); } #ifdef NETDATA_INTERNAL_CHECKS - int ssl = 0; -#ifdef ENABLE_HTTPS - if (rpt->ssl.conn != NULL) - ssl = 1; -#endif - info("STREAM %s [receive from [%s]:%s]: client willing to stream metrics for host '%s' with machine_guid '%s': update every = %d, history = %ld, memory mode = %s, health %s,%s tags '%s'" + info("STREAM '%s' [receive from [%s]:%s]: " + "client willing to stream metrics for host '%s' with machine_guid '%s': " + "update every = %d, history = %ld, memory mode = %s, health %s,%s tags |