diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2023-01-10 19:59:21 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2023-01-10 19:59:21 +0200 |
commit | 368a26cfee6887ca0cb2301d93138f63b75e353a (patch) | |
tree | b57e39fdb78dc57f7a2c1fcc3d9b6bf3c2a2a113 /streaming/replication.c | |
parent | b513888be389f92b2323d1bb3fdf55c22d4e4bad (diff) |
DBENGINE v2 (#14125)
* count open cache pages refering to datafile
* eliminate waste flush attempts
* remove eliminated variable
* journal v2 scanning split functions
* avoid locking open cache for a long time while migrating to journal v2
* dont acquire datafile for the loop; disable thread cancelability while a query is running
* work on datafile acquiring
* work on datafile deletion
* work on datafile deletion again
* logs of dbengine should start with DBENGINE
* thread specific key for queries to check if a query finishes without a finalize
* page_uuid is not used anymore
* Cleanup judy traversal when building new v2
Remove not needed calls to metric registry
* metric is 8 bytes smaller; timestamps are protected with a spinlock; timestamps in metric are now always coherent
* disable checks for invalid time-ranges
* Remove type from page details
* report scanning time
* remove infinite loop from datafile acquire for deletion
* remove infinite loop from datafile acquire for deletion again
* trace query handles
* properly allocate array of dimensions in replication
* metrics cleanup
* metrics registry uses arrayalloc
* arrayalloc free should be protected by lock
* use array alloc in page cache
* journal v2 scanning fix
* datafile reference leaking hunding
* do not load metrics of future timestamps
* initialize reasons
* fix datafile reference leak
* do not load pages that are entirely overlapped by others
* expand metric retention atomically
* split replication logic in initialization and execution
* replication prepare ahead queries
* replication prepare ahead queries fixed
* fix replication workers accounting
* add router active queries chart
* restore accounting of pages metadata sources; cleanup replication
* dont count skipped pages as unroutable
* notes on services shutdown
* do not migrate to journal v2 too early, while it has pending dirty pages in the main cache for the specific journal file
* do not add pages we dont need to pdc
* time in range re-work to provide info about past and future matches
* finner control on the pages selected for processing; accounting of page related issues
* fix invalid reference to handle->page
* eliminate data collection handle of pg_lookup_next
* accounting for queries with gaps
* query preprocessing the same way the processing is done; cache now supports all operations on Judy
* dynamic libuv workers based on number of processors; minimum libuv workers 8; replication query init ahead uses libuv workers - reserved ones (3)
* get into pdc all matching pages from main cache and open cache; do not do v2 scan if main cache and open cache can satisfy the query
* finner gaps calculation; accounting of overlapping pages in queries
* fix gaps accounting
* move datafile deletion to worker thread
* tune libuv workers and thread stack size
* stop netdata threads gradually
* run indexing together with cache flush/evict
* more work on clean shutdown
* limit the number of pages to evict per run
* do not lock the clean queue for accesses if it is not possible at that time - the page will be moved to the back of the list during eviction
* economies on flags for smaller page footprint; cleanup and renames
* eviction moves referenced pages to the end of the queue
* use murmur hash for indexing partition
* murmur should be static
* use more indexing partitions
* revert number of partitions to number of cpus
* cancel threads first, then stop services
* revert default thread stack size
* dont execute replication requests of disconnected senders
* wait more time for services that are exiting gradually
* fixed last commit
* finer control on page selection algorithm
* default stacksize of 1MB
* fix formatting
* fix worker utilization going crazy when the number is rotating
* avoid buffer full due to replication preprocessing of requests
* support query priorities
* add count of spins in spinlock when compiled with netdata internal checks
* remove prioritization from dbengine queries; cache now uses mutexes for the queues
* hot pages are now in sections judy arrays, like dirty
* align replication queries to optimal page size
* during flushing add to clean and evict in batches
* Revert "during flushing add to clean and evict in batches"
This reverts commit 8fb2b69d068499eacea6de8291c336e5e9f197c7.
* dont lock clean while evicting pages during flushing
* Revert "dont lock clean while evicting pages during flushing"
This reverts commit d6c82b5f40aeba86fc7aead062fab1b819ba58b3.
* Revert "Revert "during flushing add to clean and evict in batches""
This reverts commit ca7a187537fb8f743992700427e13042561211ec.
* dont cross locks during flushing, for the fastest flushes possible
* low-priority queries load pages synchronously
* Revert "low-priority queries load pages synchronously"
This reverts commit 1ef2662ddcd20fe5842b856c716df134c42d1dc7.
* cache uses spinlock again
* during flushing, dont lock the clean queue at all; each item is added atomically
* do smaller eviction runs
* evict one page at a time to minimize lock contention on the clean queue
* fix eviction statistics
* fix last commit
* plain should be main cache
* event loop cleanup; evictions and flushes can now happen concurrently
* run flush and evictions from tier0 only
* remove not needed variables
* flushing open cache is not needed; flushing protection is irrelevant since flushing is global for all tiers; added protection to datafiles so that only one flusher can run per datafile at any given time
* added worker jobs in timer to find the slow part of it
* support fast eviction of pages when all_of_them is set
* revert default thread stack size
* bypass event loop for dispatching read extent commands to workers - send them directly
* Revert "bypass event loop for dispatching read extent commands to workers - send them directly"
This reverts commit 2c08bc5bab12881ae33bc73ce5dea03dfc4e1fce.
* cache work requests
* minimize memory operations during flushing; caching of extent_io_descriptors and page_descriptors
* publish flushed pages to open cache in the thread pool
* prevent eventloop requests from getting stacked in the event loop
* single threaded dbengine controller; support priorities for all queries; major cleanup and restructuring of rrdengine.c
* more rrdengine.c cleanup
* enable db rotation
* do not log when there is a filter
* do not run multiple migration to journal v2
* load all extents async
* fix wrong paste
* report opcodes waiting, works dispatched, works executing
* cleanup event loop memory every 10 minutes
* dont dispatch more work requests than the number of threads available
* use the dispatched counter instead of the executing counter to check if the worker thread pool is full
* remove UV_RUN_NOWAIT
* replication to fill the queues
* caching of extent buffers; code cleanup
* caching of pdc and pd; rework on journal v2 indexing, datafile creation, database rotation
* single transaction wal
* synchronous flushing
* first cancel the threads, then signal them to exit
* caching of rrdeng query handles; added priority to query target; health is now low prio
* add priority to the missing points; do not allow critical priority in queries
* offload query preparation and routing to libuv thread pool
* updated timing charts for the offloaded query preparation
* caching of WALs
* accounting for struct caches (buffers); do not load extents with invalid sizes
* protection against memory booming during replication due to the optimal alignment of pages; sender thread buffer is now also reset when the circular buffer is reset
* also check if the expanded before is not the chart later updated time
* also check if the expanded before is not after the wall clock time of when the query started
* Remove unused variable
* replication to queue less queries; cleanup of internal fatals
* Mark dimension to be updated async
* caching of extent_page_details_list (epdl) and datafile_extent_offset_list (deol)
* disable pgc stress test, under an ifdef
* disable mrg stress test under an ifdef
* Mark chart and host labels, host info for async check and store in the database
* dictionary items use arrayalloc
* cache section pages structure is allocated with arrayalloc
* Add function to wakeup the aclk query threads and check for exit
Register function to be called during shutdown after signaling the service to exit
* parallel preparation of all dimensions of queries
* be more sensitive to enable streaming after replication
* atomically finish chart replication
* fix last commit
* fix last commit again
* fix last commit again again
* fix last commit again again again
* unify the normalization of retention calculation for collected charts; do not enable streaming if more than 60 points are to be transferred; eliminate an allocation during replication
* do not cancel start streaming; use high priority queries when we have locked chart data collection
* prevent starvation on opcodes execution, by allowing 2% of the requests to be re-ordered
* opcode now uses 2 spinlocks one for the caching of allocations and one for the waiting queue
* Remove check locks and NETDATA_VERIFY_LOCKS as it is not needed anymore
* Fix bad memory allocation / cleanup
* Cleanup ACLK sync initialization (part 1)
* Don't update metric registry during shutdown (part 1)
* Prevent crash when dashboard is refreshed and host goes away
* Mark ctx that is shutting down.
Test not adding flushed pages to open cache as hot if we are shutting down
* make ML work
* Fix compile without NETDATA_INTERNAL_CHECKS
* shutdown each ctx independently
* fix completion of quiesce
* do not update shared ML charts
* Create ML charts on child hosts.
When a parent runs a ML for a child, the relevant-ML charts
should be created on the child host. These charts should use
the parent's hostname to differentiate multiple parents that might
run ML for a child.
The only exception to this rule is the training/prediction resource
usage charts. These are created on the localhost of the parent host,
because they provide information specific to said host.
* check new ml code
* first save the database, then free all memory
* dbengine prep exit before freeing all memory; fixed deadlock in cache hot to dirty; added missing check to query engine about metrics without any data in the db
* Cleanup metadata thread (part 2)
* increase refcount before dispatching prep command
* Do not try to stop anomaly detection threads twice.
A separate function call has been added to stop anomaly detection threads.
This commit removes the left over function calls that were made
internally when a host was being created/destroyed.
* Remove allocations when smoothing samples buffer
The number of dims per sample is always 1, ie. we are training and
predicting only individual dimensions.
* set the orphan flag when loading archived hosts
* track worker dispatch callbacks and threadpool worker init
* make ML threads joinable; mark ctx having flushing in progress as early as possible
* fix allocation counter
* Cleanup metadata thread (part 3)
* Cleanup metadata thread (part 4)
* Skip metadata host scan when running unittest
* unittest support during init
* dont use all the libuv threads for queries
* break an infinite loop when sleep_usec() is interrupted
* ml prediction is a collector for several charts
* sleep_usec() now makes sure it will never loop if it passes the time expected; sleep_usec() now uses nanosleep() because clock_nanosleep() misses signals on netdata exit
* worker_unregister() in netdata threads cleanup
* moved pdc/epdl/deol/extent_buffer related code to pdc.c and pdc.h
* fixed ML issues
* removed engine2 directory
* added dbengine2 files in CMakeLists.txt
* move query plan data to query target, so that they can be exposed by in jsonwrap
* uniform definition of query plan according to the other query target members
* event_loop should be in daemon, not libnetdata
* metric_retention_by_uuid() is now part of the storage engine abstraction
* unify time_t variables to have the suffix _s (meaning: seconds)
* old dbengine statistics become "dbengine io"
* do not enable ML resource usage charts by default
* unify ml chart families, plugins and modules
* cleanup query plans from query target
* cleanup all extent buffers
* added debug info for rrddim slot to time
* rrddim now does proper gap management
* full rewrite of the mem modes
* use library functions for madvise
* use CHECKSUM_SZ for the checksum size
* fix coverity warning about the impossible case of returning a page that is entirely in the past of the query
* fix dbengine shutdown
* keep the old datafile lock until a new datafile has been created, to avoid creating multiple datafiles concurrently
* fine tune cache evictions
* dont initialize health if the health service is not running - prevent crash on shutdown while children get connected
* rename AS threads to ACLK[hostname]
* prevent re-use of uninitialized memory in queries
* use JulyL instead of JudyL for PDC operations - to test it first
* add also JulyL files
* fix July memory accounting
* disable July for PDC (use Judy)
* use the function to remove datafiles from linked list
* fix july and event_loop
* add july to libnetdata subdirs
* rename time_t variables that end in _t to end in _s
* replicate when there is a gap at the beginning of the replication period
* reset postponing of sender connections when a receiver is connected
* Adjust update every properly
* fix replication infinite loop due to last change
* packed enums in rrd.h and cleanup of obsolete rrd structure members
* prevent deadlock in replication: replication_recalculate_buffer_used_ratio_unsafe() deadlocking with replication_sender_delete_pending_requests()
* void unused variable
* void unused variables
* fix indentation
* entries_by_time calculation in VD was wrong; restored internal checks for checking future timestamps
* macros to caclulate page entries by time and size
* prevent statsd cleanup crash on exit
* cleanup health thread related variables
Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Co-authored-by: vkalintiris <vasilis@netdata.cloud>
Diffstat (limited to 'streaming/replication.c')
-rw-r--r-- | streaming/replication.c | 733 |
1 files changed, 480 insertions, 253 deletions
diff --git a/streaming/replication.c b/streaming/replication.c index d659d701d5..5bfcb7540d 100644 --- a/streaming/replication.c +++ b/streaming/replication.c @@ -11,20 +11,21 @@ #define WORKER_JOB_QUERYING 2 #define WORKER_JOB_DELETE_ENTRY 3 #define WORKER_JOB_FIND_CHART 4 -#define WORKER_JOB_CHECK_CONSISTENCY 5 -#define WORKER_JOB_BUFFER_COMMIT 6 -#define WORKER_JOB_CLEANUP 7 -#define WORKER_JOB_WAIT 8 +#define WORKER_JOB_PREPARE_QUERY 5 +#define WORKER_JOB_CHECK_CONSISTENCY 6 +#define WORKER_JOB_BUFFER_COMMIT 7 +#define WORKER_JOB_CLEANUP 8 +#define WORKER_JOB_WAIT 9 // master thread worker jobs -#define WORKER_JOB_STATISTICS 9 -#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 10 -#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 11 -#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 12 -#define WORKER_JOB_CUSTOM_METRIC_ADDED 13 -#define WORKER_JOB_CUSTOM_METRIC_DONE 14 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 15 -#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 16 +#define WORKER_JOB_STATISTICS 10 +#define WORKER_JOB_CUSTOM_METRIC_PENDING_REQUESTS 11 +#define WORKER_JOB_CUSTOM_METRIC_SKIPPED_NO_ROOM 12 +#define WORKER_JOB_CUSTOM_METRIC_COMPLETION 13 +#define WORKER_JOB_CUSTOM_METRIC_ADDED 14 +#define WORKER_JOB_CUSTOM_METRIC_DONE 15 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_RESETS 16 +#define WORKER_JOB_CUSTOM_METRIC_SENDER_FULL 17 #define ITERATIONS_IDLE_WITHOUT_PENDING_TO_RUN_SENDER_VERIFICATION 30 #define SECONDS_TO_RESET_POINT_IN_TIME 10 @@ -51,137 +52,313 @@ struct replication_dimension { STORAGE_POINT sp; struct storage_engine_query_handle handle; bool enabled; + bool skip; DICTIONARY *dict; const DICTIONARY_ITEM *rda; RRDDIM *rd; }; -static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, time_t before, bool enable_streaming, time_t wall_clock_time) { +struct replication_query { + RRDSET *st; + + struct { + time_t first_entry_t; + time_t last_entry_t; + } db; + + struct { // what the parent requested + time_t after; + time_t before; + bool enable_streaming; + } request; + + struct { // what the child will do + time_t after; + time_t before; + bool enable_streaming; + + bool locked_data_collection; + bool execute; + } query; + + time_t wall_clock_time; + + size_t points_read; + size_t points_generated; + + struct storage_engine_query_ops *ops; + struct replication_request *rq; + + size_t dimensions; + struct replication_dimension data[]; +}; + +static struct replication_query *replication_query_prepare( + RRDSET *st, + time_t db_first_entry, + time_t db_last_entry, + time_t requested_after, + time_t requested_before, + time_t query_after, + time_t query_before, + bool enable_streaming, + time_t wall_clock_time +) { size_t dimensions = rrdset_number_of_dimensions(st); - size_t points_read = 0, points_generated = 0; + struct replication_query *q = callocz(1, sizeof(struct replication_query) + dimensions * sizeof(struct replication_dimension)); + q->dimensions = dimensions; + q->st = st; - struct storage_engine_query_ops *ops = &st->rrdhost->db[0].eng->api.query_ops; - struct replication_dimension data[dimensions]; - memset(data, 0, sizeof(data)); + q->db.first_entry_t = db_first_entry; + q->db.last_entry_t = db_last_entry; - if(enable_streaming && st->last_updated.tv_sec > before) { - internal_error(true, "STREAM_SENDER REPLAY: 'host:%s/chart:%s' has start_streaming = true, adjusting replication before timestamp from %llu to %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)before, - (unsigned long long)st->last_updated.tv_sec - ); - before = st->last_updated.tv_sec; + q->request.after = requested_after, + q->request.before = requested_before, + q->request.enable_streaming = enable_streaming, + + q->query.after = query_after; + q->query.before = query_before; + q->query.enable_streaming = enable_streaming; + + q->wall_clock_time = wall_clock_time; + + if (!q->dimensions || !q->query.after || !q->query.before) { + q->query.execute = false; + q->dimensions = 0; + return q; + } + + if(q->query.enable_streaming) { + netdata_spinlock_lock(&st->data_collection_lock); + q->query.locked_data_collection = true; + + if (st->last_updated.tv_sec > q->query.before) { + internal_error(true, + "STREAM_SENDER REPLAY: 'host:%s/chart:%s' " + "has start_streaming = true, " + "adjusting replication before timestamp from %llu to %llu", + rrdhost_hostname(st->rrdhost), rrdset_id(st), + (unsigned long long) q->query.before, + (unsigned long long) st->last_updated.tv_sec + ); + q->query.before = st->last_updated.tv_sec; + } } + q->ops = &st->rrdhost->db[0].eng->api.query_ops; + // prepare our array of dimensions - { - RRDDIM *rd; - rrddim_foreach_read(rd, st) { - if(unlikely(!rd || !rd_dfe.item || !rd->exposed)) - continue; + size_t count = 0; + RRDDIM *rd; + rrddim_foreach_read(rd, st) { + if (unlikely(!rd || !rd_dfe.item || !rd->exposed)) + continue; - if (unlikely(rd_dfe.counter >= dimensions)) { - internal_error(true, "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - break; - } + if (unlikely(rd_dfe.counter >= q->dimensions)) { + internal_error(true, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' has more dimensions than the replicated ones", + rrdhost_hostname(st->rrdhost), rrdset_id(st)); + break; + } - struct replication_dimension *d = &data[rd_dfe.counter]; + struct replication_dimension *d = &q->data[rd_dfe.counter]; - d->dict = rd_dfe.dict; - d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); - d->rd = rd; + d->dict = rd_dfe.dict; + d->rda = dictionary_acquired_item_dup(rd_dfe.dict, rd_dfe.item); + d->rd = rd; - ops->init(rd->tiers[0]->db_metric_handle, &d->handle, after, before); - d->enabled = true; + q->ops->init(rd->tiers[0]->db_metric_handle, &d->handle, q->query.after, q->query.before, + q->query.locked_data_collection ? STORAGE_PRIORITY_HIGH : STORAGE_PRIORITY_LOW); + d->enabled = true; + d->skip = false; + count++; + } + rrddim_foreach_done(rd); + + if(!count) { + // no data for this chart + + q->query.execute = false; + + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&st->data_collection_lock); + q->query.locked_data_collection = false; } - rrddim_foreach_done(rd); + + } + else { + // we have data for this chart + + q->query.execute = true; + } + + return q; +} + +static time_t replication_query_finalize(struct replication_query *q, bool executed) { + time_t query_before = q->query.before; + size_t dimensions = q->dimensions; + + // release all the dictionary items acquired + // finalize the queries + size_t queries = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; + + q->ops->finalize(&d->handle); + + dictionary_acquired_item_release(d->dict, d->rda); + + // update global statistics + queries++; + } + + if(q->query.locked_data_collection) { + netdata_spinlock_unlock(&q->st->data_collection_lock); + q->query.locked_data_collection = false; + } + + if(executed) { + netdata_spinlock_lock(&replication_queries.spinlock); + replication_queries.queries_started += queries; + replication_queries.queries_finished += queries; + replication_queries.points_read += q->points_read; + replication_queries.points_generated += q->points_generated; + netdata_spinlock_unlock(&replication_queries.spinlock); } - time_t now = after + 1, actual_after = 0, actual_before = 0; (void)actual_before; + freez(q); + + return query_before; +} + +static void replication_query_align_to_optimal_before(struct replication_query *q) { + if(!q->query.execute || q->query.enable_streaming) + return; + + size_t dimensions = q->dimensions; + time_t expanded_before = 0; + + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled)) continue; + + time_t new_before = q->ops->align_to_optimal_before(&d->handle); + if (!expanded_before || new_before < expanded_before) + expanded_before = new_before; + } + + if(expanded_before > q->query.before && // it is later than the original + (expanded_before - q->query.before) / q->st->update_every < 1024 && // it is reasonable (up to a page) + expanded_before < q->st->last_updated.tv_sec && // it is not the chart's last updated time + expanded_before < q->wall_clock_time) // it is not later than the wall clock time + q->query.before = expanded_before; +} + +static time_t replication_query_execute_and_finalize(BUFFER *wb, struct replication_query *q) { + if(!q->query.execute) + return replication_query_finalize(q, false); + + replication_query_align_to_optimal_before(q); + + time_t after = q->query.after; + time_t before = q->query.before; + size_t dimensions = q->dimensions; + struct storage_engine_query_ops *ops = q->ops; + time_t wall_clock_time = q->wall_clock_time; + + size_t points_read = q->points_read, points_generated = q->points_generated; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + time_t actual_after = 0, actual_before = 0; +#endif + + time_t now = after + 1; while(now <= before) { time_t min_start_time = 0, min_end_time = 0; for (size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + struct replication_dimension *d = &q->data[i]; + if(unlikely(!d->enabled || d->skip)) continue; // fetch the first valid point for the dimension - int max_skip = 100; - while(d->sp.end_time < now && !ops->is_finished(&d->handle) && max_skip-- > 0) { + int max_skip = 1000; + while(d->sp.end_time_s < now && !ops->is_finished(&d->handle) && max_skip-- >= 0) { d->sp = ops->next_metric(&d->handle); points_read++; } - internal_error(max_skip <= 0, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), rrddim_id(d->rd), (unsigned long long) now); + if(max_skip <= 0) { + d->skip = true; - if(unlikely(d->sp.end_time < now || storage_point_is_unset(d->sp) || storage_point_is_empty(d->sp))) - continue; + error_limit_static_global_var(erl, 1, 0); + error_limit(&erl, + "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s/dim:%s': db does not advance the query beyond time %llu (tried 1000 times to get the next point and always got back a point in the past)", + rrdhost_hostname(q->st->rrdhost), rrdset_id(q->st), rrddim_id(d->rd), + (unsigned long long) now); - if(unlikely(!min_start_time)) { - min_start_time = d->sp.start_time; - min_end_time = d->sp.end_time; - } - else { - min_start_time = MIN(min_start_time, d->sp.start_time); - min_end_time = MIN(min_end_time, d->sp.end_time); + continue; } - } - if(unlikely(min_start_time > wall_clock_time + 1 || min_end_time > wall_clock_time + st->update_every + 1)) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s': db provided future start time %llu or end time %llu (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)min_start_time, - (unsigned long long)min_end_time, - (unsigned long long)wall_clock_time); - break; + if(d->sp.end_time_s < now) + // this dimension does not have any more data + continue; + + if(unlikely(!min_start_time)) + min_start_time = d->sp.start_time_s; + + if(unlikely(!min_end_time)) + min_end_time = d->sp.end_time_s; + + min_start_time = MIN(min_start_time, d->sp.start_time_s); + min_end_time = MIN(min_end_time, d->sp.end_time_s); } - if(unlikely(min_end_time < now)) { -#ifdef NETDATA_LOG_REPLICATION_REQUESTS - internal_error(true, - "STREAM_SENDER REPLAY: 'host:%s/chart:%s': no data on any dimension beyond time %llu", - rrdhost_hostname(st->rrdhost), rrdset_id(st), (unsigned long long)now); -#endif // NETDATA_LOG_REPLICATION_REQUESTS + if(unlikely(min_end_time < now)) break; - } - if(unlikely(min_end_time <= min_start_time)) - min_start_time = min_end_time - st->update_every; + if(likely(min_start_time <= now)) { + // we have a valid point + + if (unlikely(min_end_time <= min_start_time)) + min_start_time = min_end_time - q->st->update_every; + +#ifdef NETDATA_LOG_REPLICATION_REQUESTS + if (unlikely(!actual_after)) + actual_after = min_end_time; - if(unlikely(!actual_after)) { - actual_after = min_end_time; - actual_before = min_end_time; - } - else actual_before = min_end_time; +#endif - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n" - , (unsigned long long)min_start_time - , (unsigned long long)min_end_time - , (unsigned long long)wall_clock_time - ); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " '' %llu %llu %llu\n", + (unsigned long long) min_start_time, + (unsigned long long) min_end_time, + (unsigned long long) wall_clock_time + ); - // output the replay values for this time - for (size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; + // output the replay values for this time + for (size_t i = 0; i < dimensions; i++) { + struct replication_dimension *d = &q->data[i]; + if (unlikely(!d->enabled)) continue; - if(likely(d->sp.start_time <= min_end_time && d->sp.end_time >= min_end_time)) - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", - rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); + if (likely( d->sp.start_time_s <= min_end_time && + d->sp.end_time_s >= min_end_time && + !storage_point_is_unset(d->sp) && + !storage_point_is_empty(d->sp))) { - else - buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" NAN \"E\"\n", - rrddim_id(d->rd)); + buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_SET " \"%s\" " NETDATA_DOUBLE_FORMAT " \"%s\"\n", + rrddim_id(d->rd), d->sp.sum, d->sp.flags & SN_FLAG_RESET ? "R" : ""); - points_generated++; - } + points_generated++; + } + } - now = min_end_time + 1; + now = min_end_time + 1; + } + else + now = min_start_time; } #ifdef NETDATA_LOG_REPLICATION_REQUESTS @@ -202,32 +379,12 @@ static time_t replicate_chart_timeframe(BUFFER *wb, RRDSET *st, time_t after, ti (unsigned long long)after, (unsigned long long)before); #endif // NETDATA_LOG_REPLICATION_REQUESTS - // release all the dictionary items acquired - // finalize the queries - size_t queries = 0; - for(size_t i = 0; i < dimensions ;i++) { - struct replication_dimension *d = &data[i]; - if(unlikely(!d->enabled)) continue; - - ops->finalize(&d->handle); - - dictionary_acquired_item_release(d->dict, d->rda); - - // update global statistics - queries++; - } - - netdata_spinlock_lock(&replication_queries.spinlock); - replication_queries.queries_started += queries; - replication_queries.queries_finished += queries; - replication_queries.points_read += points_read; - replication_queries.points_generated += points_generated; - netdata_spinlock_unlock(&replication_queries.spinlock); - - return before; + q->points_read = points_read; + q->points_generated = points_generated; + return replication_query_finalize(q, true); } -static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { +static void replication_send_chart_collection_state(BUFFER *wb, RRDSET *st) { RRDDIM *rd; rrddim_foreach_read(rd, st) { if(!rd->exposed) continue; @@ -248,56 +405,23 @@ static void replicate_chart_collection_state(BUFFER *wb, RRDSET *st) { ); } -bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, time_t after, time_t before) { - time_t query_after = after; - time_t query_before = before; - time_t now = now_realtime_sec(); - time_t tolerance = 2; // sometimes from the time we get this value, to the time we check, - // a data collection has been made - // so, we give this tolerance to detect invalid timestamps - - // find the first entry we have - time_t first_entry_local = rrdset_first_entry_t(st); - if(first_entry_local > now + tolerance) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db first time %llu is in the future (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)first_entry_local, (unsigned long long)now); - first_entry_local = now; - } +static struct replication_query *replication_response_prepare(RRDSET *st, bool start_streaming, time_t requested_after, time_t requested_before) { + time_t query_after = requested_after; + time_t query_before = requested_before; + time_t wall_clock_time = now_realtime_sec(); - if (query_after < first_entry_local) - query_after = first_entry_local; - - // find the latest entry we have - time_t last_entry_local = st->last_updated.tv_sec; - if(!last_entry_local) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' RRDSET reports last updated time zero.", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - last_entry_local = rrdset_last_entry_t(st); - if(!last_entry_local) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' db reports last time zero.", - rrdhost_hostname(st->rrdhost), rrdset_id(st)); - last_entry_local = now; - } - } + time_t db_first_entry, db_last_entry; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); - if(last_entry_local > now + tolerance) { - internal_error(true, - "STREAM_SENDER REPLAY ERROR: 'host:%s/chart:%s' last updated time %llu is in the future (now is %llu)", - rrdhost_hostname(st->rrdhost), rrdset_id(st), - (unsigned long long)last_entry_local, (unsigned long long)now); - last_entry_local = now; - } + if (query_after < db_first_entry) + query_after = db_first_entry; - if (query_before > last_entry_local) - query_before = last_entry_local; + if (query_before > db_last_entry) + query_before = db_last_entry; // if the parent asked us to start streaming, then fill the rest with the data that we have if (start_streaming) - query_before = last_entry_local; + query_before = db_last_entry; if (query_after > query_before) { time_t tmp = query_before; @@ -305,7 +429,29 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t query_after = tmp; } - bool enable_streaming = (start_streaming || query_before == last_entry_local || !after || !before) ? true : false; + bool enable_streaming = (start_streaming || query_before == db_last_entry || !requested_after || !requested_before) ? true : false; + + return replication_query_prepare( + st, + db_first_entry, db_last_entry, + requested_after, requested_before, + query_after, query_before, enable_streaming, + wall_clock_time); +} + +void replication_response_cancel_and_finalize(struct replication_query *q) { + replication_query_finalize(q, false); +} + +static bool sender_is_still_connected_for_this_request(struct replication_request *rq); + +bool replication_response_execute_and_finalize(struct replication_query *q) { + struct replication_request *rq = q->rq; + RRDSET *st = q->st; + RRDHOST *host = st->rrdhost; + time_t after = q->request.after; + time_t before; // the query will report this + bool enable_streaming = q->query.enable_streaming; // we might want to optimize this by filling a temporary buffer // and copying the result to the host's buffer in order to avoid @@ -314,25 +460,22 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t buffer_sprintf(wb, PLUGINSD_KEYWORD_REPLAY_BEGIN " \"%s\"\n", rrdset_id(st)); - if(after != 0 && before != 0) - before = replicate_chart_timeframe(wb, st, query_after, query_before, enable_streaming, now); - else { - after = 0; - before = 0; - enable_streaming = true; - } + bool locked_data_collection = q->query.locked_data_collection; + q->query.locked_data_collection = false; + + before = replication_query_execute_and_finalize(wb, q); + + // IMPORTANT: q is invalid now + q = NULL; // get again the world clock time - time_t world_clock_time = now_realtime_sec(); - if(enable_streaming) { - if(now < world_clock_time) { - // we needed time to execute this request - // so, the parent will need to replicate more data - enable_streaming = false; - } - else - replicate_chart_collection_state(wb, st); - } + if(enable_streaming) + replication_send_chart_collection_state(wb, st); + + // get a fresh retention to send to the parent + time_t wall_clock_time = now_realtime_sec(); + time_t db_first_entry, db_last_entry; + rrdset_get_retention_of_tier_for_collected_chart(st, &db_first_entry, &db_last_entry, wall_clock_time, 0); // end with first/last entries we have, and the first start time and // last end time of the data we sent @@ -342,7 +485,7 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool start_streaming, t (int)st->update_every // child first db time, child end db time - , (unsigned long long)first_entry_local, (unsigned long long)last_entry_local + , (unsigned long long)db_first_entry, (unsigned long long)db_last_entry // start streaming boolean , enable_streaming ? "true" : "false" @@ -351,13 +494,37 @@ bool replicate_chart_response(RRDHOST *host, RRDSET *st, bool s |