Skip to content

Commit

Permalink
Refactored MySQL connector class, fixed bugs
Browse files Browse the repository at this point in the history
Eliminated duplicate data members. Reinforced anf refined the public API
and the implementation.
  • Loading branch information
iagaponenko committed Nov 7, 2023
1 parent 0d1e943 commit c423e02
Show file tree
Hide file tree
Showing 2 changed files with 229 additions and 177 deletions.
258 changes: 128 additions & 130 deletions src/mysql/MySqlConnection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

// System headers
#include <cstddef>
#include <stdexcept>
#include <sstream>

// Third-party headers
Expand All @@ -44,9 +45,6 @@ namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.mysql.MySqlConnection");

} // namespace

namespace {
// A class that calls mysql_thread_end when an instance is destroyed.
struct MySqlThreadJanitor {
~MySqlThreadJanitor() { mysql_thread_end(); }
Expand All @@ -67,130 +65,126 @@ struct InitializeMysqlLibrary {
janitor.reset(new MySqlThreadJanitor);
}
};
} // namespace

namespace lsst::qserv::mysql {
/**
* Establish a new MySQL connection.
* @param config Parameters of the connection.
* @return A pointer to the MySQL connection or nullptr.
*/
MYSQL* doConnect(std::shared_ptr<lsst::qserv::mysql::MySqlConfig> const& config) {
// We must call mysql_library_init() exactly once before calling mysql_init
// because it is not thread safe. Both mysql_library_init and mysql_init
// call mysql_thread_init, and so we must arrange to call mysql_thread_end
// when the calling thread exists. We do this by allocating a thread
// local object that calls mysql_thread_end from its destructor.
static std::once_flag initialized;
static thread_local std::unique_ptr<MySqlThreadJanitor> janitor;

MySqlConnection::MySqlConnection()
: _mysql(nullptr),
_mysql_res(nullptr),
_isConnected(false),
_isExecuting(false),
_interrupted(false) {}

MySqlConnection::MySqlConnection(MySqlConfig const& sqlConfig)
: _mysql(nullptr),
_mysql_res(nullptr),
_isConnected(false),
_sqlConfig(std::make_shared<MySqlConfig>(sqlConfig)),
_isExecuting(false),
_interrupted(false) {}

MySqlConnection::~MySqlConnection() {
if (_mysql) {
if (_mysql_res) {
MYSQL_ROW row;
while ((row = mysql_fetch_row(_mysql_res)))
; // Drain results.
_mysql_res = nullptr;
}
closeMySqlConn();
std::call_once(initialized, InitializeMysqlLibrary(janitor));
MYSQL* m = mysql_init(nullptr);
if (nullptr == m) return m;
if (nullptr == janitor) janitor.reset(new MySqlThreadJanitor);
unsigned long const clientFlag = CLIENT_MULTI_STATEMENTS;
mysql_options(m, MYSQL_OPT_LOCAL_INFILE, 0);
MYSQL* c = mysql_real_connect(m, config->socket.empty() ? config->hostname.c_str() : 0,
config->username.empty() ? 0 : config->username.c_str(),
config->password.empty() ? 0 : config->password.c_str(),
config->dbName.empty() ? 0 : config->dbName.c_str(), config->port,
config->socket.empty() ? 0 : config->socket.c_str(), clientFlag);
if (nullptr == c) {
// Failed to connect: free resources.
mysql_close(m);
return c;
}
return m;
}

bool MySqlConnection::checkConnection(mysql::MySqlConfig const& mysqlconfig) {
MySqlConnection conn(mysqlconfig);
} // namespace

namespace lsst::qserv::mysql {

bool MySqlConnection::checkConnection(mysql::MySqlConfig const& config) {
MySqlConnection conn(config);
if (conn.connect()) {
LOGS(_log, LOG_LVL_DEBUG, "Successful MySQL connection check: " << mysqlconfig);
LOGS(_log, LOG_LVL_DEBUG, "Successful MySQL connection check: " << config);
return true;
} else {
LOGS(_log, LOG_LVL_WARN, "Unsuccessful MySQL connection check: " << mysqlconfig);
LOGS(_log, LOG_LVL_WARN, "Unsuccessful MySQL connection check: " << config);
return false;
}
}

void MySqlConnection::closeMySqlConn() {
// Close mysql connection and set deallocated pointer to null
mysql_close(_mysql);
_mysql = nullptr;
}
MySqlConnection::MySqlConnection(MySqlConfig const& config)
: _config(std::make_shared<MySqlConfig>(config)), _mysql(nullptr), _mysql_res(nullptr) {}

MySqlConnection::~MySqlConnection() { _closeMySqlConnImpl(std::lock_guard<std::mutex>(_mtx)); }

void MySqlConnection::closeMySqlConn() { _closeMySqlConnImpl(std::lock_guard<std::mutex>(_mtx)); }

bool MySqlConnection::connect() {
// Cleanup garbage
if (_mysql != nullptr) {
closeMySqlConn();
std::lock_guard<std::mutex> const lock(_mtx);
_closeMySqlConnImpl(lock);
_mysql = ::doConnect(_config);
if (nullptr != _mysql) {
_threadId = mysql_thread_id(_mysql);
return true;
}
_isConnected = false;
// Make myself a thread
_mysql = _connectHelper();
_isConnected = (_mysql != nullptr);
return _isConnected;
return false;
}

bool MySqlConnection::queryUnbuffered(std::string const& query) {
// run query, store into list.
int rc;
{
std::lock_guard<std::mutex> lock(_interruptMutex);
_isExecuting = true;
_interrupted = false;
}
rc = mysql_real_query(_mysql, query.c_str(), query.length());
if (rc) {
return false;
}
std::lock_guard<std::mutex> lock(_mtx);
if (_mysql == nullptr) return false;
int const rc = mysql_real_query(_mysql, query.c_str(), query.length());
if (rc) return false;
_mysql_res = mysql_use_result(_mysql);
_isExecuting = false;
if (!_mysql_res) {
return false;
}
if (nullptr == _mysql_res) return false;
return true;
}

/// Cancel existing query
/// @return 0 on success.
/// 1 indicates error in connecting. (may try again)
/// 2 indicates error executing kill query. (do not try again)
/// -1 indicates NOP: No query in progress or query already interrupted.
int MySqlConnection::cancel() {
std::lock_guard<std::mutex> lock(_interruptMutex);
int rc;
if (_interrupted) {
// Should we log this?
return -1; // No further action needed.
}
_interrupted = true; // Prevent others from trying to interrupt
MYSQL* killMysql = _connectHelper();
if (!killMysql) {
_interrupted = false; // Didn't try
return 1;
// Handle broken connection
}
// KILL QUERY only, not KILL CONNECTION.
int threadId = mysql_thread_id(_mysql);
MySqlConnection::CancelStatus MySqlConnection::cancel() {
unsigned int const threadId = _threadId.load();
if (!(connected() && (0 != threadId))) return CancelStatus::CANCEL_NOP;
MYSQL* killMysql = ::doConnect(_config);
if (nullptr == killMysql) return CancelStatus::CANCEL_CONNECT_ERROR;
std::string const killSql = "KILL QUERY " + std::to_string(threadId);
rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size());
int const rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size());
mysql_close(killMysql);
if (rc) {
LOGS(_log, LOG_LVL_WARN,
"failed to kill MySQL thread: " << threadId << ", error: " << std::string(mysql_error(killMysql))
<< ", errno: " << std::to_string(mysql_errno(killMysql)));
return 2;
return CancelStatus::CANCEL_FAILED;
}
return 0;
return CancelStatus::CANCEL_SUCCESS;
}

bool MySqlConnection::selectDb(std::string const& dbName) {
if (!dbName.empty() && mysql_select_db(_mysql, dbName.c_str())) {
return false;
}
_sqlConfig->dbName = dbName;
return true;
MYSQL* MySqlConnection::getMySql() {
_throwIfNotConnected(__func__);
return _mysql;
}

MYSQL_RES* MySqlConnection::getResult() {
_throwIfNotConnected(__func__);
return _mysql_res;
}

void MySqlConnection::freeResult() {
std::lock_guard<std::mutex> lock(_mtx);
_throwIfNotInProcessingResult(__func__);
mysql_free_result(_mysql_res);
_mysql_res = nullptr;
}

int MySqlConnection::getResultFieldCount() {
std::lock_guard<std::mutex> lock(_mtx);
_throwIfNotInProcessingResult(__func__);
return mysql_field_count(_mysql);
}

std::vector<std::string> MySqlConnection::getColumnNames() const {
assert(_mysql);
assert(_mysql_res);
std::lock_guard<std::mutex> lock(_mtx);
_throwIfNotInProcessingResult(__func__);
std::vector<std::string> names;
if (0 != mysql_field_count(_mysql)) {
auto fields = mysql_fetch_fields(_mysql_res);
Expand All @@ -201,50 +195,54 @@ std::vector<std::string> MySqlConnection::getColumnNames() const {
return names;
}

////////////////////////////////////////////////////////////////////////
// MySqlConnection
// private:
////////////////////////////////////////////////////////////////////////

MYSQL* MySqlConnection::_connectHelper() {
// We must call mysql_library_init() exactly once before calling mysql_init
// because it is not thread safe. Both mysql_library_init and mysql_init
// call mysql_thread_init, and so we must arrange to call mysql_thread_end
// when the calling thread exists. We do this by allocating a thread
// local object that calls mysql_thread_end from its destructor.
static std::once_flag initialized;
static thread_local std::unique_ptr<MySqlThreadJanitor> janitor;
unsigned int MySqlConnection::getErrno() const {
_throwIfNotConnected(__func__);
return mysql_errno(_mysql);
}
const std::string MySqlConnection::getError() const {
_throwIfNotConnected(__func__);
return std::string(mysql_error(_mysql));
}

std::call_once(initialized, InitializeMysqlLibrary(janitor));
MYSQL* m = mysql_init(nullptr);
if (!m) {
return m;
}
if (!janitor.get()) {
janitor.reset(new MySqlThreadJanitor);
}
unsigned long clientFlag = CLIENT_MULTI_STATEMENTS;
mysql_options(m, MYSQL_OPT_LOCAL_INFILE, 0);
MYSQL* c =
mysql_real_connect(m, _sqlConfig->socket.empty() ? _sqlConfig->hostname.c_str() : 0,
_sqlConfig->username.empty() ? 0 : _sqlConfig->username.c_str(),
_sqlConfig->password.empty() ? 0 : _sqlConfig->password.c_str(),
_sqlConfig->dbName.empty() ? 0 : _sqlConfig->dbName.c_str(), _sqlConfig->port,
_sqlConfig->socket.empty() ? 0 : _sqlConfig->socket.c_str(), clientFlag);
if (!c) {
// Failed to connect: free resources.
mysql_close(m);
return c;
bool MySqlConnection::selectDb(std::string const& dbName) {
_throwIfNotConnected(__func__);
if (!dbName.empty() && (0 != mysql_select_db(_mysql, dbName.c_str()))) {
return false;
}
_threadId = mysql_thread_id(m);
return m;
_config->dbName = dbName;
return true;
}

std::string MySqlConnection::dump() {
std::ostringstream os;
os << "hostN=" << _sqlConfig->hostname << " sock=" << _sqlConfig->socket
<< " uname=" << _sqlConfig->username << " dbN=" << _sqlConfig->dbName << " port=" << _sqlConfig->port;
os << "hostN=" << _config->hostname << " sock=" << _config->socket << " uname=" << _config->username
<< " dbN=" << _config->dbName << " port=" << _config->port;
return os.str();
}

void MySqlConnection::_closeMySqlConnImpl(std::lock_guard<std::mutex> const& lock) {
if (nullptr != _mysql) {
mysql_close(_mysql);
_mysql = nullptr;
_threadId = 0;
if (nullptr != _mysql_res) {
mysql_free_result(_mysql_res);
_mysql_res = nullptr;
}
}
}

void MySqlConnection::_throwIfNotConnected(std::string const& func) const {
if (_mysql == nullptr) {
throw std::logic_error("MySqlConnection::" + func + " connection is not open.");
}
}

void MySqlConnection::_throwIfNotInProcessingResult(std::string const& func) const {
_throwIfNotConnected(func);
if (_mysql_res == nullptr) {
throw std::logic_error("MySqlConnection::" + func + " not in the result processing context.");
}
}

} // namespace lsst::qserv::mysql
Loading

0 comments on commit c423e02

Please sign in to comment.