diff options
author | avstrakhov <57530959+avstrakhov@users.noreply.github.com> | 2022-01-19 18:57:49 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2022-01-19 17:57:49 +0200 |
commit | b003e5fd40c3b42dfacc87db5a7730b76eff0e92 (patch) | |
tree | da105b1b74c98cb567ff18aa73d282566bbff454 /streaming/receiver.c | |
parent | 8a01dabf89add0ef473d37bc8f2310cabfc1db43 (diff) |
Add code for LZ4 streaming data compression (#11821)
* Add code for LZ4 streaming data compression
* Fix LGTM alert
* Add lz4 library for link when compression enabled
* Add LZ4_resetStream_fast presence detection
* Disable compression for older LZ4 libraries
* Correct LZ4 API check
* [Testing Stream Compression] Debug msgs and report.md
* Add LZ4 library version using LZ4_initStream
* Fixed bug in SSL mode
* [Testing compression] - Add compression info messages
* Set compression enabled by default, update doc
* Update streaming/README.md
Co-authored-by: DShreve2 <david@netdata.cloud>
* [Agent Negotiation] Compression as separate capability
* [Agent Negotiation] Compression as separate capability - default compression variable always active
* Add code to negotiate compression
* [Agent Negotiation] Based on stream version
* [Agent Negotiation] Version based - fix compilation error
* [Agent Negotiation] Fix glob var default_compression_enbaled=0 affects all the connections - Handle compression - stream version based
* [Agent Negotiation - Compression] - Add control flag in 1. sender/receiver state & 2. stream.conf per child
* [Agent Negotiation - Compression] Fix stream.conf key, mguid control
* [Agent Negotiate Compression] Fine control on stream.conf per key,mguid for each child
* [Agent Negotiation Compression] Stop destroying compressor for runtime configuration + Update Readme.md
* [Agent Negotiation Compression] Use stream_version 4 if compression is disabled
* Correct child's compression check
* [Agent Negotiation Compression] Create streaming compression section in docs.
* [Agent Negotiation Compresion] Remove redundant debug msgs
* [Stream Compression] - integrate compression build info & config info in api/v1/info endpoint.
* [Agent Negotiation] Finalize README.md
* [Agent Stream Compression] Fix buildinfo json, Finalize readme.md
* [Agent Stream Compression] Negotiate compression based on stream version
* [Agent Stream Compression] Stream compression control per child in stream.conf | per AP_KEY, MACHINE_GUID
* [Agent Stream Compression] Avoid destroying compressor enabling runtime configuration + Update Readme.md
* [Agent Stream Compression] - Provide stream compression build info & config info in api/v1/info endpoint + Update Readme.md
* [Agent Stream Compression] Fix rebase conflicts
* [Agent Stream Compression] Fix more rebase conflicts
* [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api
* [Agent Stream Compression] 1. Stream version based negotiation 2. per child stream.conf control 3. finalize docs 4. stream compression build info in web api
* [Agent Stream Compression] Change unsuccessful buffer check to error
* [Agent Stream Compression] Readme.md proof-read corrections, downgrade to stream_version_clabels, add shields for supported versions, EOF lint
* [Agent Stream Compression] Fix missed lz4 library on Alpine Linux
* Phrasal review
Co-authored-by: odynik <odynik.ee@gmail.com>
Co-authored-by: DShreve2 <david@netdata.cloud>
Co-authored-by: Tina Lüdtke <tina@kickoke.com>
Diffstat (limited to 'streaming/receiver.c')
-rw-r--r-- | streaming/receiver.c | 156 |
1 files changed, 152 insertions, 4 deletions
diff --git a/streaming/receiver.c b/streaming/receiver.c index a0920e563d..5b8d7d7048 100644 --- a/streaming/receiver.c +++ b/streaming/receiver.c @@ -22,6 +22,10 @@ void destroy_receiver_state(struct receiver_state *rpt) { SSL_free(rpt->ssl.conn); } #endif +#ifdef ENABLE_COMPRESSION + if (rpt->decompressor) + rpt->decompressor->destroy(&rpt->decompressor); +#endif freez(rpt); } @@ -159,6 +163,8 @@ PARSER_RC streaming_claimed_id(char **words, void *user, PLUGINSD_ACTION *plugin return PARSER_RC_OK; } + +#ifndef ENABLE_COMPRESSION /* The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. */ static int receiver_read(struct receiver_state *r, FILE *fp) { @@ -186,6 +192,130 @@ static int receiver_read(struct receiver_state *r, FILE *fp) { r->read_len = strlen(r->read_buffer); return 0; } +#else +/* + * The receiver socket is blocking, perform a single read into a buffer so that we can reassemble lines for parsing. + * if SSL encryption is on, then use SSL API for reading stream data. + * Use line oriented fgets() in buffer from receiver_state is provided. + * In other cases use fread to read binary data from socket. + * Return zero on success and the number of bytes were read using pointer in the last argument. + */ +static int read_stream(struct receiver_state *r, FILE *fp, char* buffer, size_t size, int* ret) { + if (!ret) + return 1; + *ret = 0; +#ifdef ENABLE_HTTPS + if (r->ssl.conn && !r->ssl.flags) { + ERR_clear_error(); + if (buffer != r->read_buffer + r->read_len) { + *ret = SSL_read(r->ssl.conn, buffer, size); + if (*ret > 0 ) + return 0; + } else { + // we need to receive data with LF to parse compression header + size_t ofs = 0; + int res = 0; + while (ofs < size) { + do { + res = SSL_read(r->ssl.conn, buffer + ofs, 1); + } while (res == 0); + + if (res < 0) + break; + if (buffer[ofs] == '\n') + break; + ofs += res; + } + if (res > 0) { + ofs += res; + *ret = ofs; + buffer[ofs] = 0; + return 0; + } + } + // Don't treat SSL_ERROR_WANT_READ or SSL_ERROR_WANT_WRITE differently on blocking socket + u_long err; + char buf[256]; + while ((err = ERR_get_error()) != 0) { + ERR_error_string_n(err, buf, sizeof(buf)); + error("STREAM %s [receive from %s] ssl error: %s", r->hostname, r->client_ip, buf); + } + return 1; + } +#endif + if (buffer != r->read_buffer + r->read_len) { + // read to external buffer + *ret = fread(buffer, 1, size, fp); + if (!*ret) + return 1; + } else { + if (!fgets(r->read_buffer, sizeof(r->read_buffer), fp)) + return 1; + *ret = strlen(r->read_buffer); + } + return 0; +} + +/* + * Get the next line of data for parsing. + * Return data from the decompressor buffer if available. + * Otherwise read next line from the socket and check for compression header. + * Return the line was read If no compression header was found. + * Otherwise read the entire block of compressed data, decompress it + * and return it in receiver_state buffer. + * Return zero on success. + */ +static int receiver_read(struct receiver_state *r, FILE *fp) { + // check any decompressed data present + if (r->decompressor && + r->decompressor->decompressed_bytes_in_buffer(r->decompressor)) { + size_t available = sizeof(r->read_buffer) - r->read_len; + if (available) { + size_t len = r->decompressor->get(r->decompressor, + r->read_buffer + r->read_len, available); + if (!len) + return 1; + r->read_len += len; + } + return 0; + } + int ret = 0; + if (read_stream(r, fp, r->read_buffer + r->read_len, sizeof(r->read_buffer) - r->read_len - 1, &ret)) + return 1; + + if (!is_compressed_data(r->read_buffer, ret)) { + r->read_len += ret; + return 0; + } + + if (unlikely(!r->decompressor)) + r->decompressor = create_decompressor(); + + size_t bytes_to_read = r->decompressor->start(r->decompressor, + r->read_buffer, ret); + + // Read the entire block of compressed data because + // we're unable to decompress incomplete block + char compressed[bytes_to_read]; + do { + if (read_stream(r, fp, compressed, bytes_to_read, &ret)) + return 1; + // Send input data to decompressor + if (ret) + r->decompressor->put(r->decompressor, compressed, ret); + bytes_to_read -= ret; + } while (bytes_to_read > 0); + // Decompress + size_t bytes_to_parse = r->decompressor->decompress(r->decompressor); + if (!bytes_to_parse) + return 1; + // Fill read buffer with decompressed data + r->read_len = r->decompressor->get(r->decompressor, + r->read_buffer, sizeof(r->read_buffer)); + return 0; +} + +#endif /* Produce a full line if one exists, statefully return where we start next time. * When we hit the end of the buffer with a partial line move it to the beginning for the next fill. @@ -208,7 +338,6 @@ static char *receiver_next_line(struct receiver_state *r, int *pos) { return NULL; } - size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp) { size_t result; PARSER_USER_OBJECT *user = callocz(1, sizeof(*user)); @@ -239,12 +368,14 @@ size_t streaming_parser(struct receiver_state *rpt, struct plugind *cd, FILE *fp parser->plugins_action->overwrite_action = &pluginsd_overwrite_action; parser->plugins_action->chart_action = &pluginsd_chart_action; parser->plugins_action->set_action = &pluginsd_set_action; - parser->plugins_action->clabel_commit_action = &pluginsd_clabel_commit_action; - parser->plugins_action->clabel_action = &pluginsd_clabel_action; user->parser = parser; - do { +#ifdef ENABLE_COMPRESSION + if (rpt->decompressor) + rpt->decompressor->reset(rpt->decompressor); +#endif + do{ if (receiver_read(rpt, fp)) break; int pos = 0; @@ -311,6 +442,13 @@ static int rrdpush_receive(struct receiver_state *rpt) rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->key, "default proxy send charts matching", rrdpush_send_charts_matching); rrdpush_send_charts_matching = appconfig_get(&stream_config, rpt->machine_guid, "proxy send charts matching", rrdpush_send_charts_matching); +#ifdef ENABLE_COMPRESSION + unsigned int rrdpush_compression = default_compression_enabled; + rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->key, "enable compression", rrdpush_compression); + rrdpush_compression = appconfig_get_boolean(&stream_config, rpt->machine_guid, "enable compression", rrdpush_compression); + rpt->rrdpush_compression = (rrdpush_compression && default_compression_enabled); +#endif //ENABLE_COMPRESSION + (void)appconfig_set_default(&stream_config, rpt->machine_guid, "host tags", (rpt->tags)?rpt->tags:""); if (strcmp(rpt->machine_guid, localhost->machine_guid) == 0) { @@ -432,6 +570,16 @@ static int rrdpush_receive(struct receiver_state *rpt) info("STREAM %s [receive from [%s]:%s]: initializing communication...", rpt->host->hostname, rpt->client_ip, rpt->client_port); char initial_response[HTTP_HEADER_SIZE]; if (rpt->stream_version > 1) { + if(rpt->stream_version >= STREAM_VERSION_COMPRESSION){ +#ifdef ENABLE_COMPRESSION + if(!rpt->rrdpush_compression) + rpt->stream_version = STREAM_VERSION_CLABELS; +#else + if(STREAMING_PROTOCOL_CURRENT_VERSION < rpt->stream_version) { + rpt->stream_version = STREAMING_PROTOCOL_CURRENT_VERSION; + } +#endif + } info("STREAM %s [receive from [%s]:%s]: Netdata is using the stream version %u.", rpt->host->hostname, rpt->client_ip, rpt->client_port, rpt->stream_version); sprintf(initial_response, "%s%u", START_STREAMING_PROMPT_VN, rpt->stream_version); } else if (rpt->stream_version == 1) { |