diff --git a/be/src/common/kerberos/kerberos_ticket_mgr.cpp b/be/src/common/kerberos/kerberos_ticket_mgr.cpp index d120d158fe1517..2f721de29db0cf 100644 --- a/be/src/common/kerberos/kerberos_ticket_mgr.cpp +++ b/be/src/common/kerberos/kerberos_ticket_mgr.cpp @@ -47,7 +47,11 @@ void KerberosTicketMgr::_stop_cleanup_thread() { } void KerberosTicketMgr::_cleanup_loop() { +#ifdef BE_TEST + static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose +#else static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds +#endif uint64_t last_cleanup_time = std::time(nullptr); while (!_should_stop_cleanup_thread) { @@ -65,8 +69,9 @@ void KerberosTicketMgr::_cleanup_loop() { // 2. One temporary reference in entry.second.cache if (entry.second.cache.use_count() == 2) { LOG(INFO) << "Found unused Kerberos ticket cache for principal: " - << entry.second.cache->get_config().get_principal() - << ", keytab: " << entry.second.cache->get_config().get_keytab_path(); + << entry.second.cache->get_config().get_principal() + << ", keytab: " + << entry.second.cache->get_config().get_keytab_path(); keys_to_remove.push_back(entry.first); } } diff --git a/be/src/common/kerberos/kerberos_ticket_mgr.h b/be/src/common/kerberos/kerberos_ticket_mgr.h index d5271df9743755..ed0fedbb7063cc 100644 --- a/be/src/common/kerberos/kerberos_ticket_mgr.h +++ b/be/src/common/kerberos/kerberos_ticket_mgr.h @@ -47,8 +47,8 @@ class KerberosTicketMgr { // Get or create a ticket cache for the given Kerberos configuration // Logic: Checks if cache exists, if not creates new one, initializes it, // performs login, and starts periodic refresh - virtual Status get_or_set_ticket_cache(const KerberosConfig& config, - std::shared_ptr* ticket_cache); + Status get_or_set_ticket_cache(const KerberosConfig& config, + std::shared_ptr* ticket_cache); // Retrieve the cache file path for a given principal and keytab combination // Logic: Generates key from principal and keytab, looks up in cache map, @@ -63,9 +63,7 @@ class KerberosTicketMgr { const std::string& keytab_path); // Set the cleanup interval for testing purpose - void set_cleanup_interval(std::chrono::seconds interval) { - _cleanup_interval = interval; - } + void set_cleanup_interval(std::chrono::seconds interval) { _cleanup_interval = interval; } virtual ~KerberosTicketMgr(); diff --git a/be/src/io/fs/hdfs/hdfs_mgr.cpp b/be/src/io/fs/hdfs/hdfs_mgr.cpp index a4dc05a0f7248c..d5cde499a5d46e 100644 --- a/be/src/io/fs/hdfs/hdfs_mgr.cpp +++ b/be/src/io/fs/hdfs/hdfs_mgr.cpp @@ -54,15 +54,17 @@ void HdfsMgr::_stop_cleanup_thread() { } void HdfsMgr::_cleanup_loop() { +#ifdef BE_TEST + static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose +#else static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds +#endif uint64_t last_cleanup_time = std::time(nullptr); while (!_should_stop_cleanup_thread) { uint64_t current_time = std::time(nullptr); // Only perform cleanup if enough time has passed - std::cout << "yy debug: current_time: " << current_time << ", last_cleanup_time: " << last_cleanup_time - << ", _cleanup_interval_seconds: " << _cleanup_interval_seconds << std::endl; if (current_time - last_cleanup_time >= _cleanup_interval_seconds) { // Collect expired handlers under lock std::vector> handlers_to_cleanup; @@ -72,7 +74,8 @@ void HdfsMgr::_cleanup_loop() { // Find expired handlers for (const auto& entry : _fs_handlers) { - if (current_time - entry.second->last_access_time >= _instance_timeout_seconds) { + if (current_time - entry.second->last_access_time >= + _instance_timeout_seconds) { LOG(INFO) << "Found expired HDFS handler, hash_code=" << entry.first << ", last_access_time=" << entry.second->last_access_time << ", is_kerberos=" << entry.second->is_kerberos_auth @@ -103,8 +106,6 @@ void HdfsMgr::_cleanup_loop() { LOG(INFO) << "Finished cleanup HDFS handler" << ", fs_name=" << handler->fs_name; - std::cout << "Finished cleanup HDFS handler" - << ", fs_name=" << handler->fs_name << std::endl; } handlers_to_cleanup.clear(); @@ -120,7 +121,6 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri std::shared_ptr* fs_handler) { uint64_t hash_code = _hdfs_hash_code(hdfs_params, fs_name); - std::cout << "yy debug 1" << std::endl; // First check without lock { std::lock_guard lock(_mutex); @@ -135,7 +135,6 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri } } - std::cout << "yy debug 2" << std::endl; // Create new hdfsFS handler outside the lock LOG(INFO) << "Start to create new HDFS handler, hash_code=" << hash_code << ", fs_name=" << fs_name; @@ -143,7 +142,6 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri std::shared_ptr new_fs_handler; RETURN_IF_ERROR(_create_hdfs_fs(hdfs_params, fs_name, &new_fs_handler)); - std::cout << "yy debug 3" << std::endl; // Double check with lock before inserting { std::lock_guard lock(_mutex); @@ -167,29 +165,23 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri << ", principal=" << new_fs_handler->principal << ", fs_name=" << fs_name; } - std::cout << "yy debug 4" << std::endl; return Status::OK(); } Status HdfsMgr::_create_hdfs_fs_impl(const THdfsParams& hdfs_params, const std::string& fs_name, std::shared_ptr* fs_handler) { - std::cout << "yy debug xx 1" << std::endl; HDFSCommonBuilder builder; RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder)); - std::cout << "yy debug xx 2" << std::endl; hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get()); - std::cout << "yy debug xx 3" << std::endl; if (hdfs_fs == nullptr) { return Status::InternalError("failed to connect to hdfs {}: {}", fs_name, hdfs_error()); } - std::cout << "yy debug xx 4" << std::endl; bool is_kerberos = builder.is_kerberos(); *fs_handler = std::make_shared( hdfs_fs, is_kerberos, is_kerberos ? hdfs_params.hdfs_kerberos_principal : "", is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name, builder.get_ticket_cache()); - std::cout << "yy debug xx 5" << std::endl; return Status::OK(); } diff --git a/be/src/io/hdfs_util.h b/be/src/io/hdfs_util.h index 687503805b49d1..a492d95faaea00 100644 --- a/be/src/io/hdfs_util.h +++ b/be/src/io/hdfs_util.h @@ -64,7 +64,6 @@ class HdfsHandler { ~HdfsHandler() { // The ticket_cache will be automatically released when the last reference is gone // No need to explicitly cleanup kerberos ticket - std::cout << "zzz debug ~HdfsHandler" << std::endl; } void update_access_time() { last_access_time = std::time(nullptr); } diff --git a/be/test/io/fs/hdfs/hdfs_mgr_test.cpp b/be/test/io/fs/hdfs/hdfs_mgr_test.cpp index e5cab4bccbc2ef..83f31b5b7ceec8 100644 --- a/be/test/io/fs/hdfs/hdfs_mgr_test.cpp +++ b/be/test/io/fs/hdfs/hdfs_mgr_test.cpp @@ -76,11 +76,6 @@ class MockKerberosTicketMgr : public kerberos::KerberosTicketMgr { public: explicit MockKerberosTicketMgr(const std::string& root_path) : KerberosTicketMgr(root_path) {} - MOCK_METHOD(Status, get_or_set_ticket_cache, - (const kerberos::KerberosConfig& config, - std::shared_ptr* ticket_cache), - (override)); - protected: std::shared_ptr _make_new_ticket_cache( const kerberos::KerberosConfig& config) override { @@ -94,7 +89,7 @@ class HdfsMgrTest : public testing::Test { _hdfs_mgr = std::make_unique>(); // Set shorter timeout for testing - _hdfs_mgr->set_instance_timeout_seconds(5); + _hdfs_mgr->set_instance_timeout_seconds(2); _hdfs_mgr->set_cleanup_interval_seconds(1); // Reset MockKerberosTicketCache instance count @@ -197,7 +192,7 @@ TEST_F(HdfsMgrTest, CleanupExpiredHandlers) { ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 1); // Wait for cleanup - std::this_thread::sleep_for(std::chrono::seconds(7)); + std::this_thread::sleep_for(std::chrono::seconds(4)); // Handler should be removed ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 0); @@ -243,20 +238,17 @@ TEST_F(HdfsMgrTest, SharedKerberosTicketCache) { // Create a shared ticket cache that will be used by both handlers auto shared_ticket_mgr = std::make_shared>("/tmp/kerberos"); - auto shared_ticket_cache = std::make_shared( - kerberos::KerberosConfig(), "/tmp/kerberos"); - - // Setup mock to return the shared ticket cache - ON_CALL(*shared_ticket_mgr, get_or_set_ticket_cache(_, _)) - .WillByDefault(DoAll(SetArgPointee<1>(shared_ticket_cache), Return(Status::OK()))); + // Set cleanup interval to 1 second for testing + shared_ticket_mgr->set_cleanup_interval(std::chrono::seconds(1)); // Setup mock to create handlers with Kerberos ON_CALL(*_hdfs_mgr, _create_hdfs_fs_impl(_, _, _)) - .WillByDefault([shared_ticket_mgr](const THdfsParams& params, const std::string& fs_name, - std::shared_ptr* fs_handler) { + .WillByDefault([shared_ticket_mgr](const THdfsParams& params, + const std::string& fs_name, + std::shared_ptr* fs_handler) { kerberos::KerberosConfig config; config.set_principal_and_keytab(params.hdfs_kerberos_principal, - params.hdfs_kerberos_keytab); + params.hdfs_kerberos_keytab); std::shared_ptr ticket_cache; RETURN_IF_ERROR(shared_ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache)); *fs_handler = std::make_shared( @@ -275,7 +267,7 @@ TEST_F(HdfsMgrTest, SharedKerberosTicketCache) { // Verify both handlers share the same ticket cache ASSERT_EQ(handler1->ticket_cache, handler2->ticket_cache); - ASSERT_EQ(handler1->ticket_cache, shared_ticket_cache); + // ASSERT_EQ(handler1->ticket_cache, shared_ticket_cache); } // Test cleanup of KerberosTicketCache when handlers are destroyed @@ -291,42 +283,31 @@ TEST_F(HdfsMgrTest, KerberosTicketCacheCleanup) { // Setup mock to create handler with Kerberos ON_CALL(*_hdfs_mgr, _create_hdfs_fs_impl(_, _, _)) .WillByDefault([ticket_mgr](const THdfsParams& params, const std::string& fs_name, - std::shared_ptr* fs_handler) { + std::shared_ptr* fs_handler) { kerberos::KerberosConfig config; config.set_principal_and_keytab(params.hdfs_kerberos_principal, - params.hdfs_kerberos_keytab); + params.hdfs_kerberos_keytab); std::shared_ptr ticket_cache; - // Create a new ticket cache for each call - ON_CALL(*ticket_mgr, get_or_set_ticket_cache(_, _)) - .WillByDefault(DoAll( - SetArgPointee<1>(std::make_shared(config, "/tmp/kerberos")), - Return(Status::OK()))); - - std::cout << "uuu debug1: " << ticket_cache.use_count() << std::endl; RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache)); - std::cout << "uuu debug2: " << ticket_cache.use_count() << std::endl; *fs_handler = std::make_shared( nullptr, true, params.hdfs_kerberos_principal, params.hdfs_kerberos_keytab, fs_name, ticket_cache); - std::cout << "uuu debug3: " << ticket_cache.use_count() << std::endl; return Status::OK(); }); // Create handler ASSERT_TRUE(_hdfs_mgr->get_or_create_fs(params, "test_fs", &handler).ok()); std::shared_ptr ticket_cache_holder = handler->ticket_cache; - std::cout << "xxx debug1: " << handler.use_count() << ", " << ticket_cache_holder.use_count() << std::endl; handler.reset(); - std::cout << "xxx debug2: " << handler.use_count() << ", " << ticket_cache_holder.use_count() << std::endl; ASSERT_EQ(MockKerberosTicketCache::get_instance_count(), 1); // Wait for cleanup - std::this_thread::sleep_for(std::chrono::seconds(7)); - std::cout << "xxx debug3: " << handler.use_count() << ", " << ticket_cache_holder.use_count() << std::endl; + std::this_thread::sleep_for(std::chrono::seconds(6)); // Verify handler and ticket cache are cleaned up ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 0); + ticket_cache_holder.reset(); ASSERT_EQ(MockKerberosTicketCache::get_instance_count(), 0); } @@ -345,18 +326,12 @@ TEST_F(HdfsMgrTest, DifferentKerberosCredentials) { // Setup mock to create handlers with Kerberos ON_CALL(*_hdfs_mgr, _create_hdfs_fs_impl(_, _, _)) .WillByDefault([ticket_mgr](const THdfsParams& params, const std::string& fs_name, - std::shared_ptr* fs_handler) { + std::shared_ptr* fs_handler) { kerberos::KerberosConfig config; config.set_principal_and_keytab(params.hdfs_kerberos_principal, - params.hdfs_kerberos_keytab); + params.hdfs_kerberos_keytab); std::shared_ptr ticket_cache; - // Create a new ticket cache for each unique principal/keytab combination - ON_CALL(*ticket_mgr, get_or_set_ticket_cache(_, _)) - .WillByDefault(DoAll( - SetArgPointee<1>(std::make_shared(config, "/tmp/kerberos")), - Return(Status::OK()))); - RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache)); *fs_handler = std::make_shared( nullptr, true, params.hdfs_kerberos_principal, params.hdfs_kerberos_keytab, @@ -373,10 +348,14 @@ TEST_F(HdfsMgrTest, DifferentKerberosCredentials) { ASSERT_NE(handler1->ticket_cache, handler2->ticket_cache); // Wait for cleanup - std::this_thread::sleep_for(std::chrono::seconds(7)); + std::this_thread::sleep_for(std::chrono::seconds(6)); // Verify all handlers and ticket caches are cleaned up ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 0); + + // Also need to reset this 2 temp references + handler1.reset(); + handler2.reset(); ASSERT_EQ(MockKerberosTicketCache::get_instance_count(), 0); }