diff options
author | Vladimir Kobal <vlad@prokk.net> | 2020-02-25 21:08:41 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2020-02-25 21:08:41 +0200 |
commit | d79bbbf943f72495e135eee4afc25723f886592f (patch) | |
tree | 1637e6f719f9923e92bad2e5033dce6207c2b9c1 /exporting/tests | |
parent | 84421fdf0b513e9e7dc1351550b96044e92c363d (diff) |
Add an AWS Kinesis connector to the exporting engine (#8145)
* Prepare files for the AWS Kinesis exporting connector
* Update the documentation
* Rename functions in backends
* Include the connector to the Netdata buid
* Add initializers and a worker
* Add Kinesis specific configuration options
* Add a compile time configuration check
* Remove the connector data structure
* Restore unit tests
* Fix the compile-time configuration check
* Initialize AWS SDK only once
* Don't create an instance for an unknown exporting connector
* Separate client and request outcome data for every instance
* Fix memory cleanup, document functions
* Add unit tests
* Update the documentation
Diffstat (limited to 'exporting/tests')
-rw-r--r-- | exporting/tests/exporting_doubles.c | 52 | ||||
-rw-r--r-- | exporting/tests/exporting_fixtures.c | 8 | ||||
-rw-r--r-- | exporting/tests/test_exporting_engine.c | 270 | ||||
-rw-r--r-- | exporting/tests/test_exporting_engine.h | 10 |
4 files changed, 247 insertions, 93 deletions
diff --git a/exporting/tests/exporting_doubles.c b/exporting/tests/exporting_doubles.c index 1860d750ed..d36d09e5ed 100644 --- a/exporting/tests/exporting_doubles.c +++ b/exporting/tests/exporting_doubles.c @@ -16,13 +16,11 @@ struct engine *__mock_read_exporting_config() engine->config.hostname = strdupz("test-host"); engine->config.update_every = 3; - engine->connector_root = calloc(1, sizeof(struct connector)); - engine->connector_root->config.type = BACKEND_TYPE_GRAPHITE; - engine->connector_root->engine = engine; - engine->connector_root->instance_root = calloc(1, sizeof(struct instance)); - struct instance *instance = engine->connector_root->instance_root; - instance->connector = engine->connector_root; + engine->instance_root = calloc(1, sizeof(struct instance)); + struct instance *instance = engine->instance_root; + instance->engine = engine; + instance->config.type = BACKEND_TYPE_GRAPHITE; instance->config.name = strdupz("instance_name"); instance->config.destination = strdupz("localhost"); instance->config.update_every = 1; @@ -160,3 +158,45 @@ int __mock_end_batch_formatting(struct instance *instance) check_expected_ptr(instance); return mock_type(int); } + +#if HAVE_KINESIS +void __wrap_aws_sdk_init() +{ + function_called(); +} + +void __wrap_kinesis_init( + void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key, + const long timeout) +{ + function_called(); + check_expected_ptr(kinesis_specific_data_p); + check_expected_ptr(region); + check_expected_ptr(access_key_id); + check_expected_ptr(secret_key); + check_expected(timeout); +} + +void __wrap_kinesis_put_record( + void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data, + size_t data_len) +{ + function_called(); + check_expected_ptr(kinesis_specific_data_p); + check_expected_ptr(stream_name); + check_expected_ptr(partition_key); + check_expected_ptr(data); + check_expected_ptr(data); + check_expected(data_len); +} + +int __wrap_kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes) +{ + function_called(); + check_expected_ptr(request_outcomes_p); + check_expected_ptr(error_message); + check_expected_ptr(sent_bytes); + check_expected_ptr(lost_bytes); + return mock_type(int); +} +#endif //HAVE_KINESIS diff --git a/exporting/tests/exporting_fixtures.c b/exporting/tests/exporting_fixtures.c index f92575e337..677494cb88 100644 --- a/exporting/tests/exporting_fixtures.c +++ b/exporting/tests/exporting_fixtures.c @@ -15,15 +15,13 @@ int teardown_configured_engine(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; free((void *)instance->config.destination); free((void *)instance->config.name); simple_pattern_free(instance->config.charts_pattern); simple_pattern_free(instance->config.hosts_pattern); free(instance); - free(engine->connector_root); - free((void *)engine->config.prefix); free((void *)engine->config.hostname); free(engine); @@ -122,8 +120,8 @@ int teardown_initialized_engine(void **state) struct engine *engine = *state; teardown_rrdhost(); - buffer_free(engine->connector_root->instance_root->labels); - buffer_free(engine->connector_root->instance_root->buffer); + buffer_free(engine->instance_root->labels); + buffer_free(engine->instance_root->buffer); teardown_configured_engine(state); return 0; diff --git a/exporting/tests/test_exporting_engine.c b/exporting/tests/test_exporting_engine.c index 089c4a2004..1a5be4a680 100644 --- a/exporting/tests/test_exporting_engine.c +++ b/exporting/tests/test_exporting_engine.c @@ -21,16 +21,16 @@ void init_connectors_in_tests(struct engine *engine) expect_function_call(__wrap_uv_thread_create); - expect_value(__wrap_uv_thread_create, thread, &engine->connector_root->instance_root->thread); + expect_value(__wrap_uv_thread_create, thread, &engine->instance_root->thread); expect_value(__wrap_uv_thread_create, worker, simple_connector_worker); - expect_value(__wrap_uv_thread_create, arg, engine->connector_root->instance_root); + expect_value(__wrap_uv_thread_create, arg, engine->instance_root); expect_function_call(__wrap_uv_thread_set_name_np); assert_int_equal(__real_init_connectors(engine), 0); assert_int_equal(engine->now, 2); - assert_int_equal(engine->connector_root->instance_root->after, 2); + assert_int_equal(engine->instance_root->after, 2); } static void test_exporting_engine(void **state) @@ -82,16 +82,12 @@ static void test_read_exporting_config(void **state) assert_int_equal(engine->config.update_every, 3); assert_int_equal(engine->instance_num, 0); - struct connector *connector = engine->connector_root; - assert_ptr_not_equal(connector, NULL); - assert_ptr_equal(connector->next, NULL); - assert_ptr_equal(connector->engine, engine); - assert_int_equal(connector->config.type, BACKEND_TYPE_GRAPHITE); - struct instance *instance = connector->instance_root; + struct instance *instance = engine->instance_root; assert_ptr_not_equal(instance, NULL); assert_ptr_equal(instance->next, NULL); - assert_ptr_equal(instance->connector, connector); + assert_ptr_equal(instance->engine, engine); + assert_int_equal(instance->config.type, BACKEND_TYPE_GRAPHITE); assert_string_equal(instance->config.destination, "localhost"); assert_int_equal(instance->config.update_every, 1); assert_int_equal(instance->config.buffer_on_failures, 10); @@ -111,16 +107,15 @@ static void test_init_connectors(void **state) assert_int_equal(engine->instance_num, 1); - struct connector *connector = engine->connector_root; - assert_ptr_equal(connector->next, NULL); - assert_ptr_equal(connector->worker, simple_connector_worker); + struct instance *instance = engine->instance_root; - struct simple_connector_config *connector_specific_config = connector->config.connector_specific_config; - assert_int_equal(connector_specific_config->default_port, 2003); - - struct instance *instance = connector->instance_root; assert_ptr_equal(instance->next, NULL); assert_int_equal(instance->index, 0); + + struct simple_connector_config *connector_specific_config = instance->config.connector_specific_config; + assert_int_equal(connector_specific_config->default_port, 2003); + + assert_ptr_equal(instance->worker, simple_connector_worker); assert_ptr_equal(instance->start_batch_formatting, NULL); assert_ptr_equal(instance->start_host_formatting, format_host_labels_graphite_plaintext); assert_ptr_equal(instance->start_chart_formatting, NULL); @@ -138,16 +133,13 @@ static void test_init_connectors(void **state) static void test_init_graphite_instance(void **state) { struct engine *engine = *state; - struct connector *connector = engine->connector_root; - struct instance *instance = connector->instance_root; - - init_graphite_connector(connector); - assert_int_equal( - ((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 2003); - freez(connector->config.connector_specific_config); + struct instance *instance = engine->instance_root; instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES; assert_int_equal(init_graphite_instance(instance), 0); + assert_int_equal( + ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 2003); + freez(instance->config.connector_specific_config); assert_ptr_equal(instance->metric_formatting, format_dimension_collected_graphite_plaintext); assert_ptr_not_equal(instance->buffer, NULL); buffer_free(instance->buffer); @@ -160,16 +152,13 @@ static void test_init_graphite_instance(void **state) static void test_init_json_instance(void **state) { struct engine *engine = *state; - struct connector *connector = engine->connector_root; - struct instance *instance = connector->instance_root; - - init_json_connector(connector); - assert_int_equal( - ((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 5448); - freez(connector->config.connector_specific_config); + struct instance *instance = engine->instance_root; instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES; assert_int_equal(init_json_instance(instance), 0); + assert_int_equal( + ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 5448); + freez(instance->config.connector_specific_config); assert_ptr_equal(instance->metric_formatting, format_dimension_collected_json_plaintext); assert_ptr_not_equal(instance->buffer, NULL); buffer_free(instance->buffer); @@ -179,25 +168,16 @@ static void test_init_json_instance(void **state) assert_ptr_equal(instance->metric_formatting, format_dimension_stored_json_plaintext); } -static void test_init_opentsdb_connector(void **state) -{ - struct engine *engine = *state; - struct connector *connector = engine->connector_root; - - init_opentsdb_connector(connector); - assert_int_equal( - ((struct simple_connector_config *)(connector->config.connector_specific_config))->default_port, 4242); - freez(connector->config.connector_specific_config); -} - static void test_init_opentsdb_telnet_instance(void **state) { struct engine *engine = *state; - struct connector *connector = engine->connector_root; - struct instance *instance = connector->instance_root; + struct instance *instance = engine->instance_root; instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES; assert_int_equal(init_opentsdb_telnet_instance(instance), 0); + assert_int_equal( + ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 4242); + freez(instance->config.connector_specific_config); assert_ptr_equal(instance->metric_formatting, format_dimension_collected_opentsdb_telnet); assert_ptr_not_equal(instance->buffer, NULL); buffer_free(instance->buffer); @@ -210,11 +190,13 @@ static void test_init_opentsdb_telnet_instance(void **state) static void test_init_opentsdb_http_instance(void **state) { struct engine *engine = *state; - struct connector *connector = engine->connector_root; - struct instance *instance = connector->instance_root; + struct instance *instance = engine->instance_root; instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES; assert_int_equal(init_opentsdb_http_instance(instance), 0); + assert_int_equal( + ((struct simple_connector_config *)(instance->config.connector_specific_config))->default_port, 4242); + freez(instance->config.connector_specific_config); assert_ptr_equal(instance->metric_formatting, format_dimension_collected_opentsdb_http); assert_ptr_not_equal(instance->buffer, NULL); buffer_free(instance->buffer); @@ -230,7 +212,7 @@ static void test_mark_scheduled_instances(void **state) assert_int_equal(__real_mark_scheduled_instances(engine), 1); - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; assert_int_equal(instance->scheduled, 1); assert_int_equal(instance->before, 2); } @@ -238,7 +220,7 @@ static void test_mark_scheduled_instances(void **state) static void test_rrdhost_is_exportable(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; expect_function_call(__wrap_info_int); @@ -255,7 +237,7 @@ static void test_rrdhost_is_exportable(void **state) static void test_false_rrdhost_is_exportable(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; simple_pattern_free(instance->config.hosts_pattern); instance->config.hosts_pattern = simple_pattern_create("!*", NULL, SIMPLE_PATTERN_EXACT); @@ -275,7 +257,7 @@ static void test_false_rrdhost_is_exportable(void **state) static void test_rrdset_is_exportable(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; RRDSET *st = localhost->rrdset_root; assert_ptr_equal(st->exporting_flags, NULL); @@ -289,7 +271,7 @@ static void test_rrdset_is_exportable(void **state) static void test_false_rrdset_is_exportable(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; RRDSET *st = localhost->rrdset_root; simple_pattern_free(instance->config.charts_pattern); @@ -306,7 +288,7 @@ static void test_false_rrdset_is_exportable(void **state) static void test_exporting_calculate_value_from_stored_data(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; RRDDIM *rd = localhost->rrdset_root->dimensions; time_t timestamp; @@ -344,7 +326,7 @@ static void test_exporting_calculate_value_from_stored_data(void **state) static void test_prepare_buffers(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; instance->start_batch_formatting = __mock_start_batch_formatting; instance->start_host_formatting = __mock_start_host_formatting; @@ -434,9 +416,9 @@ static void test_format_dimension_collected_graphite_plaintext(void **state) struct engine *engine = *state; RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_collected_graphite_plaintext(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_collected_graphite_plaintext(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n"); } @@ -448,9 +430,9 @@ static void test_format_dimension_stored_graphite_plaintext(void **state) will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS)); RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_stored_graphite_plaintext(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_stored_graphite_plaintext(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 690565856.0000000 15052\n"); } @@ -459,9 +441,9 @@ static void test_format_dimension_collected_json_plaintext(void **state) struct engine *engine = *state; RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_collected_json_plaintext(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_collected_json_plaintext(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\"," "\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\"," "\"chart_context\":\"(null)\",\"chart_type\":\"(null)\",\"units\":\"(null)\",\"id\":\"dimension_id\"," @@ -476,9 +458,9 @@ static void test_format_dimension_stored_json_plaintext(void **state) will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS)); RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_stored_json_plaintext(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_stored_json_plaintext(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "{\"prefix\":\"netdata\",\"hostname\":\"test-host\",\"host_tags\":\"TAG1=VALUE1 TAG2=VALUE2\"," "\"chart_id\":\"chart_id\",\"chart_name\":\"chart_name\",\"chart_family\":\"(null)\"," \ "\"chart_context\": \"(null)\",\"chart_type\":\"(null)\",\"units\": \"(null)\",\"id\":\"dimension_id\"," @@ -490,9 +472,9 @@ static void test_format_dimension_collected_opentsdb_telnet(void **state) struct engine *engine = *state; RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_collected_opentsdb_telnet(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_collected_opentsdb_telnet(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "put netdata.chart_name.dimension_name 15051 123000321 host=test-host TAG1=VALUE1 TAG2=VALUE2\n"); } @@ -504,9 +486,9 @@ static void test_format_dimension_stored_opentsdb_telnet(void **state) will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS)); RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_stored_opentsdb_telnet(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_stored_opentsdb_telnet(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "put netdata.chart_name.dimension_name 15052 690565856.0000000 host=test-host TAG1=VALUE1 TAG2=VALUE2\n"); } @@ -515,9 +497,9 @@ static void test_format_dimension_collected_opentsdb_http(void **state) struct engine *engine = *state; RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_collected_opentsdb_http(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_collected_opentsdb_http(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "POST /api/put HTTP/1.1\r\n" "Host: test-host\r\n" "Content-Type: application/json\r\n" @@ -536,9 +518,9 @@ static void test_format_dimension_stored_opentsdb_http(void **state) will_return(__wrap_exporting_calculate_value_from_stored_data, pack_storage_number(27, SN_EXISTS)); RRDDIM *rd = localhost->rrdset_root->dimensions; - assert_int_equal(format_dimension_stored_opentsdb_http(engine->connector_root->instance_root, rd), 0); + assert_int_equal(format_dimension_stored_opentsdb_http(engine->instance_root, rd), 0); assert_string_equal( - buffer_tostring(engine->connector_root->instance_root->buffer), + buffer_tostring(engine->instance_root->buffer), "POST /api/put HTTP/1.1\r\n" "Host: test-host\r\n" "Content-Type: application/json\r\n" @@ -558,7 +540,7 @@ static void test_exporting_discard_response(void **state) expect_function_call(__wrap_info_int); - assert_int_equal(exporting_discard_response(response, engine->connector_root->instance_root), 0); + assert_int_equal(exporting_discard_response(response, engine->instance_root), 0); assert_string_equal( log_line, @@ -572,7 +554,7 @@ static void test_exporting_discard_response(void **state) static void test_simple_connector_receive_response(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; struct stats *stats = &instance->stats; int sock = 1; @@ -599,7 +581,7 @@ static void test_simple_connector_receive_response(void **state) static void test_simple_connector_send_buffer(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; struct stats *stats = &instance->stats; BUFFER *buffer = instance->buffer; @@ -645,7 +627,7 @@ static void test_simple_connector_send_buffer(void **state) static void test_simple_connector_worker(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; BUFFER *buffer = instance->buffer; __real_mark_scheduled_instances(engine); @@ -721,7 +703,7 @@ static void test_sanitize_opentsdb_label_value(void **state) static void test_format_host_labels_json_plaintext(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; @@ -733,7 +715,7 @@ static void test_format_host_labels_json_plaintext(void **state) static void test_format_host_labels_graphite_plaintext(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; @@ -745,7 +727,7 @@ static void test_format_host_labels_graphite_plaintext(void **state) static void test_format_host_labels_opentsdb_telnet(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; @@ -757,7 +739,7 @@ static void test_format_host_labels_opentsdb_telnet(void **state) static void test_format_host_labels_opentsdb_http(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; instance->config.options |= EXPORTING_OPTION_SEND_CONFIGURED_LABELS; instance->config.options |= EXPORTING_OPTION_SEND_AUTOMATIC_LABELS; @@ -769,7 +751,7 @@ static void test_format_host_labels_opentsdb_http(void **state) static void test_flush_host_labels(void **state) { struct engine *engine = *state; - struct instance *instance = engine->connector_root->instance_root; + struct instance *instance = engine->instance_root; instance->labels = buffer_create(12); buffer_strcat(instance->labels, "check string"); @@ -779,6 +761,117 @@ static void test_flush_host_labels(void **state) assert_int_equal(buffer_strlen(instance->labels), 0); } +#if HAVE_KINESIS +static void test_init_aws_kinesis_instance(void **state) +{ + struct engine *engine = *state; + struct instance *instance = engine->instance_root; + + instance->config.options = EXPORTING_SOURCE_DATA_AS_COLLECTED | EXPORTING_OPTION_SEND_NAMES; + + struct aws_kinesis_specific_config *connector_specific_config = + callocz(1, sizeof(struct aws_kinesis_specific_config)); + instance->config.connector_specific_config = connector_specific_config; + connector_specific_config->stream_name = strdupz("test_stream"); + connector_specific_config->auth_key_id = strdupz("test_auth_key_id"); + connector_specific_config->secure_key = strdupz("test_secure_key"); + + expect_function_call(__wrap_aws_sdk_init); + expect_function_call(__wrap_kinesis_init); + expect_not_value(__wrap_kinesis_init, kinesis_specific_data_p, NULL); + expect_string(__wrap_kinesis_init, region, "localhost"); + expect_string(__wrap_kinesis_init, access_key_id, "test_auth_key_id"); + expect_string(__wrap_kinesis_init, secret_key, "test_secure_key"); + expect_value(__wrap_kinesis_init, timeout, 10000); + assert_int_equal(init_aws_kinesis_instance(instance), 0); + + assert_ptr_equal(instance->worker, aws_kinesis_connector_worker); + assert_ptr_equal(instance->start_batch_formatting, NULL); + assert_ptr_equal(instance->start_host_formatting, format_host_labels_json_plaintext); + assert_ptr_equal(instance->start_chart_formatting, NULL); + assert_ptr_equal(instance->metric_formatting, format_dimension_collected_json_plaintext); + assert_ptr_equal(instance->end_chart_formatting, NULL); + assert_ptr_equal(instance->end_host_formatting, flush_host_labels); + assert_ptr_equal(instance->end_batch_formatting, NULL); + assert_ptr_not_equal(instance->buffer, NULL); + buffer_free(instance->buffer); + assert_ptr_not_equal(instance->connector_specific_data, NULL); + freez(instance->connector_specific_data); + + instance->config.options = EXPORTING_SOURCE_DATA_AVERAGE | EXPORTING_OPTION_SEND_NAMES; + + expect_function_call(__wrap_kinesis_init); + expect_not_value(__wrap_kinesis_init, kinesis_specific_data_p, NULL); + expect_string(__wrap_kinesis_init, region, "localhost"); + expect_string(__wrap_kinesis_init, access_key_id, "test_auth_key_id"); + expect_string(__wrap_kinesis_init, secret_key, "test_secure_key"); + expect_value(__wrap_kinesis_init, timeout, 10000); + + assert_int_equal(init_aws_kinesis_instance(instance), 0); + assert_ptr_equal(instance->metric_formatting, format_dimension_stored_json_plaintext); + + free(connector_specific_config->stream_name); + free(connector_specific_config->auth_key_id); + free(connector_specific_config->secure_key); +} + +static void test_aws_kinesis_connector_worker(void **state) +{ + struct engine *engine = *state; + struct instance *instance = engine->instance_root; + BUFFER *buffer = instance->buffer; + + __real_mark_scheduled_instances(engine); + + expect_function_call(__wrap_rrdhost_is_exportable); + expect_value(__wrap_rrdhost_is_exportable, instance, instance); + expect_value(__wrap_rrdhost_is_exportable, host, localhost); + will_return(__wrap_rrdhost_is_exportable, 1); + + RRDSET *st = localhost->rrdset_root; + expect_function_call(__wrap_rrdset_is_exportable); + expect_value(__wrap_rrdset_is_exportable, instance, instance); + expect_value(__wrap_rrdset_is_exportable, st, st); + will_return(__wrap_rrdset_is_exportable, 1); + + __real_prepare_buffers(engine); + + struct aws_kinesis_specific_config *connector_specific_config = + callocz(1, sizeof(struct aws_kinesis_specific_config)); + instance->config.connector_specific_config = connector_specific_config; + connector_specific_config->stream_name = strdupz("test_stream"); + connector_specific_config->auth_key_id = strdupz("test_auth_key_id"); + connector_specific_config->secure_key = strdupz("test_secure_key"); + + struct aws_kinesis_specific_data *connector_specific_data = callocz(1, sizeof(struct aws_kinesis_specific_data)); + instance->connector_specific_data = (void *)connector_specific_data; + + expect_function_call(__wrap_kinesis_put_record); + expect_not_value(__wrap_kinesis_put_record, kinesis_specific_data_p, NULL); + expect_string(__wrap_kinesis_put_record, stream_name, "test_stream"); + expect_string(__wrap_kinesis_put_record, partition_key, "netdata_0"); + expect_value(__wrap_kinesis_put_record, data, buffer_tostring(buffer)); + // The buffer is prepated by Graphite exporting connector + expect_string( + __wrap_kinesis_put_record, data, + "netdata.test-host.chart_name.dimension_name;TAG1=VALUE1 TAG2=VALUE2 123000321 15051\n"); + expect_value(__wrap_kinesis_put_record, data_len, 84); + + expect_function_call(__wrap_kinesis_get_result); + expect_value(__wrap_kinesis_get_result, request_outcomes_p, NULL); + expect_not_value(__wrap_kinesis_get_result, error_message, NULL); + expect_not_value(__wrap_kinesis_get_result, sent_bytes, NULL); + expect_not_value(__wrap_kinesis_get_result, lost_bytes, NULL); + will_return(__wrap_kinesis_get_result, 0); + + aws_kinesis_connector_worker(instance); + + free(connector_specific_config->stream_name); + free(connector_specific_config->auth_key_id); + free(connector_specific_config->secure_key); +} +#endif // HAVE_KINESIS + int main(void) { const struct CMUnitTest tests[] = { @@ -790,8 +883,6 @@ int main(void) cmocka_unit_test_setup_teardown( test_init_json_instance, setup_configured_engine, teardown_configured_engine), cmocka_unit_test_setup_teardown( - test_init_opentsdb_connector, setup_configured_engine, teardown_configured_engine), - cmocka_unit_test_setup_teardown( test_init_opentsdb_telnet_instance, setup_configured_engine, teardown_configured_engine), cmocka_unit_test_setup_teardown( test_init_opentsdb_http_instance, setup_configured_engine, teardown_configured_engine), @@ -850,6 +941,21 @@ int main(void) cmocka_unit_test_setup_teardown(test_flush_host_labels, setup_initialized_engine, teardown_initialized_engine), }; - return cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL) + - cmocka_run_group_tests_name("labels_in_exporting_engine", label_tests, NULL, NULL); +#if HAVE_KINESIS + const struct CMUnitTest kinesis_tests[] = { + cmocka_unit_test_setup_teardown( + test_init_aws_kinesis_instance, setup_configured_engine, teardown_configured_engine), + cmocka_unit_test_setup_teardown( + test_aws_kinesis_connector_worker, setup_initialized_engine, teardown_initialized_engine), + }; +#endif + + int test_res = cmocka_run_group_tests_name("exporting_engine", tests, NULL, NULL) + + cmocka_run_group_tests_name("labels_in_exporting_engine", label_tests, NULL, NULL); + +#if HAVE_KINESIS + test_res += cmocka_run_group_tests_name("kinesis_exporting_connector", kinesis_tests, NULL, NULL); +#endif + + return test_res; } diff --git a/exporting/tests/test_exporting_engine.h b/exporting/tests/test_exporting_engine.h index c49cdc19b3..ed5cab7d85 100644 --- a/exporting/tests/test_exporting_engine.h +++ b/exporting/tests/test_exporting_engine.h @@ -9,6 +9,7 @@ #include "exporting/graphite/graphite.h" #include "exporting/json/json.h" #include "exporting/opentsdb/opentsdb.h" +#include "exporting/aws_kinesis/aws_kinesis.h" #include <stdarg.h> #include <stddef.h> @@ -95,6 +96,15 @@ int __mock_end_chart_formatting(struct instance *instance, RRDSET *st); int __mock_end_host_formatting(struct instance *instance, RRDHOST *host); int __mock_end_batch_formatting(struct instance *instance); +void __wrap_aws_sdk_init(); +void __wrap_kinesis_init( + void *kinesis_specific_data_p, const char *region, const char *access_key_id, const char *secret_key, + const long timeout); +void __wrap_kinesis_put_record( + void *kinesis_specific_data_p, const char *stream_name, const char *partition_key, const char *data, + size_t data_len); +int __wrap_kinesis_get_result(void *request_outcomes_p, char *error_message, size_t *sent_bytes, size_t *lost_bytes); + // ----------------------------------------------------------------------- // fixtures |