summaryrefslogtreecommitdiffstats
path: root/streaming/receiver.c
diff options
context:
space:
mode:
authorCosta Tsaousis <costa@netdata.cloud>2022-11-15 23:49:42 +0200
committerGitHub <noreply@github.com>2022-11-15 23:49:42 +0200
commitbecd97a3660af34104c557ba6c2877f624143c2e (patch)
tree14fa78305bd8c27bf1fa06c9c346b8f7a8a1d721 /streaming/receiver.c
parent1789d07c43182152437459a7a4f81267bbdd752c (diff)
Revert "New journal disk based indexing for agent memory reduction" (#14000)
Revert "New journal disk based indexing for agent memory reduction (#13885)" This reverts commit 224b051a2b2bab39a4b536e531ab9ca590bf31bb.
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r--streaming/receiver.c25
1 files changed, 3 insertions, 22 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c
index a872642a44..40673f05b4 100644
--- a/streaming/receiver.c
+++ b/streaming/receiver.c
@@ -240,11 +240,8 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
return 0;
}
- // for compressed streams, the compression signature header ends with a new line
- // so, here we read a single line from the stream.
-
int ret = 0;
- if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) {
+ if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) {
internal_error(true, "read_stream() failed (1).");
return 1;
}
@@ -287,7 +284,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) {
}
// Fill read buffer with decompressed data
- r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len);
+ r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer));
return 0;
}
@@ -727,16 +724,9 @@ static int rrdpush_receive(struct receiver_state *rpt)
rrdhost_set_is_parent_label(++localhost->senders_count);
- if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
- RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- }
- rrdset_foreach_done(st);
- }
-
rrdcontext_host_child_connected(rpt->host);
+
rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED);
size_t count = streaming_parser(rpt, &cd, fp_in, fp_out,
@@ -756,15 +746,6 @@ static int rrdpush_receive(struct receiver_state *rpt)
error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).",
rpt->hostname, rpt->client_ip, rpt->client_port, count);
- if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) {
- RRDSET *st;
- rrdset_foreach_read(st, rpt->host) {
- rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS);
- rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED);
- }
- rrdset_foreach_done(st);
- }
-
rrdcontext_host_child_disconnected(rpt->host);
#ifdef ENABLE_ACLK