Skip to content

Commit

Permalink
auto remove cache
Browse files Browse the repository at this point in the history
  • Loading branch information
morningman committed Jan 29, 2025
1 parent de67ad8 commit 43c5017
Show file tree
Hide file tree
Showing 4 changed files with 91 additions and 79 deletions.
36 changes: 15 additions & 21 deletions be/src/common/kerberos/kerberos_ticket_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ Status KerberosTicketMgr::get_or_set_ticket_cache(const KerberosConfig& config,
RETURN_IF_ERROR(ticket_cache->write_ticket_cache());
ticket_cache->start_periodic_refresh();

// Add to map
KerberosTicketEntry entry {.cache = std::move(ticket_cache)};
// Convert to shared_ptr and add to map
auto shared_cache = std::shared_ptr<KerberosTicketCache>(ticket_cache.release());
KerberosTicketEntry entry {.cache = shared_cache};

auto [inserted_it, success] = _ticket_caches.emplace(key, std::move(entry));
if (!success) {
Expand All @@ -72,6 +73,18 @@ Status KerberosTicketMgr::get_or_set_ticket_cache(const KerberosConfig& config,
return Status::OK();
}

std::shared_ptr<KerberosTicketCache> KerberosTicketMgr::get_ticket_cache(
const std::string& principal, const std::string& keytab_path) {
std::string key = _generate_key(principal, keytab_path);

std::lock_guard<std::mutex> lock(_mutex);
auto it = _ticket_caches.find(key);
if (it != _ticket_caches.end()) {
return it->second.cache;
}
return nullptr;
}

std::unique_ptr<KerberosTicketCache> KerberosTicketMgr::_make_new_ticket_cache(
const KerberosConfig& config) {
return std::make_unique<KerberosTicketCache>(config, _root_path);
Expand All @@ -93,23 +106,4 @@ Status KerberosTicketMgr::get_cache_file_path(const std::string& principal,
return Status::OK();
}

// TODO: 1. reference
// TODO: 2. add ut
Status KerberosTicketMgr::remove_ticket_cache(const std::string& principal,
const std::string& keytab_path) {
std::string key = _generate_key(principal, keytab_path);

std::lock_guard<std::mutex> lock(_mutex);

auto it = _ticket_caches.find(key);
if (it == _ticket_caches.end()) {
return Status::NotFound("Kerberos ticket cache not found for principal: " + principal);
}

// Remove the ticket cache
_ticket_caches.erase(it);
LOG(INFO) << "Removed kerberos ticket cache for principal: " << principal;
return Status::OK();
}

} // namespace doris::kerberos
6 changes: 5 additions & 1 deletion be/src/common/kerberos/kerberos_ticket_mgr.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ namespace doris::kerberos {

// Structure to hold a ticket cache instance and its last access time
struct KerberosTicketEntry {
std::unique_ptr<KerberosTicketCache> cache;
std::shared_ptr<KerberosTicketCache> cache;
std::chrono::steady_clock::time_point last_access_time;
};

Expand All @@ -57,6 +57,10 @@ class KerberosTicketMgr {

Status remove_ticket_cache(const std::string& principal, const std::string& keytab_path);

// Get the ticket cache object. This is used by HdfsHandler to hold a reference
std::shared_ptr<KerberosTicketCache> get_ticket_cache(const std::string& principal,
const std::string& keytab_path);

virtual ~KerberosTicketMgr();

protected:
Expand Down
99 changes: 52 additions & 47 deletions be/src/io/fs/hdfs/hdfs_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,20 +53,6 @@ void HdfsMgr::_stop_cleanup_thread() {
}
}

void HdfsMgr::_cleanup_kerberos_ticket(const HdfsHandler& handler) {
if (!handler.is_kerberos_auth) {
return;
}
// Remove ticket cache from KerberosTicketMgr
auto* ticket_mgr = ExecEnv::GetInstance()->kerberos_ticket_mgr();
std::string cache_path;
Status st = ticket_mgr->remove_ticket_cache(handler.principal, handler.keytab_path);
if (!st.ok()) {
LOG(WARNING) << "Failed to remove kerberos ticket cache for principal: "
<< handler.principal << ", error: " << st;
}
}

void HdfsMgr::_cleanup_loop() {
static constexpr int64_t CHECK_INTERVAL_SECONDS = 5; // Check stop flag every 5 seconds
uint64_t last_cleanup_time = std::time(nullptr);
Expand All @@ -76,12 +62,13 @@ void HdfsMgr::_cleanup_loop() {

// Only perform cleanup if enough time has passed
if (current_time - last_cleanup_time >= CLEANUP_INTERVAL_SECONDS) {
// Collect expired handlers under lock
std::vector<std::shared_ptr<HdfsHandler>> handlers_to_cleanup;
{
std::lock_guard<std::mutex> lock(_mutex);

// TODO: outside lock
// Collect handlers to remove
std::vector<uint64_t> to_remove;

// Find expired handlers
for (const auto& entry : _fs_handlers) {
if (current_time - entry.second->last_access_time >= INSTANCE_TIMEOUT_SECONDS) {
LOG(INFO) << "Found expired HDFS handler, hash_code=" << entry.first
Expand All @@ -90,30 +77,32 @@ void HdfsMgr::_cleanup_loop() {
<< ", principal=" << entry.second->principal
<< ", fs_name=" << entry.second->fs_name;
to_remove.push_back(entry.first);
handlers_to_cleanup.push_back(entry.second);
}
}

// Remove expired handlers
// Remove expired handlers from map under lock
for (uint64_t hash_code : to_remove) {
auto it = _fs_handlers.find(hash_code);
if (it != _fs_handlers.end()) {
LOG(INFO) << "Start to cleanup HDFS handler, hash_code=" << hash_code
<< ", is_kerberos=" << it->second->is_kerberos_auth
<< ", principal=" << it->second->principal
<< ", fs_name=" << it->second->fs_name;

// Clean up kerberos ticket if necessary
_cleanup_kerberos_ticket(*it->second);
// DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed"
// even if we create a new one
// hdfsDisconnect(it->second->hdfs_fs);
_fs_handlers.erase(it);

LOG(INFO) << "Finished cleanup HDFS handler, hash_code=" << hash_code
<< ", fs_name=" << it->second->fs_name;
}
_fs_handlers.erase(hash_code);
}
}

// Cleanup handlers outside lock
for (const auto& handler : handlers_to_cleanup) {
LOG(INFO) << "Start to cleanup HDFS handler"
<< ", is_kerberos=" << handler->is_kerberos_auth
<< ", principal=" << handler->principal
<< ", fs_name=" << handler->fs_name;

// The kerberos ticket cache will be automatically cleaned up when the last reference is gone
// DO NOT call hdfsDisconnect(), or we will meet "Filesystem closed"
// even if we create a new one
// hdfsDisconnect(handler->hdfs_fs);

LOG(INFO) << "Finished cleanup HDFS handler"
<< ", fs_name=" << handler->fs_name;
}

last_cleanup_time = current_time;
}

Expand All @@ -126,6 +115,7 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri
std::shared_ptr<HdfsHandler>* fs_handler) {
uint64_t hash_code = _hdfs_hash_code(hdfs_params, fs_name);

// First check without lock
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _fs_handlers.find(hash_code);
Expand All @@ -137,22 +127,37 @@ Status HdfsMgr::get_or_create_fs(const THdfsParams& hdfs_params, const std::stri
*fs_handler = it->second;
return Status::OK();
}
}

LOG(INFO) << "Start to create new HDFS handler, hash_code=" << hash_code
<< ", fs_name=" << fs_name;
// Create new hdfsFS handler outside the lock
LOG(INFO) << "Start to create new HDFS handler, hash_code=" << hash_code
<< ", fs_name=" << fs_name;

hdfsFS new_fs = nullptr;
bool is_kerberos = hdfs_params.__isset.hdfs_kerberos_principal;
RETURN_IF_ERROR(create_hdfs_fs(hdfs_params, fs_name, &new_fs));

// TODO: outside lock
// Create new hdfsFS handler
hdfsFS new_fs = nullptr;
bool is_kerberos = hdfs_params.__isset.hdfs_kerberos_principal;
RETURN_IF_ERROR(create_hdfs_fs(hdfs_params, fs_name, &new_fs));
auto new_handler = std::make_shared<HdfsHandler>(
new_fs, is_kerberos, is_kerberos ? hdfs_params.hdfs_kerberos_principal : "",
is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name);

// Double check with lock before inserting
{
std::lock_guard<std::mutex> lock(_mutex);
auto it = _fs_handlers.find(hash_code);
if (it != _fs_handlers.end()) {
// Another thread has created the handler, use it instead
LOG(INFO) << "Another thread created HDFS handler, reuse it, hash_code=" << hash_code
<< ", is_kerberos=" << it->second->is_kerberos_auth
<< ", principal=" << it->second->principal << ", fs_name=" << fs_name;
it->second->update_access_time();
*fs_handler = it->second;
return Status::OK();
}

// Store the new handler
auto handler = std::make_shared<HdfsHandler>(
new_fs, is_kerberos, is_kerberos ? hdfs_params.hdfs_kerberos_principal : "",
is_kerberos ? hdfs_params.hdfs_kerberos_keytab : "", fs_name);
*fs_handler = handler;
_fs_handlers[hash_code] = std::move(handler);
*fs_handler = new_handler;
_fs_handlers[hash_code] = new_handler;

LOG(INFO) << "Finished create new HDFS handler, hash_code=" << hash_code
<< ", is_kerberos=" << is_kerberos
Expand Down
29 changes: 19 additions & 10 deletions be/src/io/hdfs_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,25 +39,34 @@ class THdfsParams;

namespace io {

class HdfsHandler {
public:
struct HdfsHandler {
hdfsFS hdfs_fs;
uint64_t last_access_time;
bool is_kerberos_auth;
std::string principal;
std::string keytab_path;
std::string fs_name;
std::atomic<uint64_t> last_access_time;
std::shared_ptr<kerberos::KerberosTicketCache> ticket_cache;

HdfsHandler(hdfsFS fs, bool is_kerberos = false, const std::string& principal = "",
const std::string& keytab = "", const std::string& fs_name = "")
HdfsHandler(hdfsFS fs, bool is_kerberos, const std::string& principal_,
const std::string& keytab_path_, const std::string& fs_name_)
: hdfs_fs(fs),
last_access_time(0),
is_kerberos_auth(is_kerberos),
principal(principal),
keytab_path(keytab),
fs_name(fs_name) {
update_access_time();
principal(principal_),
keytab_path(keytab_path_),
fs_name(fs_name_),
last_access_time(std::time(nullptr)) {
if (is_kerberos_auth) {
auto* ticket_mgr = ExecEnv::GetInstance()->kerberos_ticket_mgr();
ticket_cache = ticket_mgr->get_ticket_cache(principal, keytab_path);
}
}

~HdfsHandler() {
// The ticket_cache will be automatically released when the last reference is gone
// No need to explicitly cleanup kerberos ticket
}

void update_access_time() { last_access_time = std::time(nullptr); }
};

Expand Down

0 comments on commit 43c5017

Please sign in to comment.