diff options
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 25 |
1 files changed, 3 insertions, 22 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index a872642a44..40673f05b4 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -240,11 +240,8 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { return 0; } - // for compressed streams, the compression signature header ends with a new line - // so, here we read a single line from the stream. - int ret = 0; - if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len, &ret)) { + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) { internal_error(true, "read_stream() failed (1)."); return 1; } @@ -287,7 +284,7 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { } // Fill read buffer with decompressed data - r->read_len += (int)r->decompressor->get(r->decompressor, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len); + r->read_len = r->decompressor->get(r->decompressor, r->read_buffer, sizeof(r->read_buffer)); return 0; } @@ -727,16 +724,9 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdhost_set_is_parent_label(++localhost->senders_count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS | RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_connected(rpt->host); + rrdhost_flag_clear(rpt->host, RRDHOST_FLAG_RRDPUSH_RECEIVER_DISCONNECTED); size_t count = streaming_parser(rpt, &cd, fp_in, fp_out, @@ -756,15 +746,6 @@ static int rrdpush_receive(struct receiver_state *rpt) error("STREAM %s [receive from [%s]:%s]: disconnected (completed %zu updates).", rpt->hostname, rpt->client_ip, rpt->client_port, count); - if(stream_has_capability(rpt->host->receiver, STREAM_CAP_REPLICATION)) { - RRDSET *st; - rrdset_foreach_read(st, rpt->host) { - rrdset_flag_clear(st, RRDSET_FLAG_RECEIVER_REPLICATION_IN_PROGRESS); - rrdset_flag_set(st, RRDSET_FLAG_RECEIVER_REPLICATION_FINISHED); - } - rrdset_foreach_done(st); - } - rrdcontext_host_child_disconnected(rpt->host); #ifdef ENABLE_ACLK |