summaryrefslogtreecommitdiffstats
path: root/streaming/rrdpush.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /streaming/rrdpush.c
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'streaming/rrdpush.c')
-rw-r--r--streaming/rrdpush.c572
1 files changed, 327 insertions, 245 deletions
diff --git a/streaming/rrdpush.c b/streaming/rrdpush.c
index 82264c12ec..80addd5566 100644
--- a/streaming/rrdpush.c
+++ b/streaming/rrdpush.c
@@ -108,7 +108,7 @@ int rrdpush_init() {
default_rrdpush_seconds_to_replicate = config_get_number(CONFIG_SECTION_DB, "seconds to replicate", default_rrdpush_seconds_to_replicate);
default_rrdpush_replication_step = config_get_number(CONFIG_SECTION_DB, "seconds per replication step", default_rrdpush_replication_step);
- rrdhost_free_orphan_time = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time);
+ rrdhost_free_orphan_time_s = config_get_number(CONFIG_SECTION_DB, "cleanup orphan hosts after secs", rrdhost_free_orphan_time_s);
#ifdef ENABLE_COMPRESSION
default_compression_enabled = (unsigned int)appconfig_get_boolean(&stream_config, CONFIG_SECTION_STREAM,
@@ -295,40 +295,14 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
rrdsetvar_print_to_streaming_custom_chart_variables(st, wb);
if (stream_has_capability(host->sender, STREAM_CAP_REPLICATION)) {
- time_t first_entry_local = rrdset_first_entry_t_of_tier(st, 0);
- time_t last_entry_local = st->last_updated.tv_sec;
-
- if(unlikely(!last_entry_local))
- last_entry_local = rrdset_last_entry_t(st);
+ time_t db_first_time_t, db_last_time_t;
time_t now = now_realtime_sec();
- if(unlikely(last_entry_local > now)) {
- internal_error(true,
- "RRDSET REPLAY ERROR: 'host:%s/chart:%s' last updated time %ld is in the future, adjusting it to now %ld",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- last_entry_local, now);
- last_entry_local = now;
- }
-
- if(unlikely(first_entry_local && last_entry_local && first_entry_local >= last_entry_local)) {
- internal_error(true,
- "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first updated time %ld is equal or bigger than last updated time %ld, adjusting it last updated time - update every",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- first_entry_local, last_entry_local);
- first_entry_local = last_entry_local - st->update_every;
- }
-
- if(unlikely(!first_entry_local && last_entry_local)) {
- internal_error(true,
- "RRDSET REPLAY ERROR: 'host:%s/chart:%s' first time %ld, last time %ld, setting both to last time",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- first_entry_local, last_entry_local);
- first_entry_local = last_entry_local;
- }
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_time_t, &db_last_time_t, now, 0);
buffer_sprintf(wb, PLUGINSD_KEYWORD_CHART_DEFINITION_END " %llu %llu %llu\n",
- (unsigned long long)first_entry_local,
- (unsigned long long)last_entry_local,
+ (unsigned long long)db_first_time_t,
+ (unsigned long long)db_last_time_t,
(unsigned long long)now);
rrdset_flag_set(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS);
@@ -342,7 +316,7 @@ static inline bool rrdpush_send_chart_definition(BUFFER *wb, RRDSET *st) {
#endif
}
- st->upstream_resync_time = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
+ st->upstream_resync_time_s = st->last_collected_time.tv_sec + (remote_clock_resync_iterations * st->update_every);
return replication_progress;
}
@@ -352,7 +326,7 @@ static void rrdpush_send_chart_metrics(BUFFER *wb, RRDSET *st, struct sender_sta
buffer_fast_strcat(wb, rrdset_id(st), string_strlen(st->id));
buffer_fast_strcat(wb, "\" ", 2);
- if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time)
+ if(stream_has_capability(s, STREAM_CAP_REPLICATION) || st->last_collected_time.tv_sec > st->upstream_resync_time_s)
buffer_print_llu(wb, st->usec_since_last_update);
else
buffer_fast_strcat(wb, "0", 1);
@@ -581,25 +555,16 @@ void rrdpush_destinations_free(RRDHOST *host) {
// Either the receiver lost the connection or the host is being destroyed.
// The sender mutex guards thread creation, any spurious data is wiped on reconnection.
-void rrdpush_sender_thread_stop(RRDHOST *host) {
-
+void rrdpush_sender_thread_stop(RRDHOST *host, const char *reason, bool wait) {
if (!host->sender)
return;
netdata_mutex_lock(&host->sender->mutex);
- netdata_thread_t thr = 0;
if(rrdhost_flag_check(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN)) {
- rrdhost_flag_clear(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
-
- info("STREAM %s [send]: signaling sending thread to stop...", rrdhost_hostname(host));
-
- // signal the thread that we want to join it
- rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_JOIN);
- // copy the thread id, so that we will be waiting for the right one
- // even if a new one has been spawn
- thr = host->rrdpush_sender_thread;
+ host->sender->exit.shutdown = true;
+ host->sender->exit.reason = reason;
// signal it to cancel
netdata_thread_cancel(host->rrdpush_sender_thread);
@@ -607,11 +572,14 @@ void rrdpush_sender_thread_stop(RRDHOST *host) {
netdata_mutex_unlock(&host->sender->mutex);
- if(thr != 0) {
- info("STREAM %s [send]: waiting for the sending thread to stop...", rrdhost_hostname(host));
- void *result;
- netdata_thread_join(thr, &result);
- info("STREAM %s [send]: sending thread has exited.", rrdhost_hostname(host));
+ if(wait) {
+ netdata_mutex_lock(&host->sender->mutex);
+ while(host->sender->tid) {
+ netdata_mutex_unlock(&host->sender->mutex);
+ sleep_usec(10 * USEC_PER_MS);
+ netdata_mutex_lock(&host->sender->mutex);
+ }
+ netdata_mutex_unlock(&host->sender->mutex);
}
}
@@ -631,7 +599,7 @@ static void rrdpush_sender_thread_spawn(RRDHOST *host) {
char tag[NETDATA_THREAD_TAG_MAX + 1];
snprintfz(tag, NETDATA_THREAD_TAG_MAX, "STREAM_SENDER[%s]", rrdhost_hostname(host));
- if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_JOINABLE, rrdpush_sender_thread, (void *) host->sender))
+ if(netdata_thread_create(&host->rrdpush_sender_thread, tag, NETDATA_THREAD_OPTION_DEFAULT, rrdpush_sender_thread, (void *) host->sender))
error("STREAM %s [send]: failed to create new thread for client.", rrdhost_hostname(host));
else
rrdhost_flag_set(host, RRDHOST_FLAG_RRDPUSH_SENDER_SPAWN);
@@ -645,7 +613,7 @@ int rrdpush_receiver_permission_denied(struct web_client *w) {
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "You are not permitted to access this. Check the logs for more info.");
- return 401;
+ return HTTP_RESP_UNAUTHORIZED;
}
int rrdpush_receiver_too_busy_now(struct web_client *w) {
@@ -653,21 +621,38 @@ int rrdpush_receiver_too_busy_now(struct web_client *w) {
// to prevent an attacker from gaining info about the error
buffer_flush(w->response.data);
buffer_sprintf(w->response.data, "The server is too busy now to accept this request. Try later.");
- return 503;
+ return HTTP_RESP_SERVICE_UNAVAILABLE;
}
void *rrdpush_receiver_thread(void *ptr);
int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
- // info("clients wants to STREAM metrics.");
- char *key = NULL, *hostname = NULL, *registry_hostname = NULL, *machine_guid = NULL, *os = "unknown", *timezone = "unknown", *abbrev_timezone = "UTC", *tags = NULL;
- int32_t utc_offset = 0;
- int update_every = default_rrd_update_every;
- uint32_t stream_version = UINT_MAX;
- char buf[GUID_LEN + 1];
+ if(!service_running(ABILITY_STREAMING_CONNECTIONS))
+ return rrdpush_receiver_too_busy_now(w);
+
+ struct receiver_state *rpt = callocz(1, sizeof(*rpt));
+ rpt->last_msg_t = now_realtime_sec();
+ rpt->capabilities = STREAM_CAP_INVALID;
+
+ rpt->system_info = callocz(1, sizeof(struct rrdhost_system_info));
+ rpt->system_info->hops = 1;
+
+ rpt->fd = w->ifd;
+ rpt->client_ip = strdupz(w->client_ip);
+ rpt->client_port = strdupz(w->client_port);
+
+ rpt->config.update_every = default_rrd_update_every;
+
+#ifdef ENABLE_HTTPS
+ rpt->ssl.conn = w->ssl.conn;
+ rpt->ssl.flags = w->ssl.flags;
+
+ w->ssl.conn = NULL;
+ w->ssl.flags = NETDATA_SSL_START;
+#endif
+
+ // parse the parameters and fill rpt and rpt->system_info
- struct rrdhost_system_info *system_info = callocz(1, sizeof(struct rrdhost_system_info));
- system_info->hops = 1;
while(url) {
char *value = mystrsep(&url, "&");
if(!value || !*value) continue;
@@ -676,178 +661,307 @@ int rrdpush_receiver_thread_spawn(struct web_client *w, char *url) {
if(!name || !*name) continue;
if(!value || !*value) continue;
- if(!strcmp(name, "key"))
- key = value;
- else if(!strcmp(name, "hostname"))
- hostname = value;
- else if(!strcmp(name, "registry_hostname"))
- registry_hostname = value;
- else if(!strcmp(name, "machine_guid"))
- machine_guid = value;
+ if(!strcmp(name, "key") && !rpt->key)
+ rpt->key = strdupz(value);
+
+ else if(!strcmp(name, "hostname") && !rpt->hostname)
+ rpt->hostname = strdupz(value);
+
+ else if(!strcmp(name, "registry_hostname") && !rpt->registry_hostname)
+ rpt->registry_hostname = strdupz(value);
+
+ else if(!strcmp(name, "machine_guid") && !rpt->machine_guid)
+ rpt->machine_guid = strdupz(value);
+
else if(!strcmp(name, "update_every"))
- update_every = (int)strtoul(value, NULL, 0);
- else if(!strcmp(name, "os"))
- os = value;
- else if(!strcmp(name, "timezone"))
- timezone = value;
- else if(!strcmp(name, "abbrev_timezone"))
- abbrev_timezone = value;
+ rpt->config.update_every = (int)strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "os") && !rpt->os)
+ rpt->os = strdupz(value);
+
+ else if(!strcmp(name, "timezone") && !rpt->timezone)
+ rpt->timezone = strdupz(value);
+
+ else if(!strcmp(name, "abbrev_timezone") && !rpt->abbrev_timezone)
+ rpt->abbrev_timezone = strdupz(value);
+
else if(!strcmp(name, "utc_offset"))
- utc_offset = (int32_t)strtol(value, NULL, 0);
+ rpt->utc_offset = (int32_t)strtol(value, NULL, 0);
+
else if(!strcmp(name, "hops"))
- system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+ rpt->system_info->hops = (uint16_t) strtoul(value, NULL, 0);
+
else if(!strcmp(name, "ml_capable"))
- system_info->ml_capable = strtoul(value, NULL, 0);
+ rpt->system_info->ml_capable = strtoul(value, NULL, 0);
+
else if(!strcmp(name, "ml_enabled"))
- system_info->ml_enabled = strtoul(value, NULL, 0);
+ rpt->system_info->ml_enabled = strtoul(value, NULL, 0);
+
else if(!strcmp(name, "mc_version"))
- system_info->mc_version = strtoul(value, NULL, 0);
- else if(!strcmp(name, "tags"))
- tags = value;
- else if(!strcmp(name, "ver"))
- stream_version = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+ rpt->system_info->mc_version = strtoul(value, NULL, 0);
+
+ else if(!strcmp(name, "tags") && !rpt->tags)
+ rpt->tags = strdupz(value);
+
+ else if(!strcmp(name, "ver") && (rpt->capabilities & STREAM_CAP_INVALID))
+ rpt->capabilities = convert_stream_version_to_capabilities(strtoul(value, NULL, 0));
+
else {
// An old Netdata child does not have a compatible streaming protocol, map to something sane.
if (!strcmp(name, "NETDATA_SYSTEM_OS_NAME"))
name = "NETDATA_HOST_OS_NAME";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID"))
name = "NETDATA_HOST_OS_ID";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_ID_LIKE"))
name = "NETDATA_HOST_OS_ID_LIKE";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION"))
name = "NETDATA_HOST_OS_VERSION";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_VERSION_ID"))
name = "NETDATA_HOST_OS_VERSION_ID";
+
else if (!strcmp(name, "NETDATA_SYSTEM_OS_DETECTION"))
name = "NETDATA_HOST_OS_DETECTION";
- else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && stream_version == UINT_MAX) {
- stream_version = convert_stream_version_to_capabilities(1);
- }
- if (unlikely(rrdhost_set_system_info_variable(system_info, name, value))) {
- info("STREAM [receive from [%s]:%s]: request has parameter '%s' = '%s', which is not used.",
- w->client_ip, w->client_port, name, value);
+ else if(!strcmp(name, "NETDATA_PROTOCOL_VERSION") && (rpt->capabilities & STREAM_CAP_INVALID))
+ rpt->capabilities = convert_stream_version_to_capabilities(1);
+
+ if (unlikely(rrdhost_set_system_info_variable(rpt->system_info, name, value))) {
+ info("STREAM '%s' [receive from [%s]:%s]: "
+ "request has parameter '%s' = '%s', which is not used."
+ , (rpt->hostname && *rpt->hostname) ? rpt->hostname : "-"
+ , rpt->client_ip, rpt->client_port
+ , name, value);
}
}
}
- if (stream_version == UINT_MAX)
- stream_version = convert_stream_version_to_capabilities(0);
+ if (rpt->capabilities & STREAM_CAP_INVALID)
+ // no version is supplied, assume version 0;
+ rpt->capabilities = convert_stream_version_to_capabilities(0);
- if(!key || !*key) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO KEY");
- error("STREAM [receive from [%s]:%s]: request without an API key. Forbidding access.", w->client_ip, w->client_port);
- return rrdpush_receiver_permission_denied(w);
+ // find the program name and version
+ if(w->user_agent && w->user_agent[0]) {
+ char *t = strchr(w->user_agent, '/');
+ if(t && *t) {
+ *t = '\0';
+ t++;
+ }
+
+ rpt->program_name = strdupz(w->user_agent);
+ if(t && *t) rpt->program_version = strdupz(t);
}
- if(!hostname || !*hostname) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO HOSTNAME");
- error("STREAM [receive from [%s]:%s]: request without a hostname. Forbidding access.", w->client_ip, w->client_port);
+ // check if we should accept this connection
+
+ if(!rpt->key || !*rpt->key) {
+ rrdpush_receive_log_status(
+ rpt,
+ "request without an API key",
+ "NO API KEY PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(!machine_guid || !*machine_guid) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, (key && *key)?key:"-", (machine_guid && *machine_guid)?machine_guid:"-", (hostname && *hostname)?hostname:"-", "ACCESS DENIED - NO MACHINE GUID");
- error("STREAM [receive from [%s]:%s]: request without a machine GUID. Forbidding access.", w->client_ip, w->client_port);
+ if(!rpt->hostname || !*rpt->hostname) {
+ rrdpush_receive_log_status(
+ rpt,
+ "request without a hostname",
+ "NO HOSTNAME PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(regenerate_guid(key, buf) == -1) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID KEY");
- error("STREAM [receive from [%s]:%s]: API key '%s' is not valid GUID (use the command uuidgen to generate one). Forbidding access.", w->client_ip, w->client_port, key);
+ if(!rpt->registry_hostname)
+ rpt->registry_hostname = strdupz(rpt->hostname);
+
+ if(!rpt->machine_guid || !*rpt->machine_guid) {
+ rrdpush_receive_log_status(
+ rpt,
+ "request without a machine GUID",
+ "NO MACHINE GUID PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(regenerate_guid(machine_guid, buf) == -1) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - INVALID MACHINE GUID");
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid);
- return rrdpush_receiver_permission_denied(w);
+ {
+ char buf[GUID_LEN + 1];
+
+ if (regenerate_guid(rpt->key, buf) == -1) {
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is not a valid UUID (use the command uuidgen to generate one)",
+ "INVALID API KEY PERMISSION DENIED");
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
+
+ if (regenerate_guid(rpt->machine_guid, buf) == -1) {
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is not a valid UUID",
+ "INVALID MACHINE GUID PERMISSION DENIED");
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
}
- const char *api_key_type = appconfig_get(&stream_config, key, "type", "api");
+ const char *api_key_type = appconfig_get(&stream_config, rpt->key, "type", "api");
if(!api_key_type || !*api_key_type) api_key_type = "unknown";
if(strcmp(api_key_type, "api") != 0) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - API KEY GIVEN IS NOT API KEY");
- error("STREAM [receive from [%s]:%s]: API key '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, key, api_key_type);
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is a machine GUID",
+ "INVALID API KEY PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
- if(!appconfig_get_boolean(&stream_config, key, "enabled", 0)) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ENABLED");
- error("STREAM [receive from [%s]:%s]: API key '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, key);
+ if(!appconfig_get_boolean(&stream_config, rpt->key, "enabled", 0)) {
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is not enabled",
+ "API KEY DISABLED PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
{
- SIMPLE_PATTERN *key_allow_from = simple_pattern_create(appconfig_get(&stream_config, key, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ SIMPLE_PATTERN *key_allow_from = simple_pattern_create(
+ appconfig_get(&stream_config, rpt->key, "allow from", "*"),
+ NULL, SIMPLE_PATTERN_EXACT);
+
if(key_allow_from) {
if(!simple_pattern_matches(key_allow_from, w->client_ip)) {
simple_pattern_free(key_allow_from);
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - KEY NOT ALLOWED FROM THIS IP");
- error("STREAM [receive from [%s]:%s]: API key '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, key);
+
+ rrdpush_receive_log_status(
+ rpt,
+ "API key is not allowed from this IP",
+ "NOT ALLOWED IP PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
+
simple_pattern_free(key_allow_from);
}
}
- const char *machine_guid_type = appconfig_get(&stream_config, machine_guid, "type", "machine");
- if(!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
- if(strcmp(machine_guid_type, "machine") != 0) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID GIVEN IS NOT A MACHINE GUID");
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is a %s GUID. Forbidding access.", w->client_ip, w->client_port, machine_guid, machine_guid_type);
- return rrdpush_receiver_permission_denied(w);
+ {
+ const char *machine_guid_type = appconfig_get(&stream_config, rpt->machine_guid, "type", "machine");
+ if (!machine_guid_type || !*machine_guid_type) machine_guid_type = "unknown";
+
+ if (strcmp(machine_guid_type, "machine") != 0) {
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is an API key",
+ "INVALID MACHINE GUID PERMISSION DENIED");
+
+ receiver_state_free(rpt);
+ return rrdpush_receiver_permission_denied(w);
+ }
}
- if(!appconfig_get_boolean(&stream_config, machine_guid, "enabled", 1)) {
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ENABLED");
- error("STREAM [receive from [%s]:%s]: machine GUID '%s' is not allowed. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+ if(!appconfig_get_boolean(&stream_config, rpt->machine_guid, "enabled", 1)) {
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is not enabled",
+ "MACHINE GUID DISABLED PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
{
- SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(appconfig_get(&stream_config, machine_guid, "allow from", "*"), NULL, SIMPLE_PATTERN_EXACT);
+ SIMPLE_PATTERN *machine_allow_from = simple_pattern_create(
+ appconfig_get(&stream_config, rpt->machine_guid, "allow from", "*"),
+ NULL, SIMPLE_PATTERN_EXACT);
+
if(machine_allow_from) {
if(!simple_pattern_matches(machine_allow_from, w->client_ip)) {
simple_pattern_free(machine_allow_from);
- rrdhost_system_info_free(system_info);
- log_stream_connection(w->client_ip, w->client_port, key, machine_guid, hostname, "ACCESS DENIED - MACHINE GUID NOT ALLOWED FROM THIS IP");
- error("STREAM [receive from [%s]:%s]: Machine GUID '%s' is not permitted from this IP. Forbidding access.", w->client_ip, w->client_port, machine_guid);
+
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is not allowed from this IP",
+ "NOT ALLOWED IP PERMISSION DENIED");
+
+ receiver_state_free(rpt);
return rrdpush_receiver_permission_denied(w);
}
+
simple_pattern_free(machine_allow_from);
}
}
+ if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) {
+
+ rrdpush_receive_log_status(
+ rpt,
+ "machine GUID is my own",
+ "LOCALHOST PERMISSION DENIED");
+
+ char initial_response[HTTP_HEADER_SIZE + 1];
+ snprintfz(initial_response, HTTP_HEADER_SIZE, "%s", START_STREAMING_ERROR_SAME_LOCALHOST);
+
+ if(send_timeout(
+#ifdef ENABLE_HTTPS
+ &rpt->ssl,
+#endif
+ rpt->fd, initial_response, strlen(initial_response), 0, 60) != (ssize_t)strlen(initial_response)) {
+
+ error("STREAM '%s' [receive from [%s]:%s]: "
+ "failed to reply."
+ , rpt->hostname
+ , rpt->client_ip, rpt->client_port
+ );
+ }
+
+ close(rpt->fd);
+ receiver_state_free(rpt);
+ return web_client_socket_is_now_used_for_streaming(w);
+ }
+
if(unlikely(web_client_streaming_rate_t > 0)) {
- static netdata_mutex_t stream_rate_mutex = NETDATA_MUTEX_INITIALIZER;
- static volatile time_t last_stream_accepted_t = 0;
+ static SPINLOCK spinlock = NETDATA_SPINLOCK_INITIALIZER;
+ static time_t last_stream_accepted_t = 0;
- netdata_mutex_lock(&stream_rate_mutex);
time_t now = now_realtime_sec();
+ netdata_spinlock_lock(&spinlock);
if(unlikely(last_stream_accepted_t == 0))
last_stream_accepted_t = now;
if(now - last_stream_accepted_t < web_client_streaming_rate_t) {
- netdata_mutex_unlock(&stream_rate_mutex);
- rrdhost_system_info_free(system_info);
- error("STREAM [receive from [%s]:%s]: too busy to accept new streaming request. Will be allowed in %ld secs.", w->client_ip, w->client_port, (long)(web_client_streaming_rate_t - (now - last_stream_accepted_t)));
+ netdata_spinlock_unlock(&spinlock);
+
+ char msg[100 + 1];
+ snprintfz(