From 2f7358ac6d149e456b97f9e6a2606e8ce451a46a Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Sun, 12 Nov 2023 20:07:31 +0100 Subject: [PATCH 1/7] [C] Implement low storage warning checks in the C driver and extract storage checks into a common function. --- aeron-driver/src/main/c/aeron_driver.c | 1 + .../src/main/c/aeron_driver_context.c | 78 +++++++++++++++---- .../src/main/c/aeron_driver_context.h | 3 + .../src/main/c/aeron_ipc_publication.c | 12 +-- .../src/main/c/aeron_network_publication.c | 12 +-- .../src/main/c/aeron_publication_image.c | 12 +-- aeron-driver/src/main/c/aeronmd.h | 8 ++ .../c/aeron_driver_context_config_test.cpp | 39 ++++++++++ .../test/c/aeron_publication_image_test.cpp | 58 ++++++++++++-- aeron-driver/src/test/c/aeron_receiver_test.h | 3 + 10 files changed, 176 insertions(+), 50 deletions(-) diff --git a/aeron-driver/src/main/c/aeron_driver.c b/aeron-driver/src/main/c/aeron_driver.c index ee6173a62a..c08d8af099 100644 --- a/aeron-driver/src/main/c/aeron_driver.c +++ b/aeron-driver/src/main/c/aeron_driver.c @@ -586,6 +586,7 @@ void aeron_driver_context_print_configuration(aeron_driver_context_t *context) fprintf(fpout, "\n mtu_length=%" PRIu64, (uint64_t)context->mtu_length); fprintf(fpout, "\n ipc_mtu_length=%" PRIu64, (uint64_t)context->ipc_mtu_length); fprintf(fpout, "\n file_page_size=%" PRIu64, (uint64_t)context->file_page_size); + fprintf(fpout, "\n low_file_store_warning_threshold=%" PRIu64, (uint64_t)context->low_file_store_warning_threshold); fprintf(fpout, "\n publication_reserved_session_id_low=%" PRId32, context->publication_reserved_session_id_low); fprintf(fpout, "\n publication_reserved_session_id_high=%" PRId32, context->publication_reserved_session_id_high); fprintf(fpout, "\n loss_report_length=%" PRIu64, (uint64_t)context->loss_report_length); diff --git a/aeron-driver/src/main/c/aeron_driver_context.c b/aeron-driver/src/main/c/aeron_driver_context.c index da2f7751d0..2388206800 100644 --- a/aeron-driver/src/main/c/aeron_driver_context.c +++ b/aeron-driver/src/main/c/aeron_driver_context.c @@ -276,18 +276,19 @@ static void aeron_driver_untethered_subscription_state_change_null( #define AERON_THREADING_MODE_DEFAULT AERON_THREADING_MODE_DEDICATED #define AERON_DIR_DELETE_ON_START_DEFAULT false #define AERON_DIR_DELETE_ON_SHUTDOWN_DEFAULT false -#define AERON_CLIENT_LIVENESS_TIMEOUT_NS_DEFAULT (10 * 1000 * 1000 * 1000LL) +#define AERON_CLIENT_LIVENESS_TIMEOUT_NS_DEFAULT (10 * 1000 * 1000 * INT64_C(1000)) #define AERON_TERM_BUFFER_LENGTH_DEFAULT (16 * 1024 * 1024) #define AERON_IPC_TERM_BUFFER_LENGTH_DEFAULT (64 * 1024 * 1024) #define AERON_TERM_BUFFER_SPARSE_FILE_DEFAULT (false) #define AERON_PERFORM_STORAGE_CHECKS_DEFAULT (true) +#define AERON_LOW_FILE_STORE_WARNING_THRESHOLD_DEFAULT (AERON_TERM_BUFFER_LENGTH_DEFAULT * INT64_C(10)) #define AERON_SPIES_SIMULATE_CONNECTION_DEFAULT (false) #define AERON_FILE_PAGE_SIZE_DEFAULT (4 * 1024) #define AERON_MTU_LENGTH_DEFAULT (1408) #define AERON_IPC_MTU_LENGTH_DEFAULT (1408) #define AERON_IPC_PUBLICATION_TERM_WINDOW_LENGTH_DEFAULT (0) #define AERON_PUBLICATION_TERM_WINDOW_LENGTH_DEFAULT (0) -#define AERON_PUBLICATION_LINGER_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * 1000LL) +#define AERON_PUBLICATION_LINGER_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * INT64_C(1000)) #define AERON_SOCKET_SO_RCVBUF_DEFAULT (128 * 1024) #define AERON_SOCKET_SO_SNDBUF_DEFAULT (0) #define AERON_SOCKET_MULTICAST_TTL_DEFAULT (0) @@ -295,31 +296,31 @@ static void aeron_driver_untethered_subscription_state_change_null( #define AERON_RECEIVER_GROUP_TAG_VALUE_DEFAULT (-1) #define AERON_FLOW_CONTROL_GROUP_TAG_DEFAULT (-1) #define AERON_FLOW_CONTROL_GROUP_MIN_SIZE_DEFAULT (0) -#define AERON_FLOW_CONTROL_RECEIVER_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * 1000LL) +#define AERON_FLOW_CONTROL_RECEIVER_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * INT64_C(1000)) #define AERON_SEND_TO_STATUS_POLL_RATIO_DEFAULT (6) -#define AERON_RCV_STATUS_MESSAGE_TIMEOUT_NS_DEFAULT (200 * 1000 * 1000LL) +#define AERON_RCV_STATUS_MESSAGE_TIMEOUT_NS_DEFAULT (200 * 1000 * INT64_C(1000)) #define AERON_MULTICAST_FLOWCONTROL_SUPPLIER_DEFAULT ("aeron_max_multicast_flow_control_strategy_supplier") #define AERON_UNICAST_FLOWCONTROL_SUPPLIER_DEFAULT ("aeron_unicast_flow_control_strategy_supplier") #define AERON_CONGESTIONCONTROL_SUPPLIER_DEFAULT ("aeron_congestion_control_default_strategy_supplier") -#define AERON_IMAGE_LIVENESS_TIMEOUT_NS_DEFAULT (10 * 1000 * 1000 * 1000LL) +#define AERON_IMAGE_LIVENESS_TIMEOUT_NS_DEFAULT (10 * 1000 * 1000 * INT64_C(1000)) #define AERON_RCV_INITIAL_WINDOW_LENGTH_DEFAULT (128 * 1024) #define AERON_LOSS_REPORT_BUFFER_LENGTH_DEFAULT (1024 * 1024) -#define AERON_PUBLICATION_UNBLOCK_TIMEOUT_NS_DEFAULT (15 * 1000 * 1000 * 1000LL) -#define AERON_PUBLICATION_CONNECTION_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * 1000LL) -#define AERON_TIMER_INTERVAL_NS_DEFAULT (1 * 1000 * 1000 * 1000LL) +#define AERON_PUBLICATION_UNBLOCK_TIMEOUT_NS_DEFAULT (15 * 1000 * 1000 * INT64_C(1000)) +#define AERON_PUBLICATION_CONNECTION_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * INT64_C(1000)) +#define AERON_TIMER_INTERVAL_NS_DEFAULT (1 * 1000 * 1000 * INT64_C(1000)) #define AERON_IDLE_STRATEGY_BACKOFF_DEFAULT "aeron_idle_strategy_backoff" -#define AERON_COUNTERS_FREE_TO_REUSE_TIMEOUT_NS_DEFAULT (1 * 1000 * 1000 * 1000LL) +#define AERON_COUNTERS_FREE_TO_REUSE_TIMEOUT_NS_DEFAULT (1 * 1000 * 1000 * INT64_C(1000)) #define AERON_PRINT_CONFIGURATION_DEFAULT (false) #define AERON_RELIABLE_STREAM_DEFAULT (true) #define AERON_TETHER_SUBSCRIPTIONS_DEFAULT (true) -#define AERON_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * 1000LL) -#define AERON_UNTETHERED_RESTING_TIMEOUT_NS_DEFAULT (10 * 1000 * 1000 * 1000LL) +#define AERON_UNTETHERED_WINDOW_LIMIT_TIMEOUT_NS_DEFAULT (5 * 1000 * 1000 * INT64_C(1000)) +#define AERON_UNTETHERED_RESTING_TIMEOUT_NS_DEFAULT (10 * 1000 * 1000 * INT64_C(1000)) #define AERON_DRIVER_TIMEOUT_MS_DEFAULT (10 * 1000) #define AERON_RETRANSMIT_UNICAST_DELAY_NS_DEFAULT (0) -#define AERON_RETRANSMIT_UNICAST_LINGER_NS_DEFAULT (60 * 1000 * 1000LL) +#define AERON_RETRANSMIT_UNICAST_LINGER_NS_DEFAULT (60 * 1000 * INT64_C(1000)) #define AERON_NAK_MULTICAST_GROUP_SIZE_DEFAULT (10) -#define AERON_NAK_MULTICAST_MAX_BACKOFF_NS_DEFAULT (60 * 1000 * 1000LL) -#define AERON_NAK_UNICAST_DELAY_NS_DEFAULT (60 * 1000 * 1000LL) +#define AERON_NAK_MULTICAST_MAX_BACKOFF_NS_DEFAULT (60 * 1000 * INT64_C(1000)) +#define AERON_NAK_UNICAST_DELAY_NS_DEFAULT (60 * 1000 * INT64_C(1000)) #define AERON_UDP_CHANNEL_TRANSPORT_BINDINGS_MEDIA_DEFAULT ("default") #define AERON_UDP_CHANNEL_TRANSPORT_BINDINGS_INTERCEPTORS_DEFAULT ("") #define AERON_RECEIVER_GROUP_CONSIDERATION_DEFAULT (AERON_INFER) @@ -530,6 +531,7 @@ int aeron_driver_context_init(aeron_driver_context_t **context) _context->initial_window_length = AERON_RCV_INITIAL_WINDOW_LENGTH_DEFAULT; _context->loss_report_length = AERON_LOSS_REPORT_BUFFER_LENGTH_DEFAULT; _context->file_page_size = AERON_FILE_PAGE_SIZE_DEFAULT; + _context->low_file_store_warning_threshold = AERON_LOW_FILE_STORE_WARNING_THRESHOLD_DEFAULT; _context->publication_unblock_timeout_ns = AERON_PUBLICATION_UNBLOCK_TIMEOUT_NS_DEFAULT; _context->publication_connection_timeout_ns = AERON_PUBLICATION_CONNECTION_TIMEOUT_NS_DEFAULT; _context->counter_free_to_reuse_ns = AERON_COUNTERS_FREE_TO_REUSE_TIMEOUT_NS_DEFAULT; @@ -826,6 +828,13 @@ int aeron_driver_context_init(aeron_driver_context_t **context) 4 * 1024, INT32_MAX); + _context->low_file_store_warning_threshold = aeron_config_parse_size64( + AERON_LOW_FILE_STORE_WARNING_THRESHOLD_ENV_VAR, + getenv(AERON_LOW_FILE_STORE_WARNING_THRESHOLD_ENV_VAR), + _context->low_file_store_warning_threshold, + 0, + INT64_MAX); + _context->publication_unblock_timeout_ns = aeron_config_parse_duration_ns( AERON_PUBLICATION_UNBLOCK_TIMEOUT_ENV_VAR, getenv(AERON_PUBLICATION_UNBLOCK_TIMEOUT_ENV_VAR), @@ -1244,6 +1253,33 @@ static void aeron_driver_context_free_bindings(const aeron_udp_channel_intercept } } +inline int aeron_driver_context_run_storage_checks(aeron_driver_context_t *context, uint64_t log_length) +{ + if (context->perform_storage_checks) + { + const uint64_t usable_space = context->usable_fs_space_func(context->aeron_dir); + if (usable_space < log_length) + { + AERON_SET_ERR( + -AERON_ERROR_CODE_STORAGE_SPACE, + "insufficient usable storage for new log of length=%" PRId64 " usable=%" PRId64 " in %s", + log_length, usable_space, context->aeron_dir); + return -1; + } + + if (usable_space <= context->low_file_store_warning_threshold) + { + AERON_SET_ERR( + -AERON_ERROR_CODE_STORAGE_SPACE, + "WARNING: space is running low: threshold=%" PRId64 " usable=%" PRId64 " in %s", + context->low_file_store_warning_threshold, usable_space, context->aeron_dir); + aeron_distinct_error_log_record(context->error_log, aeron_errcode(), aeron_errmsg()); + aeron_err_clear(); + } + } + return 0; +} + int aeron_driver_context_bindings_clientd_create_entries(aeron_driver_context_t *context) { const aeron_udp_channel_interceptor_bindings_t *interceptor_bindings; @@ -1751,6 +1787,20 @@ bool aeron_driver_context_get_perform_storage_checks(aeron_driver_context_t *con return NULL != context ? context->perform_storage_checks : AERON_PERFORM_STORAGE_CHECKS_DEFAULT; } +int aeron_driver_context_set_low_file_store_warning_threshold(aeron_driver_context_t *context, uint64_t value) +{ + AERON_DRIVER_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context); + + context->low_file_store_warning_threshold = value; + return 0; +} + +uint64_t aeron_driver_context_get_low_file_store_warning_threshold(aeron_driver_context_t *context) +{ + return NULL != context ? + context->low_file_store_warning_threshold : AERON_LOW_FILE_STORE_WARNING_THRESHOLD_DEFAULT; +} + int aeron_driver_context_set_spies_simulate_connection(aeron_driver_context_t *context, bool value) { AERON_DRIVER_CONTEXT_SET_CHECK_ARG_AND_RETURN(-1, context); diff --git a/aeron-driver/src/main/c/aeron_driver_context.h b/aeron-driver/src/main/c/aeron_driver_context.h index 3c38224a54..eb07ff2342 100644 --- a/aeron-driver/src/main/c/aeron_driver_context.h +++ b/aeron-driver/src/main/c/aeron_driver_context.h @@ -132,6 +132,7 @@ typedef struct aeron_driver_context_stct uint64_t nak_unicast_delay_ns; /* aeron.nak.unicast.delay = 60ms */ uint64_t nak_multicast_max_backoff_ns; /* aeron.nak.multicast.max.backoff = 60ms */ uint64_t re_resolution_check_interval_ns; /* aeron.driver.reresolution.check.interval = 1s */ + uint64_t low_file_store_warning_threshold; /* aeron.low.file.store.warning.threshold = 160MB */ size_t to_driver_buffer_length; /* aeron.conductor.buffer.length = 1MB + trailer*/ size_t to_clients_buffer_length; /* aeron.clients.buffer.length = 1MB + trailer */ size_t counters_values_buffer_length; /* aeron.counters.buffer.length = 1MB */ @@ -332,6 +333,8 @@ int aeron_driver_context_validate_mtu_length(uint64_t mtu_length); size_t aeron_cnc_length(aeron_driver_context_t *context); +int aeron_driver_context_run_storage_checks(aeron_driver_context_t *context, uint64_t log_length); + int aeron_driver_context_bindings_clientd_create_entries(aeron_driver_context_t *context); int aeron_driver_context_bindings_clientd_delete_entries(aeron_driver_context_t *context); int aeron_driver_context_bindings_clientd_find_first_free_index(aeron_driver_context_t *context); diff --git a/aeron-driver/src/main/c/aeron_ipc_publication.c b/aeron-driver/src/main/c/aeron_ipc_publication.c index 2ded4a2711..0e0d2832b7 100644 --- a/aeron-driver/src/main/c/aeron_ipc_publication.c +++ b/aeron-driver/src/main/c/aeron_ipc_publication.c @@ -45,17 +45,9 @@ int aeron_ipc_publication_create( *publication = NULL; - if (context->perform_storage_checks) + if (aeron_driver_context_run_storage_checks(context, log_length) < 0) { - const uint64_t usable_space = context->usable_fs_space_func(context->aeron_dir); - if (usable_space < log_length) - { - AERON_SET_ERR( - -AERON_ERROR_CODE_STORAGE_SPACE, - "Insufficient usable storage for new log of length=%" PRId64 " usable=%" PRId64 - " in %s", log_length, usable_space, context->aeron_dir); - return -1; - } + return -1; } if (aeron_alloc((void **)&_pub, sizeof(aeron_ipc_publication_t)) < 0) diff --git a/aeron-driver/src/main/c/aeron_network_publication.c b/aeron-driver/src/main/c/aeron_network_publication.c index dec3db48de..8659485313 100644 --- a/aeron-driver/src/main/c/aeron_network_publication.c +++ b/aeron-driver/src/main/c/aeron_network_publication.c @@ -62,17 +62,9 @@ int aeron_network_publication_create( *publication = NULL; - if (context->perform_storage_checks) + if (aeron_driver_context_run_storage_checks(context, log_length) < 0) { - const uint64_t usable_space = context->usable_fs_space_func(context->aeron_dir); - if (usable_space < log_length) - { - AERON_SET_ERR( - -AERON_ERROR_CODE_STORAGE_SPACE, - "Insufficient usable storage for new log of length=%" PRId64 " usable=%" PRId64 - " in %s", log_length, usable_space, context->aeron_dir); - return -1; - } + return -1; } if (aeron_alloc((void **)&_pub, sizeof(aeron_network_publication_t)) < 0) diff --git a/aeron-driver/src/main/c/aeron_publication_image.c b/aeron-driver/src/main/c/aeron_publication_image.c index 560586eb36..b0d2b997c4 100644 --- a/aeron-driver/src/main/c/aeron_publication_image.c +++ b/aeron-driver/src/main/c/aeron_publication_image.c @@ -82,17 +82,9 @@ int aeron_publication_image_create( *image = NULL; - if (context->perform_storage_checks) + if (aeron_driver_context_run_storage_checks(context, log_length) < 0) { - const uint64_t usable_space = context->usable_fs_space_func(context->aeron_dir); - if (usable_space < log_length) - { - AERON_SET_ERR( - -AERON_ERROR_CODE_STORAGE_SPACE, - "Insufficient usable storage for new log of length=%" PRId64 " usable=%" PRId64 - " in %s", log_length, usable_space, context->aeron_dir); - return -1; - } + return -1; } if (aeron_alloc((void **)&_image, sizeof(aeron_publication_image_t)) < 0) diff --git a/aeron-driver/src/main/c/aeronmd.h b/aeron-driver/src/main/c/aeronmd.h index e4d5a3b70c..7da6976bdc 100644 --- a/aeron-driver/src/main/c/aeronmd.h +++ b/aeron-driver/src/main/c/aeronmd.h @@ -153,6 +153,14 @@ bool aeron_driver_context_get_term_buffer_sparse_file(aeron_driver_context_t *co int aeron_driver_context_set_perform_storage_checks(aeron_driver_context_t *context, bool value); bool aeron_driver_context_get_perform_storage_checks(aeron_driver_context_t *context); +/** + * Specify the interval which checks for re-resolutions of names occurs. + */ +#define AERON_LOW_FILE_STORE_WARNING_THRESHOLD_ENV_VAR "AERON_LOW_FILE_STORE_WARNING_THRESHOLD" + +int aeron_driver_context_set_low_file_store_warning_threshold(aeron_driver_context_t *context, uint64_t value); +uint64_t aeron_driver_context_get_low_file_store_warning_threshold(aeron_driver_context_t *context); + /** * Should a spy subscription simulate a connection to a network publication. */ diff --git a/aeron-driver/src/test/c/aeron_driver_context_config_test.cpp b/aeron-driver/src/test/c/aeron_driver_context_config_test.cpp index 2644ca08a7..b3eadf536c 100644 --- a/aeron-driver/src/test/c/aeron_driver_context_config_test.cpp +++ b/aeron-driver/src/test/c/aeron_driver_context_config_test.cpp @@ -181,3 +181,42 @@ TEST_F(DriverContextConfigTest, shouldHandleValuesOutsideOfUint32Range) EXPECT_EQ(1, aeron_driver_context_get_resource_free_limit(context)); aeron_driver_context_close(context); } + +TEST_F(DriverContextConfigTest, shouldReturnDefaultLowFileStoreWarningThresholdIfNoneProvided) +{ + const uint64_t default_low_storage_warning_threshold = 160 * 1024 * 1024; + + aeron_driver_context_t *context = nullptr; + EXPECT_EQ(default_low_storage_warning_threshold, aeron_driver_context_get_low_file_store_warning_threshold(context)); + + ASSERT_EQ(0, aeron_driver_context_init(&context)); + EXPECT_EQ(default_low_storage_warning_threshold, aeron_driver_context_get_low_file_store_warning_threshold(context)); + aeron_driver_context_close(context); +} + +TEST_F(DriverContextConfigTest, shouldAssignLowStoreWarningThreshold) +{ + aeron_driver_context_t *context = nullptr; + EXPECT_EQ(-1, aeron_driver_context_set_low_file_store_warning_threshold(context, 42)); + + const uint64_t threshold = 1024 * 1024; + ASSERT_EQ(0, aeron_driver_context_init(&context)); + EXPECT_EQ(0, aeron_driver_context_set_low_file_store_warning_threshold(context, threshold)); + EXPECT_EQ(threshold, aeron_driver_context_get_low_file_store_warning_threshold(context)); + aeron_driver_context_close(context); +} + +TEST_F(DriverContextConfigTest, shouldReadLowFileStoreWarningThresholdFromAnEnvironmentVariable) +{ + aeron_driver_context_t *context = nullptr; + aeron_env_set(AERON_LOW_FILE_STORE_WARNING_THRESHOLD_ENV_VAR, "2m"); + ASSERT_EQ(0, aeron_driver_context_init(&context)); + EXPECT_EQ(2 * 1024 * 1024, aeron_driver_context_get_low_file_store_warning_threshold(context)); + aeron_driver_context_close(context); + + const uint64_t default_low_storage_warning_threshold = 160 * 1024 * 1024; + aeron_env_set(AERON_LOW_FILE_STORE_WARNING_THRESHOLD_ENV_VAR, "garbage"); + ASSERT_EQ(0, aeron_driver_context_init(&context)); + EXPECT_EQ(default_low_storage_warning_threshold, aeron_driver_context_get_low_file_store_warning_threshold(context)); + aeron_driver_context_close(context); +} diff --git a/aeron-driver/src/test/c/aeron_publication_image_test.cpp b/aeron-driver/src/test/c/aeron_publication_image_test.cpp index 7d3b8ab74e..f3a2c9a970 100644 --- a/aeron-driver/src/test/c/aeron_publication_image_test.cpp +++ b/aeron-driver/src/test/c/aeron_publication_image_test.cpp @@ -36,11 +36,6 @@ static bool always_measure_rtt(void *state, int64_t now_ns) return true; } -static uint64_t no_space_available(const char* path) -{ - return 0; -} - class PublicationImageTest : public ReceiverTestBase { }; @@ -486,9 +481,60 @@ TEST_F(PublicationImageTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSpac endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest)); - m_context->usable_fs_space_func = no_space_available; + m_context->usable_fs_space_func = [](const char* path) + { + return 42ULL; + }; aeron_publication_image_t *image = createImage(endpoint, dest, stream_id, session_id); ASSERT_EQ(nullptr, image) << aeron_errmsg(); EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, aeron_errcode()); + auto expected_error_text = + std::string("insufficient usable storage for new log of length=200704 usable=42 in ") + .append(m_context->aeron_dir); + EXPECT_NE(std::string::npos, std::string(aeron_errmsg()).find(expected_error_text)); +} + +TEST_F(PublicationImageTest, shouldLogWarningIfStorageSpaceIsLow) +{ + const char *uri = "aeron:udp?endpoint=localhost:9090"; + aeron_receive_channel_endpoint_t *endpoint = createMdsEndpoint(); + int32_t stream_id = 1001; + int32_t session_id = 1000001; + int64_t registration_id = 0; + + aeron_udp_channel_t *channel; + aeron_receive_destination_t *dest; + + aeron_udp_channel_parse(strlen(uri), uri, &m_resolver, &channel, false); + + ASSERT_LE(0, aeron_receive_destination_create( + &dest, + channel, + channel, + m_context, + &m_counters_manager, + registration_id, + endpoint->channel_status.counter_id)); + ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest)); + + m_context->usable_fs_space_func = [](const char* path) + { + return 123456789ULL; + }; + m_context->low_file_store_warning_threshold = 987654321ULL; + aeron_publication_image_t *image = createImage(endpoint, dest, stream_id, session_id); + + ASSERT_NE(nullptr, image) << aeron_errmsg(); + EXPECT_EQ(0, aeron_errcode()); + auto errors_list = m_context->error_log->observation_list; + EXPECT_NE(0, errors_list->num_observations); + auto last_error = errors_list->observations[errors_list->num_observations - 1]; + EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, last_error.error_code); + auto error_text = std::string(last_error.description); + EXPECT_EQ(error_text.size(), last_error.description_length); + auto expected_warning = + std::string("WARNING: space is running low: threshold=987654321 usable=123456789 in ") + .append(m_context->aeron_dir); + EXPECT_NE(std::string::npos, error_text.find(expected_warning)); } diff --git a/aeron-driver/src/test/c/aeron_receiver_test.h b/aeron-driver/src/test/c/aeron_receiver_test.h index d42e365e2f..33c9820db9 100644 --- a/aeron-driver/src/test/c/aeron_receiver_test.h +++ b/aeron-driver/src/test/c/aeron_receiver_test.h @@ -102,6 +102,9 @@ class ReceiverTestBase : public testing::Test m_receiver_proxy.receiver = &m_receiver; m_context->receiver_proxy = &m_receiver_proxy; + m_context->error_log = &m_error_log; + m_context->error_buffer = m_error_log_buffer.data(); + m_context->error_buffer_length = m_error_log_buffer.size(); } void TearDown() override From fe5010422ecc6abf541e5e85dc2321882acf3bd7 Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Sun, 12 Nov 2023 20:08:04 +0100 Subject: [PATCH 2/7] [C] Add failing tests for the low storage warning. --- .../test/c/aeron_network_publication_test.cpp | 53 ++++++++++++++++--- 1 file changed, 47 insertions(+), 6 deletions(-) diff --git a/aeron-driver/src/test/c/aeron_network_publication_test.cpp b/aeron-driver/src/test/c/aeron_network_publication_test.cpp index 62f5e23c0d..c8cc9a0050 100644 --- a/aeron-driver/src/test/c/aeron_network_publication_test.cpp +++ b/aeron-driver/src/test/c/aeron_network_publication_test.cpp @@ -56,11 +56,17 @@ class NetworkPublicationTest : public testing::Test aeron_system_counters_init(&m_system_counters, &m_counters_manager); + aeron_distinct_error_log_t m_error_log = {}; + aeron_distinct_error_log_init( + &m_error_log, m_error_log_buffer.data(), m_error_log_buffer.size(), aeron_epoch_clock); aeron_driver_sender_init(&m_sender, m_context, &m_system_counters, nullptr); m_sender_proxy.sender = &m_sender; m_context->sender_proxy = &m_sender_proxy; + m_context->error_log = &m_error_log; + m_context->error_buffer = m_error_log_buffer.data(); + m_context->error_buffer_length = m_error_log_buffer.size(); aeron_driver_ensure_dir_is_recreated(m_context); } @@ -188,11 +194,6 @@ class NetworkPublicationTest : public testing::Test return publication; } - static uint64_t noSpaceAvailable(const char* path) - { - return 0; - } - aeron_driver_context_t *m_context = nullptr; private: aeron_clock_cache_t m_cached_clock = {}; @@ -201,6 +202,7 @@ class NetworkPublicationTest : public testing::Test aeron_system_counters_t m_system_counters = {}; AERON_DECL_ALIGNED(buffer_t m_counter_value_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_4x_t m_counter_meta_buffer, 16) = {}; + AERON_DECL_ALIGNED(buffer_t m_error_log_buffer, 16) = {}; std::vector m_endpoints; std::vector m_publications; aeron_name_resolver_t m_resolver = {}; @@ -256,8 +258,47 @@ TEST_F(NetworkPublicationTest, shouldSendHeartbeatWhileSendingPeriodicSetups) TEST_F(NetworkPublicationTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSpaceAvailable) { - m_context->usable_fs_space_func = noSpaceAvailable; + m_context->usable_fs_space_func = [](const char* path) + { + return 190ULL; + }; + m_context->perform_storage_checks = true; + aeron_network_publication_t *publication = createPublication("aeron:udp?endpoint=localhost:23245"); + ASSERT_EQ(nullptr, publication) << aeron_errmsg(); EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, aeron_errcode()); + auto expected_error_text = + std::string("insufficient usable storage for new log of length=4096 usable=190 in ") + .append(m_context->aeron_dir); + EXPECT_NE(std::string::npos, std::string(aeron_errmsg()).find(expected_error_text)); +} + +TEST_F(NetworkPublicationTest, shouldWarnIfRemainingStorageSpaceIsLow) +{ + m_context->usable_fs_space_func = [](const char* path) + { + return 1048576ULL; + }; + m_context->low_file_store_warning_threshold = 4194304ULL; + m_context->perform_storage_checks = true; + + aeron_network_publication_t *publication = createPublication("aeron:udp?endpoint=localhost:23245"); + + ASSERT_NE(nullptr, publication) << aeron_errmsg(); + std::cout << "[before]: error_log: " << m_context->error_log << std::endl; + std::cout << "[before]: error_log.observation_list: " << m_context->error_log->observation_list << std::endl; + EXPECT_EQ(0, aeron_errcode()); + std::cout << "[after]: error_log: " << m_context->error_log << std::endl; + std::cout << "[after]: error_log.observation_list: " << m_context->error_log->observation_list << std::endl; + auto errors_list = m_context->error_log->observation_list; + EXPECT_NE(0, errors_list->num_observations); + auto last_error = errors_list->observations[errors_list->num_observations - 1]; + EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, last_error.error_code); + auto error_text = std::string(last_error.description); + EXPECT_EQ(error_text.size(), last_error.description_length); + auto expected_warning = + std::string("WARNING: space is running low: threshold=4194304 usable=1048576 in ") + .append(m_context->aeron_dir); + EXPECT_NE(std::string::npos, error_text.find(expected_warning)); } From 0d7fb3c8c331868816de332c9454aca4aff0e7ce Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:33:36 +0100 Subject: [PATCH 3/7] [C] Fix GCC compilation warnings. --- .../src/test/c/aeron_network_publication_test.cpp | 8 ++++---- aeron-driver/src/test/c/aeron_publication_image_test.cpp | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/aeron-driver/src/test/c/aeron_network_publication_test.cpp b/aeron-driver/src/test/c/aeron_network_publication_test.cpp index c8cc9a0050..a31710ed54 100644 --- a/aeron-driver/src/test/c/aeron_network_publication_test.cpp +++ b/aeron-driver/src/test/c/aeron_network_publication_test.cpp @@ -258,9 +258,9 @@ TEST_F(NetworkPublicationTest, shouldSendHeartbeatWhileSendingPeriodicSetups) TEST_F(NetworkPublicationTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSpaceAvailable) { - m_context->usable_fs_space_func = [](const char* path) + m_context->usable_fs_space_func = [](const char* path) -> uint64_t { - return 190ULL; + return 190; }; m_context->perform_storage_checks = true; @@ -276,9 +276,9 @@ TEST_F(NetworkPublicationTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSp TEST_F(NetworkPublicationTest, shouldWarnIfRemainingStorageSpaceIsLow) { - m_context->usable_fs_space_func = [](const char* path) + m_context->usable_fs_space_func = [](const char *path) -> uint64_t { - return 1048576ULL; + return 1048576; }; m_context->low_file_store_warning_threshold = 4194304ULL; m_context->perform_storage_checks = true; diff --git a/aeron-driver/src/test/c/aeron_publication_image_test.cpp b/aeron-driver/src/test/c/aeron_publication_image_test.cpp index f3a2c9a970..1dd3940e32 100644 --- a/aeron-driver/src/test/c/aeron_publication_image_test.cpp +++ b/aeron-driver/src/test/c/aeron_publication_image_test.cpp @@ -481,9 +481,9 @@ TEST_F(PublicationImageTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSpac endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest)); - m_context->usable_fs_space_func = [](const char* path) + m_context->usable_fs_space_func = [](const char* path) -> uint64_t { - return 42ULL; + return 42; }; aeron_publication_image_t *image = createImage(endpoint, dest, stream_id, session_id); @@ -518,9 +518,9 @@ TEST_F(PublicationImageTest, shouldLogWarningIfStorageSpaceIsLow) endpoint->channel_status.counter_id)); ASSERT_EQ(1, aeron_receive_channel_endpoint_add_destination(endpoint, dest)); - m_context->usable_fs_space_func = [](const char* path) + m_context->usable_fs_space_func = [](const char* path) -> uint64_t { - return 123456789ULL; + return 123456789; }; m_context->low_file_store_warning_threshold = 987654321ULL; aeron_publication_image_t *image = createImage(endpoint, dest, stream_id, session_id); From 69e8beb0b18e9b9c8a573377e6d422c289319dab Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 13 Nov 2023 11:46:37 +0100 Subject: [PATCH 4/7] [C] Close distinct error log after each test. --- .../src/test/c/aeron_network_publication_test.cpp | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/aeron-driver/src/test/c/aeron_network_publication_test.cpp b/aeron-driver/src/test/c/aeron_network_publication_test.cpp index a31710ed54..60464e755c 100644 --- a/aeron-driver/src/test/c/aeron_network_publication_test.cpp +++ b/aeron-driver/src/test/c/aeron_network_publication_test.cpp @@ -56,7 +56,6 @@ class NetworkPublicationTest : public testing::Test aeron_system_counters_init(&m_system_counters, &m_counters_manager); - aeron_distinct_error_log_t m_error_log = {}; aeron_distinct_error_log_init( &m_error_log, m_error_log_buffer.data(), m_error_log_buffer.size(), aeron_epoch_clock); @@ -87,6 +86,7 @@ class NetworkPublicationTest : public testing::Test aeron_driver_sender_on_close(&m_sender); aeron_system_counters_close(&m_system_counters); aeron_counters_manager_close(&m_counters_manager); + aeron_distinct_error_log_close(&m_error_log); aeron_driver_context_close(m_context); } @@ -202,6 +202,7 @@ class NetworkPublicationTest : public testing::Test aeron_system_counters_t m_system_counters = {}; AERON_DECL_ALIGNED(buffer_t m_counter_value_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_4x_t m_counter_meta_buffer, 16) = {}; + aeron_distinct_error_log_t m_error_log = {}; AERON_DECL_ALIGNED(buffer_t m_error_log_buffer, 16) = {}; std::vector m_endpoints; std::vector m_publications; @@ -286,12 +287,9 @@ TEST_F(NetworkPublicationTest, shouldWarnIfRemainingStorageSpaceIsLow) aeron_network_publication_t *publication = createPublication("aeron:udp?endpoint=localhost:23245"); ASSERT_NE(nullptr, publication) << aeron_errmsg(); - std::cout << "[before]: error_log: " << m_context->error_log << std::endl; - std::cout << "[before]: error_log.observation_list: " << m_context->error_log->observation_list << std::endl; EXPECT_EQ(0, aeron_errcode()); - std::cout << "[after]: error_log: " << m_context->error_log << std::endl; - std::cout << "[after]: error_log.observation_list: " << m_context->error_log->observation_list << std::endl; auto errors_list = m_context->error_log->observation_list; + EXPECT_NE(nullptr, errors_list); EXPECT_NE(0, errors_list->num_observations); auto last_error = errors_list->observations[errors_list->num_observations - 1]; EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, last_error.error_code); From b031deaa0fe9d9b1b7030fbe6ece4ecc9def6c9c Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 13 Nov 2023 14:18:49 +0100 Subject: [PATCH 5/7] [C++] Add low storage space for IPC publications. --- .../src/test/c/aeron_ipc_publication_test.cpp | 53 ++++++++++++++++--- .../test/c/aeron_network_publication_test.cpp | 2 +- 2 files changed, 48 insertions(+), 7 deletions(-) diff --git a/aeron-driver/src/test/c/aeron_ipc_publication_test.cpp b/aeron-driver/src/test/c/aeron_ipc_publication_test.cpp index 1817ffc680..64b6c8f2ec 100644 --- a/aeron-driver/src/test/c/aeron_ipc_publication_test.cpp +++ b/aeron-driver/src/test/c/aeron_ipc_publication_test.cpp @@ -48,6 +48,13 @@ class IpcPublicationTest : public testing::Test aeron_system_counters_init(&m_system_counters, &m_counters_manager); + aeron_distinct_error_log_init( + &m_error_log, m_error_log_buffer.data(), m_error_log_buffer.size(), aeron_epoch_clock); + + m_context->error_log = &m_error_log; + m_context->error_buffer = m_error_log_buffer.data(); + m_context->error_buffer_length = m_error_log_buffer.size(); + aeron_driver_ensure_dir_is_recreated(m_context); } @@ -61,6 +68,7 @@ class IpcPublicationTest : public testing::Test aeron_system_counters_close(&m_system_counters); aeron_counters_manager_close(&m_counters_manager); + aeron_distinct_error_log_close(&m_error_log); aeron_driver_context_close(m_context); } @@ -118,18 +126,15 @@ class IpcPublicationTest : public testing::Test return publication; } - static uint64_t noSpaceAvailable(const char* path) - { - return 0; - } - aeron_driver_context_t *m_context = nullptr; private: aeron_clock_cache_t m_cached_clock = {}; aeron_counters_manager_t m_counters_manager = {}; aeron_system_counters_t m_system_counters = {}; + aeron_distinct_error_log_t m_error_log = {}; AERON_DECL_ALIGNED(buffer_t m_counter_value_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_4x_t m_counter_meta_buffer, 16) = {}; + AERON_DECL_ALIGNED(buffer_t m_error_log_buffer, 16) = {}; std::vector m_publications; }; @@ -148,8 +153,44 @@ TEST_F(IpcPublicationTest, shouldCreatePublication) TEST_F(IpcPublicationTest, shouldReturnStorageSpaceErrorIfNotEnoughStorageSpaceAvailable) { - m_context->usable_fs_space_func = noSpaceAvailable; + m_context->usable_fs_space_func = [](const char* path) -> uint64_t + { + return 2049; + }; + m_context->perform_storage_checks = true; + aeron_ipc_publication_t *publication = createPublication("aeron:ipc"); + ASSERT_EQ(nullptr, publication) << aeron_errmsg(); EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, aeron_errcode()); + auto expected_error_text = + std::string("insufficient usable storage for new log of length=4096 usable=2049 in ") + .append(m_context->aeron_dir); + EXPECT_NE(std::string::npos, std::string(aeron_errmsg()).find(expected_error_text)); +} + +TEST_F(IpcPublicationTest, shouldWarnIfRemainingStorageSpaceIsLow) +{ + m_context->usable_fs_space_func = [](const char *path) -> uint64_t + { + return 1000000; + }; + m_context->low_file_store_warning_threshold = 2020202020ULL; + m_context->perform_storage_checks = true; + + aeron_ipc_publication_t *publication = createPublication("aeron:ipc"); + + ASSERT_NE(nullptr, publication) << aeron_errmsg(); + EXPECT_EQ(0, aeron_errcode()); + auto errors_list = m_context->error_log->observation_list; + EXPECT_NE(nullptr, errors_list); + EXPECT_NE(0, errors_list->num_observations); + auto last_error = errors_list->observations[errors_list->num_observations - 1]; + EXPECT_EQ(-AERON_ERROR_CODE_STORAGE_SPACE, last_error.error_code); + auto error_text = std::string(last_error.description); + EXPECT_EQ(error_text.size(), last_error.description_length); + auto expected_warning = + std::string("WARNING: space is running low: threshold=2020202020 usable=1000000 in ") + .append(m_context->aeron_dir); + EXPECT_NE(std::string::npos, error_text.find(expected_warning)); } diff --git a/aeron-driver/src/test/c/aeron_network_publication_test.cpp b/aeron-driver/src/test/c/aeron_network_publication_test.cpp index 60464e755c..f96b675472 100644 --- a/aeron-driver/src/test/c/aeron_network_publication_test.cpp +++ b/aeron-driver/src/test/c/aeron_network_publication_test.cpp @@ -200,9 +200,9 @@ class NetworkPublicationTest : public testing::Test aeron_udp_channel_transport_bindings_t m_transport_bindings = {}; aeron_counters_manager_t m_counters_manager = {}; aeron_system_counters_t m_system_counters = {}; + aeron_distinct_error_log_t m_error_log = {}; AERON_DECL_ALIGNED(buffer_t m_counter_value_buffer, 16) = {}; AERON_DECL_ALIGNED(buffer_4x_t m_counter_meta_buffer, 16) = {}; - aeron_distinct_error_log_t m_error_log = {}; AERON_DECL_ALIGNED(buffer_t m_error_log_buffer, 16) = {}; std::vector m_endpoints; std::vector m_publications; From bb960c0f90a8182ac488c678ab53afd40e7e6799 Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 13 Nov 2023 14:30:12 +0100 Subject: [PATCH 6/7] [C] Fix typo. --- aeron-driver/src/main/c/media/aeron_udp_channel_transport.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c b/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c index cf883d3a4c..a17f2b38bc 100644 --- a/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c +++ b/aeron-driver/src/main/c/media/aeron_udp_channel_transport.c @@ -293,7 +293,7 @@ int aeron_udp_channel_transport_init( { if (aeron_udp_channel_transport_setup_media_rcv_timestamps(transport) < 0) { - AERON_APPEND_ERR("%s", "WARNING, unable to setup media timestamping"); + AERON_APPEND_ERR("%s", "WARNING: unable to setup media timestamping"); aeron_distinct_error_log_record(context->error_log, aeron_errcode(), aeron_errmsg()); aeron_err_clear(); } From 2a450fc5bc9b0f5dcef0ca934b3d33d1c876e468 Mon Sep 17 00:00:00 2001 From: Dmitry Vyazelenko <696855+vyazelenko@users.noreply.github.com> Date: Mon, 13 Nov 2023 15:59:21 +0100 Subject: [PATCH 7/7] [C] Remove `inline` to fix linker error on Windows. --- aeron-driver/src/main/c/aeron_driver_context.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/aeron-driver/src/main/c/aeron_driver_context.c b/aeron-driver/src/main/c/aeron_driver_context.c index 2388206800..e47f95de9c 100644 --- a/aeron-driver/src/main/c/aeron_driver_context.c +++ b/aeron-driver/src/main/c/aeron_driver_context.c @@ -1253,7 +1253,7 @@ static void aeron_driver_context_free_bindings(const aeron_udp_channel_intercept } } -inline int aeron_driver_context_run_storage_checks(aeron_driver_context_t *context, uint64_t log_length) +int aeron_driver_context_run_storage_checks(aeron_driver_context_t *context, uint64_t log_length) { if (context->perform_storage_checks) {