summaryrefslogtreecommitdiffstats
path: root/exporting/tests
diff options
context:
space:
mode:
authorVladimir Kobal <vlad@prokk.net>2020-02-25 21:08:41 +0200
committerGitHub <noreply@github.com>2020-02-25 21:08:41 +0200
commitd79bbbf943f72495e135eee4afc25723f886592f (patch)
tree1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting/tests
parent84421fdf0b513e9e7dc1351550b96044e92c363d (diff)
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector * Update the documentation * Rename functions in backends * Include the connector to the Netdata buid * Add initializers and a worker * Add Kinesis specific configuration options * Add a compile time configuration check * Remove the connector data structure * Restore unit tests * Fix the compile-time configuration check * Initialize AWS SDK only once * Don't create an instance for an unknown exporting connector * Separate client and request outcome data for every instance * Fix memory cleanup, document functions * Add unit tests * Update the documentation
Diffstat (limited to 'exporting/tests')
-rw-r--r--exporting/tests/exporting_doubles.c52
-rw-r--r--exporting/tests/exporting_fixtures.c8
-rw-r--r--exporting/tests/test_exporting_engine.c270
-rw-r--r--exporting/tests/test_exporting_engine.h10
4 files changed, 247 insertions, 93 deletions
diff --git a/exporting/tests/exporting_doubles.c b/exporting/tests/exporting_doubles.c
index 1860d750ed..d36d09e5ed 100644
--- a/exporting/tests/exporting_doubles.c
+++ b/exporting/tests/exporting_doubles.c
@@ -16,13 +16,11 @@ struct engine *__mock_read_exporting_config()
engine->config.hostname = strdupz("test-host");
engine->config.update_every = 3;
- engine->connector_root = calloc(1, sizeof(struct connector));
- engine->connector_root->config.type = BACKEND_TYPE_GRAPHITE;
- engine->connector_root->engine = engine;
- engine->connector_root->instance_root = calloc(1, sizeof(struct instance));
- struct instance *instance = engine->connector_root->instance_root;
- instance->connector = engine->connector_root;
+ engine->instance_root = calloc(1, sizeof(struct instance));
+ struct instance *instance = engine->instance_root;
+ instance->engine = engine;
+ instance->config.type = BACKEND_TYPE_GRAPHITE;
instance->config.name = strdupz("instance_name");
instance->config.destination = strdupz("localhost");
instance->config.update_every = 1;
@@ -160,3 +158,45 @@ int __mock_end_batch_formatting(struct instance *instance)
check_expected_ptr(instance);
return mock_type(int);
}
+
+#if HAVE_KINESIS
+void __wrap_aws_sdk_init()
+{
+ function_called();
+}
+
+void __wrap_kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout)
+{
+ function_called();
+ check_expected_ptr(kinesis_specific_data_p);
+ check_expected_ptr(region);
+ check_expected_ptr(access_key_id);
+ check_expected_ptr(secret_key);
+ check_expected(timeout);
+}
+
+void __wrap_kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len)
+{
+ function_called();
+ check_expected_ptr(kinesis_specific_data_p);
+ check_expected_ptr(stream_name);
+ check_expected_ptr(partition_key);
+ check_expected_ptr(data);
+ check_expected_ptr(data);
+ check_expected(data_len);
+}
+
+int __wrap_kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes)
+{
+ function_called();
+ check_expected_ptr(request_outcomes_p);
+ check_expected_ptr(error_message);
+ check_expected_ptr(sent_bytes);
+ check_expected_ptr(lost_bytes);
+ return mock_type(int);
+}
+#endif //HAVE_KINESIS
diff --git a/exporting/tests/exporting_fixtures.c b/exporting/tests/exporting_fixtures.c
index f92575e337..677494cb88 100644
--- a/exporting/tests/exporting_fixtures.c
+++ b/exporting/tests/exporting_fixtures.c
@@ -15,15 +15,13 @@ int teardown_configured_engine(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
free((void *)instance->config.destination);
free((void *)instance->config.name);
simple_pattern_free(instance->config.charts_pattern);
simple_pattern_free(instance->config.hosts_pattern);
free(instance);
- free(engine->connector_root);
-
free((void *)engine->config.prefix);
free((void *)engine->config.hostname);
free(engine);
@@ -122,8 +120,8 @@ int teardown_initialized_engine(void **state)
struct engine *engine = *state;
teardown_rrdhost();
- buffer_free(engine->connector_root->instance_root->labels);
- buffer_free(engine->connector_root->instance_root->buffer);
+ buffer_free(engine->instance_root->labels);
+ buffer_free(engine->instance_root->buffer);
teardown_configured_engine(state);
return 0;
diff --git a/exporting/tests/test_exporting_engine.c b/exporting/tests/test_exporting_engine.c
index 089c4a2004..1a5be4a680 100644
--- a/exporting/tests/test_exporting_engine.c
+++ b/exporting/tests/test_exporting_engine.c
@@ -21,16 +21,16 @@ void init_connectors_in_tests(struct engine *engine)
expect_function_call(__wrap_uv_thread_create);
- expect_value(__wrap_uv_thread_create, thread, &engine->connector_root->instance_root->thread);
+ expect_value(__wrap_uv_thread_create, thread, &engine->instance_root->thread);
expect_value(__wrap_uv_thread_create, worker, simple_connector_worker);
- expect_value(__wrap_uv_thread_create, arg, engine->connector_root->instance_root);
+ expect_value(__wrap_uv_thread_create, arg, engine->instance_root);
expect_function_call(__wrap_uv_thread_set_name_np);
assert_int_equal(__real_init_connectors(engine), 0);
assert_int_equal(engine->now, 2);
- assert_int_equal(engine->connector_root->instance_root->after, 2);
+ assert_int_equal(engine->instance_root->after, 2);
}
static void test_exporting_engine(void **state)
@@ -82,16 +82,12 @@ static void test_read_exporting_config(void **state)
assert_int_equal(engine->config.update_every, 3);
assert_int_equal(engine->instance_num, 0);
- struct connector *connector = engine->connector_root;
- assert_ptr_not_equal(connector, NULL);
- assert_ptr_equal(connector->next, NULL);
- assert_ptr_equal(connector->engine, engine);
- assert_int_equal(connector->config.type, BACKEND_TYPE_GRAPHITE);
- struct instance *instance = connector->instance_root;
+ struct instance *instance = engine->instance_root;
assert_ptr_not_equal(instance, NULL);
assert_ptr_equal(instance->next, NULL);
- assert_ptr_equal(instance->connector, connector);
+ assert_ptr_equal(instance->engine, engine);
+ assert_int_equal(instance->config.type, BACKEND_TYPE_GRAPHITE);
assert_string_equal(instance->config.destination, "localhost");
assert_int_equal(instance->config.update_every, 1);
assert_int_equal(instance->config.buffer_on_failures, 10);
@@ -111,16 +107,15 @@ static void test_init_connectors(void **state)
assert_int_equal(engine->instance_num, 1);
- struct connector *connector = engine->connector_root;
- assert_ptr_equal(connector->next, NULL);
- assert_ptr_equal(connector->worker, simple_connector_worker);
+ struct instance *instance = engine->instance_root;
- struct simple_connector_config *connector_specific_config = connector->config.connector_specific_config;
- assert_int_equal(connector_specific_config->default_port, 2003);
-
- struct instance *instance = connector->instance_root;
assert_ptr_equal(instance->next, NULL);
assert_int_equal(instance->index, 0);
+
+ struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config;
+ assert_int_equal(connector_specific_config->default_port, 2003);
+
+ assert_ptr_equal(instance->worker, simple_connector_worker);
assert_ptr_equal(instance->start_batch_formatting, NULL);
assert_ptr_equal(instance->start_host_formatting, format_host_labels_graphite_plaintext);
assert_ptr_equal(instance->start_chart_formatting, NULL);
@@ -138,16 +133,13 @@ static void test_init_connectors(void **state)
static void test_init_graphite_instance(void **state)
{
struct engine *engine = *state;
- struct connector *connector = engine->connector_root;
- struct instance *instance = connector->instance_root;
-
- init_graphite_connector(connector);
- assert_int_equal(
- ((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 2003);
- freez(connector->config.connector_specific_config);
+ struct instance *instance = engine->instance_root;
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_graphite_instance(instance), 0);
+ assert_int_equal(
+ ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 2003);
+ freez(instance->config.connector_specific_config);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
@@ -160,16 +152,13 @@ static void test_init_graphite_instance(void **state)
static void test_init_json_instance(void **state)
{
struct engine *engine = *state;
- struct connector *connector = engine->connector_root;
- struct instance *instance = connector->instance_root;
-
- init_json_connector(connector);
- assert_int_equal(
- ((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 5448);
- freez(connector->config.connector_specific_config);
+ struct instance *instance = engine->instance_root;
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_json_instance(instance), 0);
+ assert_int_equal(
+ ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 5448);
+ freez(instance->config.connector_specific_config);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_json_plaintext);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
@@ -179,25 +168,16 @@ static void test_init_json_instance(void **state)
assert_ptr_equal(instance->metric_formatting, format_dimension_stored_json_plaintext);
}
-static void test_init_opentsdb_connector(void **state)
-{
- struct engine *engine = *state;
- struct connector *connector = engine->connector_root;
-
- init_opentsdb_connector(connector);
- assert_int_equal(
- ((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 4242);
- freez(connector->config.connector_specific_config);
-}
-
static void test_init_opentsdb_telnet_instance(void **state)
{
struct engine *engine = *state;
- struct connector *connector = engine->connector_root;
- struct instance *instance = connector->instance_root;
+ struct instance *instance = engine->instance_root;
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_opentsdb_telnet_instance(instance), 0);
+ assert_int_equal(
+ ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 4242);
+ freez(instance->config.connector_specific_config);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_opentsdb_telnet);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
@@ -210,11 +190,13 @@ static void test_init_opentsdb_telnet_instance(void **state)
static void test_init_opentsdb_http_instance(void **state)
{
struct engine *engine = *state;
- struct connector *connector = engine->connector_root;
- struct instance *instance = connector->instance_root;
+ struct instance *instance = engine->instance_root;
instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
assert_int_equal(init_opentsdb_http_instance(instance), 0);
+ assert_int_equal(
+ ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 4242);
+ freez(instance->config.connector_specific_config);
assert_ptr_equal(instance->metric_formatting, format_dimension_collected_opentsdb_http);
assert_ptr_not_equal(instance->buffer, NULL);
buffer_free(instance->buffer);
@@ -230,7 +212,7 @@ static void test_mark_scheduled_instances(void **state)
assert_int_equal(__real_mark_scheduled_instances(engine), 1);
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
assert_int_equal(instance->scheduled, 1);
assert_int_equal(instance->before, 2);
}
@@ -238,7 +220,7 @@ static void test_mark_scheduled_instances(void **state)
static void test_rrdhost_is_exportable(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
expect_function_call(__wrap_info_int);
@@ -255,7 +237,7 @@ static void test_rrdhost_is_exportable(void **state)
static void test_false_rrdhost_is_exportable(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
simple_pattern_free(instance->config.hosts_pattern);
instance->config.hosts_pattern = simple_pattern_create("!*", NULL, SIMPLE_PATTERN_EXACT);
@@ -275,7 +257,7 @@ static void test_false_rrdhost_is_exportable(void **state)
static void test_rrdset_is_exportable(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
RRDSET *st = localhost->rrdset_root;
assert_ptr_equal(st->exporting_flags, NULL);
@@ -289,7 +271,7 @@ static void test_rrdset_is_exportable(void **state)
static void test_false_rrdset_is_exportable(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
RRDSET *st = localhost->rrdset_root;
simple_pattern_free(instance->config.charts_pattern);
@@ -306,7 +288,7 @@ static void test_false_rrdset_is_exportable(void **state)
static void test_exporting_calculate_value_from_stored_data(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
RRDDIM *rd = localhost->rrdset_root->dimensions;
time_t timestamp;
@@ -344,7 +326,7 @@ static void test_exporting_calculate_value_from_stored_data(void **state)
static void test_prepare_buffers(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
instance->start_batch_formatting = __mock_start_batch_formatting;
instance->start_host_formatting = __mock_start_host_formatting;
@@ -434,9 +416,9 @@ static void test_format_dimension_collected_graphite_plaintext(void **state)
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_collected_graphite_plaintext(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_collected_graphite_plaintext(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n");
}
@@ -448,9 +430,9 @@ static void test_format_dimension_stored_graphite_plaintext(void **state)
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_stored_graphite_plaintext(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_stored_graphite_plaintext(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 690565856.0000000 15052\n");
}
@@ -459,9 +441,9 @@ static void test_format_dimension_collected_json_plaintext(void **state)
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_collected_json_plaintext(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_collected_json_plaintext(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\","
"\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\","
"\"chart_context\":\"(null)\",\"chart_type\":\"(null)\",\"units\":\"(null)\",\"id\":\"dimension_id\","
@@ -476,9 +458,9 @@ static void test_format_dimension_stored_json_plaintext(void **state)
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_stored_json_plaintext(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_stored_json_plaintext(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\","
"\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\"," \
"\"chart_context\": \"(null)\",\"chart_type\":\"(null)\",\"units\": \"(null)\",\"id\":\"dimension_id\","
@@ -490,9 +472,9 @@ static void test_format_dimension_collected_opentsdb_telnet(void **state)
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_collected_opentsdb_telnet(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_collected_opentsdb_telnet(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"put netdata.chart_name.dimension_name 15051 123000321 host=test-host TAG1=VALUE1 TAG2=VALUE2\n");
}
@@ -504,9 +486,9 @@ static void test_format_dimension_stored_opentsdb_telnet(void **state)
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_stored_opentsdb_telnet(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_stored_opentsdb_telnet(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"put netdata.chart_name.dimension_name 15052 690565856.0000000 host=test-host TAG1=VALUE1 TAG2=VALUE2\n");
}
@@ -515,9 +497,9 @@ static void test_format_dimension_collected_opentsdb_http(void **state)
struct engine *engine = *state;
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_collected_opentsdb_http(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_collected_opentsdb_http(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"POST /api/put HTTP/1.1\r\n"
"Host: test-host\r\n"
"Content-Type: application/json\r\n"
@@ -536,9 +518,9 @@ static void test_format_dimension_stored_opentsdb_http(void **state)
will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS));
RRDDIM *rd = localhost->rrdset_root->dimensions;
- assert_int_equal(format_dimension_stored_opentsdb_http(engine->connector_root->instance_root, rd), 0);
+ assert_int_equal(format_dimension_stored_opentsdb_http(engine->instance_root, rd), 0);
assert_string_equal(
- buffer_tostring(engine->connector_root->instance_root->buffer),
+ buffer_tostring(engine->instance_root->buffer),
"POST /api/put HTTP/1.1\r\n"
"Host: test-host\r\n"
"Content-Type: application/json\r\n"
@@ -558,7 +540,7 @@ static void test_exporting_discard_response(void **state)
expect_function_call(__wrap_info_int);
- assert_int_equal(exporting_discard_response(response, engine->connector_root->instance_root), 0);
+ assert_int_equal(exporting_discard_response(response, engine->instance_root), 0);
assert_string_equal(
log_line,
@@ -572,7 +554,7 @@ static void test_exporting_discard_response(void **state)
static void test_simple_connector_receive_response(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
struct stats *stats = &instance->stats;
int sock = 1;
@@ -599,7 +581,7 @@ static void test_simple_connector_receive_response(void **state)
static void test_simple_connector_send_buffer(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
struct stats *stats = &instance->stats;
BUFFER *buffer = instance->buffer;
@@ -645,7 +627,7 @@ static void test_simple_connector_send_buffer(void **state)
static void test_simple_connector_worker(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
BUFFER *buffer = instance->buffer;
__real_mark_scheduled_instances(engine);
@@ -721,7 +703,7 @@ static void test_sanitize_opentsdb_label_value(void **state)
static void test_format_host_labels_json_plaintext(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
@@ -733,7 +715,7 @@ static void test_format_host_labels_json_plaintext(void **state)
static void test_format_host_labels_graphite_plaintext(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
@@ -745,7 +727,7 @@ static void test_format_host_labels_graphite_plaintext(void **state)
static void test_format_host_labels_opentsdb_telnet(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
@@ -757,7 +739,7 @@ static void test_format_host_labels_opentsdb_telnet(void **state)
static void test_format_host_labels_opentsdb_http(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS;
instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS;
@@ -769,7 +751,7 @@ static void test_format_host_labels_opentsdb_http(void **state)
static void test_flush_host_labels(void **state)
{
struct engine *engine = *state;
- struct instance *instance = engine->connector_root->instance_root;
+ struct instance *instance = engine->instance_root;
instance->labels = buffer_create(12);
buffer_strcat(instance->labels, "check string");
@@ -779,6 +761,117 @@ static void test_flush_host_labels(void **state)
assert_int_equal(buffer_strlen(instance->labels), 0);
}
+#if HAVE_KINESIS
+static void test_init_aws_kinesis_instance(void **state)
+{
+ struct engine *engine = *state;
+ struct instance *instance = engine->instance_root;
+
+ instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES;
+
+ struct aws_kinesis_specific_config *connector_specific_config =
+ callocz(1, sizeof(struct aws_kinesis_specific_config));
+ instance->config.connector_specific_config = connector_specific_config;
+ connector_specific_config->stream_name = strdupz("test_stream");
+ connector_specific_config->auth_key_id = strdupz("test_auth_key_id");
+ connector_specific_config->secure_key = strdupz("test_secure_key");
+
+ expect_function_call(__wrap_aws_sdk_init);
+ expect_function_call(__wrap_kinesis_init);
+ expect_not_value(__wrap_kinesis_init, kinesis_specific_data_p, NULL);
+ expect_string(__wrap_kinesis_init, region, "localhost");
+ expect_string(__wrap_kinesis_init, access_key_id, "test_auth_key_id");
+ expect_string(__wrap_kinesis_init, secret_key, "test_secure_key");
+ expect_value(__wrap_kinesis_init, timeout, 10000);
+ assert_int_equal(init_aws_kinesis_instance(instance), 0);
+
+ assert_ptr_equal(instance->worker, aws_kinesis_connector_worker);
+ assert_ptr_equal(instance->start_batch_formatting, NULL);
+ assert_ptr_equal(instance->start_host_formatting, format_host_labels_json_plaintext);
+ assert_ptr_equal(instance->start_chart_formatting, NULL);
+ assert_ptr_equal(instance->metric_formatting, format_dimension_collected_json_plaintext);
+ assert_ptr_equal(instance->end_chart_formatting, NULL);
+ assert_ptr_equal(instance->end_host_formatting, flush_host_labels);
+ assert_ptr_equal(instance->end_batch_formatting, NULL);
+ assert_ptr_not_equal(instance->buffer, NULL);
+ buffer_free(instance->buffer);
+ assert_ptr_not_equal(instance->connector_specific_data, NULL);
+ freez(instance->connector_specific_data);
+
+ instance->config.options = EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_OPTION_SEND_NAMES;
+
+ expect_function_call(__wrap_kinesis_init);
+ expect_not_value(__wrap_kinesis_init, kinesis_specific_data_p, NULL);
+ expect_string(__wrap_kinesis_init, region, "localhost");
+ expect_string(__wrap_kinesis_init, access_key_id, "test_auth_key_id");
+ expect_string(__wrap_kinesis_init, secret_key, "test_secure_key");
+ expect_value(__wrap_kinesis_init, timeout, 10000);
+
+ assert_int_equal(init_aws_kinesis_instance(instance), 0);
+ assert_ptr_equal(instance->metric_formatting, format_dimension_stored_json_plaintext);
+
+ free(connector_specific_config->stream_name);
+ free(connector_specific_config->auth_key_id);
+ free(connector_specific_config->secure_key);
+}
+
+static void test_aws_kinesis_connector_worker(void **state)
+{
+ struct engine *engine = *state;
+ struct instance *instance = engine->instance_root;
+ BUFFER *buffer = instance->buffer;
+
+ __real_mark_scheduled_instances(engine);
+
+ expect_function_call(__wrap_rrdhost_is_exportable);
+ expect_value(__wrap_rrdhost_is_exportable, instance, instance);
+ expect_value(__wrap_rrdhost_is_exportable, host, localhost);
+ will_return(__wrap_rrdhost_is_exportable, 1);
+
+ RRDSET *st = localhost->rrdset_root;
+ expect_function_call(__wrap_rrdset_is_exportable);
+ expect_value(__wrap_rrdset_is_exportable, instance, instance);
+ expect_value(__wrap_rrdset_is_exportable, st, st);
+ will_return(__wrap_rrdset_is_exportable, 1);
+
+ __real_prepare_buffers(engine);
+
+ struct aws_kinesis_specific_config *connector_specific_config =
+ callocz(1, sizeof(struct aws_kinesis_specific_config));
+ instance->config.connector_specific_config = connector_specific_config;
+ connector_specific_config->stream_name = strdupz("test_stream");
+ connector_specific_config->auth_key_id = strdupz("test_auth_key_id");
+ connector_specific_config->secure_key = strdupz("test_secure_key");
+
+ struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data));
+ instance->connector_specific_data = (void *)connector_specific_data;
+
+ expect_function_call(__wrap_kinesis_put_record);
+ expect_not_value(__wrap_kinesis_put_record, kinesis_specific_data_p, NULL);
+ expect_string(__wrap_kinesis_put_record, stream_name, "test_stream");
+ expect_string(__wrap_kinesis_put_record, partition_key, "netdata_0");
+ expect_value(__wrap_kinesis_put_record, data, buffer_tostring(buffer));
+ // The buffer is prepated by Graphite exporting connector
+ expect_string(
+ __wrap_kinesis_put_record, data,
+ "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n");
+ expect_value(__wrap_kinesis_put_record, data_len, 84);
+
+ expect_function_call(__wrap_kinesis_get_result);
+ expect_value(__wrap_kinesis_get_result, request_outcomes_p, NULL);
+ expect_not_value(__wrap_kinesis_get_result, error_message, NULL);
+ expect_not_value(__wrap_kinesis_get_result, sent_bytes, NULL);
+ expect_not_value(__wrap_kinesis_get_result, lost_bytes, NULL);
+ will_return(__wrap_kinesis_get_result, 0);
+
+ aws_kinesis_connector_worker(instance);
+
+ free(connector_specific_config->stream_name);
+ free(connector_specific_config->auth_key_id);
+ free(connector_specific_config->secure_key);
+}
+#endif // HAVE_KINESIS
+
int main(void)
{
const struct CMUnitTest tests[] = {
@@ -790,8 +883,6 @@ int main(void)
cmocka_unit_test_setup_teardown(
test_init_json_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
- test_init_opentsdb_connector, setup_configured_engine, teardown_configured_engine),
- cmocka_unit_test_setup_teardown(
test_init_opentsdb_telnet_instance, setup_configured_engine, teardown_configured_engine),
cmocka_unit_test_setup_teardown(
test_init_opentsdb_http_instance, setup_configured_engine, teardown_configured_engine),
@@ -850,6 +941,21 @@ int main(void)
cmocka_unit_test_setup_teardown(test_flush_host_labels, setup_initialized_engine, teardown_initialized_engine),
};
- return cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL) +
- cmocka_run_group_tests_name("labels_in_exporting_engine", label_tests, NULL, NULL);
+#if HAVE_KINESIS
+ const struct CMUnitTest kinesis_tests[] = {
+ cmocka_unit_test_setup_teardown(
+ test_init_aws_kinesis_instance, setup_configured_engine, teardown_configured_engine),
+ cmocka_unit_test_setup_teardown(
+ test_aws_kinesis_connector_worker, setup_initialized_engine, teardown_initialized_engine),
+ };
+#endif
+
+ int test_res = cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL) +
+ cmocka_run_group_tests_name("labels_in_exporting_engine", label_tests, NULL, NULL);
+
+#if HAVE_KINESIS
+ test_res += cmocka_run_group_tests_name("kinesis_exporting_connector", kinesis_tests, NULL, NULL);
+#endif
+
+ return test_res;
}
diff --git a/exporting/tests/test_exporting_engine.h b/exporting/tests/test_exporting_engine.h
index c49cdc19b3..ed5cab7d85 100644
--- a/exporting/tests/test_exporting_engine.h
+++ b/exporting/tests/test_exporting_engine.h
@@ -9,6 +9,7 @@
#include "exporting/graphite/graphite.h"
#include "exporting/json/json.h"
#include "exporting/opentsdb/opentsdb.h"
+#include "exporting/aws_kinesis/aws_kinesis.h"
#include <stdarg.h>
#include <stddef.h>
@@ -95,6 +96,15 @@ int __mock_end_chart_formatting(struct instance *instance, RRDSET *st);
int __mock_end_host_formatting(struct instance *instance, RRDHOST *host);
int __mock_end_batch_formatting(struct instance *instance);
+void __wrap_aws_sdk_init();
+void __wrap_kinesis_init(
+ void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key,
+ const long timeout);
+void __wrap_kinesis_put_record(
+ void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data,
+ size_t data_len);
+int __wrap_kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes);
+
// -----------------------------------------------------------------------
// fixtures