From becd97a3660af34104c557ba6c2877f624143c2e Mon Sep 17 00:00:00 2001 From: Costa Tsaousis Date: Tue, 15 Nov 2022 23:49:42 +0200 Subject: Revert "New journal disk based indexing for agent memory reduction" (#14000) Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb. --- streaming/receiver.c | 25 +++---------------------- 1 file changed, 3 insertions(+), 22 deletions(-) (limited to 'streaming/receiver.c') diff --git a/streaming/receiver.c b/streaming/receiver.c index a872642a44..40673f05b4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -240,11 +240,8 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { return 0; } - // for compressed streams, the compression signature header ends with a new line - // so, here we read a single line from the stream. - int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) { + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) { internal_error(true, "read_stream() failed (1)."); return 1; } @@ -287,7 +284,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { } // Fill read buffer with decompressed data - r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len); + r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); return 0; } @@ -727,16 +724,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(++localhost->senders_count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_connected(rpt->host); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); size_t count = streaming_parser(rpt, &cd, fp_in, fp_out, @@ -756,15 +746,6 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_disconnected(rpt->host); #ifdef ENABLE_ACLK -- cgit v1.2.3