summaryrefslogtreecommitdiffstats
path: root/src/aclk/mqtt_websockets/mqtt_ng.c
diff options
context:
space:
mode:
Diffstat (limited to 'src/aclk/mqtt_websockets/mqtt_ng.c')
-rw-r--r--src/aclk/mqtt_websockets/mqtt_ng.c77
1 files changed, 34 insertions, 43 deletions
diff --git a/src/aclk/mqtt_websockets/mqtt_ng.c b/src/aclk/mqtt_websockets/mqtt_ng.c
index f570fde712..96099aa687 100644
--- a/src/aclk/mqtt_websockets/mqtt_ng.c
+++ b/src/aclk/mqtt_websockets/mqtt_ng.c
@@ -1,16 +1,10 @@
-// Copyright: SPDX-License-Identifier: GPL-3.0-only
+// SPDX-License-Identifier: GPL-3.0-or-later
#ifndef _GNU_SOURCE
#define _GNU_SOURCE
#endif
-#include <stdint.h>
-#include <stdlib.h>
-#include <string.h>
-#include <pthread.h>
-#include <inttypes.h>
-
-#include "c_rhash/c_rhash.h"
+#include "libnetdata/libnetdata.h"
#include "common_internal.h"
#include "mqtt_constants.h"
@@ -26,10 +20,8 @@
#define SMALL_STRING_DONT_FRAGMENT_LIMIT 128
-#define MIN(a,b) (((a)<(b))?(a):(b))
-
-#define LOCK_HDR_BUFFER(buffer) pthread_mutex_lock(&((buffer)->mutex))
-#define UNLOCK_HDR_BUFFER(buffer) pthread_mutex_unlock(&((buffer)->mutex))
+#define LOCK_HDR_BUFFER(buffer) spinlock_lock(&((buffer)->spinlock))
+#define UNLOCK_HDR_BUFFER(buffer) spinlock_unlock(&((buffer)->spinlock))
#define BUFFER_FRAG_GARBAGE_COLLECT 0x01
// some packets can be marked for garbage collection
@@ -75,7 +67,7 @@ struct transaction_buffer {
// to be able to revert state easily
// in case of error mid processing
struct header_buffer state_backup;
- pthread_mutex_t mutex;
+ SPINLOCK spinlock;
struct buffer_fragment *sending_frag;
};
@@ -423,7 +415,7 @@ static void buffer_frag_free_data(struct buffer_fragment *frag)
if ( frag->flags & BUFFER_FRAG_DATA_EXTERNAL && frag->data != NULL) {
switch (ptr2memory_mode(frag->free_fnc)) {
case MEMCPY:
- mw_free(frag->data);
+ freez(frag->data);
break;
case EXTERNAL_FREE_AFTER_USE:
frag->free_fnc(frag->data);
@@ -563,7 +555,7 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_
if (buf->hdr_buffer.size > max)
buf->hdr_buffer.size = max;
- void *ret = mw_realloc(buf->hdr_buffer.data, buf->hdr_buffer.size);
+ void *ret = reallocz(buf->hdr_buffer.data, buf->hdr_buffer.size);
if (ret == NULL) {
mws_warn(log_ctx, "Buffer growth failed (realloc)");
return 1;
@@ -578,10 +570,10 @@ static int transaction_buffer_grow(struct transaction_buffer *buf, mqtt_wss_log_
inline static int transaction_buffer_init(struct transaction_buffer *to_init, size_t size)
{
- pthread_mutex_init(&to_init->mutex, NULL);
+ spinlock_init(&to_init->spinlock);
to_init->hdr_buffer.size = size;
- to_init->hdr_buffer.data = mw_malloc(size);
+ to_init->hdr_buffer.data = mallocz(size);
if (to_init->hdr_buffer.data == NULL)
return 1;
@@ -593,8 +585,7 @@ inline static int transaction_buffer_init(struct transaction_buffer *to_init, si
static void transaction_buffer_destroy(struct transaction_buffer *to_init)
{
buffer_purge(&to_init->hdr_buffer);
- pthread_mutex_destroy(&to_init->mutex);
- mw_free(to_init->hdr_buffer.data);
+ freez(to_init->hdr_buffer.data);
}
// Creates transaction
@@ -628,7 +619,7 @@ void transaction_buffer_transaction_rollback(struct transaction_buffer *buf, str
#define RX_ALIASES_INITIALIZE() c_rhash_new(UINT16_MAX >> 8)
struct mqtt_ng_client *mqtt_ng_init(struct mqtt_ng_init *settings)
{
- struct mqtt_ng_client *client = mw_calloc(1, sizeof(struct mqtt_ng_client));
+ struct mqtt_ng_client *client = callocz(1, sizeof(struct mqtt_ng_client));
if (client == NULL)
return NULL;
@@ -672,7 +663,7 @@ err_free_rx_alias:
err_free_trx_buf:
transaction_buffer_destroy(&client->main_buffer);
err_free_client:
- mw_free(client);
+ freez(client);
return NULL;
}
@@ -688,7 +679,7 @@ static void mqtt_ng_destroy_rx_alias_hash(c_rhash hash)
void *to_free;
while(!c_rhash_iter_uint64_keys(hash, &i, &stored_key)) {
c_rhash_get_ptr_by_uint64(hash, stored_key, &to_free);
- mw_free(to_free);
+ freez(to_free);
}
c_rhash_destroy(hash);
}
@@ -700,7 +691,7 @@ static void mqtt_ng_destroy_tx_alias_hash(c_rhash hash)
void *to_free;
while(!c_rhash_iter_str_keys(hash, &i, &stored_key)) {
c_rhash_get_ptr_by_str(hash, stored_key, &to_free);
- mw_free(to_free);
+ freez(to_free);
}
c_rhash_destroy(hash);
}
@@ -714,7 +705,7 @@ void mqtt_ng_destroy(struct mqtt_ng_client *client)
pthread_rwlock_destroy(&client->tx_topic_aliases.rwlock);
mqtt_ng_destroy_rx_alias_hash(client->rx_aliases);
- mw_free(client);
+ freez(client);
}
int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag, void *data, size_t data_len, free_fnc_t data_free_fnc)
@@ -730,7 +721,7 @@ int frag_set_external_data(mqtt_wss_log_ctx_t log, struct buffer_fragment *frag,
switch (ptr2memory_mode(data_free_fnc)) {
case MEMCPY:
- frag->data = mw_malloc(data_len);
+ frag->data = mallocz(data_len);
if (frag->data == NULL) {
mws_error(log, UNIT_LOG_PREFIX "OOM while malloc @_optimized_add");
return 1;
@@ -1408,12 +1399,12 @@ static void mqtt_properties_parser_ctx_reset(struct mqtt_properties_parser_ctx *
struct mqtt_property *f = ctx->head;
ctx->head = ctx->head->next;
if (f->type == MQTT_TYPE_STR || f->type == MQTT_TYPE_STR_PAIR)
- mw_free(f->data.strings[0]);
+ freez(f->data.strings[0]);
if (f->type == MQTT_TYPE_STR_PAIR)
- mw_free(f->data.strings[1]);
+ freez(f->data.strings[1]);
if (f->type == MQTT_TYPE_BIN)
- mw_free(f->data.bindata);
- mw_free(f);
+ freez(f->data.bindata);
+ freez(f);
}
ctx->tail = NULL;
ctx->properties_length = 0;
@@ -1498,7 +1489,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
return rc;
case PROPERTY_CREATE:
BUF_READ_CHECK_AT_LEAST(data, 1);
- struct mqtt_property *prop = mw_calloc(1, sizeof(struct mqtt_property));
+ struct mqtt_property *prop = callocz(1, sizeof(struct mqtt_property));
if (ctx->head == NULL) {
ctx->head = prop;
ctx->tail = prop;
@@ -1558,7 +1549,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
break;
case PROPERTY_TYPE_STR:
BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
- ctx->tail->data.strings[ctx->str_idx] = mw_malloc(ctx->tail->bindata_len + 1);
+ ctx->tail->data.strings[ctx->str_idx] = mallocz(ctx->tail->bindata_len + 1);
rbuf_pop(data, ctx->tail->data.strings[ctx->str_idx], ctx->tail->bindata_len);
ctx->tail->data.strings[ctx->str_idx][ctx->tail->bindata_len] = 0;
ctx->str_idx++;
@@ -1571,7 +1562,7 @@ static int parse_properties_array(struct mqtt_properties_parser_ctx *ctx, rbuf_t
break;
case PROPERTY_TYPE_BIN:
BUF_READ_CHECK_AT_LEAST(data, ctx->tail->bindata_len);
- ctx->tail->data.bindata = mw_malloc(ctx->tail->bindata_len);
+ ctx->tail->data.bindata = mallocz(ctx->tail->bindata_len);
rbuf_pop(data, ctx->tail->data.bindata, ctx->tail->bindata_len);
ctx->bytes_consumed += ctx->tail->bindata_len;
ctx->state = PROPERTY_NEXT;
@@ -1721,7 +1712,7 @@ static int parse_suback_varhdr(struct mqtt_ng_client *client)
return rc;
parser->mqtt_parsed_len += parser->properties_parser.bytes_consumed;
suback->reason_code_count = parser->mqtt_fixed_hdr_remaining_length - parser->mqtt_parsed_len;
- suback->reason_codes = mw_calloc(suback->reason_code_count, sizeof(*suback->reason_codes));
+ suback->reason_codes = callocz(suback->reason_code_count, sizeof(*suback->reason_codes));
suback->reason_codes_pending = suback->reason_code_count;
parser->varhdr_state = MQTT_PARSE_REASONCODES;
/* FALLTHROUGH */
@@ -1760,7 +1751,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
parser->varhdr_state = MQTT_PARSE_VARHDR_POST_TOPICNAME;
break;
}
- publish->topic = mw_calloc(1, publish->topic_len + 1 /* add 0x00 */);
+ publish->topic = callocz(1, publish->topic_len + 1 /* add 0x00 */);
if (publish->topic == NULL)
return MQTT_NG_CLIENT_OOM;
parser->varhdr_state = MQTT_PARSE_VARHDR_TOPICNAME;
@@ -1796,7 +1787,7 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
/* FALLTHROUGH */
case MQTT_PARSE_PAYLOAD:
if (parser->mqtt_fixed_hdr_remaining_length < parser->mqtt_parsed_len) {
- mw_free(publish->topic);
+ freez(publish->topic);
publish->topic = NULL;
ERROR("Error parsing PUBLISH message");
return MQTT_NG_CLIENT_PROTOCOL_ERROR;
@@ -1808,9 +1799,9 @@ static int parse_publish_varhdr(struct mqtt_ng_client *client)
}
BUF_READ_CHECK_AT_LEAST(parser->received_data, publish->data_len);
- publish->data = mw_malloc(publish->data_len);
+ publish->data = mallocz(publish->data_len);
if (publish->data == NULL) {
- mw_free(publish->topic);
+ freez(publish->topic);
publish->topic = NULL;
return MQTT_NG_CLIENT_OOM;
}
@@ -1867,7 +1858,7 @@ static int parse_data(struct mqtt_ng_client *client)
case MQTT_CPT_SUBACK:
rc = parse_suback_varhdr(client);
if (rc != MQTT_NG_CLIENT_NEED_MORE_BYTES && rc != MQTT_NG_CLIENT_OK_CALL_AGAIN) {
- mw_free(parser->mqtt_packet.suback.reason_codes);
+ freez(parser->mqtt_packet.suback.reason_codes);
}
if (rc == MQTT_NG_CLIENT_PARSE_DONE) {
parser->state = MQTT_PARSE_MQTT_PACKET_DONE;
@@ -2096,8 +2087,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
#endif
pub = &client->parser.mqtt_packet.publish;
if (pub->qos > 1) {
- mw_free(pub->topic);
- mw_free(pub->data);
+ freez(pub->topic);
+ freez(pub->data);
return MQTT_NG_CLIENT_NOT_IMPL_YET;
}
if ( pub->qos == 1 && (rc = mqtt_ng_puback(client, pub->packet_id, 0)) ) {
@@ -2127,8 +2118,8 @@ int handle_incoming_traffic(struct mqtt_ng_client *client)
// in case we have property topic alias and we have topic we take over the string
// and add pointer to it into topic alias list
if (prop == NULL)
- mw_free(pub->topic);
- mw_free(pub->data);
+ freez(pub->topic);
+ freez(pub->data);
return MQTT_NG_CLIENT_WANT_WRITE;
case MQTT_CPT_DISCONNECT:
INFO ("Got MQTT DISCONNECT control packet from server. Reason code: %d", (int)client->parser.mqtt_packet.disconnect.reason_code);
@@ -2225,7 +2216,7 @@ int mqtt_ng_set_topic_alias(struct mqtt_ng_client *client, const char *topic)
return idx;
}
- alias = mw_malloc(sizeof(struct topic_alias_data));
+ alias = mallocz(sizeof(struct topic_alias_data));
idx = ++client->tx_topic_aliases.idx_assigned;
alias->idx = idx;
__atomic_store_n(&alias->usage_count, 0, __ATOMIC_SEQ_CST);