diff --git a/src/mysql/MySqlConnection.cc b/src/mysql/MySqlConnection.cc index 4bdfa3980..8279b7505 100644 --- a/src/mysql/MySqlConnection.cc +++ b/src/mysql/MySqlConnection.cc @@ -30,6 +30,7 @@ // System headers #include +#include #include // Third-party headers @@ -44,9 +45,6 @@ namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.mysql.MySqlConnection"); -} // namespace - -namespace { // A class that calls mysql_thread_end when an instance is destroyed. struct MySqlThreadJanitor { ~MySqlThreadJanitor() { mysql_thread_end(); } @@ -67,130 +65,126 @@ struct InitializeMysqlLibrary { janitor.reset(new MySqlThreadJanitor); } }; -} // namespace -namespace lsst::qserv::mysql { +/** + * Establish a new MySQL connection. + * @param config Parameters of the connection. + * @return A pointer to the MySQL connection or nullptr. + */ +MYSQL* doConnect(std::shared_ptr const& config) { + // We must call mysql_library_init() exactly once before calling mysql_init + // because it is not thread safe. Both mysql_library_init and mysql_init + // call mysql_thread_init, and so we must arrange to call mysql_thread_end + // when the calling thread exists. We do this by allocating a thread + // local object that calls mysql_thread_end from its destructor. + static std::once_flag initialized; + static thread_local std::unique_ptr janitor; -MySqlConnection::MySqlConnection() - : _mysql(nullptr), - _mysql_res(nullptr), - _isConnected(false), - _isExecuting(false), - _interrupted(false) {} - -MySqlConnection::MySqlConnection(MySqlConfig const& sqlConfig) - : _mysql(nullptr), - _mysql_res(nullptr), - _isConnected(false), - _sqlConfig(std::make_shared(sqlConfig)), - _isExecuting(false), - _interrupted(false) {} - -MySqlConnection::~MySqlConnection() { - if (_mysql) { - if (_mysql_res) { - MYSQL_ROW row; - while ((row = mysql_fetch_row(_mysql_res))) - ; // Drain results. - _mysql_res = nullptr; - } - closeMySqlConn(); + std::call_once(initialized, InitializeMysqlLibrary(janitor)); + MYSQL* m = mysql_init(nullptr); + if (nullptr == m) return m; + if (nullptr == janitor) janitor.reset(new MySqlThreadJanitor); + unsigned long const clientFlag = CLIENT_MULTI_STATEMENTS; + mysql_options(m, MYSQL_OPT_LOCAL_INFILE, 0); + MYSQL* c = mysql_real_connect(m, config->socket.empty() ? config->hostname.c_str() : 0, + config->username.empty() ? 0 : config->username.c_str(), + config->password.empty() ? 0 : config->password.c_str(), + config->dbName.empty() ? 0 : config->dbName.c_str(), config->port, + config->socket.empty() ? 0 : config->socket.c_str(), clientFlag); + if (nullptr == c) { + // Failed to connect: free resources. + mysql_close(m); + return c; } + return m; } -bool MySqlConnection::checkConnection(mysql::MySqlConfig const& mysqlconfig) { - MySqlConnection conn(mysqlconfig); +} // namespace + +namespace lsst::qserv::mysql { + +bool MySqlConnection::checkConnection(mysql::MySqlConfig const& config) { + MySqlConnection conn(config); if (conn.connect()) { - LOGS(_log, LOG_LVL_DEBUG, "Successful MySQL connection check: " << mysqlconfig); + LOGS(_log, LOG_LVL_DEBUG, "Successful MySQL connection check: " << config); return true; } else { - LOGS(_log, LOG_LVL_WARN, "Unsuccessful MySQL connection check: " << mysqlconfig); + LOGS(_log, LOG_LVL_WARN, "Unsuccessful MySQL connection check: " << config); return false; } } -void MySqlConnection::closeMySqlConn() { - // Close mysql connection and set deallocated pointer to null - mysql_close(_mysql); - _mysql = nullptr; -} +MySqlConnection::MySqlConnection(MySqlConfig const& config) + : _config(std::make_shared(config)), _mysql(nullptr), _mysql_res(nullptr) {} + +MySqlConnection::~MySqlConnection() { _closeMySqlConnImpl(std::lock_guard(_mtx)); } + +void MySqlConnection::closeMySqlConn() { _closeMySqlConnImpl(std::lock_guard(_mtx)); } bool MySqlConnection::connect() { - // Cleanup garbage - if (_mysql != nullptr) { - closeMySqlConn(); + std::lock_guard const lock(_mtx); + _closeMySqlConnImpl(lock); + _mysql = ::doConnect(_config); + if (nullptr != _mysql) { + _threadId = mysql_thread_id(_mysql); + return true; } - _isConnected = false; - // Make myself a thread - _mysql = _connectHelper(); - _isConnected = (_mysql != nullptr); - return _isConnected; + return false; } bool MySqlConnection::queryUnbuffered(std::string const& query) { - // run query, store into list. - int rc; - { - std::lock_guard lock(_interruptMutex); - _isExecuting = true; - _interrupted = false; - } - rc = mysql_real_query(_mysql, query.c_str(), query.length()); - if (rc) { - return false; - } + std::lock_guard lock(_mtx); + if (_mysql == nullptr) return false; + int const rc = mysql_real_query(_mysql, query.c_str(), query.length()); + if (rc) return false; _mysql_res = mysql_use_result(_mysql); - _isExecuting = false; - if (!_mysql_res) { - return false; - } + if (nullptr == _mysql_res) return false; return true; } -/// Cancel existing query -/// @return 0 on success. -/// 1 indicates error in connecting. (may try again) -/// 2 indicates error executing kill query. (do not try again) -/// -1 indicates NOP: No query in progress or query already interrupted. -int MySqlConnection::cancel() { - std::lock_guard lock(_interruptMutex); - int rc; - if (_interrupted) { - // Should we log this? - return -1; // No further action needed. - } - _interrupted = true; // Prevent others from trying to interrupt - MYSQL* killMysql = _connectHelper(); - if (!killMysql) { - _interrupted = false; // Didn't try - return 1; - // Handle broken connection - } - // KILL QUERY only, not KILL CONNECTION. - int threadId = mysql_thread_id(_mysql); +MySqlConnection::CancelStatus MySqlConnection::cancel() { + unsigned int const threadId = _threadId.load(); + if (!(connected() && (0 != threadId))) return CancelStatus::CANCEL_NOP; + MYSQL* killMysql = ::doConnect(_config); + if (nullptr == killMysql) return CancelStatus::CANCEL_CONNECT_ERROR; std::string const killSql = "KILL QUERY " + std::to_string(threadId); - rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size()); + int const rc = mysql_real_query(killMysql, killSql.c_str(), killSql.size()); mysql_close(killMysql); if (rc) { LOGS(_log, LOG_LVL_WARN, "failed to kill MySQL thread: " << threadId << ", error: " << std::string(mysql_error(killMysql)) << ", errno: " << std::to_string(mysql_errno(killMysql))); - return 2; + return CancelStatus::CANCEL_FAILED; } - return 0; + return CancelStatus::CANCEL_SUCCESS; } -bool MySqlConnection::selectDb(std::string const& dbName) { - if (!dbName.empty() && mysql_select_db(_mysql, dbName.c_str())) { - return false; - } - _sqlConfig->dbName = dbName; - return true; +MYSQL* MySqlConnection::getMySql() { + _throwIfNotConnected(__func__); + return _mysql; +} + +MYSQL_RES* MySqlConnection::getResult() { + _throwIfNotConnected(__func__); + return _mysql_res; +} + +void MySqlConnection::freeResult() { + std::lock_guard lock(_mtx); + _throwIfNotInProcessingResult(__func__); + mysql_free_result(_mysql_res); + _mysql_res = nullptr; +} + +int MySqlConnection::getResultFieldCount() { + std::lock_guard lock(_mtx); + _throwIfNotInProcessingResult(__func__); + return mysql_field_count(_mysql); } std::vector MySqlConnection::getColumnNames() const { - assert(_mysql); - assert(_mysql_res); + std::lock_guard lock(_mtx); + _throwIfNotInProcessingResult(__func__); std::vector names; if (0 != mysql_field_count(_mysql)) { auto fields = mysql_fetch_fields(_mysql_res); @@ -201,50 +195,54 @@ std::vector MySqlConnection::getColumnNames() const { return names; } -//////////////////////////////////////////////////////////////////////// -// MySqlConnection -// private: -//////////////////////////////////////////////////////////////////////// - -MYSQL* MySqlConnection::_connectHelper() { - // We must call mysql_library_init() exactly once before calling mysql_init - // because it is not thread safe. Both mysql_library_init and mysql_init - // call mysql_thread_init, and so we must arrange to call mysql_thread_end - // when the calling thread exists. We do this by allocating a thread - // local object that calls mysql_thread_end from its destructor. - static std::once_flag initialized; - static thread_local std::unique_ptr janitor; +unsigned int MySqlConnection::getErrno() const { + _throwIfNotConnected(__func__); + return mysql_errno(_mysql); +} +const std::string MySqlConnection::getError() const { + _throwIfNotConnected(__func__); + return std::string(mysql_error(_mysql)); +} - std::call_once(initialized, InitializeMysqlLibrary(janitor)); - MYSQL* m = mysql_init(nullptr); - if (!m) { - return m; - } - if (!janitor.get()) { - janitor.reset(new MySqlThreadJanitor); - } - unsigned long clientFlag = CLIENT_MULTI_STATEMENTS; - mysql_options(m, MYSQL_OPT_LOCAL_INFILE, 0); - MYSQL* c = - mysql_real_connect(m, _sqlConfig->socket.empty() ? _sqlConfig->hostname.c_str() : 0, - _sqlConfig->username.empty() ? 0 : _sqlConfig->username.c_str(), - _sqlConfig->password.empty() ? 0 : _sqlConfig->password.c_str(), - _sqlConfig->dbName.empty() ? 0 : _sqlConfig->dbName.c_str(), _sqlConfig->port, - _sqlConfig->socket.empty() ? 0 : _sqlConfig->socket.c_str(), clientFlag); - if (!c) { - // Failed to connect: free resources. - mysql_close(m); - return c; +bool MySqlConnection::selectDb(std::string const& dbName) { + _throwIfNotConnected(__func__); + if (!dbName.empty() && (0 != mysql_select_db(_mysql, dbName.c_str()))) { + return false; } - _threadId = mysql_thread_id(m); - return m; + _config->dbName = dbName; + return true; } std::string MySqlConnection::dump() { std::ostringstream os; - os << "hostN=" << _sqlConfig->hostname << " sock=" << _sqlConfig->socket - << " uname=" << _sqlConfig->username << " dbN=" << _sqlConfig->dbName << " port=" << _sqlConfig->port; + os << "hostN=" << _config->hostname << " sock=" << _config->socket << " uname=" << _config->username + << " dbN=" << _config->dbName << " port=" << _config->port; return os.str(); } +void MySqlConnection::_closeMySqlConnImpl(std::lock_guard const& lock) { + if (nullptr != _mysql) { + mysql_close(_mysql); + _mysql = nullptr; + _threadId = 0; + if (nullptr != _mysql_res) { + mysql_free_result(_mysql_res); + _mysql_res = nullptr; + } + } +} + +void MySqlConnection::_throwIfNotConnected(std::string const& func) const { + if (_mysql == nullptr) { + throw std::logic_error("MySqlConnection::" + func + " connection is not open."); + } +} + +void MySqlConnection::_throwIfNotInProcessingResult(std::string const& func) const { + _throwIfNotConnected(func); + if (_mysql_res == nullptr) { + throw std::logic_error("MySqlConnection::" + func + " not in the result processing context."); + } +} + } // namespace lsst::qserv::mysql diff --git a/src/mysql/MySqlConnection.h b/src/mysql/MySqlConnection.h index 5263d55cd..25299fdc1 100644 --- a/src/mysql/MySqlConnection.h +++ b/src/mysql/MySqlConnection.h @@ -28,6 +28,7 @@ #define LSST_QSERV_MYSQL_MYSQLCONNECTION_H // System headers +#include #include #include #include @@ -36,81 +37,134 @@ #include // Third-party headers -#include "boost/utility.hpp" #include +// Forward declarations namespace lsst::qserv::mysql { - -// Forward class MySqlConfig; +} // namespace lsst::qserv::mysql + +namespace lsst::qserv::mysql { /// MySqlConnection is a thin wrapper around the MySQL C-API that partially /// shields clients from the raw API, while still providing raw access for /// clients that need it. -class MySqlConnection : boost::noncopyable { +class MySqlConnection { public: - MySqlConnection(); - MySqlConnection(MySqlConfig const& sqlConfig); + /// The completion status of the query cancelation operation. + enum CancelStatus { + CANCEL_SUCCESS = 0, ///< The operation was succesfull. + CANCEL_CONNECT_ERROR = 1, ///< Failed to establish a separate connection to MySQL. + CANCEL_FAILED = 2, ///< Failed to failure to kill the query. + CANCEL_NOP = -1 ///< Connection is not open. + }; + /** + * Check if a MySQL connection could be established for the given configuration. + * @return 'true' if MySQL connection succeeded. + */ + static bool checkConnection(mysql::MySqlConfig const& config); + + /** + * Construct the connector with specifid configuration. + * @param sqlConfig Parameters of the connection. + */ + explicit MySqlConnection(MySqlConfig const& config); + + MySqlConnection() = delete; + MySqlConnection(MySqlConnection const&) = delete; + MySqlConnection& operator=(MySqlConnection const&) = delete; + + /// Non-trivial destructor is needed to close the connection and release resources. ~MySqlConnection(); - void closeMySqlConn(); + MySqlConfig const& getConfig() const { return *_config; } + + /// Close the current connection (if any) and open the new one. + /// @return 'true' if the operation was succesfull. bool connect(); + bool connected() const { return nullptr != _mysql; } + + /// @note The identifier is set after making a connection, and it's reset + /// to 0 upon disconnects. + /// @return A thread identifier of the last succesfully established connection. + unsigned long threadId() const { return _threadId.load(); } + + /// Close the current connection (if any). + void closeMySqlConn(); + /** - * Check MySQL connection for a given configuration - * - * @return: true if MySQL connection succeeded, else false + * Execute a query. + * @param query The query to be executed. + * @return 'true' if the operation was successfull. */ - static bool checkConnection(mysql::MySqlConfig const& mysqlconfig); + bool queryUnbuffered(std::string const& query); - bool connected() const { return _isConnected; } - unsigned long threadId() const { return _threadId; } + /** + * Cancel existing query (if any). + * @note The method will only attempt to cancel the ognoing query (if any). + * The connection (if any) will be left intact, and it could be used for + * submitting other queries. + * @return CancelStatus The completion status of the operation. + */ + CancelStatus cancel(); - // instance destruction invalidates this return value - MYSQL* getMySql() { return _mysql; } - MySqlConfig const& getMySqlConfig() const { return *_sqlConfig; } + // The following methods require a valid connection. + // Otherwise std::logic_error will be thrown. - bool queryUnbuffered(std::string const& query); - int cancel(); - - MYSQL_RES* getResult() { return _mysql_res; } - void freeResult() { - mysql_free_result(_mysql_res); - _mysql_res = nullptr; - } - int getResultFieldCount() { - assert(_mysql); - return mysql_field_count(_mysql); - } - std::vector getColumnNames() const; - unsigned int getErrno() const { - assert(_mysql); - return mysql_errno(_mysql); - } - const std::string getError() const { - assert(_mysql); - return std::string(mysql_error(_mysql)); - } - MySqlConfig const& getConfig() const { return *_sqlConfig; } + MYSQL* getMySql(); + unsigned int getErrno() const; + const std::string getError() const; bool selectDb(std::string const& dbName); + /** + * The method requires a valid connection. + * @return A pointer to the result descriptor. The method returns nullptr + * if the last query failed, if no query submitted after establishing a connection, + * or if the result setof the last query was explicitly cleared by calling freeResult(). + * @throws std::logic_error if the connection is not open. + */ + MYSQL_RES* getResult(); + + // The following methods require must be called within the query procesing + // context (assuming the connection is open and the last query succeeded). + // Otherwise std::logic_error will be thrown. + + void freeResult(); + int getResultFieldCount(); + std::vector getColumnNames() const; + /// @return a string suitable for logging. std::string dump(); private: - MYSQL* _connectHelper(); - static std::mutex _mysqlShared; - static bool _mysqlReady; + /// Close the current connection (if open). + /// @param lock An exclusive lock on the _mtx must be acquired before calling the method. + void _closeMySqlConnImpl(std::lock_guard const& lock); + + /// Ensure a connection is establushed. + /// @param func A context the method was called from (for error reporting). + /// @throw std::logic_error If not in the desired state. + void _throwIfNotConnected(std::string const& func) const; + + /// Ensure the object in the result processing state (a connection is established and + /// the last submitted query succeeded). + /// @param func A context the method was called from (for error reporting). + /// @throw std::logic_error If not in the desired state. + void _throwIfNotInProcessingResult(std::string const& func) const; + + std::shared_ptr _config; ///< Input parameters of the connections. + + mutable std::mutex _mtx; ///< Guards state transitions. + + // The current state of the connection. Values of the data members + // are modified after establishing a connection, query completion, or + // uppon disconnects. MYSQL* _mysql; MYSQL_RES* _mysql_res; - bool _isConnected; - unsigned long _threadId = 0; ///< 0 if not connected - std::shared_ptr _sqlConfig; - bool _isExecuting; ///< true during mysql_real_query and mysql_use_result - bool _interrupted; ///< true if cancellation requested - std::mutex _interruptMutex; + std::atomic _threadId{0}; }; } // namespace lsst::qserv::mysql