summaryrefslogtreecommitdiffstats
path: root/database
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-24 00:24:21 +0200
committerGitHub <noreply@github.com>2022-11-24 00:24:21 +0200
commit8e1a99ad79a1394cbb0ffcaa24bdde85c7b14d81 (patch)
tree61020aea9c8c6e48032d5721aec4b9ddbfe7c35b /database
parent0fe7b1c8a84b7836b5bb13c37924d9fd851c6233 (diff)
replication fixes #5 (#14038)
* pluginsd cleanup; replication logic cleanup; fix bug in replication begin * log replication start/stop and change the keyword of NETDATA_LOG_REPLICATION_REQUESTS logs to REPLAY * dont ask for data the child does not have; log fixes * more pluginsd cleanup * count sender dictionary entries * fix dictionary_flush()
Diffstat (limited to 'database')
-rw-r--r--database/rrd.h28
-rw-r--r--database/rrdset.c30
2 files changed, 48 insertions, 10 deletions
diff --git a/database/rrd.h b/database/rrd.h
index e697c3e3ee..12bfb8660e 100644
--- a/database/rrd.h
+++ b/database/rrd.h
@@ -314,6 +314,12 @@ struct rrddim {
collected_number collected_value; // the current value, as collected - resets to 0 after being used
collected_number last_collected_value; // the last value that was collected, after being processed
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+ usec_t rrddim_store_metric_last_ut; // the timestamp we last called rrddim_store_metric()
+ size_t rrddim_store_metric_count; // the rrddim_store_metric() counter
+ const char *rrddim_store_metric_last_caller; // the name of the function that last called rrddim_store_metric()
+#endif
+
// ------------------------------------------------------------------------
// db mode RAM, SAVE, MAP, ALLOC, NONE specifics
// TODO - they should be managed by storage engine
@@ -532,20 +538,19 @@ typedef enum rrdset_flags {
RRDSET_FLAG_PENDING_HEALTH_INITIALIZATION = (1 << 21),
- RRDSET_FLAG_SENDER_REPLICATION_QUEUED = (1 << 22), // the sending side has replication in progress
- RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 23), // the sending side has replication in progress
- RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 24), // the sending side has completed replication
- RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 25), // the receiving side has replication in progress
- RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 26), // the receiving side has completed replication
+ RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS = (1 << 22), // the sending side has replication in progress
+ RRDSET_FLAG_SENDER_REPLICATION_FINISHED = (1 << 23), // the sending side has completed replication
+ RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS = (1 << 24), // the receiving side has replication in progress
+ RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED = (1 << 25), // the receiving side has completed replication
- RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 27), // a custom variable has been updated and needs to be exposed to parent
+ RRDSET_FLAG_UPSTREAM_SEND_VARIABLES = (1 << 26), // a custom variable has been updated and needs to be exposed to parent
} RRDSET_FLAGS;
#define rrdset_flag_check(st, flag) (__atomic_load_n(&((st)->flags), __ATOMIC_SEQ_CST) & (flag))
#define rrdset_flag_set(st, flag) __atomic_or_fetch(&((st)->flags), flag, __ATOMIC_SEQ_CST)
#define rrdset_flag_clear(st, flag) __atomic_and_fetch(&((st)->flags), ~(flag), __ATOMIC_SEQ_CST)
-#define rrdset_is_replicating(st) (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_SENDER_REPLICATION_QUEUED) \
+#define rrdset_is_replicating(st) (rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_IN_PROGRESS|RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS) \
&& !rrdset_flag_check(st, RRDSET_FLAG_SENDER_REPLICATION_FINISHED|RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED))
struct rrdset {
@@ -666,13 +671,13 @@ struct rrdset {
RRDCALC *base; // double linked list of RRDCALC related to this RRDSET
} alerts;
-#ifdef NETDATA_INTERNAL_CHECKS
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
struct {
bool start_streaming;
time_t after;
time_t before;
} replay;
-#endif
+#endif // NETDATA_LOG_REPLICATION_REQUESTS
};
#define rrdset_plugin_name(st) string2str((st)->plugin_name)
@@ -1330,7 +1335,12 @@ time_t calc_dimension_liveness(RRDDIM *rd, time_t now);
#endif
long align_entries_to_pagesize(RRD_MEMORY_MODE mode, long entries);
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+#define rrddim_store_metric(rd, point_end_time_ut, n, flags) rrddim_store_metric_with_trace(rd, point_end_time_ut, n, flags, __FUNCTION__)
+void rrddim_store_metric_with_trace(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags, const char *function);
+#else
void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags);
+#endif
// ----------------------------------------------------------------------------
// Miscellaneous functions
diff --git a/database/rrdset.c b/database/rrdset.c
index ed9d547891..053655817c 100644
--- a/database/rrdset.c
+++ b/database/rrdset.c
@@ -1109,8 +1109,36 @@ void store_metric_at_tier(RRDDIM *rd, struct rrddim_tier *t, STORAGE_POINT sp, u
}
}
}
-
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+void rrddim_store_metric_with_trace(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags, const char *function) {
+#else // !NETDATA_LOG_COLLECTION_ERRORS
void rrddim_store_metric(RRDDIM *rd, usec_t point_end_time_ut, NETDATA_DOUBLE n, SN_FLAGS flags) {
+#endif // !NETDATA_LOG_COLLECTION_ERRORS
+#ifdef NETDATA_LOG_COLLECTION_ERRORS
+ rd->rrddim_store_metric_count++;
+
+ if(likely(rd->rrddim_store_metric_count > 1)) {
+ usec_t expected = rd->rrddim_store_metric_last_ut + rd->update_every * USEC_PER_SEC;
+
+ if(point_end_time_ut != rd->rrddim_store_metric_last_ut) {
+ internal_error(true,
+ "%s COLLECTION: 'host:%s/chart:%s/dim:%s' granularity %d, collection %zu, expected to store at tier 0 a value at %llu, but it gave %llu [%s%llu usec] (called from %s(), previously by %s())",
+ (point_end_time_ut < rd->rrddim_store_metric_last_ut) ? "**PAST**" : "GAP",
+ rrdhost_hostname(rd->rrdset->rrdhost), rrdset_id(rd->rrdset), rrddim_id(rd),
+ rd->update_every,
+ rd->rrddim_store_metric_count,
+ expected, point_end_time_ut,
+ (point_end_time_ut < rd->rrddim_store_metric_last_ut)?"by -" : "gap ",
+ expected - point_end_time_ut,
+ function,
+ rd->rrddim_store_metric_last_caller?rd->rrddim_store_metric_last_caller:"none");
+ }
+ }
+
+ rd->rrddim_store_metric_last_ut = point_end_time_ut;
+ rd->rrddim_store_metric_last_caller = function;
+#endif // NETDATA_LOG_COLLECTION_ERRORS
+
// store the metric on tier 0
rd->tiers[0]->collect_ops->store_metric(rd->tiers[0]->db_collection_handle, point_end_time_ut, n, 0, 0, 1, 0, flags);