Skip to content

Commit

Permalink
hdfs mgr ut ok
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jan 30, 2025
1 parent 4d790d3 commit 9b4a38f
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 63 deletions.
9 changes: 7 additions & 2 deletions be/src/common/kerberos/kerberos_ticket_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,11 @@ void KerberosTicketMgr::_stop_cleanup_thread() {
}

void KerberosTicketMgr::_cleanup_loop() {
#ifdef BE_TEST
static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose
#else
static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds
#endif
uint64_t last_cleanup_time = std::time(nullptr);

while (!_should_stop_cleanup_thread) {
Expand All @@ -65,8 +69,9 @@ void KerberosTicketMgr::_cleanup_loop() {
// 2. One temporary reference in entry.second.cache
if (entry.second.cache.use_count() == 2) {
LOG(INFO) << "Found unused Kerberos ticket cache for principal: "
<< entry.second.cache->get_config().get_principal()
<< ", keytab: " << entry.second.cache->get_config().get_keytab_path();
<< entry.second.cache->get_config().get_principal()
<< ", keytab: "
<< entry.second.cache->get_config().get_keytab_path();
keys_to_remove.push_back(entry.first);
}
}
Expand Down
8 changes: 3 additions & 5 deletions be/src/common/kerberos/kerberos_ticket_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ class KerberosTicketMgr {
// Get or create a ticket cache for the given Kerberos configuration
// Logic: Checks if cache exists, if not creates new one, initializes it,
// performs login, and starts periodic refresh
virtual Status get_or_set_ticket_cache(const KerberosConfig& config,
std::shared_ptr<KerberosTicketCache>* ticket_cache);
Status get_or_set_ticket_cache(const KerberosConfig& config,
std::shared_ptr<KerberosTicketCache>* ticket_cache);

// Retrieve the cache file path for a given principal and keytab combination
// Logic: Generates key from principal and keytab, looks up in cache map,
Expand All @@ -63,9 +63,7 @@ class KerberosTicketMgr {
const std::string& keytab_path);

// Set the cleanup interval for testing purpose
void set_cleanup_interval(std::chrono::seconds interval) {
_cleanup_interval = interval;
}
void set_cleanup_interval(std::chrono::seconds interval) { _cleanup_interval = interval; }

virtual ~KerberosTicketMgr();

Expand Down
20 changes: 6 additions & 14 deletions be/src/io/fs/hdfs/hdfs_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,17 @@ void HdfsMgr::_stop_cleanup_thread() {
}

void HdfsMgr::_cleanup_loop() {
#ifdef BE_TEST
static constexpr int64_t CHECK_INTERVAL_SECONDS = 1; // For testing purpose
#else
static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds
#endif
uint64_t last_cleanup_time = std::time(nullptr);

while (!_should_stop_cleanup_thread) {
uint64_t current_time = std::time(nullptr);

// Only perform cleanup if enough time has passed
std::cout << "yy debug: current_time: " << current_time << ", last_cleanup_time: " << last_cleanup_time
<< ", _cleanup_interval_seconds: " << _cleanup_interval_seconds << std::endl;
if (current_time - last_cleanup_time >= _cleanup_interval_seconds) {
// Collect expired handlers under lock
std::vector<std::shared_ptr<HdfsHandler>> handlers_to_cleanup;
Expand All @@ -72,7 +74,8 @@ void HdfsMgr::_cleanup_loop() {

// Find expired handlers
for (const auto& entry : _fs_handlers) {
if (current_time - entry.second->last_access_time >= _instance_timeout_seconds) {
if (current_time - entry.second->last_access_time >=
_instance_timeout_seconds) {
LOG(INFO) << "Found expired HDFS handler, hash_code=" << entry.first
<< ", last_access_time=" << entry.second->last_access_time
<< ", is_kerberos=" << entry.second->is_kerberos_auth
Expand Down Expand Up @@ -103,8 +106,6 @@ void HdfsMgr::_cleanup_loop() {

LOG(INFO) << "Finished cleanup HDFS handler"
<< ", fs_name=" << handler->fs_name;
std::cout << "Finished cleanup HDFS handler"
<< ", fs_name=" << handler->fs_name << std::endl;
}

handlers_to_cleanup.clear();
Expand All @@ -120,7 +121,6 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri
std::shared_ptr<HdfsHandler>* fs_handler) {
uint64_t hash_code = _hdfs_hash_code(hdfs_params, fs_name);

std::cout << "yy debug 1" << std::endl;
// First check without lock
{
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -135,15 +135,13 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri
}
}

std::cout << "yy debug 2" << std::endl;
// Create new hdfsFS handler outside the lock
LOG(INFO) << "Start to create new HDFS handler, hash_code=" << hash_code
<< ", fs_name=" << fs_name;

std::shared_ptr<HdfsHandler> new_fs_handler;
RETURN_IF_ERROR(_create_hdfs_fs(hdfs_params, fs_name, &new_fs_handler));

std::cout << "yy debug 3" << std::endl;
// Double check with lock before inserting
{
std::lock_guard<std::mutex> lock(_mutex);
Expand All @@ -167,29 +165,23 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri
<< ", principal=" << new_fs_handler->principal << ", fs_name=" << fs_name;
}

std::cout << "yy debug 4" << std::endl;
return Status::OK();
}

Status HdfsMgr::_create_hdfs_fs_impl(const THdfsParams& hdfs_params, const std::string& fs_name,
std::shared_ptr<HdfsHandler>* fs_handler) {
std::cout << "yy debug xx 1" << std::endl;
HDFSCommonBuilder builder;
RETURN_IF_ERROR(create_hdfs_builder(hdfs_params, fs_name, &builder));
std::cout << "yy debug xx 2" << std::endl;
hdfsFS hdfs_fs = hdfsBuilderConnect(builder.get());
std::cout << "yy debug xx 3" << std::endl;
if (hdfs_fs == nullptr) {
return Status::InternalError("failed to connect to hdfs {}: {}", fs_name, hdfs_error());
}

std::cout << "yy debug xx 4" << std::endl;
bool is_kerberos = builder.is_kerberos();
*fs_handler = std::make_shared<HdfsHandler>(
hdfs_fs, is_kerberos, is_kerberos ? hdfs_params.hdfs_kerberos_principal : "",
is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name,
builder.get_ticket_cache());
std::cout << "yy debug xx 5" << std::endl;
return Status::OK();
}

Expand Down
1 change: 0 additions & 1 deletion be/src/io/hdfs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class HdfsHandler {
~HdfsHandler() {
// The ticket_cache will be automatically released when the last reference is gone
// No need to explicitly cleanup kerberos ticket
std::cout << "zzz debug ~HdfsHandler" << std::endl;
}

void update_access_time() { last_access_time = std::time(nullptr); }
Expand Down
61 changes: 20 additions & 41 deletions be/test/io/fs/hdfs/hdfs_mgr_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,6 @@ class MockKerberosTicketMgr : public kerberos::KerberosTicketMgr {
public:
explicit MockKerberosTicketMgr(const std::string& root_path) : KerberosTicketMgr(root_path) {}

MOCK_METHOD(Status, get_or_set_ticket_cache,
(const kerberos::KerberosConfig& config,
std::shared_ptr<kerberos::KerberosTicketCache>* ticket_cache),
(override));

protected:
std::shared_ptr<kerberos::KerberosTicketCache> _make_new_ticket_cache(
const kerberos::KerberosConfig& config) override {
Expand All @@ -94,7 +89,7 @@ class HdfsMgrTest : public testing::Test {
_hdfs_mgr = std::make_unique<NiceMock<MockHdfsMgr>>();

// Set shorter timeout for testing
_hdfs_mgr->set_instance_timeout_seconds(5);
_hdfs_mgr->set_instance_timeout_seconds(2);
_hdfs_mgr->set_cleanup_interval_seconds(1);

// Reset MockKerberosTicketCache instance count
Expand Down Expand Up @@ -197,7 +192,7 @@ TEST_F(HdfsMgrTest, CleanupExpiredHandlers) {
ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 1);

// Wait for cleanup
std::this_thread::sleep_for(std::chrono::seconds(7));
std::this_thread::sleep_for(std::chrono::seconds(4));

// Handler should be removed
ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 0);
Expand Down Expand Up @@ -243,20 +238,17 @@ TEST_F(HdfsMgrTest, SharedKerberosTicketCache) {

// Create a shared ticket cache that will be used by both handlers
auto shared_ticket_mgr = std::make_shared<NiceMock<MockKerberosTicketMgr>>("/tmp/kerberos");
auto shared_ticket_cache = std::make_shared<MockKerberosTicketCache>(
kerberos::KerberosConfig(), "/tmp/kerberos");

// Setup mock to return the shared ticket cache
ON_CALL(*shared_ticket_mgr, get_or_set_ticket_cache(_, _))
.WillByDefault(DoAll(SetArgPointee<1>(shared_ticket_cache), Return(Status::OK())));
// Set cleanup interval to 1 second for testing
shared_ticket_mgr->set_cleanup_interval(std::chrono::seconds(1));

// Setup mock to create handlers with Kerberos
ON_CALL(*_hdfs_mgr, _create_hdfs_fs_impl(_, _, _))
.WillByDefault([shared_ticket_mgr](const THdfsParams& params, const std::string& fs_name,
std::shared_ptr<HdfsHandler>* fs_handler) {
.WillByDefault([shared_ticket_mgr](const THdfsParams& params,
const std::string& fs_name,
std::shared_ptr<HdfsHandler>* fs_handler) {
kerberos::KerberosConfig config;
config.set_principal_and_keytab(params.hdfs_kerberos_principal,
params.hdfs_kerberos_keytab);
params.hdfs_kerberos_keytab);
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;
RETURN_IF_ERROR(shared_ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache));
*fs_handler = std::make_shared<HdfsHandler>(
Expand All @@ -275,7 +267,7 @@ TEST_F(HdfsMgrTest, SharedKerberosTicketCache) {

// Verify both handlers share the same ticket cache
ASSERT_EQ(handler1->ticket_cache, handler2->ticket_cache);
ASSERT_EQ(handler1->ticket_cache, shared_ticket_cache);
// ASSERT_EQ(handler1->ticket_cache, shared_ticket_cache);
}

// Test cleanup of KerberosTicketCache when handlers are destroyed
Expand All @@ -291,42 +283,31 @@ TEST_F(HdfsMgrTest, KerberosTicketCacheCleanup) {
// Setup mock to create handler with Kerberos
ON_CALL(*_hdfs_mgr, _create_hdfs_fs_impl(_, _, _))
.WillByDefault([ticket_mgr](const THdfsParams& params, const std::string& fs_name,
std::shared_ptr<HdfsHandler>* fs_handler) {
std::shared_ptr<HdfsHandler>* fs_handler) {
kerberos::KerberosConfig config;
config.set_principal_and_keytab(params.hdfs_kerberos_principal,
params.hdfs_kerberos_keytab);
params.hdfs_kerberos_keytab);
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;

// Create a new ticket cache for each call
ON_CALL(*ticket_mgr, get_or_set_ticket_cache(_, _))
.WillByDefault(DoAll(
SetArgPointee<1>(std::make_shared<MockKerberosTicketCache>(config, "/tmp/kerberos")),
Return(Status::OK())));

std::cout << "uuu debug1: " << ticket_cache.use_count() << std::endl;
RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache));
std::cout << "uuu debug2: " << ticket_cache.use_count() << std::endl;
*fs_handler = std::make_shared<HdfsHandler>(
nullptr, true, params.hdfs_kerberos_principal, params.hdfs_kerberos_keytab,
fs_name, ticket_cache);
std::cout << "uuu debug3: " << ticket_cache.use_count() << std::endl;
return Status::OK();
});

// Create handler
ASSERT_TRUE(_hdfs_mgr->get_or_create_fs(params, "test_fs", &handler).ok());
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache_holder = handler->ticket_cache;
std::cout << "xxx debug1: " << handler.use_count() << ", " << ticket_cache_holder.use_count() << std::endl;
handler.reset();
std::cout << "xxx debug2: " << handler.use_count() << ", " << ticket_cache_holder.use_count() << std::endl;
ASSERT_EQ(MockKerberosTicketCache::get_instance_count(), 1);

// Wait for cleanup
std::this_thread::sleep_for(std::chrono::seconds(7));
std::cout << "xxx debug3: " << handler.use_count() << ", " << ticket_cache_holder.use_count() << std::endl;
std::this_thread::sleep_for(std::chrono::seconds(6));

// Verify handler and ticket cache are cleaned up
ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 0);
ticket_cache_holder.reset();
ASSERT_EQ(MockKerberosTicketCache::get_instance_count(), 0);
}

Expand All @@ -345,18 +326,12 @@ TEST_F(HdfsMgrTest, DifferentKerberosCredentials) {
// Setup mock to create handlers with Kerberos
ON_CALL(*_hdfs_mgr, _create_hdfs_fs_impl(_, _, _))
.WillByDefault([ticket_mgr](const THdfsParams& params, const std::string& fs_name,
std::shared_ptr<HdfsHandler>* fs_handler) {
std::shared_ptr<HdfsHandler>* fs_handler) {
kerberos::KerberosConfig config;
config.set_principal_and_keytab(params.hdfs_kerberos_principal,
params.hdfs_kerberos_keytab);
params.hdfs_kerberos_keytab);
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;

// Create a new ticket cache for each unique principal/keytab combination
ON_CALL(*ticket_mgr, get_or_set_ticket_cache(_, _))
.WillByDefault(DoAll(
SetArgPointee<1>(std::make_shared<MockKerberosTicketCache>(config, "/tmp/kerberos")),
Return(Status::OK())));

RETURN_IF_ERROR(ticket_mgr->get_or_set_ticket_cache(config, &ticket_cache));
*fs_handler = std::make_shared<HdfsHandler>(
nullptr, true, params.hdfs_kerberos_principal, params.hdfs_kerberos_keytab,
Expand All @@ -373,10 +348,14 @@ TEST_F(HdfsMgrTest, DifferentKerberosCredentials) {
ASSERT_NE(handler1->ticket_cache, handler2->ticket_cache);

// Wait for cleanup
std::this_thread::sleep_for(std::chrono::seconds(7));
std::this_thread::sleep_for(std::chrono::seconds(6));

// Verify all handlers and ticket caches are cleaned up
ASSERT_EQ(_hdfs_mgr->get_fs_handlers_size(), 0);

// Also need to reset this 2 temp references
handler1.reset();
handler2.reset();
ASSERT_EQ(MockKerberosTicketCache::get_instance_count(), 0);
}

Expand Down

0 comments on commit 9b4a38f

Please sign in to comment.