summaryrefslogtreecommitdiffstats
path: root/streaming/replication.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-01-10 19:59:21 +0200
committerGitHub <noreply@github.com>2023-01-10 19:59:21 +0200
commit368a26cfee6887ca0cb2301d93138f63b75e353a (patch)
treeb57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /streaming/replication.c
parentb513888be389f92b2323d1bb3fdf55c22d4e4bad (diff)
DBENGINE v2 (#14125)
* count open cache pages refering to datafile * eliminate waste flush attempts * remove eliminated variable * journal v2 scanning split functions * avoid locking open cache for a long time while migrating to journal v2 * dont acquire datafile for the loop; disable thread cancelability while a query is running * work on datafile acquiring * work on datafile deletion * work on datafile deletion again * logs of dbengine should start with DBENGINE * thread specific key for queries to check if a query finishes without a finalize * page_uuid is not used anymore * Cleanup judy traversal when building new v2 Remove not needed calls to metric registry * metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent * disable checks for invalid time-ranges * Remove type from page details * report scanning time * remove infinite loop from datafile acquire for deletion * remove infinite loop from datafile acquire for deletion again * trace query handles * properly allocate array of dimensions in replication * metrics cleanup * metrics registry uses arrayalloc * arrayalloc free should be protected by lock * use array alloc in page cache * journal v2 scanning fix * datafile reference leaking hunding * do not load metrics of future timestamps * initialize reasons * fix datafile reference leak * do not load pages that are entirely overlapped by others * expand metric retention atomically * split replication logic in initialization and execution * replication prepare ahead queries * replication prepare ahead queries fixed * fix replication workers accounting * add router active queries chart * restore accounting of pages metadata sources; cleanup replication * dont count skipped pages as unroutable * notes on services shutdown * do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file * do not add pages we dont need to pdc * time in range re-work to provide info about past and future matches * finner control on the pages selected for processing; accounting of page related issues * fix invalid reference to handle->page * eliminate data collection handle of pg_lookup_next * accounting for queries with gaps * query preprocessing the same way the processing is done; cache now supports all operations on Judy * dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3) * get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query * finner gaps calculation; accounting of overlapping pages in queries * fix gaps accounting * move datafile deletion to worker thread * tune libuv workers and thread stack size * stop netdata threads gradually * run indexing together with cache flush/evict * more work on clean shutdown * limit the number of pages to evict per run * do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction * economies on flags for smaller page footprint; cleanup and renames * eviction moves referenced pages to the end of the queue * use murmur hash for indexing partition * murmur should be static * use more indexing partitions * revert number of partitions to number of cpus * cancel threads first, then stop services * revert default thread stack size * dont execute replication requests of disconnected senders * wait more time for services that are exiting gradually * fixed last commit * finer control on page selection algorithm * default stacksize of 1MB * fix formatting * fix worker utilization going crazy when the number is rotating * avoid buffer full due to replication preprocessing of requests * support query priorities * add count of spins in spinlock when compiled with netdata internal checks * remove prioritization from dbengine queries; cache now uses mutexes for the queues * hot pages are now in sections judy arrays, like dirty * align replication queries to optimal page size * during flushing add to clean and evict in batches * Revert "during flushing add to clean and evict in batches" This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7. * dont lock clean while evicting pages during flushing * Revert "dont lock clean while evicting pages during flushing" This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3. * Revert "Revert "during flushing add to clean and evict in batches"" This reverts commit ca7a187537fb8f743992700427e13042561211ec. * dont cross locks during flushing, for the fastest flushes possible * low-priority queries load pages synchronously * Revert "low-priority queries load pages synchronously" This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7. * cache uses spinlock again * during flushing, dont lock the clean queue at all; each item is added atomically * do smaller eviction runs * evict one page at a time to minimize lock contention on the clean queue * fix eviction statistics * fix last commit * plain should be main cache * event loop cleanup; evictions and flushes can now happen concurrently * run flush and evictions from tier0 only * remove not needed variables * flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time * added worker jobs in timer to find the slow part of it * support fast eviction of pages when all_of_them is set * revert default thread stack size * bypass event loop for dispatching read extent commands to workers - send them directly * Revert "bypass event loop for dispatching read extent commands to workers - send them directly" This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce. * cache work requests * minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors * publish flushed pages to open cache in the thread pool * prevent eventloop requests from getting stacked in the event loop * single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c * more rrdengine.c cleanup * enable db rotation * do not log when there is a filter * do not run multiple migration to journal v2 * load all extents async * fix wrong paste * report opcodes waiting, works dispatched, works executing * cleanup event loop memory every 10 minutes * dont dispatch more work requests than the number of threads available * use the dispatched counter instead of the executing counter to check if the worker thread pool is full * remove UV_RUN_NOWAIT * replication to fill the queues * caching of extent buffers; code cleanup * caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation * single transaction wal * synchronous flushing * first cancel the threads, then signal them to exit * caching of rrdeng query handles; added priority to query target; health is now low prio * add priority to the missing points; do not allow critical priority in queries * offload query preparation and routing to libuv thread pool * updated timing charts for the offloaded query preparation * caching of WALs * accounting for struct caches (buffers); do not load extents with invalid sizes * protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset * also check if the expanded before is not the chart later updated time * also check if the expanded before is not after the wall clock time of when the query started * Remove unused variable * replication to queue less queries; cleanup of internal fatals * Mark dimension to be updated async * caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol) * disable pgc stress test, under an ifdef * disable mrg stress test under an ifdef * Mark chart and host labels, host info for async check and store in the database * dictionary items use arrayalloc * cache section pages structure is allocated with arrayalloc * Add function to wakeup the aclk query threads and check for exit Register function to be called during shutdown after signaling the service to exit * parallel preparation of all dimensions of queries * be more sensitive to enable streaming after replication * atomically finish chart replication * fix last commit * fix last commit again * fix last commit again again * fix last commit again again again * unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication * do not cancel start streaming; use high priority queries when we have locked chart data collection * prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered * opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue * Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore * Fix bad memory allocation / cleanup * Cleanup ACLK sync initialization (part 1) * Don't update metric registry during shutdown (part 1) * Prevent crash when dashboard is refreshed and host goes away * Mark ctx that is shutting down. Test not adding flushed pages to open cache as hot if we are shutting down * make ML work * Fix compile without NETDATA_INTERNAL_CHECKS * shutdown each ctx independently * fix completion of quiesce * do not update shared ML charts * Create ML charts on child hosts. When a parent runs a ML for a child, the relevant-ML charts should be created on the child host. These charts should use the parent's hostname to differentiate multiple parents that might run ML for a child. The only exception to this rule is the training/prediction resource usage charts. These are created on the localhost of the parent host, because they provide information specific to said host. * check new ml code * first save the database, then free all memory * dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db * Cleanup metadata thread (part 2) * increase refcount before dispatching prep command * Do not try to stop anomaly detection threads twice. A separate function call has been added to stop anomaly detection threads. This commit removes the left over function calls that were made internally when a host was being created/destroyed. * Remove allocations when smoothing samples buffer The number of dims per sample is always 1, ie. we are training and predicting only individual dimensions. * set the orphan flag when loading archived hosts * track worker dispatch callbacks and threadpool worker init * make ML threads joinable; mark ctx having flushing in progress as early as possible * fix allocation counter * Cleanup metadata thread (part 3) * Cleanup metadata thread (part 4) * Skip metadata host scan when running unittest * unittest support during init * dont use all the libuv threads for queries * break an infinite loop when sleep_usec() is interrupted * ml prediction is a collector for several charts * sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit * worker_unregister() in netdata threads cleanup * moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h * fixed ML issues * removed engine2 directory * added dbengine2 files in CMakeLists.txt * move query plan data to query target, so that they can be exposed by in jsonwrap * uniform definition of query plan according to the other query target members * event_loop should be in daemon, not libnetdata * metric_retention_by_uuid() is now part of the storage engine abstraction * unify time_t variables to have the suffix _s (meaning: seconds) * old dbengine statistics become "dbengine io" * do not enable ML resource usage charts by default * unify ml chart families, plugins and modules * cleanup query plans from query target * cleanup all extent buffers * added debug info for rrddim slot to time * rrddim now does proper gap management * full rewrite of the mem modes * use library functions for madvise * use CHECKSUM_SZ for the checksum size * fix coverity warning about the impossible case of returning a page that is entirely in the past of the query * fix dbengine shutdown * keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently * fine tune cache evictions * dont initialize health if the health service is not running - prevent crash on shutdown while children get connected * rename AS threads to ACLK[hostname] * prevent re-use of uninitialized memory in queries * use JulyL instead of JudyL for PDC operations - to test it first * add also JulyL files * fix July memory accounting * disable July for PDC (use Judy) * use the function to remove datafiles from linked list * fix july and event_loop * add july to libnetdata subdirs * rename time_t variables that end in _t to end in _s * replicate when there is a gap at the beginning of the replication period * reset postponing of sender connections when a receiver is connected * Adjust update every properly * fix replication infinite loop due to last change * packed enums in rrd.h and cleanup of obsolete rrd structure members * prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests() * void unused variable * void unused variables * fix indentation * entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps * macros to caclulate page entries by time and size * prevent statsd cleanup crash on exit * cleanup health thread related variables Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com> Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r--streaming/replication.c733
1 files changed, 480 insertions, 253 deletions
diff --git a/streaming/replication.c b/streaming/replication.c
index d659d701d5..5bfcb7540d 100644
--- a/streaming/replication.c
+++ b/streaming/replication.c
@@ -11,20 +11,21 @@
#define WORKER_JOB_QUERYING 2
#define WORKER_JOB_DELETE_ENTRY 3
#define WORKER_JOB_FIND_CHART 4
-#define WORKER_JOB_CHECK_CONSISTENCY 5
-#define WORKER_JOB_BUFFER_COMMIT 6
-#define WORKER_JOB_CLEANUP 7
-#define WORKER_JOB_WAIT 8
+#define WORKER_JOB_PREPARE_QUERY 5
+#define WORKER_JOB_CHECK_CONSISTENCY 6
+#define WORKER_JOB_BUFFER_COMMIT 7
+#define WORKER_JOB_CLEANUP 8
+#define WORKER_JOB_WAIT 9
// master thread worker jobs
-#define WORKER_JOB_STATISTICS 9
-#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10
-#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11
-#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12
-#define WORKER_JOB_CUSTOM_METRIC_ADDED 13
-#define WORKER_JOB_CUSTOM_METRIC_DONE 14
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15
-#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16
+#define WORKER_JOB_STATISTICS 10
+#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11
+#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12
+#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13
+#define WORKER_JOB_CUSTOM_METRIC_ADDED 14
+#define WORKER_JOB_CUSTOM_METRIC_DONE 15
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16
+#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17
#define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30
#define SECONDS_TO_RESET_POINT_IN_TIME 10
@@ -51,137 +52,313 @@ struct replication_dimension {
STORAGE_POINT sp;
struct storage_engine_query_handle handle;
bool enabled;
+ bool skip;
DICTIONARY *dict;
const DICTIONARY_ITEM *rda;
RRDDIM *rd;
};
-static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) {
+struct replication_query {
+ RRDSET *st;
+
+ struct {
+ time_t first_entry_t;
+ time_t last_entry_t;
+ } db;
+
+ struct { // what the parent requested
+ time_t after;
+ time_t before;
+ bool enable_streaming;
+ } request;
+
+ struct { // what the child will do
+ time_t after;
+ time_t before;
+ bool enable_streaming;
+
+ bool locked_data_collection;
+ bool execute;
+ } query;
+
+ time_t wall_clock_time;
+
+ size_t points_read;
+ size_t points_generated;
+
+ struct storage_engine_query_ops *ops;
+ struct replication_request *rq;
+
+ size_t dimensions;
+ struct replication_dimension data[];
+};
+
+static struct replication_query *replication_query_prepare(
+ RRDSET *st,
+ time_t db_first_entry,
+ time_t db_last_entry,
+ time_t requested_after,
+ time_t requested_before,
+ time_t query_after,
+ time_t query_before,
+ bool enable_streaming,
+ time_t wall_clock_time
+) {
size_t dimensions = rrdset_number_of_dimensions(st);
- size_t points_read = 0, points_generated = 0;
+ struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension));
+ q->dimensions = dimensions;
+ q->st = st;
- struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops;
- struct replication_dimension data[dimensions];
- memset(data, 0, sizeof(data));
+ q->db.first_entry_t = db_first_entry;
+ q->db.last_entry_t = db_last_entry;
- if(enable_streaming && st->last_updated.tv_sec > before) {
- internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)before,
- (unsigned long long)st->last_updated.tv_sec
- );
- before = st->last_updated.tv_sec;
+ q->request.after = requested_after,
+ q->request.before = requested_before,
+ q->request.enable_streaming = enable_streaming,
+
+ q->query.after = query_after;
+ q->query.before = query_before;
+ q->query.enable_streaming = enable_streaming;
+
+ q->wall_clock_time = wall_clock_time;
+
+ if (!q->dimensions || !q->query.after || !q->query.before) {
+ q->query.execute = false;
+ q->dimensions = 0;
+ return q;
+ }
+
+ if(q->query.enable_streaming) {
+ netdata_spinlock_lock(&st->data_collection_lock);
+ q->query.locked_data_collection = true;
+
+ if (st->last_updated.tv_sec > q->query.before) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY: 'host:%s/chart:%s' "
+ "has start_streaming = true, "
+ "adjusting replication before timestamp from %llu to %llu",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st),
+ (unsigned long long) q->query.before,
+ (unsigned long long) st->last_updated.tv_sec
+ );
+ q->query.before = st->last_updated.tv_sec;
+ }
}
+ q->ops = &st->rrdhost->db[0].eng->api.query_ops;
+
// prepare our array of dimensions
- {
- RRDDIM *rd;
- rrddim_foreach_read(rd, st) {
- if(unlikely(!rd || !rd_dfe.item || !rd->exposed))
- continue;
+ size_t count = 0;
+ RRDDIM *rd;
+ rrddim_foreach_read(rd, st) {
+ if (unlikely(!rd || !rd_dfe.item || !rd->exposed))
+ continue;
- if (unlikely(rd_dfe.counter >= dimensions)) {
- internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- break;
- }
+ if (unlikely(rd_dfe.counter >= q->dimensions)) {
+ internal_error(true,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones",
+ rrdhost_hostname(st->rrdhost), rrdset_id(st));
+ break;
+ }
- struct replication_dimension *d = &data[rd_dfe.counter];
+ struct replication_dimension *d = &q->data[rd_dfe.counter];
- d->dict = rd_dfe.dict;
- d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
- d->rd = rd;
+ d->dict = rd_dfe.dict;
+ d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item);
+ d->rd = rd;
- ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before);
- d->enabled = true;
+ q->ops->init(rd->tiers[0]->db_metric_handle, &d->handle, q->query.after, q->query.before,
+ q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW);
+ d->enabled = true;
+ d->skip = false;
+ count++;
+ }
+ rrddim_foreach_done(rd);
+
+ if(!count) {
+ // no data for this chart
+
+ q->query.execute = false;
+
+ if(q->query.locked_data_collection) {
+ netdata_spinlock_unlock(&st->data_collection_lock);
+ q->query.locked_data_collection = false;
}
- rrddim_foreach_done(rd);
+
+ }
+ else {
+ // we have data for this chart
+
+ q->query.execute = true;
+ }
+
+ return q;
+}
+
+static time_t replication_query_finalize(struct replication_query *q, bool executed) {
+ time_t query_before = q->query.before;
+ size_t dimensions = q->dimensions;
+
+ // release all the dictionary items acquired
+ // finalize the queries
+ size_t queries = 0;
+
+ for (size_t i = 0; i < dimensions; i++) {
+ struct replication_dimension *d = &q->data[i];
+ if (unlikely(!d->enabled)) continue;
+
+ q->ops->finalize(&d->handle);
+
+ dictionary_acquired_item_release(d->dict, d->rda);
+
+ // update global statistics
+ queries++;
+ }
+
+ if(q->query.locked_data_collection) {
+ netdata_spinlock_unlock(&q->st->data_collection_lock);
+ q->query.locked_data_collection = false;
+ }
+
+ if(executed) {
+ netdata_spinlock_lock(&replication_queries.spinlock);
+ replication_queries.queries_started += queries;
+ replication_queries.queries_finished += queries;
+ replication_queries.points_read += q->points_read;
+ replication_queries.points_generated += q->points_generated;
+ netdata_spinlock_unlock(&replication_queries.spinlock);
}
- time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before;
+ freez(q);
+
+ return query_before;
+}
+
+static void replication_query_align_to_optimal_before(struct replication_query *q) {
+ if(!q->query.execute || q->query.enable_streaming)
+ return;
+
+ size_t dimensions = q->dimensions;
+ time_t expanded_before = 0;
+
+ for (size_t i = 0; i < dimensions; i++) {
+ struct replication_dimension *d = &q->data[i];
+ if(unlikely(!d->enabled)) continue;
+
+ time_t new_before = q->ops->align_to_optimal_before(&d->handle);
+ if (!expanded_before || new_before < expanded_before)
+ expanded_before = new_before;
+ }
+
+ if(expanded_before > q->query.before && // it is later than the original
+ (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page)
+ expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time
+ expanded_before < q->wall_clock_time) // it is not later than the wall clock time
+ q->query.before = expanded_before;
+}
+
+static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replication_query *q) {
+ if(!q->query.execute)
+ return replication_query_finalize(q, false);
+
+ replication_query_align_to_optimal_before(q);
+
+ time_t after = q->query.after;
+ time_t before = q->query.before;
+ size_t dimensions = q->dimensions;
+ struct storage_engine_query_ops *ops = q->ops;
+ time_t wall_clock_time = q->wall_clock_time;
+
+ size_t points_read = q->points_read, points_generated = q->points_generated;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ time_t actual_after = 0, actual_before = 0;
+#endif
+
+ time_t now = after + 1;
while(now <= before) {
time_t min_start_time = 0, min_end_time = 0;
for (size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &data[i];
- if(unlikely(!d->enabled)) continue;
+ struct replication_dimension *d = &q->data[i];
+ if(unlikely(!d->enabled || d->skip)) continue;
// fetch the first valid point for the dimension
- int max_skip = 100;
- while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) {
+ int max_skip = 1000;
+ while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) {
d->sp = ops->next_metric(&d->handle);
points_read++;
}
- internal_error(max_skip <= 0,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now);
+ if(max_skip <= 0) {
+ d->skip = true;
- if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp)))
- continue;
+ error_limit_static_global_var(erl, 1, 0);
+ error_limit(&erl,
+ "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)",
+ rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd),
+ (unsigned long long) now);
- if(unlikely(!min_start_time)) {
- min_start_time = d->sp.start_time;
- min_end_time = d->sp.end_time;
- }
- else {
- min_start_time = MIN(min_start_time, d->sp.start_time);
- min_end_time = MIN(min_end_time, d->sp.end_time);
+ continue;
}
- }
- if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)min_start_time,
- (unsigned long long)min_end_time,
- (unsigned long long)wall_clock_time);
- break;
+ if(d->sp.end_time_s < now)
+ // this dimension does not have any more data
+ continue;
+
+ if(unlikely(!min_start_time))
+ min_start_time = d->sp.start_time_s;
+
+ if(unlikely(!min_end_time))
+ min_end_time = d->sp.end_time_s;
+
+ min_start_time = MIN(min_start_time, d->sp.start_time_s);
+ min_end_time = MIN(min_end_time, d->sp.end_time_s);
}
- if(unlikely(min_end_time < now)) {
-#ifdef NETDATA_LOG_REPLICATION_REQUESTS
- internal_error(true,
- "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu",
- rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now);
-#endif // NETDATA_LOG_REPLICATION_REQUESTS
+ if(unlikely(min_end_time < now))
break;
- }
- if(unlikely(min_end_time <= min_start_time))
- min_start_time = min_end_time - st->update_every;
+ if(likely(min_start_time <= now)) {
+ // we have a valid point
+
+ if (unlikely(min_end_time <= min_start_time))
+ min_start_time = min_end_time - q->st->update_every;
+
+#ifdef NETDATA_LOG_REPLICATION_REQUESTS
+ if (unlikely(!actual_after))
+ actual_after = min_end_time;
- if(unlikely(!actual_after)) {
- actual_after = min_end_time;
- actual_before = min_end_time;
- }
- else
actual_before = min_end_time;
+#endif
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n"
- , (unsigned long long)min_start_time
- , (unsigned long long)min_end_time
- , (unsigned long long)wall_clock_time
- );
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n",
+ (unsigned long long) min_start_time,
+ (unsigned long long) min_end_time,
+ (unsigned long long) wall_clock_time
+ );
- // output the replay values for this time
- for (size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &data[i];
- if(unlikely(!d->enabled)) continue;
+ // output the replay values for this time
+ for (size_t i = 0; i < dimensions; i++) {
+ struct replication_dimension *d = &q->data[i];
+ if (unlikely(!d->enabled)) continue;
- if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time))
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
- rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
+ if (likely( d->sp.start_time_s <= min_end_time &&
+ d->sp.end_time_s >= min_end_time &&
+ !storage_point_is_unset(d->sp) &&
+ !storage_point_is_empty(d->sp))) {
- else
- buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n",
- rrddim_id(d->rd));
+ buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n",
+ rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : "");
- points_generated++;
- }
+ points_generated++;
+ }
+ }
- now = min_end_time + 1;
+ now = min_end_time + 1;
+ }
+ else
+ now = min_start_time;
}
#ifdef NETDATA_LOG_REPLICATION_REQUESTS
@@ -202,32 +379,12 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti
(unsigned long long)after, (unsigned long long)before);
#endif // NETDATA_LOG_REPLICATION_REQUESTS
- // release all the dictionary items acquired
- // finalize the queries
- size_t queries = 0;
- for(size_t i = 0; i < dimensions ;i++) {
- struct replication_dimension *d = &data[i];
- if(unlikely(!d->enabled)) continue;
-
- ops->finalize(&d->handle);
-
- dictionary_acquired_item_release(d->dict, d->rda);
-
- // update global statistics
- queries++;
- }
-
- netdata_spinlock_lock(&replication_queries.spinlock);
- replication_queries.queries_started += queries;
- replication_queries.queries_finished += queries;
- replication_queries.points_read += points_read;
- replication_queries.points_generated += points_generated;
- netdata_spinlock_unlock(&replication_queries.spinlock);
-
- return before;
+ q->points_read = points_read;
+ q->points_generated = points_generated;
+ return replication_query_finalize(q, true);
}
-static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
+static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) {
RRDDIM *rd;
rrddim_foreach_read(rd, st) {
if(!rd->exposed) continue;
@@ -248,56 +405,23 @@ static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) {
);
}
-bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) {
- time_t query_after = after;
- time_t query_before = before;
- time_t now = now_realtime_sec();
- time_t tolerance = 2; // sometimes from the time we get this value, to the time we check,
- // a data collection has been made
- // so, we give this tolerance to detect invalid timestamps
-
- // find the first entry we have
- time_t first_entry_local = rrdset_first_entry_t(st);
- if(first_entry_local > now + tolerance) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)first_entry_local, (unsigned long long)now);
- first_entry_local = now;
- }
+static struct replication_query *replication_response_prepare(RRDSET *st, bool start_streaming, time_t requested_after, time_t requested_before) {
+ time_t query_after = requested_after;
+ time_t query_before = requested_before;
+ time_t wall_clock_time = now_realtime_sec();
- if (query_after < first_entry_local)
- query_after = first_entry_local;
-
- // find the latest entry we have
- time_t last_entry_local = st->last_updated.tv_sec;
- if(!last_entry_local) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- last_entry_local = rrdset_last_entry_t(st);
- if(!last_entry_local) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.",
- rrdhost_hostname(st->rrdhost), rrdset_id(st));
- last_entry_local = now;
- }
- }
+ time_t db_first_entry, db_last_entry;
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
- if(last_entry_local > now + tolerance) {
- internal_error(true,
- "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)",
- rrdhost_hostname(st->rrdhost), rrdset_id(st),
- (unsigned long long)last_entry_local, (unsigned long long)now);
- last_entry_local = now;
- }
+ if (query_after < db_first_entry)
+ query_after = db_first_entry;
- if (query_before > last_entry_local)
- query_before = last_entry_local;
+ if (query_before > db_last_entry)
+ query_before = db_last_entry;
// if the parent asked us to start streaming, then fill the rest with the data that we have
if (start_streaming)
- query_before = last_entry_local;
+ query_before = db_last_entry;
if (query_after > query_before) {
time_t tmp = query_before;
@@ -305,7 +429,29 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
query_after = tmp;
}
- bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false;
+ bool enable_streaming = (start_streaming || query_before == db_last_entry || !requested_after || !requested_before) ? true : false;
+
+ return replication_query_prepare(
+ st,
+ db_first_entry, db_last_entry,
+ requested_after, requested_before,
+ query_after, query_before, enable_streaming,
+ wall_clock_time);
+}
+
+void replication_response_cancel_and_finalize(struct replication_query *q) {
+ replication_query_finalize(q, false);
+}
+
+static bool sender_is_still_connected_for_this_request(struct replication_request *rq);
+
+bool replication_response_execute_and_finalize(struct replication_query *q) {
+ struct replication_request *rq = q->rq;
+ RRDSET *st = q->st;
+ RRDHOST *host = st->rrdhost;
+ time_t after = q->request.after;
+ time_t before; // the query will report this
+ bool enable_streaming = q->query.enable_streaming;
// we might want to optimize this by filling a temporary buffer
// and copying the result to the host's buffer in order to avoid
@@ -314,25 +460,22 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st));
- if(after != 0 && before != 0)
- before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now);
- else {
- after = 0;
- before = 0;
- enable_streaming = true;
- }
+ bool locked_data_collection = q->query.locked_data_collection;
+ q->query.locked_data_collection = false;
+
+ before = replication_query_execute_and_finalize(wb, q);
+
+ // IMPORTANT: q is invalid now
+ q = NULL;
// get again the world clock time
- time_t world_clock_time = now_realtime_sec();
- if(enable_streaming) {
- if(now < world_clock_time) {
- // we needed time to execute this request
- // so, the parent will need to replicate more data
- enable_streaming = false;
- }
- else
- replicate_chart_collection_state(wb, st);
- }
+ if(enable_streaming)
+ replication_send_chart_collection_state(wb, st);
+
+ // get a fresh retention to send to the parent
+ time_t wall_clock_time = now_realtime_sec();
+ time_t db_first_entry, db_last_entry;
+ rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0);
// end with first/last entries we have, and the first start time and
// last end time of the data we sent
@@ -342,7 +485,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t
(int)st->update_every
// child first db time, child end db time
- , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local
+ , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry
// start streaming boolean
, enable_streaming ? "true" : "false"
@@ -351,13 +494,37 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool s