diff options
author | vkalintiris <vasilis@netdata.cloud> | 2022-10-31 19:53:20 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-10-31 19:53:20 +0200 |
commit | 282e0dfaa97289cc6542742e9e389bd76b7e4164 (patch) | |
tree | b23e108b35adc8ed322e8167d0f1fe607c2cfa4c /database | |
parent | df87a538cfaba5014a752937714756b7c5d30c93 (diff) |
Replication of metrics (gaps filling) during streaming (#13873)
* Revert "Use llvm's ar and ranlib when compiling with clang (#13854)"
This reverts commit a9135f47bbb36e9cb437b18a7109607569580db7.
* Profile plugin
* Fix macos static thread
* Add support for replication
- Add a new capability for replication, when not supported the agent
should behave as previously.
- When replication is supported, the text protocol supports the
following new commands:
- CHART_DEFINITION_END: send the first/last entry of the child
- REPLAY_RRDSET_BEGIN: sends the name of the chart we are
replicating
- REPLAY_RRDSET_HEADER: sends a line describing the columns of the
following command (ie. start-time, end-time, dim1-name, ...)
- REPLAY_RRDSET_DONE: sends values to push for a specific start/end
time
- REPLAY_RRDSET_END: send the (a) update every of the chart, (b)
first/last entries in DB, (c) whether the child's been told to
start streaming, (d) original after/before period to replicate.
- REPLAY_CHART: Sent from a parent to a child, specifying (a)
the chart name we want data for, (b) whether the child should
start streaming once it has fullfilled the request with the
aforementioned commands, (c) after/before of the data the parent
wants
- As a consequence of the new protocol, streaming is disabled for all
charts on a new connection. It's enabled once replication is finished.
- The configuration parameters are specified from within stream.conf:
- "enable replication = yes|no"
- "seconds to replicate = 3600"
- "replication step = 600" (ie. how many seconds to fill per
roundtrip request.
* Minor fixes
- quote set and dim ids
- start streaming after writing replicated data to the buffer
- write replicated data only when buffer is less than 50% full.
- use reentrant iteration for charts
* Do not send chart definitions on connection.
* Track replication status through rrdset flags.
* Add debug flag for noisy log messages.
* Add license notice.
* Iterate charts with reentrant loop
* Set replication finished flag when streaming is disabled.
* Revert "Profile plugin"
This reverts commit 468fc9386e5283e0865fae56e9989b8ec83de14d.
Used only for testing purposes.
* Revert "Revert "Use llvm's ar and ranlib when compiling with clang (#13854)""
This reverts commit 27c955c58d95aed6c44d42e8b675f0cf3ca45c6d.
Reapply commit that I had to revert in order to be able to build the
agent on MacOS.
* Build replication source files with CMake.
* Pass number of words in plugind functions.
* Use get_word instead of indexing words.
* Use size_t instead of int.
* Pay only what we use when splitting words.
* no need to redefine PLUGINSD_MAX_WORDS
* fix formatting warning
* all usages of pluginsd_split_words() should use the return value to ensure non-cached results reuse; no need to lock the host to find a chart
* keep a sender dictionary with all the replication commands received and remove replication commands from charts
* do not replicate future data
* use last_updated to find the end of the db
* uniformity of replication logs
* rewrite of the query logic
* replication.c in C; debug info in human readable dates
* update the chart on every replication row
* update all chart members so that rrdset_done() can continue
* update the protocol to push one dimension per line and transfer data collection state to parent
* fix formatting
* remove replication object from pluginsd
* shorter communication
* fix typo
* support for replication proxies
* proper use of flags
* set receiver replication finished flag on charts created after the sender has been connected
* clear RRDSET_FLAG_SYNC_CLOCK on replicated charts
* log storing of nulls
* log first store
* log update every switches
* test ignoring timestamps but sending a point just after replication end
* replication should work on end_time
* use replicated timestamps
* at the final replication step, replicate all the remaining points
* cleanup code from tests
* print timestamps as unsigned long long
* more formating changes; fix conflicting type of replicate_chart_response()
* updated stream.conf
* always respond to replication requests
* in non-dbengine db modes, do not replicate more than the database size
* advance the db pointer of legacy db modes
* should be multiplied by update_every
* fix buggy label parsing - identified by codacy
* dont log error on history mismatches for db mode dbengine
* allow SSL requests to streaming children
* dont use ssl variable
Co-authored-by: Costa Tsaousis <costa@netdata.cloud>
Diffstat (limited to 'database')
-rw-r--r-- | database/rrd.h | 21 | ||||
-rw-r--r-- | database/rrdcontext.c | 4 | ||||
-rw-r--r-- | database/rrdhost.c | 51 | ||||
-rw-r--r-- | database/rrdset.c | 207 | ||||
-rw-r--r-- | database/sqlite/sqlite_aclk.c | 3 |
5 files changed, 167 insertions, 119 deletions
diff --git a/database/rrd.h b/database/rrd.h index 686d7b7ba4..ac19fc7402 100644 --- a/database/rrd.h +++ b/database/rrd.h @@ -523,13 +523,15 @@ typedef enum rrdset_flags { // least rrdset_free_obsolete_time seconds ago. RRDSET_FLAG_ARCHIVED = (1 << 15), RRDSET_FLAG_METADATA_UPDATE = (1 << 16), // Mark that metadata needs to be stored - RRDSET_FLAG_PENDING_FOREACH_ALARMS = (1 << 17), // contains dims with uninitialized foreach alarms RRDSET_FLAG_ANOMALY_DETECTION = (1 << 18), // flag to identify anomaly detection charts. RRDSET_FLAG_INDEXED_ID = (1 << 19), // the rrdset is indexed by its id RRDSET_FLAG_INDEXED_NAME = (1 << 20), // the rrdset is indexed by its name RRDSET_FLAG_ANOMALY_RATE_CHART = (1 << 21), // the rrdset is for storing anomaly rates for all dimensions RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 22), + + RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 23), // the sending side has completed replication + RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 24), // the receiving side has completed replication } RRDSET_FLAGS; #define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag)) @@ -935,6 +937,10 @@ struct rrdhost { struct rrdpush_destinations *destination; // the current destination from the above list SIMPLE_PATTERN *rrdpush_send_charts_matching; // pattern to match the charts to be sent + bool rrdpush_enable_replication; // enable replication + time_t rrdpush_seconds_to_replicate; // max time we want to replicate from the child + time_t rrdpush_replication_step; // seconds per replication step + // the following are state information for the threading // streaming metrics from this netdata to an upstream netdata struct sender_state *sender; @@ -1098,6 +1104,9 @@ RRDHOST *rrdhost_find_or_create( , char *rrdpush_destination , char *rrdpush_api_key , char *rrdpush_send_charts_matching + , bool rrdpush_enable_replication + , time_t rrdpush_seconds_to_replicate + , time_t rrdpush_replication_step , struct rrdhost_system_info *system_info , bool is_archived ); @@ -1121,6 +1130,9 @@ void rrdhost_update(RRDHOST *host , char *rrdpush_destination , char *rrdpush_api_key , char *rrdpush_send_charts_matching + , bool rrdpush_enable_replication + , time_t rrdpush_seconds_to_replicate + , time_t rrdpush_replication_step , struct rrdhost_system_info *system_info ); @@ -1190,6 +1202,8 @@ int rrdhost_should_be_removed(RRDHOST *host, RRDHOST *protected_host, time_t now void rrdset_update_heterogeneous_flag(RRDSET *st); +time_t rrdset_set_update_every(RRDSET *st, time_t update_every); + RRDSET *rrdset_find(RRDHOST *host, const char *id); #define rrdset_find_localhost(id) rrdset_find(localhost, id) /* This will not return charts that are archived */ @@ -1310,8 +1324,9 @@ RRDHOST *rrdhost_create( const char *hostname, const char *registry_hostname, const char *guid, const char *os, const char *timezone, const char *abbrev_timezone, int32_t utc_offset,const char *tags, const char *program_name, const char *program_version, int update_every, long entries, RRD_MEMORY_MODE memory_mode, unsigned int health_enabled, unsigned int rrdpush_enabled, - char *rrdpush_destination, char *rrdpush_api_key, char *rrdpush_send_charts_matching, struct rrdhost_system_info *system_info, - int is_localhost, bool is_archived); + char *rrdpush_destination, char *rrdpush_api_key, char *rrdpush_send_charts_matching, + bool rrdpush_enable_replication, time_t rrdpush_seconds_to_replicate, time_t rrdpush_replication_step, + struct rrdhost_system_info *system_info, int is_localhost, bool is_archived); #endif /* NETDATA_RRD_INTERNALS */ diff --git a/database/rrdcontext.c b/database/rrdcontext.c index 937cda481e..7719671c23 100644 --- a/database/rrdcontext.c +++ b/database/rrdcontext.c @@ -2417,7 +2417,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED if(!common_first_time_t) common_first_time_t = tier_retention[tier].db_first_time_t; - else + else if(tier_retention[tier].db_first_time_t) common_first_time_t = MIN(common_first_time_t, tier_retention[tier].db_first_time_t); if(!common_last_time_t) @@ -2427,7 +2427,7 @@ static void query_target_add_metric(QUERY_TARGET_LOCALS *qtl, RRDMETRIC_ACQUIRED if(!common_update_every) common_update_every = tier_retention[tier].db_update_every; - else + else if(tier_retention[tier].db_update_every) common_update_every = MIN(common_update_every, tier_retention[tier].db_update_every); tiers_added++; diff --git a/database/rrdhost.c b/database/rrdhost.c index 5fc1021388..9a368385ca 100644 --- a/database/rrdhost.c +++ b/database/rrdhost.c @@ -248,6 +248,9 @@ RRDHOST *rrdhost_create(const char *hostname, char *rrdpush_destination, char *rrdpush_api_key, char *rrdpush_send_charts_matching, + bool rrdpush_enable_replication, + time_t rrdpush_seconds_to_replicate, + time_t rrdpush_replication_step, struct rrdhost_system_info *system_info, int is_localhost, bool archived @@ -287,6 +290,24 @@ int is_legacy = 1; host, rrdpush_enabled, rrdpush_destination, rrdpush_api_key, rrdpush_send_charts_matching); } + host->rrdpush_enable_replication = rrdpush_enable_replication; + host->rrdpush_seconds_to_replicate = rrdpush_seconds_to_replicate; + host->rrdpush_replication_step = rrdpush_replication_step; + + switch(memory_mode) { + default: + case RRD_MEMORY_MODE_ALLOC: + case RRD_MEMORY_MODE_MAP: + case RRD_MEMORY_MODE_SAVE: + case RRD_MEMORY_MODE_RAM: + if(host->rrdpush_seconds_to_replicate > host->rrd_history_entries * host->rrd_update_every) + host->rrdpush_seconds_to_replicate = host->rrd_history_entries * host->rrd_update_every; + break; + + case RRD_MEMORY_MODE_DBENGINE: + break; + } + netdata_rwlock_init(&host->rrdhost_rwlock); netdata_mutex_init(&host->aclk_state_lock); netdata_mutex_init(&host->receiver_lock); @@ -520,13 +541,15 @@ void rrdhost_update(RRDHOST *host , char *rrdpush_destination , char *rrdpush_api_key , char *rrdpush_send_charts_matching + , bool rrdpush_enable_replication + , time_t rrdpush_seconds_to_replicate + , time_t rrdpush_replication_step , struct rrdhost_system_info *system_info ) { UNUSED(guid); host->health_enabled = (mode == RRD_MEMORY_MODE_NONE) ? 0 : health_enabled; - //host->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; Unused? rrdhost_system_info_free(host->system_info); host->system_info = system_info; @@ -560,12 +583,12 @@ void rrdhost_update(RRDHOST *host if(host->rrd_update_every != update_every) error("Host '%s' has an update frequency of %d seconds, but the wanted one is %d seconds. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_update_every, update_every); - if(host->rrd_history_entries < history) - error("Host '%s' has history of %ld entries, but the wanted one is %ld entries. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_history_entries, history); - if(host->rrd_memory_mode != mode) error("Host '%s' has memory mode '%s', but the wanted one is '%s'. Restart netdata here to apply the new settings.", rrdhost_hostname(host), rrd_memory_mode_name(host->rrd_memory_mode), rrd_memory_mode_name(mode)); + else if(host->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && host->rrd_history_entries < history) + error("Host '%s' has history of %ld entries, but the wanted one is %ld entries. Restart netdata here to apply the new settings.", rrdhost_hostname(host), host->rrd_history_entries, history); + // update host tags rrdhost_init_tags(host, tags); @@ -593,6 +616,11 @@ void rrdhost_update(RRDHOST *host rrdcalctemplate_index_init(host); rrdcalc_rrdhost_index_init(host); + host->rrdpush_enable_replication = rrdpush_enable_replication; + host->rrdpush_seconds_to_replicate = rrdpush_seconds_to_replicate; + host->rrdpush_replication_step = rrdpush_replication_step; + + rrd_hosts_available++; ml_new_host(host); rrdhost_load_rrdcontext_data(host); @@ -601,8 +629,6 @@ void rrdhost_update(RRDHOST *host if (health_enabled) health_thread_spawn(host); - - return; } RRDHOST *rrdhost_find_or_create( @@ -624,6 +650,9 @@ RRDHOST *rrdhost_find_or_create( , char *rrdpush_destination , char *rrdpush_api_key , char *rrdpush_send_charts_matching + , bool rrdpush_enable_replication + , time_t rrdpush_seconds_to_replicate + , time_t rrdpush_replication_step , struct rrdhost_system_info *system_info , bool archived ) { @@ -658,6 +687,9 @@ RRDHOST *rrdhost_find_or_create( , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , system_info , 0 , archived @@ -683,6 +715,9 @@ RRDHOST *rrdhost_find_or_create( , rrdpush_destination , rrdpush_api_key , rrdpush_send_charts_matching + , rrdpush_enable_replication + , rrdpush_seconds_to_replicate + , rrdpush_replication_step , system_info); } if (host) { @@ -894,6 +929,9 @@ unittest: , default_rrdpush_destination , default_rrdpush_api_key , default_rrdpush_send_charts_matching + , default_rrdpush_enable_replication + , default_rrdpush_seconds_to_replicate + , default_rrdpush_replication_step , system_info , 1 , 0 @@ -1003,6 +1041,7 @@ void stop_streaming_sender(RRDHOST *host) if (host->sender->compressor) host->sender->compressor->destroy(&host->sender->compressor); #endif + dictionary_destroy(host->sender->replication_requests); freez(host->sender); host->sender = NULL; } diff --git a/database/rrdset.c b/database/rrdset.c index a071860def..e7ea7f6b80 100644 --- a/database/rrdset.c +++ b/database/rrdset.c @@ -141,6 +141,10 @@ static void rrdset_insert_callback(const DICTIONARY_ITEM *item __maybe_unused, v st->rrdhost = host; st->flags = RRDSET_FLAG_SYNC_CLOCK | RRDSET_FLAG_INDEXED_ID; + + if(host == localhost || !host->receiver || !stream_has_capability(host->receiver, STREAM_CAP_REPLICATION)) + st->flags |= RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED; + if(unlikely(st->id == anomaly_rates_chart)) st->flags |= RRDSET_FLAG_ANOMALY_RATE_CHART; @@ -285,18 +289,7 @@ static bool rrdset_conflict_callback(const DICTIONARY_ITEM *item __maybe_unused, } if (unlikely(st->update_every != ctr->update_every)) { - st->update_every = ctr->update_every; - - // switch update every to the storage engine - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - for (size_t tier = 0; tier < storage_tiers; tier++) { - if (rd->tiers[tier] && rd->tiers[tier]->db_collection_handle) - rd->tiers[tier]->collect_ops->change_collection_frequency(rd->tiers[tier]->db_collection_handle, (int)(st->rrdhost->db[tier].tier_grouping * st->update_every)); - } - } - rrddim_foreach_done(rd); - + rrdset_set_update_every(st, ctr->update_every); ctr->react_action |= RRDSET_REACT_UPDATED; } @@ -876,7 +869,13 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las // oops! the database is in the future #ifdef NETDATA_INTERNAL_CHECKS info("RRD database for chart '%s' on host '%s' is %0.5" NETDATA_DOUBLE_MODIFIER - " secs in the future (counter #%zu, update #%zu). Adjusting it to current time.", rrdset_id(st), rrdhost_hostname(st->rrdhost), (NETDATA_DOUBLE)-since_last_usec / USEC_PER_SEC, st->counter, st->counter_done); + " secs in the future (counter #%zu, update #%zu). Adjusting it to current time." + , rrdset_id(st) + , rrdhost_hostname(st->rrdhost) + , (NETDATA_DOUBLE)-since_last_usec / USEC_PER_SEC + , st->counter + , st->counter_done + ); #endif st->last_collected_time.tv_sec = now.tv_sec - st->update_every; @@ -930,14 +929,18 @@ void rrdset_timed_next(RRDSET *st, struct timeval now, usec_t duration_since_las #endif } - #ifdef NETDATA_INTERNAL_CHECKS debug(D_RRD_CALLS, "rrdset_timed_next() for chart %s with duration since last update %llu usec", rrdset_name(st), duration_since_last_update); rrdset_debug(st, "NEXT: %llu microseconds", duration_since_last_update); - if(discarded && discarded != duration_since_last_update) - info("host '%s', chart '%s': discarded data collection time of %llu usec, replaced with %llu usec, reason: '%s'", rrdhost_hostname(st->rrdhost), rrdset_id(st), discarded, duration_since_last_update, discard_reason?discard_reason:"UNDEFINED"); - - #endif + internal_error(discarded && discarded != duration_since_last_update, + "host '%s', chart '%s': discarded data collection time of %llu usec, " + "replaced with %llu usec, reason: '%s'" + , rrdhost_hostname(st->rrdhost) + , rrdset_id(st) + , discarded + , duration_since_last_update + , discard_reason?discard_reason:"UNDEFINED" + ); st->usec_since_last_update = duration_since_last_update; } @@ -968,9 +971,7 @@ static inline usec_t rrdset_init_last_collected_time(RRDSET *st, struct timeval usec_t last_collect_ut = st->last_collected_time.tv_sec * USEC_PER_SEC + st->last_collected_time.tv_usec; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "initialized last collected time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_collect_ut / USEC_PER_SEC); - #endif return last_collect_ut; } @@ -981,9 +982,7 @@ static inline usec_t rrdset_update_last_collected_time(RRDSET *st) { st->last_collected_time.tv_sec = (time_t) (ut / USEC_PER_SEC); st->last_collected_time.tv_usec = (suseconds_t) (ut % USEC_PER_SEC); - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "updated last collected time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_collect_ut / USEC_PER_SEC); - #endif return last_collect_ut; } @@ -1000,9 +999,7 @@ static inline usec_t rrdset_init_last_updated_time(RRDSET *st) { usec_t last_updated_ut = st->last_updated.tv_sec * USEC_PER_SEC + st->last_updated.tv_usec; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "initialized last updated time to %0.3" NETDATA_DOUBLE_MODIFIER, (NETDATA_DOUBLE)last_updated_ut / USEC_PER_SEC); - #endif return last_updated_ut; } @@ -1165,17 +1162,24 @@ static inline size_t rrdset_done_interpolate( for( ; next_store_ut <= now_collect_ut ; last_collect_ut = next_store_ut, next_store_ut += update_every_ut, iterations-- ) { - #ifdef NETDATA_INTERNAL_CHECKS - if(iterations < 0) { error("INTERNAL CHECK: %s: iterations calculation wrapped! first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu", rrdset_name(st), first_ut, last_stored_ut, next_store_ut, now_collect_ut); } + internal_error(iterations < 0, + "RRDSET: '%s': iterations calculation wrapped! " + "first_ut = %llu, last_stored_ut = %llu, next_store_ut = %llu, now_collect_ut = %llu" + , rrdset_id(st) + , first_ut + , last_stored_ut + , next_store_ut + , now_collect_ut + ); + rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC); rrdset_debug(st, "next_store_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (next interpolation point)", (NETDATA_DOUBLE)next_store_ut/USEC_PER_SEC); - #endif last_ut = next_store_ut; struct rda_item *rda; size_t dim_id; - for(dim_id = 0, rda = rda_base, rd = rda->rd ; dim_id < rda_slots ; ++dim_id, ++rda) { + for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) { rd = rda->rd; if(unlikely(!rd)) continue; @@ -1189,7 +1193,6 @@ static inline size_t rrdset_done_interpolate( / (NETDATA_DOUBLE)(now_collect_ut - last_collect_ut) ); - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC2 INC " NETDATA_DOUBLE_FORMAT " = " NETDATA_DOUBLE_FORMAT " * (%llu - %llu)" @@ -1200,7 +1203,6 @@ static inline size_t rrdset_done_interpolate( , next_store_ut, last_collect_ut , now_collect_ut, last_collect_ut ); - #endif rd->calculated_value -= new_value; new_value += rd->last_calculated_value; @@ -1209,12 +1211,10 @@ static inline size_t rrdset_done_interpolate( if(unlikely(next_store_ut - last_stored_ut < update_every_ut)) { - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: COLLECTION POINT IS SHORT " NETDATA_DOUBLE_FORMAT " - EXTRAPOLATING", rrddim_name(rd) , (NETDATA_DOUBLE)(next_store_ut - last_stored_ut) ); - #endif new_value = new_value * (NETDATA_DOUBLE)(st->update_every * USEC_PER_SEC) / (NETDATA_DOUBLE)(next_store_ut - last_stored_ut); } @@ -1243,7 +1243,6 @@ static inline size_t rrdset_done_interpolate( + rd->last_calculated_value ); - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC2 DEF " NETDATA_DOUBLE_FORMAT " = (((" "(" NETDATA_DOUBLE_FORMAT " - " NETDATA_DOUBLE_FORMAT ")" " * %llu" @@ -1253,7 +1252,6 @@ static inline size_t rrdset_done_interpolate( , (next_store_ut - first_ut) , (now_collect_ut - first_ut), rd->last_calculated_value ); - #endif } break; } @@ -1278,9 +1276,7 @@ static inline size_t rrdset_done_interpolate( else { (void) ml_is_anomalous(rd, 0, false); - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING ", rrddim_name(rd), current_entry); - #endif rrddim_store_metric(rd, next_store_ut, NAN, SN_FLAG_NONE); rd->last_stored_value = NAN; @@ -1328,9 +1324,7 @@ static inline void rrdset_done_fill_the_gap(RRDSET *st) { rd->db[current_entry] = pack_storage_number(NAN, SN_FLAG_NONE); current_entry = ((current_entry + 1) >= entries) ? 0 : current_entry + 1; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: STORE[%ld] = NON EXISTING (FILLED THE GAP)", rrddim_name(rd), current_entry); - #endif } } rrddim_foreach_done(rd); @@ -1356,7 +1350,7 @@ void rrdset_done(RRDSET *st) { void rrdset_timed_done(RRDSET *st, struct timeval now) { if(unlikely(netdata_exit)) return; - debug(D_RRD_CALLS, "rrdset_done() for chart %s", rrdset_name(st)); + debug(D_RRD_CALLS, "rrdset_done() for chart '%s'", rrdset_name(st)); RRDDIM *rd; @@ -1381,17 +1375,15 @@ void rrdset_timed_done(RRDSET *st, struct timeval now) { // check if the chart has a long time to be updated if(unlikely(st->usec_since_last_update > st->entries * update_every_ut && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE && st->rrd_memory_mode != RRD_MEMORY_MODE_NONE)) { - info("host '%s', chart %s: took too long to be updated (counter #%zu, update #%zu, %0.3" NETDATA_DOUBLE_MODIFIER - " secs). Resetting it.", rrdhost_hostname(st->rrdhost), rrdset_name(st), st->counter, st->counter_done, (NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC); + info("host '%s', chart '%s': took too long to be updated (counter #%zu, update #%zu, %0.3" NETDATA_DOUBLE_MODIFIER + " secs). Resetting it.", rrdhost_hostname(st->rrdhost), rrdset_id(st), st->counter, st->counter_done, (NETDATA_DOUBLE)st->usec_since_last_update / USEC_PER_SEC); rrdset_reset(st); st->usec_since_last_update = update_every_ut; store_this_entry = 0; first_entry = 1; } - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "microseconds since last update: %llu", st->usec_since_last_update); - #endif // set last_collected_time if(unlikely(!st->last_collected_time.tv_sec)) { @@ -1428,9 +1420,9 @@ void rrdset_timed_done(RRDSET *st, struct timeval now) { if(unlikely(dt_usec(&st->last_collected_time, &st->last_updated) > st->entries * update_every_ut && st->rrd_memory_mode != RRD_MEMORY_MODE_DBENGINE)) { info( - "%s: too old data (last updated at %"PRId64".%"PRId64", last collected at %"PRId64".%"PRId64"). " + "'%s': too old data (last updated at %"PRId64".%"PRId64", last collected at %"PRId64".%"PRId64"). " "Resetting it. Will not store the next entry.", - rrdset_name(st), + rrdset_id(st), (int64_t)st->last_updated.tv_sec, (int64_t)st->last_updated.tv_usec, (int64_t)st->last_collected_time.tv_sec, @@ -1450,9 +1442,9 @@ void rrdset_timed_done(RRDSET *st, struct timeval now) { if(unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_DBENGINE && dt_usec(&st->last_collected_time, &st->last_updated) > (RRDENG_BLOCK_SIZE / sizeof(storage_number)) * update_every_ut)) { info( - "%s: too old data (last updated at %" PRId64 ".%" PRId64 ", last collected at %" PRId64 ".%" PRId64 "). " + "'%s': too old data (last updated at %" PRId64 ".%" PRId64 ", last collected at %" PRId64 ".%" PRId64 "). " "Resetting it. Will not store the next entry.", - rrdset_name(st), + rrdset_id(st), (int64_t)st->last_updated.tv_sec, (int64_t)st->last_updated.tv_usec, (int64_t)st->last_collected_time.tv_sec, @@ -1497,16 +1489,12 @@ void rrdset_timed_done(RRDSET *st, struct timeval now) { store_this_entry = 1; last_collect_ut = next_store_ut - update_every_ut; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "Fixed first entry."); - #endif } else { store_this_entry = 0; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "Will not store the next entry."); - #endif } } @@ -1555,19 +1543,17 @@ after_first_database_work: if (unlikely(st->rrd_memory_mode == RRD_MEMORY_MODE_NONE)) goto after_second_database_work; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "last_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last collection time)", (NETDATA_DOUBLE)last_collect_ut/USEC_PER_SEC); rrdset_debug(st, "now_collect_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (current collection time)", (NETDATA_DOUBLE)now_collect_ut/USEC_PER_SEC); rrdset_debug(st, "last_stored_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (last updated time)", (NETDATA_DOUBLE)last_stored_ut/USEC_PER_SEC); rrdset_debug(st, "next_store_ut = %0.3" NETDATA_DOUBLE_MODIFIER " (next interpolation point)", (NETDATA_DOUBLE)next_store_ut/USEC_PER_SEC); - #endif uint32_t has_reset_value = 0; // process all dimensions to calculate their values // based on the collected figures only // at this stage we do not interpolate anything - for(dim_id = 0, rda = rda_base, rd = rda->rd ; dim_id < rda_slots ; ++dim_id, ++rda) { + for(dim_id = 0, rda = rda_base ; dim_id < rda_slots ; ++dim_id, ++rda) { rd = rda->rd; if(unlikely(!rd)) continue; @@ -1576,7 +1562,6 @@ after_first_database_work: continue; } - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: START " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT @@ -1588,7 +1573,6 @@ after_first_database_work: , rd->last_calculated_value , rd->calculated_value ); - #endif switch(rd->algorithm) { case RRD_ALGORITHM_ABSOLUTE: @@ -1596,18 +1580,16 @@ after_first_database_work: * (NETDATA_DOUBLE)rd->multiplier / (NETDATA_DOUBLE)rd->divisor; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC ABS/ABS-NO-IN " NETDATA_DOUBLE_FORMAT " = " COLLECTED_NUMBER_FORMAT " * " NETDATA_DOUBLE_FORMAT - " / " NETDATA_DOUBLE_FORMAT, rrddim_name(rd) + " / " NETDATA_DOUBLE_FORMAT + , rrddim_name(rd) , rd->calculated_value , rd->collected_value , (NETDATA_DOUBLE)rd->multiplier , (NETDATA_DOUBLE)rd->divisor ); - #endif - break; case RRD_ALGORITHM_PCENT_OVER_ROW_TOTAL: @@ -1621,7 +1603,6 @@ after_first_database_work: * (NETDATA_DOUBLE)rd->collected_value / (NETDATA_DOUBLE)st->collected_total; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC PCENT-ROW " NETDATA_DOUBLE_FORMAT " = 100" " * " COLLECTED_NUMBER_FORMAT " / " COLLECTED_NUMBER_FORMAT @@ -1630,8 +1611,6 @@ after_first_database_work: , rd->collected_value , st->collected_total ); - #endif - break; case RRD_ALGORITHM_INCREMENTAL: @@ -1645,8 +1624,9 @@ after_first_database_work: // It is imperative to set the comparison to uint64_t since type collected_number is signed and // produces wrong results as far as incremental counters are concerned. if(unlikely((uint64_t)rd->last_collected_value > (uint64_t)rd->collected_value)) { - debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT - , rrdset_name(st), rrddim_name(rd) + debug(D_RRD_STATS, "'%s' / '%s': RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT + , rrdset_id(st) + , rrddim_name(rd) , rd->last_collected_value , rd->collected_value); @@ -1689,19 +1669,17 @@ after_first_database_work: / (NETDATA_DOUBLE) rd->divisor; } - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC INC PRE " NETDATA_DOUBLE_FORMAT " = (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" " * " NETDATA_DOUBLE_FORMAT - " / " NETDATA_DOUBLE_FORMAT, rrddim_name(rd) + " / " NETDATA_DOUBLE_FORMAT + , rrddim_name(rd) , rd->calculated_value , rd->collected_value, rd->last_collected_value , (NETDATA_DOUBLE)rd->multiplier , (NETDATA_DOUBLE)rd->divisor ); - #endif - break; case RRD_ALGORITHM_PCENT_OVER_DIFF_TOTAL: @@ -1713,8 +1691,9 @@ after_first_database_work: // if the new is smaller than the old (an overflow, or reset), set the old equal to the new // to reset the calculation (it will give zero as the calculation for this second) if(unlikely(rd->last_collected_value > rd->collected_value)) { - debug(D_RRD_STATS, "%s.%s: RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT - , rrdset_name(st), rrddim_name(rd) + debug(D_RRD_STATS, "'%s' / '%s': RESET or OVERFLOW. Last collected value = " COLLECTED_NUMBER_FORMAT ", current = " COLLECTED_NUMBER_FORMAT + , rrdset_id(st) + , rrddim_name(rd) , rd->last_collected_value , rd->collected_value ); @@ -1735,7 +1714,6 @@ after_first_database_work: * (NETDATA_DOUBLE)(rd->collected_value - rd->last_collected_value) / (NETDATA_DOUBLE)(st->collected_total - st->last_collected_total); - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC PCENT-DIFF " NETDATA_DOUBLE_FORMAT " = 100" " * (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" " / (" COLLECTED_NUMBER_FORMAT " - " COLLECTED_NUMBER_FORMAT ")" @@ -1744,8 +1722,6 @@ after_first_database_work: , rd->collected_value, rd->last_collected_value , st->collected_total, st->last_collected_total ); - #endif - break; default: @@ -1753,29 +1729,24 @@ after_first_database_work: // it gets noticed when we add new types rd->calculated_value = 0; - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: CALC " NETDATA_DOUBLE_FORMAT " = 0" , rrddim_name(rd) , rd->calculated_value ); - #endif - break; } - #ifdef NETDATA_INTERNAL_CHECKS rrdset_debug(st, "%s: PHASE2 " " last_collected_value = " COLLECTED_NUMBER_FORMAT " collected_value = " COLLECTED_NUMBER_FORMAT " last_calculated_value = " NETDATA_DOUBLE_FORMAT - " calculated_value = " NETDATA_DOUBLE_FORMAT, rrddim_name(rd) - , rd->last_collected_value - , rd->collected_value - , rd->last_calculated_value - , rd->calculated_value + " calculated_value = " NETDATA_DOUBLE_FORMAT + , rrddim_name(rd) + , rd->last_collected_value + , rd->collected_value + , rd->last_calculated_value + , rd->calculated_value ); - #endif - } // at this point we have all the calculated values ready @@ -1805,43 +1776,41 @@ after_first_database_work: after_ |