diff options
author | Costa Tsaousis <costa@netdata.cloud> | 2022-11-15 23:49:42 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-11-15 23:49:42 +0200 |
commit | becd97a3660af34104c557ba6c2877f624143c2e (patch) | |
tree | 14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /streaming/receiver.c | |
parent | 1789d07c43182152437459a7a4f81267bbdd752c (diff) |
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)"
This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 25 |
1 files changed, 3 insertions, 22 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index a872642a44..40673f05b4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -240,11 +240,8 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { return 0; } - // for compressed streams, the compression signature header ends with a new line - // so, here we read a single line from the stream. - int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) { + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) { internal_error(true, "read_stream() failed (1)."); return 1; } @@ -287,7 +284,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { } // Fill read buffer with decompressed data - r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len); + r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); return 0; } @@ -727,16 +724,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(++localhost->senders_count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_connected(rpt->host); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); size_t count = streaming_parser(rpt, &cd, fp_in, fp_out, @@ -756,15 +746,6 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_disconnected(rpt->host); #ifdef ENABLE_ACLK |