summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2023-02-07 22:26:16 +0200
committerGitHub <noreply@github.com>2023-02-07 22:26:16 +0200
commit8d3c3356ddeb6d62fa76b197e086e3e7fc5eb3dd (patch)
treee7661d49d0a0044cf1a5f1d3e0e6cc7dbc27f7a6 /streaming/receiver.c
parent12d92fe308f4107f67149ec9105b69ce2610a4f2 (diff)
Streaming interpolated values (#14431)
* first commit - untested * fix wrong begin command * added set v2 too * debug to log stream buffer * debug to log stream buffer * faster streaming printing * mark charts and dimensions as collected * use stream points even if sender is not enabled * comment out stream debug log * parse null as nan * custom begin v2 * custom set v2; replication now copies the anomalous flag too * custom end v2 * enabled stream log test * renamed to BEGIN2, SET2, END2 * dont mix up replay and v2 members in user object * fix typo * cleanup * support to v2 to v1 proxying * mark updated dimensions as such * do not log unknown flags * comment out stream debug log * send also the chart id on BEGIN2, v2 to v2 * update the data collections counter * v2 values are transferred in hex * faster hex parsing * a little more generic hex and dec printing and parsing * fix hex parsing * minor optimization in dbengine api * turn debugging into info message * generalized the timings tracking, so that it can be used in more places * commented out debug info * renamed conflicting variable with macro * remove wrong edits * integrated ML and added cleanup in case parsing is interrupted * disable data collection locking during v2 * cleanup stale ML locks; send updated chart variables during v2; add info to find stale locks * inject an END2 between repeated BEGIN2 from rrdset_done() * test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters * more fine grained dictionary atomics * remove unecessary return values * pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS * Revert "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" This reverts commit 846cdf2713e2a7ee2ff797f38db11714228800e9. * Revert "remove unecessary return values" This reverts commit 8c87d30f4d86f0f5d6b4562cf74fe7447138bbff. * Revert "more fine grained dictionary atomics" This reverts commit 984aec4234a340d197d45239ff9a10fd479fcf3c. * Revert "test: remove lockless single-threaded logic from dictionary and aral and apply the right acquire/release memory order to reference counters" This reverts commit c460b3d0ad497d2641bd0ea1d63cec7c052e74e4. * Apply again "pointer validation under NETDATA_DICTIONARY_VALIDATE_POINTERS" while keeping the improved atomic operations. This reverts commit f158d009 * fix last commit * fix last commit again * optimizations in dbengine * do not send anomaly bit on non-supporting agents (send it when the INTERPOLATED capability is available) * break long empty-points-loops in rrdset_done() * decide page alignment on new page allocation, not on every point collected * create max size pages but no smaller than 1/3 * Fix compilation when --disable-ml is specified * Return false * fixes for NETDATA_LOG_REPLICATION_REQUESTS * added compile option NETDATA_WITHOUT_WORKERS_LATENCY * put timings in BEGIN2, SET2, END2 * isolate begin2 ml * revert repositioning data collection lock * fixed multi-threading of statistics * do not lookup dimensions all the time if they come in the same order * update used on iteration, not on every points; also do better error handling --------- Co-authored-by: Stelios Fragkakis <52996999+stelfrag@users.noreply.github.com>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c8
1 files changed, 6 insertions, 2 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index 95652942e9..9378d2d825 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -332,6 +332,10 @@ static void streaming_parser_thread_cleanup(void *ptr) {
bool plugin_is_enabled(struct plugind *cd);
+void streaming_parser_cleanup(void *user) {
+ pluginsd_cleanup_v2(user);
+}
+
static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, int fd, void *ssl) {
size_t result;
@@ -343,7 +347,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
.trust_durations = 1
};
- PARSER *parser = parser_init(rpt->host, &user, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
+ PARSER *parser = parser_init(rpt->host, &user, streaming_parser_cleanup, NULL, NULL, fd, PARSER_INPUT_SPLIT, ssl);
rrd_collector_started();
@@ -416,7 +420,7 @@ static size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, i
}
done:
- result = user.count;
+ result = user.data_collections_count;
// free parser with the pop function
netdata_thread_cleanup_pop(1);