From e0fc435b1b7344a497d80477d4aaaea9cafdbacf Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 5 Apr 2024 00:12:14 +0000 Subject: [PATCH 1/8] Extended schema of qmeta to support persistent chunk map --- src/qmeta/QMetaMysql.cc | 2 +- src/qmeta/schema/migrate-9-to-10.sql | 13 ++++++++++ ...sql.jinja => migrate-None-to-10.sql.jinja} | 26 ++++++++++++++++++- 3 files changed, 39 insertions(+), 2 deletions(-) create mode 100644 src/qmeta/schema/migrate-9-to-10.sql rename src/qmeta/schema/{migrate-None-to-9.sql.jinja => migrate-None-to-10.sql.jinja} (90%) diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index d669b05f8..7314550a8 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -47,7 +47,7 @@ using namespace std; namespace { // Current version of QMeta schema -char const VERSION_STR[] = "9"; +char const VERSION_STR[] = "10"; LOG_LOGGER _log = LOG_GET("lsst.qserv.qmeta.QMetaMysql"); diff --git a/src/qmeta/schema/migrate-9-to-10.sql b/src/qmeta/schema/migrate-9-to-10.sql new file mode 100644 index 000000000..4bd87d83b --- /dev/null +++ b/src/qmeta/schema/migrate-9-to-10.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS `chunkMap` ( + `worker` VARCHAR(256) NOT NULL COMMENT 'A unique identifier of a worker hosting the chunk replica', + `database` VARCHAR(256) NOT NULL COMMENT 'The name of a database', + `table` VARCHAR(256) NOT NULL COMMENT 'The name of a table', + `chunk` INT UNSIGNED NOT NULL COMMENT 'The number of a chunk', + `size` BIGINT UNSIGNED NOT NULL COMMENT 'The size of a chunk') +ENGINE = InnoDB +COMMENT = 'Chunk disposition across workers'; + +CREATE TABLE IF NOT EXISTS `chunkMapStatus` ( + `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map') +ENGINE = InnoDB +COMMENT = 'Satus info on the chunk map'; diff --git a/src/qmeta/schema/migrate-None-to-9.sql.jinja b/src/qmeta/schema/migrate-None-to-10.sql.jinja similarity index 90% rename from src/qmeta/schema/migrate-None-to-9.sql.jinja rename to src/qmeta/schema/migrate-None-to-10.sql.jinja index 3fd0e7e38..b34f95f53 100644 --- a/src/qmeta/schema/migrate-None-to-9.sql.jinja +++ b/src/qmeta/schema/migrate-None-to-10.sql.jinja @@ -207,6 +207,28 @@ CREATE TABLE IF NOT EXISTS `QMessages` ( ENGINE = InnoDB COMMENT = 'Table of messages generated during queries.'; +-- ----------------------------------------------------- +-- Table `chunkMap` +-- ----------------------------------------------------- + +CREATE TABLE IF NOT EXISTS `chunkMap` ( + `worker` VARCHAR(256) NOT NULL COMMENT 'A unique identifier of a worker hosting the chunk replica', + `database` VARCHAR(256) NOT NULL COMMENT 'The name of a database', + `table` VARCHAR(256) NOT NULL COMMENT 'The name of a table', + `chunk` INT UNSIGNED NOT NULL COMMENT 'The number of a chunk', + `size` BIGINT UNSIGNED NOT NULL COMMENT 'The size of a chunk') +ENGINE = InnoDB +COMMENT = 'Chunk disposition across workers'; + +-- ----------------------------------------------------- +-- Table `chunkMapStatus` +-- ----------------------------------------------------- + +CREATE TABLE IF NOT EXISTS `chunkMapStatus` ( + `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map') +ENGINE = InnoDB +COMMENT = 'Satus info on the chunk map'; + -- Update version on every schema change. -- Version 0 corresponds to initial QMeta release and it had no -- QMetadata table at all. @@ -219,4 +241,6 @@ COMMENT = 'Table of messages generated during queries.'; -- Version 7 added final row count to QInfo. -- Version 8 replaced INT with BIGINT in the byte and row counter columns of QInfo. -- Version 9 removed the full-text index on the query text from QInfo. -INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '9'); +-- Version 10 added the worker-to-chunk map tables chunkMap and chunkMapStatus + +INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '10'); From f3c46cbcc0f7160e185798e916fad5ba0c8702fc Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 5 Apr 2024 00:18:15 +0000 Subject: [PATCH 2/8] Extended transient API of QMeta to read workers-to-chunks map from database Also extended MySQL API to extract results from 5 columns. --- src/qmeta/Exceptions.h | 9 +++++ src/qmeta/QMeta.h | 59 +++++++++++++++++++++++++++++- src/qmeta/QMetaMysql.cc | 80 +++++++++++++++++++++++++++++++++++++++++ src/qmeta/QMetaMysql.h | 18 +++++++++- src/sql/SqlResults.cc | 18 ++++++++++ src/sql/SqlResults.h | 13 +++++-- 6 files changed, 193 insertions(+), 4 deletions(-) diff --git a/src/qmeta/Exceptions.h b/src/qmeta/Exceptions.h index deeccf73b..511e1cd35 100644 --- a/src/qmeta/Exceptions.h +++ b/src/qmeta/Exceptions.h @@ -109,6 +109,15 @@ class MissingTableError : public QMetaError { virtual std::string typeName() const override { return "MissingTableError"; } }; +/// Exception thrown when the specified metadata table is empty. +class EmptyTableError : public QMetaError { +public: + EmptyTableError(util::Issue::Context const& ctx, std::string const& table) + : QMetaError(ctx, "Query metadata table is empty: " + table) {} + + virtual std::string typeName() const override { return "EmptyTableError"; } +}; + /// Exception thrown when database consistency is violated. class ConsistencyError : public QMetaError { public: diff --git a/src/qmeta/QMeta.h b/src/qmeta/QMeta.h index 07e6afd91..4f5935dda 100644 --- a/src/qmeta/QMeta.h +++ b/src/qmeta/QMeta.h @@ -23,6 +23,7 @@ #define LSST_QSERV_QMETA_QMETA_H // System headers +#include #include #include #include @@ -46,7 +47,6 @@ namespace lsst::qserv::qmeta { /** * @ingroup qmeta - * * @brief Interface for query metadata. */ @@ -58,6 +58,48 @@ class QMeta { */ typedef std::vector > TableNames; + /** + * The structure ChunkMap encapsulates a disposition of chunks at Qserv workers + * along with a time when the map was updated. + * + * Here is an example on how to using the map for getting info on all chunks in + * the given context: + * @code + * std::string const worker = "worker-001"; + * std::string const database = "LSST-DR01"; + * std::string const table = "Object"; + * + * ChunkMap const& chunkMap = ...; + * for (auto const& [chunk, size] : chunkMap[worker][database][table]) { + * ... + * } + * @endcode + */ + struct ChunkMap { + /// @return 'true' if the map is empty (or constructed using the default constructor) + bool empty() const { + return workers.empty() || (std::chrono::time_point() == updateTime); + } + + // NOTE: Separate types were added here for the sake of clarity to avoid + // a definition of the unreadable nested map. + + struct ChunkInfo { + unsigned int chunk = 0; ///< The chunk number + size_t size = 0; ///< The file size (in bytes) of the chunk table + }; + typedef std::vector Chunks; ///< Collection of chunks + typedef std::map Tables; ///< tables-to-chunks + typedef std::map Databases; ///< Databases-to-tables + typedef std::map Workers; ///< Workers-to-databases + + /// The chunk disposition map for all workers. + Workers workers; + + /// The last time the map was updated (since UNIX Epoch). + std::chrono::time_point updateTime; + }; + /** * Create QMeta instance from configuration dictionary. * @@ -287,6 +329,21 @@ class QMeta { /// Write messages/errors generated during the query to the QMessages table. virtual void addQueryMessages(QueryId queryId, std::shared_ptr const& msgStore) = 0; + /** + * Fetch the chunk map which was updated after the specified time point. + * @param prevUpdateTime The cut off time for the chunk map age. Note that the default + * value of the parameter represents the start time of the UNIX Epoch. Leaving the default + * value forces an attempt to read the map from the database if the one would exist + * in there. + * @return Return the most current chunk disposition or the empty object if the persistent + * map is older than it was requested.The result could be evaluated by calling + * method empty() on the result object. + * @throws EmptyTableError if the corresponding metadata table doesn't have any record + * @throws SqlError for any other error related to MySQL + */ + virtual ChunkMap getChunkMap(std::chrono::time_point const& prevUpdateTime = + std::chrono::time_point()) = 0; + protected: // Default constructor QMeta() {} diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index 7314550a8..40befc5e9 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -25,6 +25,7 @@ // System headers #include +#include // Third-party headers #include "boost/lexical_cast.hpp" @@ -34,6 +35,7 @@ #include "lsst/log/Log.h" // Qserv headers +#include "global/stringUtil.h" #include "qdisp/JobStatus.h" #include "qdisp/MessageStore.h" #include "qmeta/Exceptions.h" @@ -840,6 +842,84 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr const& prevUpdateTime) { + lock_guard lock(_dbMutex); + + QMeta::ChunkMap chunkMap; + + auto trans = QMetaTransaction::create(*_conn); + + // Check if the table needs to be read. Note that the default value of + // the previous update timestamp always forces an attempt to read the map. + auto const updateTime = _getChunkMapUpdateTime(lock); + bool const force = + (prevUpdateTime == chrono::time_point()) || (prevUpdateTime < updateTime); + if (!force) { + trans->commit(); + return QMeta::ChunkMap(); + } + + // Read the map itself + + sql::SqlErrorObject errObj; + sql::SqlResults results; + + string const tableName = "chunkMap"; + string const query = "SELECT `worker`,`database`,`table`,`chunk`,`size` FROM `" + tableName + "`"; + LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query); + if (!_conn->runQuery(query, results, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "query failed: " << query); + throw SqlError(ERR_LOC, errObj); + } + vector> const rows = results.extractFirstNColumns(5); + trans->commit(); + + if (rows.empty()) throw EmptyTableError(ERR_LOC, tableName); + try { + for (auto const& row : rows) { + string const& worker = row[0]; + string const& database = row[1]; + string const& table = row[2]; + unsigned int chunk = lsst::qserv::stoui(row[3]); + size_t const size = stoull(row[4]); + chunkMap.workers[worker][database][table].push_back(ChunkMap::ChunkInfo{chunk, size}); + } + chunkMap.updateTime = updateTime; + } catch (exception const& ex) { + string const msg = "Failed to parse result set of query " + query + ", ex: " + string(ex.what()); + throw ConsistencyError(ERR_LOC, msg); + } + return chunkMap; +} + +chrono::time_point QMetaMysql::_getChunkMapUpdateTime(lock_guard const& lock) { + sql::SqlErrorObject errObj; + sql::SqlResults results; + string const tableName = "chunkMapStatus"; + string const query = "SELECT `update_time` FROM `" + tableName + "` ORDER BY `update_time` DESC LIMIT 1"; + LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query); + if (!_conn->runQuery(query, results, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "query failed: " << query); + throw SqlError(ERR_LOC, errObj); + } + vector updateTime; + if (!results.extractFirstColumn(updateTime, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "Failed to extract result set of query " + query); + throw SqlError(ERR_LOC, errObj); + } + if (updateTime.empty()) { + throw EmptyTableError(ERR_LOC, tableName); + } else if (updateTime.size() > 1) { + throw ConsistencyError(ERR_LOC, "Too many rows in result set of query " + query); + } + try { + return chrono::time_point() + chrono::seconds(stol(updateTime[0])); + } catch (exception const& ex) { + string const msg = "Failed to parse result set of query " + query + ", ex: " + string(ex.what()); + throw ConsistencyError(ERR_LOC, msg); + } +} + void QMetaMysql::_addQueryMessage(QueryId queryId, qdisp::QueryMessage const& qMsg, int& cancelCount, int& completeCount, int& execFailCount, map& msgCountMap) { // Don't add duplicate messages. diff --git a/src/qmeta/QMetaMysql.h b/src/qmeta/QMetaMysql.h index 59664c2ac..34def9096 100644 --- a/src/qmeta/QMetaMysql.h +++ b/src/qmeta/QMetaMysql.h @@ -23,6 +23,7 @@ #define LSST_QSERV_QMETA_QMETAMYSQL_H // System headers +#include #include #include @@ -44,7 +45,6 @@ namespace lsst::qserv::qmeta { /** * @ingroup qmeta - * * @brief Mysql-based implementation of qserv metadata. */ @@ -263,6 +263,10 @@ class QMetaMysql : public QMeta { /// @see QMeta::addQueryMessages() void addQueryMessages(QueryId queryId, std::shared_ptr const& msgStore) override; + /// @see QMeta::getChunkMap + QMeta::ChunkMap getChunkMap(std::chrono::time_point const& prevUpdateTime = + std::chrono::time_point()) override; + protected: /// Check that all necessary tables exist void _checkDb(); @@ -277,6 +281,18 @@ class QMetaMysql : public QMeta { }; private: + /** + * Read the last update time of the chunk map. + * @param A lock acquired on the mutex _dbMutex. + * @return The update time + * @throw EmptyTableError If the corrresponding table is epty + * @throw SqlError For any SQL-specific error + * @throw ConsistencyError For any problem met when parsing or interpreting results read + * from the table. + */ + std::chrono::time_point _getChunkMapUpdateTime( + std::lock_guard const& lock); + /// Add qMsg to the permanent message table. void _addQueryMessage(QueryId queryId, qdisp::QueryMessage const& qMsg, int& cancelCount, int& completeCount, int& execFailCount, diff --git a/src/sql/SqlResults.cc b/src/sql/SqlResults.cc index c2cd8e8b0..495288977 100644 --- a/src/sql/SqlResults.cc +++ b/src/sql/SqlResults.cc @@ -173,6 +173,24 @@ bool SqlResults::extractFirst4Columns(std::vector& col1, std::vecto return true; } +std::vector> SqlResults::extractFirstNColumns(size_t numColumns) { + std::vector> rows; + for (int resultIdx = 0, numResults = _results.size(); resultIdx < numResults; ++resultIdx) { + MYSQL_ROW row; + while ((row = mysql_fetch_row(_results[resultIdx])) != nullptr) { + std::vector columns; + columns.reserve(numColumns); + for (size_t colIdx = 0; colIdx < numColumns; ++colIdx) { + columns.push_back(row[colIdx]); + } + rows.push_back(std::move(columns)); + } + mysql_free_result(_results[resultIdx]); + } + _results.clear(); + return rows; +} + bool SqlResults::extractFirstValue(std::string& ret, SqlErrorObject& errObj) { if (_results.size() != 1) { std::stringstream ss; diff --git a/src/sql/SqlResults.h b/src/sql/SqlResults.h index 2f4e9b154..fae501cb2 100644 --- a/src/sql/SqlResults.h +++ b/src/sql/SqlResults.h @@ -51,8 +51,8 @@ namespace detail { * is the sequence of strings (pointers) and their lengths. Pointer may be NULL * if the column value is NONE. */ -class SqlResults_Iterator : public std::iterator > > { +class SqlResults_Iterator + : public std::iterator>> { public: SqlResults_Iterator(); SqlResults_Iterator(std::vector const& results); @@ -96,6 +96,15 @@ class SqlResults : boost::noncopyable { std::vector&, std::vector&, SqlErrorObject&); bool extractFirst4Columns(std::vector&, std::vector&, std::vector&, std::vector&, SqlErrorObject&); + + /** + * Extract a result set into the 2D array. + * @param numColumns The number of columns in the array. + * @return a 2D array, where the first index of the array represents rows + * and the second index represents columns. + */ + std::vector> extractFirstNColumns(size_t numColumns); + void freeResults(); /// Return row iterator From 10105396bd77a5d79adf5414f017cf92645f596f Mon Sep 17 00:00:00 2001 From: Igor Gaponenko Date: Fri, 5 Apr 2024 00:19:25 +0000 Subject: [PATCH 3/8] Extended Replication Controller to update chunk map at Czar --- admin/local/docker/compose/docker-compose.yml | 1 + src/replica/apps/MasterControllerHttpApp.cc | 6 +- src/replica/apps/MasterControllerHttpApp.h | 1 + src/replica/contr/ReplicationTask.cc | 91 ++++++++++++++++++- src/replica/contr/ReplicationTask.h | 17 +++- src/replica/contr/Task.h | 6 ++ 6 files changed, 114 insertions(+), 8 deletions(-) diff --git a/admin/local/docker/compose/docker-compose.yml b/admin/local/docker/compose/docker-compose.yml index 5a3c37f99..dc5ca4880 100644 --- a/admin/local/docker/compose/docker-compose.yml +++ b/admin/local/docker/compose/docker-compose.yml @@ -415,6 +415,7 @@ services: --registry-host=repl-mgr-registry --controller-auto-register-workers=1 --qserv-sync-force + --qserv-chunk-map-update --debug expose: - "25081" diff --git a/src/replica/apps/MasterControllerHttpApp.cc b/src/replica/apps/MasterControllerHttpApp.cc index 462371824..2b00107fc 100644 --- a/src/replica/apps/MasterControllerHttpApp.cc +++ b/src/replica/apps/MasterControllerHttpApp.cc @@ -147,6 +147,10 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[]) " This affect replicas to be deleted from the workers during the synchronization" " stages.", _forceQservSync); + parser().flag("qserv-chunk-map-update", + "The flag which would result in updating the chunk disposition map" + " in Qserv's QMeta database.", + _qservChunkMapUpdate); parser().flag("purge", "The binary flag which, if provided, enables the 'purge' algorithm in" " the end of each replication cycle that eliminates excess replicas which" @@ -198,7 +202,7 @@ int MasterControllerHttpApp::runImpl() { _replicationTask = ReplicationTask::create( _controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec, - _forceQservSync, _replicationIntervalSec, _purge); + _forceQservSync, _qservChunkMapUpdate, _replicationIntervalSec, _purge); _replicationTask->start(); _healthMonitorTask = HealthMonitorTask::create( diff --git a/src/replica/apps/MasterControllerHttpApp.h b/src/replica/apps/MasterControllerHttpApp.h index d3e17f782..f5ef4ed02 100644 --- a/src/replica/apps/MasterControllerHttpApp.h +++ b/src/replica/apps/MasterControllerHttpApp.h @@ -132,6 +132,7 @@ class MasterControllerHttpApp : public Application { bool _purge; bool _forceQservSync; + bool _qservChunkMapUpdate; bool _permanentDelete; /// A connection URL for the MySQL service of the Qserv master database. diff --git a/src/replica/contr/ReplicationTask.cc b/src/replica/contr/ReplicationTask.cc index c7dbdea2d..360f78927 100644 --- a/src/replica/contr/ReplicationTask.cc +++ b/src/replica/contr/ReplicationTask.cc @@ -22,23 +22,35 @@ // Class header #include "replica/contr/ReplicationTask.h" +// System headers +#include + // Qserv headers +#include "replica/config/Configuration.h" #include "replica/jobs/FindAllJob.h" #include "replica/jobs/FixUpJob.h" #include "replica/jobs/ReplicateJob.h" #include "replica/jobs/RebalanceJob.h" #include "replica/jobs/PurgeJob.h" +#include "replica/mysql/DatabaseMySQL.h" +#include "replica/mysql/DatabaseMySQLGenerator.h" +#include "replica/mysql/DatabaseMySQLUtils.h" +#include "replica/services/DatabaseServices.h" +#include "replica/util/ReplicaInfo.h" using namespace std; namespace lsst::qserv::replica { +using namespace database::mysql; + ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - unsigned int replicationIntervalSec, bool purge) { + bool qservChunkMapUpdate, unsigned int replicationIntervalSec, + bool purge) { return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync, - replicationIntervalSec, purge)); + qservChunkMapUpdate, replicationIntervalSec, purge)); } bool ReplicationTask::onRun() { @@ -51,6 +63,8 @@ bool ReplicationTask::onRun() { launch(priority, saveReplicaInfo, allWorkers); sync(_qservSyncTimeoutSec, _forceQservSync); + if (_qservChunkMapUpdate) _updateChunkMap(); + launch(priority); sync(_qservSyncTimeoutSec, _forceQservSync); @@ -73,10 +87,81 @@ bool ReplicationTask::onRun() { ReplicationTask::ReplicationTask(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - unsigned int replicationIntervalSec, bool purge) + bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge) : Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec), _qservSyncTimeoutSec(qservSyncTimeoutSec), _forceQservSync(forceQservSync), + _qservChunkMapUpdate(qservChunkMapUpdate), _purge(purge) {} +void ReplicationTask::_updateChunkMap() { + // Open MySQL connection using the RAII-style handler that would automatically + // abort the transaction should any problem occured when loading data into the table. + ConnectionHandler h; + try { + h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta")); + } catch (exception const& ex) { + error("failed to connect to the czar's database server, ex: " + string(ex.what())); + return; + } + QueryGenerator const g(h.conn); + + // Get info on known chunk replicas from the persistent store of the Replication system + // and package those into ready-to-ingest data. + bool const allDatabases = true; + string const emptyDatabaseFilter; + bool const isPublished = true; + bool const includeFileInfo = true; // need this to access tables sizes + vector rows; + for (auto const& workerName : serviceProvider()->config()->workers()) { + vector replicas; + serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter, + allDatabases, isPublished, includeFileInfo); + for (auto const& replica : replicas) { + for (auto const& fileInfo : replica.fileInfo()) { + if (fileInfo.isData() && !fileInfo.isOverlap()) { + rows.push_back(g.packVals(workerName, replica.database(), fileInfo.baseTable(), + replica.chunk(), fileInfo.size)); + } + } + } + } + if (rows.empty()) { + warn("no replicas found in the persistent state of the Replication system"); + return; + } + + // Get the limit for the length of the bulk insert queries. The limit is needed + // to run the query generation. + size_t maxQueryLength = 0; + string const globalVariableName = "max_allowed_packet"; + try { + string const query = g.showVars(SqlVarScope::GLOBAL, globalVariableName); + h.conn->executeInOwnTransaction([&query, &maxQueryLength](auto conn) { + bool const noMoreThanOne = true; + if (!selectSingleValue(conn, query, maxQueryLength, "Value", noMoreThanOne)) { + throw runtime_error("no such variable found"); + } + }); + } catch (exception const& ex) { + error("failed to get a value of GLOBAL '" + globalVariableName + "', ex: " + string(ex.what())); + return; + } + + // Execute a sequence of queries atomically + vector const deleteQueries = {g.delete_("chunkMap"), g.delete_("chunkMapStatus")}; + vector insertQueries = g.insertPacked( + "chunkMap", g.packIds("worker", "database", "table", "chunk", "size"), rows, maxQueryLength); + insertQueries.push_back(g.insert("chunkMapStatus", Sql::NOW)); + try { + h.conn->executeInOwnTransaction([&deleteQueries, &insertQueries](auto conn) { + for (auto const& query : deleteQueries) conn->execute(query); + for (auto const& query : insertQueries) conn->execute(query); + }); + } catch (exception const& ex) { + error("failed to update chunk map in the Czar database, ex: " + string(ex.what())); + return; + } +} + } // namespace lsst::qserv::replica diff --git a/src/replica/contr/ReplicationTask.h b/src/replica/contr/ReplicationTask.h index 2f8191e58..5bc99c76b 100644 --- a/src/replica/contr/ReplicationTask.h +++ b/src/replica/contr/ReplicationTask.h @@ -55,6 +55,7 @@ class ReplicationTask : public Task { * @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving * up on the Qserv synchronization requests. * @param forceQservSync Force chunk removal at worker resource collections if 'true'. + * @param qservChunkMapUpdate Update the chunk disposition map in Qserv's QMeta database if 'true'. * @param replicationIntervalSec The number of seconds to wait in the end of each * iteration loop before to begin the new one. * @param purge Purge excess replicas if 'true'. @@ -62,7 +63,7 @@ class ReplicationTask : public Task { */ static Ptr create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, + unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge); protected: @@ -72,15 +73,23 @@ class ReplicationTask : public Task { private: /// @see ReplicationTask::create() ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, + unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge); + void _updateChunkMap(); + /// The maximum number of seconds to be waited before giving up /// on the Qserv synchronization requests. unsigned int const _qservSyncTimeoutSec; - bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'. - bool const _purge; ///< Purge excess replicas if 'true'. + /// Force removal at worker resource collections if 'true'. + bool const _forceQservSync; + + /// Update the chunk disposition map in Qserv's QMeta database if 'true'. + bool const _qservChunkMapUpdate; + + /// Purge excess replicas if 'true'. + bool const _purge; }; } // namespace lsst::qserv::replica diff --git a/src/replica/contr/Task.h b/src/replica/contr/Task.h index b805b32f5..cc0151912 100644 --- a/src/replica/contr/Task.h +++ b/src/replica/contr/Task.h @@ -197,6 +197,12 @@ class Task : public EventLogger, public std::enable_shared_from_this { */ void debug(std::string const& msg) { LOGS(_log, LOG_LVL_DEBUG, context() << msg); } + /** + * Log a message into the Logger's LOG_LVL_WARN stream. + * @param msg A message to be logged. + */ + void warn(std::string const& msg) { LOGS(_log, LOG_LVL_WARN, context() << msg); } + /** * Log a message into the Logger's LOG_LVL_ERROR stream. * @param msg A message to be logged. From c46224af3d5da6bf5cc8e5d070c40804dd9de691 Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 22 Mar 2024 16:49:46 -0700 Subject: [PATCH 4/8] Added code to read chunk disposition map and organize for czar use. --- src/ccontrol/UserQueryFactory.cc | 2 + src/czar/CMakeLists.txt | 1 + src/czar/Czar.cc | 6 +- src/czar/Czar.h | 5 + src/czar/CzarChunkMap.cc | 340 +++++++++++++++++++++++++++++++ src/czar/CzarChunkMap.h | 149 ++++++++++++++ src/qmeta/QMetaMysql.cc | 5 + src/qmeta/testQMeta.cc | 13 +- 8 files changed, 518 insertions(+), 3 deletions(-) create mode 100644 src/czar/CzarChunkMap.cc create mode 100644 src/czar/CzarChunkMap.h diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 589088692..d5c557b09 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -224,6 +224,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st // First check for SUBMIT and strip it std::string query = aQuery; + // &&& need to have WorkerChunkMap info at this point + std::string stripped; bool async = false; if (UserQueryType::isSubmit(query, stripped)) { diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index 4b85ea97f..0ecb340a4 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -2,6 +2,7 @@ add_library(czar OBJECT) target_sources(czar PRIVATE Czar.cc + CzarChunkMap.cc HttpCzarSvc.cc HttpCzarQueryModule.cc HttpModule.cc diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 34ca6e74d..cc9bf11c9 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -43,6 +43,7 @@ #include "ccontrol/UserQueryResources.h" #include "ccontrol/UserQuerySelect.h" #include "ccontrol/UserQueryType.h" +#include "czar/CzarChunkMap.h" #include "czar/CzarErrors.h" #include "czar/HttpSvc.h" #include "czar/MessageTable.h" @@ -138,8 +139,7 @@ Czar::Czar(string const& configFilePath, string const& czarName) _czarConfig(cconfig::CzarConfig::create(configFilePath, czarName)), _idCounter(), _uqFactory(), - _clientToQuery(), - _mutex() { + _clientToQuery() { // set id counter to milliseconds since the epoch, mod 1 year. struct timeval tv; gettimeofday(&tv, nullptr); @@ -156,6 +156,8 @@ Czar::Czar(string const& configFilePath, string const& czarName) // the name of the Czar gets translated into a numeric identifier. _czarConfig->setId(_uqFactory->userQuerySharedResources()->qMetaCzarId); + _czarChunkMap = CzarChunkMap::create(_uqFactory->userQuerySharedResources()->queryMetadata); + // Tell workers to cancel any queries that were submitted before this restart of Czar. // Figure out which query (if any) was recorded in Czar database before the restart. // The id will be used as the high-watermark for queries that need to be cancelled. diff --git a/src/czar/Czar.h b/src/czar/Czar.h index 3fae22b83..32549fe9d 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -61,6 +61,8 @@ class FileMonitor; namespace lsst::qserv::czar { +class CzarChunkMap; + /// @addtogroup czar /** @@ -179,6 +181,9 @@ class Czar { /// The HTTP server processing Czar management requests. std::shared_ptr _controlHttpSvc; + + /// Map of which chunks on which workers and shared scan order. + std::shared_ptr _czarChunkMap; }; } // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc new file mode 100644 index 000000000..2592ca4f7 --- /dev/null +++ b/src/czar/CzarChunkMap.cc @@ -0,0 +1,340 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "czar/CzarChunkMap.h" + +// System headers +#include + +// LSST headers +#include "lsst/log/Log.h" + +// Qserv headers +#include "qmeta/Exceptions.h" +#include "qmeta/QMeta.h" +#include "util/Bug.h" + +using namespace std; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarChunkMap"); +} // namespace + +namespace lsst::qserv::czar { + +CzarChunkMap::CzarChunkMap(std::shared_ptr const& qmeta) : _qmeta(qmeta){ + try { + _read(); + } catch (qmeta::QMetaError const& qExc) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " CzarChunkMap could not read DB " << qExc.what()); + LOGS(_log, LOG_LVL_ERROR, __func__ << " &&& CzarChunkMap could not read DB " << qExc.what()); + // &&& maybe flag invalid + throw ChunkMapException(ERR_LOC, string(" CzarChunkMap constructor failed read ") + qExc.what()); + } +} + +void CzarChunkMap::_read() { + LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() start"); + qmeta::QMeta::ChunkMap chunkMap = _qmeta->getChunkMap(); + auto const& jsChunks = chunkMap.chunks; + LOGS(_log, LOG_LVL_WARN, "&&& chunkMap=" << jsChunks); + + // Create new maps. + auto wcMapPtr = make_shared(); + auto chunkMapPtr = make_shared(); + + for (auto const& [workerId, dbs] : jsChunks.items()) { + for (auto const& [dbName, tables] : dbs.items()) { + for (auto const& [tableName, chunks] : tables.items()) { + for (auto const& [index, chunkNumNSz] : chunks.items()) { + LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() " << index); + try { + int64_t chunkNum = chunkNumNSz.at(0); + int64_t sz = chunkNumNSz.at(1); + LOGS(_log, LOG_LVL_WARN, "&&& workerdId=" << workerId << " db=" << dbName << " table=" << tableName << " chunk=" << chunkNum << " sz=" << sz); + _insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum, sz); + } catch (invalid_argument const& exc) { + // &&& better option? Cancel mapping and inform the replicator that the table is bad? + throw ChunkMapException(ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + } catch (out_of_range const& exc) { + // &&& better option? + throw ChunkMapException(ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + } + LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() e1"); + } + } + } + } + + auto chunksSortedBySize = make_shared(); + _calcChunkMap(*chunkMapPtr, *chunksSortedBySize); + + // At this point we have + // - wcMapPtr has a map of workerData by worker id with each worker having a map of ChunkData + // - chunkMapPtr has a map of all chunkData by chunk id + // - chunksSortedBySize a list of chunks sorted with largest first. + // From here need to assign shared scan chunk priority + // Go through the chunksSortedBySize list and assign each chunk to worker that has it with the smallest + // totalScanSize. + for (auto&& chunkData : *chunksSortedBySize) { + int64_t smallest = std::numeric_limits::max(); + WorkerChunksData::Ptr smallestWkr = nullptr; + for (auto&& [wkrId, wkrDataWeak] : chunkData->_workerHasThisMap) { + auto wkrData = wkrDataWeak.lock(); + if (wkrData == nullptr) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " unexpected null weak ptr for " << wkrId); + continue; // maybe the next one will be ok. + } + LOGS(_log, LOG_LVL_INFO, " &&& wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize << " smallest=" << smallest); + if (wkrData->_sharedScanTotalSize < smallest) { + smallestWkr = wkrData; + smallest = smallestWkr->_sharedScanTotalSize; + } + } + if (smallestWkr == nullptr) { + throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + to_string(chunkData->_chunkId)); + } + smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData; + smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes; + chunkData->_primaryScanWorker = smallestWkr; + LOGS(_log, LOG_LVL_DEBUG, " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); + } + + verify(*chunkMapPtr, *wcMapPtr); + LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); + LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); + + LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() end"); +} + +void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId, string const& dbName, string const& tableName, int64_t chunkIdNum, int64_t sz) { + // Get or make the worker entry + LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap start"); + WorkerChunksData::Ptr workerChunksData; + auto iterWC = wcMap.find(workerId); + if (iterWC == wcMap.end()) { + workerChunksData = WorkerChunksData::Ptr(new WorkerChunksData(workerId)); + wcMap[workerId] = workerChunksData; + } else { + workerChunksData = iterWC->second; + } + + // Get or make the ChunkData entry in chunkMap + ChunkData::Ptr chunkData; + auto iterChunkData = chunkMap.find(chunkIdNum); + if (iterChunkData == chunkMap.end()) { + chunkData = ChunkData::Ptr(new ChunkData(chunkIdNum)); + chunkMap[chunkIdNum] = chunkData; + } else { + chunkData = iterChunkData->second; + } + + // Set or verify the table information + auto iterDT = chunkData->_dbTableMap.find({dbName, tableName}); + if (iterDT == chunkData->_dbTableMap.end()) { + // doesn't exist so set it up + chunkData->_dbTableMap[{dbName, tableName}] = sz; + } else { + // Verify that it matches other data + auto const& dbTbl = iterDT->first; + auto tblSz = iterDT->second; + auto const& dbN = dbTbl.first; + auto const& tblN = dbTbl.second; + if (dbName != dbN || tblN != tableName || tblSz != sz) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " data mismatch for " + << dbName << "." << tableName << "=" << sz << " vs " + << dbN << "." << tblN << "=" << tblSz); + } + } + + // Link WorkerData the single chunkData instance for the chunkId + workerChunksData->_chunkDataMap[chunkIdNum] = chunkData; + + // Add worker to the list of workers containing the chunk. + chunkData->addToWorkerHasThis(workerChunksData); + LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap end"); +} + +void CzarChunkMap::_calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) { + // Calculate total bytes for all chunks. + for (auto&& [chunkIdNum, chunkData] : chunkMap) { + chunkData->calcTotalBytes(); + chunksSortedBySize.push_back(chunkData); + } + + sortChunks(chunksSortedBySize); +} + +void CzarChunkMap::sortChunks(std::vector& chunksSortedBySize) { + /// Return true if a->_totalBytes > b->_totalBytes + auto sortBySizeDesc = [](ChunkData::Ptr const& a, ChunkData::Ptr const& b) { + if (b == nullptr && a != nullptr) return true; + if (a == nullptr) return false; + return a->_totalBytes > b->_totalBytes; + }; + + std::sort(chunksSortedBySize.begin(), chunksSortedBySize.end(), sortBySizeDesc); +} + +void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) { + set allChunkIds; + int errorCount = 0; + for (auto const& [wkr, wkrData] : wcMap) { + for (auto const& [chunkId, chunkData] : wkrData->_chunkDataMap) { + allChunkIds.insert(chunkId); + } + } + + for (auto const& [chunkId, chunkDataPtr] : chunkMap) { + if (chunkDataPtr == nullptr) { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " had nullptr"); + ++errorCount; + continue; + } + auto primeScanWkr = chunkDataPtr->_primaryScanWorker.lock(); + if (primeScanWkr == nullptr) { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " missing primaryScanWorker"); + ++errorCount; + continue; + } + if (primeScanWkr->_sharedScanChunkMap.find(chunkId) == primeScanWkr->_sharedScanChunkMap.end()) { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " << primeScanWkr->_workerId); + ++errorCount; + continue; + } + auto iter = allChunkIds.find(chunkId); + if (iter != allChunkIds.end()) { + allChunkIds.erase(iter); + } else { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " chunkId was not in allChunks list"); + ++errorCount; + continue; + } + } + + auto missing = allChunkIds.size(); + if (missing > 0) { + string allMissingIds; + for (auto const& cId : allChunkIds) { + allMissingIds += to_string(cId) + ","; + } + LOGS(_log, LOG_LVL_ERROR, " There were " << missing << " missing chunks from the scan list " << allMissingIds); + ++errorCount; + } + + if (errorCount > 0) { + throw ChunkMapException(ERR_LOC, "verification failed with " + to_string(errorCount) + " errors"); + } +} + +string CzarChunkMap::dumpChunkMap(ChunkMap const& chunkMap) { + stringstream os; + os << "ChunkMap{"; + for (auto const& [cId, cDataPtr] : chunkMap) { + os << "(cId=" << cId << ":"; + os << ((cDataPtr == nullptr) ? "null" : cDataPtr->dump()) << ")"; + } + os << "}"; + return os.str(); +} + +string CzarChunkMap::dumpWorkerChunkMap(WorkerChunkMap const& wcMap) { + stringstream os; + os << "WorkerChunkMap{"; + for (auto const& [wId, wDataPtr] : wcMap) { + os << "(wId=" << wId << ":"; + os << ((wDataPtr == nullptr) ? "null" : wDataPtr->dump()) << ")"; + } + os << "}"; + return os.str(); +} + +void CzarChunkMap::ChunkData::calcTotalBytes() { + _totalBytes = 0; + for (auto const& [key, val] : _dbTableMap) { + _totalBytes += val; + } +} + +void CzarChunkMap::ChunkData::addToWorkerHasThis(std::shared_ptr const& worker) { + if (worker == nullptr) { + throw ChunkMapException(ERR_LOC, string(__func__) + " worker was null"); + } + + _workerHasThisMap[worker->_workerId] = worker; +} + +string CzarChunkMap::ChunkData::dump() const { + stringstream os; + auto primaryWorker = _primaryScanWorker.lock(); + os << "{ChunkData id=" << _chunkId << " totalBytes=" << _totalBytes; + os << " primaryWorker=" << ((primaryWorker == nullptr) ? "null" : primaryWorker->_workerId); + os << " workers{"; + for (auto const& [wId, wData] : _workerHasThisMap) { + os << "(" << wId << ")"; + } + os << "} tables{"; + for (auto const& [dbTbl, sz] : _dbTableMap) { + os << "(" << dbTbl.first << "." << dbTbl.second << " sz=" << sz << ")"; + } + os << "}}"; + return os.str(); +} + +string CzarChunkMap::WorkerChunksData::dump() const { + stringstream os; + os << "{WorkerChunksData id=" << _workerId << " scanTotalSize=" << _sharedScanTotalSize; + os << " chunkDataIds{"; + for (auto const& [chunkId, chunkData] : _chunkDataMap) { + os << "(" << chunkId << ")"; + } + os << "} sharedScanChunks{"; + for (auto const& [chunkId, chunkData] : _sharedScanChunkMap) { + os << "(" << chunkId << ")"; + } + os << "}}"; + return os.str(); +} + +} // namespace lsst::qserv::wconfig diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h new file mode 100644 index 000000000..1962551b2 --- /dev/null +++ b/src/czar/CzarChunkMap.h @@ -0,0 +1,149 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +#ifndef LSST_QSERV_CZAR_CZARCHUNKMAP_H +#define LSST_QSERV_CZAR_CZARCHUNKMAP_H + +// System headers +#include +#include +#include +#include +#include +#include +#include + +// Third party headers +#include + +// Qserv headers +#include "util/Issue.h" + +namespace lsst::qserv::qmeta { +class QMeta; +} + +namespace lsst::qserv::czar { + +class ChunkMapException : public util::Issue { +public: + ChunkMapException(Context const& ctx, std::string const& msg) : util::Issue(ctx, msg) {} +}; + +class CzarChunkMap { +public: + using Ptr = std::shared_ptr; + + CzarChunkMap() = delete; + + static Ptr create(std::shared_ptr const& qmeta) { + return Ptr(new CzarChunkMap(qmeta)); + } + + class WorkerChunksData; + + class ChunkData { + public: + using Ptr = std::shared_ptr; + ChunkData(int chunkId_) : _chunkId(chunkId_) {} + int64_t const _chunkId; + int64_t _totalBytes = 0; + std::weak_ptr _primaryScanWorker; + + /// &&& doc + void calcTotalBytes(); + + /// &&& doc + void addToWorkerHasThis(std::shared_ptr const& worker); + + /// Key is databaseName+tableName, value is size in bytes. + std::map, int64_t> _dbTableMap; + + /// Map of workers that have this chunk + std::map> _workerHasThisMap; + + std::string dump() const; + }; + + class WorkerChunksData { + public: + using Ptr = std::shared_ptr; + WorkerChunksData(std::string const& wId) : _workerId(wId) {} + + std::string dump() const; + + std::string const _workerId; + + /// Map of all chunks found on the worker + std::map _chunkDataMap; ///< key is chunkId // &&& needed? + + /// Map of chunks this worker will handle during shared scans. + std::map _sharedScanChunkMap; + int64_t _sharedScanTotalSize = 0; + }; + + using WorkerChunkMap = std::map; + using ChunkMap = std::map; + using ChunkVector = std::vector; + + /// &&& doc + static void sortChunks(ChunkVector& chunksSortedBySize); + + /// &&& doc + /// @throws ChunkMapException + static void verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); + + static std::string dumpChunkMap(ChunkMap const& chunkMap); + + static std::string dumpWorkerChunkMap(WorkerChunkMap const& wcMap); + +private: + /// Try to retrieve + CzarChunkMap(std::shared_ptr const& qmeta); + + /// &&& doc + /// @throws `qmeta::QMetaError` + void _read(); + + /// &&& doc + void _insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId, std::string const& dbName, std::string const& tableName, int64_t chunkIdNum, int64_t sz); + + /// &&& doc + void _calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize); + + std::shared_ptr _qmeta; ///< Database connection to collect json work list. + + /// &&& doc + std::shared_ptr _workerChunksMap; + + /// &&& doc + std::shared_ptr _ChunksMap; + + /// List of chunks sorted by the total size of all tables in the chunk. + std::shared_ptr _chunksSortedBySize; + +}; + + +} // namespace lsst::qserv::czar + +#endif // LSST_QSERV_CZAR_CZARCHUNKMAP_H diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index 40befc5e9..f399c49d6 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -846,6 +846,11 @@ QMeta::ChunkMap QMetaMysql::getChunkMap(chrono::time_point lock_guard lock(_dbMutex); QMeta::ChunkMap chunkMap; +/* &&& +QMeta::ChunkMap QMetaMysql::getChunkMap() { // &&& + lock_guard sync(_dbMutex); +>>>>>>> 07d082050 (Added code to read chunk disposition map and organize for czar use.) +*/ auto trans = QMetaTransaction::create(*_conn); diff --git a/src/qmeta/testQMeta.cc b/src/qmeta/testQMeta.cc index 3589f9b36..986b9d501 100644 --- a/src/qmeta/testQMeta.cc +++ b/src/qmeta/testQMeta.cc @@ -412,5 +412,16 @@ BOOST_AUTO_TEST_CASE(messWithQueryStats) { } BOOST_CHECK(caught); } - +/* &&& +<<<<<<< HEAD +======= +*/ +BOOST_AUTO_TEST_CASE(getChunkMap) { + // The test assumes that the underlying tables exists and it's empty. + QMeta::ChunkMap chunkMap; + BOOST_CHECK_THROW(qMeta->getChunkMap(), EmptyTableError); +} +/* &&& +>>>>>>> 07d082050 (Added code to read chunk disposition map and organize for czar use.) +*/ BOOST_AUTO_TEST_SUITE_END() From f908d6b43329a910b60afda79f94fb8a55e28a06 Mon Sep 17 00:00:00 2001 From: John Gates Date: Tue, 26 Mar 2024 16:35:33 -0700 Subject: [PATCH 5/8] Added unit test. --- src/ccontrol/UserQueryFactory.cc | 2 +- src/czar/CMakeLists.txt | 31 +++++- src/czar/CzarChunkMap.cc | 138 +++++++++++++++----------- src/czar/CzarChunkMap.h | 142 +++++++++++++++++++++------ src/czar/testCzar.cc | 162 +++++++++++++++++++++++++++++++ src/global/clock_defs.h | 1 + src/qmeta/QMeta.h | 3 +- src/qmeta/QMetaMysql.cc | 6 +- src/util/TimeUtils.cc | 9 ++ src/util/TimeUtils.h | 6 ++ 10 files changed, 402 insertions(+), 98 deletions(-) create mode 100644 src/czar/testCzar.cc diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index d5c557b09..68761c196 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -224,7 +224,7 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st // First check for SUBMIT and strip it std::string query = aQuery; - // &&& need to have WorkerChunkMap info at this point + // TODO: DM-43386 need to have WorkerChunkMap info at this point std::string stripped; bool async = false; diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index 0ecb340a4..3b9d63e40 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -1,4 +1,5 @@ add_library(czar OBJECT) +add_dependencies(czar proto) target_sources(czar PRIVATE Czar.cc @@ -51,4 +52,32 @@ endfunction() czar_utils( qserv-czar-http -) \ No newline at end of file +) + +function(czar_tests) + foreach(TEST IN ITEMS ${ARGV}) + add_executable(${TEST} ${TEST}.cc) + target_link_libraries(${TEST} PUBLIC + cconfig + ccontrol + czar + global + mysql + parser + qana + qdisp + qproc + qserv_meta + query + rproc + sql + Boost::unit_test_framework + ) + add_test(NAME ${TEST} COMMAND ${TEST}) + endforeach() +endfunction() + +czar_tests( + testCzar +) + diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index 2592ca4f7..13dfdf161 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -1,25 +1,4 @@ // -*- LSST-C++ -*- -/* - * LSST Data Management System - * - * This product includes software developed by the - * LSST Project (http://www.lsst.org/). - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU General Public License as published by - * the Free Software Foundation, either version 3 of the License, or - * (at your option) any later version. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU General Public License for more details. - * - * You should have received a copy of the LSST License Statement and - * the GNU General Public License along with this program. If not, - * see . - */ -// -*- LSST-C++ -*- /* * LSST Data Management System * @@ -54,6 +33,7 @@ #include "qmeta/Exceptions.h" #include "qmeta/QMeta.h" #include "util/Bug.h" +#include "util/TimeUtils.h" using namespace std; @@ -63,23 +43,52 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarChunkMap"); namespace lsst::qserv::czar { -CzarChunkMap::CzarChunkMap(std::shared_ptr const& qmeta) : _qmeta(qmeta){ +CzarChunkMap::CzarChunkMap(std::shared_ptr const& qmeta) : _qmeta(qmeta) { try { - _read(); + auto mapsSet = _read(); + if (!mapsSet) { + throw ChunkMapException(ERR_LOC, "CzarChunkMap maps were not set in contructor"); + } } catch (qmeta::QMetaError const& qExc) { LOGS(_log, LOG_LVL_ERROR, __func__ << " CzarChunkMap could not read DB " << qExc.what()); - LOGS(_log, LOG_LVL_ERROR, __func__ << " &&& CzarChunkMap could not read DB " << qExc.what()); - // &&& maybe flag invalid throw ChunkMapException(ERR_LOC, string(" CzarChunkMap constructor failed read ") + qExc.what()); } } -void CzarChunkMap::_read() { - LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() start"); - qmeta::QMeta::ChunkMap chunkMap = _qmeta->getChunkMap(); - auto const& jsChunks = chunkMap.chunks; - LOGS(_log, LOG_LVL_WARN, "&&& chunkMap=" << jsChunks); +bool CzarChunkMap::_read() { + LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() start"); + // If replacing the map, this may take a bit of time, but it's probably + // better to wait for new maps if something changed. + std::lock_guard gLock(_mapMtx); + qmeta::QMeta::ChunkMap qChunkMap = _qmeta->getChunkMap(); + if (_lastUpdateTime >= qChunkMap.updateTime) { + LOGS(_log, LOG_LVL_DEBUG, + __func__ << " CzarChunkMap no need to read " + << util::TimeUtils::timePointToDateTimeString(_lastUpdateTime) + << " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime)); + return false; + } + + auto const& jsChunks = qChunkMap.chunks; + LOGS(_log, LOG_LVL_DEBUG, "chunkMap=" << jsChunks); + + // Make the new maps. + auto [chunkMapPtr, wcMapPtr] = makeNewMaps(jsChunks); + + verify(*chunkMapPtr, *wcMapPtr); + LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); + LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); + + _workerChunkMap = wcMapPtr; + _chunkMap = chunkMapPtr; + _lastUpdateTime = qChunkMap.updateTime; + + LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() end"); + return true; +} +pair, shared_ptr> CzarChunkMap::makeNewMaps( + nlohmann::json const& jsChunks) { // Create new maps. auto wcMapPtr = make_shared(); auto chunkMapPtr = make_shared(); @@ -88,27 +97,32 @@ void CzarChunkMap::_read() { for (auto const& [dbName, tables] : dbs.items()) { for (auto const& [tableName, chunks] : tables.items()) { for (auto const& [index, chunkNumNSz] : chunks.items()) { - LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() " << index); try { int64_t chunkNum = chunkNumNSz.at(0); int64_t sz = chunkNumNSz.at(1); - LOGS(_log, LOG_LVL_WARN, "&&& workerdId=" << workerId << " db=" << dbName << " table=" << tableName << " chunk=" << chunkNum << " sz=" << sz); - _insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum, sz); + LOGS(_log, LOG_LVL_DEBUG, + "workerdId=" << workerId << " db=" << dbName << " table=" << tableName + << " chunk=" << chunkNum << " sz=" << sz); + insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum, + sz); } catch (invalid_argument const& exc) { - // &&& better option? Cancel mapping and inform the replicator that the table is bad? - throw ChunkMapException(ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + throw ChunkMapException( + ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); } catch (out_of_range const& exc) { - // &&& better option? - throw ChunkMapException(ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + throw ChunkMapException( + ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); } - LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() e1"); } } } } auto chunksSortedBySize = make_shared(); - _calcChunkMap(*chunkMapPtr, *chunksSortedBySize); + calcChunkMap(*chunkMapPtr, *chunksSortedBySize); // At this point we have // - wcMapPtr has a map of workerData by worker id with each worker having a map of ChunkData @@ -124,33 +138,36 @@ void CzarChunkMap::_read() { auto wkrData = wkrDataWeak.lock(); if (wkrData == nullptr) { LOGS(_log, LOG_LVL_ERROR, __func__ << " unexpected null weak ptr for " << wkrId); - continue; // maybe the next one will be ok. + continue; // maybe the next one will be ok. } - LOGS(_log, LOG_LVL_INFO, " &&& wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize << " smallest=" << smallest); + LOGS(_log, LOG_LVL_DEBUG, + __func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize + << " smallest=" << smallest); if (wkrData->_sharedScanTotalSize < smallest) { smallestWkr = wkrData; smallest = smallestWkr->_sharedScanTotalSize; } } if (smallestWkr == nullptr) { - throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + to_string(chunkData->_chunkId)); + throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + + to_string(chunkData->_chunkId)); } smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData; smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes; chunkData->_primaryScanWorker = smallestWkr; - LOGS(_log, LOG_LVL_DEBUG, " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); + LOGS(_log, LOG_LVL_DEBUG, + " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); } - verify(*chunkMapPtr, *wcMapPtr); - LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); - LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); - - LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() end"); + LOGS(_log, LOG_LVL_TRACE, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); + LOGS(_log, LOG_LVL_TRACE, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); + return {chunkMapPtr, wcMapPtr}; } -void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId, string const& dbName, string const& tableName, int64_t chunkIdNum, int64_t sz) { +void CzarChunkMap::insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId, + string const& dbName, string const& tableName, int64_t chunkIdNum, + int64_t sz) { // Get or make the worker entry - LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap start"); WorkerChunksData::Ptr workerChunksData; auto iterWC = wcMap.find(workerId); if (iterWC == wcMap.end()) { @@ -182,9 +199,9 @@ void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap auto const& dbN = dbTbl.first; auto const& tblN = dbTbl.second; if (dbName != dbN || tblN != tableName || tblSz != sz) { - LOGS(_log, LOG_LVL_ERROR, __func__ << " data mismatch for " - << dbName << "." << tableName << "=" << sz << " vs " - << dbN << "." << tblN << "=" << tblSz); + LOGS(_log, LOG_LVL_ERROR, + __func__ << " data mismatch for " << dbName << "." << tableName << "=" << sz << " vs " << dbN + << "." << tblN << "=" << tblSz); } } @@ -193,10 +210,9 @@ void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap // Add worker to the list of workers containing the chunk. chunkData->addToWorkerHasThis(workerChunksData); - LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap end"); } -void CzarChunkMap::_calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) { +void CzarChunkMap::calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) { // Calculate total bytes for all chunks. for (auto&& [chunkIdNum, chunkData] : chunkMap) { chunkData->calcTotalBytes(); @@ -218,6 +234,7 @@ void CzarChunkMap::sortChunks(std::vector& chunksSortedBySize) { } void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) { + // Use a set to prevent duplicate ids caused by replication levels > 1. set allChunkIds; int errorCount = 0; for (auto const& [wkr, wkrData] : wcMap) { @@ -239,7 +256,9 @@ void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) continue; } if (primeScanWkr->_sharedScanChunkMap.find(chunkId) == primeScanWkr->_sharedScanChunkMap.end()) { - LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " << primeScanWkr->_workerId); + LOGS(_log, LOG_LVL_ERROR, + " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " + << primeScanWkr->_workerId); ++errorCount; continue; } @@ -259,7 +278,8 @@ void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) for (auto const& cId : allChunkIds) { allMissingIds += to_string(cId) + ","; } - LOGS(_log, LOG_LVL_ERROR, " There were " << missing << " missing chunks from the scan list " << allMissingIds); + LOGS(_log, LOG_LVL_ERROR, + " There were " << missing << " missing chunks from the scan list " << allMissingIds); ++errorCount; } @@ -307,7 +327,7 @@ void CzarChunkMap::ChunkData::addToWorkerHasThis(std::shared_ptr_workerId); os << " workers{"; @@ -337,4 +357,4 @@ string CzarChunkMap::WorkerChunksData::dump() const { return os.str(); } -} // namespace lsst::qserv::wconfig +} // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h index 1962551b2..edc941ab0 100644 --- a/src/czar/CzarChunkMap.h +++ b/src/czar/CzarChunkMap.h @@ -36,6 +36,7 @@ #include // Qserv headers +#include "global/clock_defs.h" #include "util/Issue.h" namespace lsst::qserv::qmeta { @@ -49,55 +50,106 @@ class ChunkMapException : public util::Issue { ChunkMapException(Context const& ctx, std::string const& msg) : util::Issue(ctx, msg) {} }; +/// This class is used to organize worker chunk table information so that it +/// can be used to send jobs to the appropriate worker and inform workers +/// what chunks they can expect to handle in shared scans. +/// The data for the maps is provided by the Replicator and stored in QMeta. +/// When the data is changed, there is a timestamp that is updated, which +/// will cause new maps to be made by this class. +/// +/// The maps generated are constant objects stored with shared pointers. As +/// such, it should be possible for numerous threads to use each map +/// simultaneously provided they have their own pointers to the maps. +/// The pointers to the maps are mutex protected to safely allow map updates. +/// +/// The czar is expected to heavily use the +/// `getMaps() -> WorkerChunkMap -> getSharedScanChunkMap()` +/// to send jobs to workers, as that gets an ordered list of all chunks +/// the worker should handle during a shared scan. +/// `getMaps() -> ChunkMap` is expected to be more useful if there is a +/// failure and a chunk query needs to go to a different worker. class CzarChunkMap { public: using Ptr = std::shared_ptr; CzarChunkMap() = delete; - static Ptr create(std::shared_ptr const& qmeta) { - return Ptr(new CzarChunkMap(qmeta)); - } + static Ptr create(std::shared_ptr const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); } class WorkerChunksData; + /// Essentially a structure for storing data about which tables and workers are associated with this + /// chunk. class ChunkData { public: using Ptr = std::shared_ptr; ChunkData(int chunkId_) : _chunkId(chunkId_) {} - int64_t const _chunkId; - int64_t _totalBytes = 0; - std::weak_ptr _primaryScanWorker; - /// &&& doc + int64_t getChunkId() const { return _chunkId; } + + int64_t getTotalBytes() const { return _totalBytes; } + + std::weak_ptr getPrimaryScanWorker() const { return _primaryScanWorker; } + + /// Add up the bytes in each table for this chunk to get `_totalBytes` void calcTotalBytes(); - /// &&& doc + /// Add `worker` to the `_workerHasThisMap` to indicate that worker has a copy + /// of this chunk. void addToWorkerHasThis(std::shared_ptr const& worker); + std::string dump() const; + + friend CzarChunkMap; + + private: + int64_t const _chunkId; ///< The Id number for this chunk. + int64_t _totalBytes = 0; ///< The total number of bytes used by all tables in this chunk. + std::weak_ptr _primaryScanWorker; ///< The worker to be used to shared scans. + /// Key is databaseName+tableName, value is size in bytes. std::map, int64_t> _dbTableMap; /// Map of workers that have this chunk std::map> _workerHasThisMap; - - std::string dump() const; }; + /// Essentially a structure for storing which chunks are associated with a worker. class WorkerChunksData { public: using Ptr = std::shared_ptr; WorkerChunksData(std::string const& wId) : _workerId(wId) {} + /// Return the worker's id string. + std::string const& getWorkerId() const { return _workerId; } + + /// Return the number of bytes contained in all chunks/tables to be + /// accessed in a full table scan on this worker. + int64_t getSharedScanTotalSize() const { return _sharedScanTotalSize; } + + /// Return a reference to `_sharedScanChunkMap`. A copy of the pointer + /// to this class (or the containing map) should be held to ensure the reference. + std::map const& getSharedScanChunkMap() const { return _sharedScanChunkMap; } + std::string dump() const; + friend CzarChunkMap; + + private: std::string const _workerId; - /// Map of all chunks found on the worker - std::map _chunkDataMap; ///< key is chunkId // &&& needed? + /// Map of all chunks found on the worker where key is chunkId + std::map _chunkDataMap; /// Map of chunks this worker will handle during shared scans. + /// Since scans are done in order of chunk id numbers, it helps + /// to have this in chunk id number order. + /// At some point, thus should be sent to workers so they + /// can make more accurate time estimates for chunk completion. std::map _sharedScanChunkMap; + + /// The total size (in bytes) of all chunks on this worker that + /// are to be used in shared scans. int64_t _sharedScanTotalSize = 0; }; @@ -105,10 +157,32 @@ class CzarChunkMap { using ChunkMap = std::map; using ChunkVector = std::vector; - /// &&& doc + /// Sort the chunks in `chunksSortedBySize` in descending order by total size in bytes. static void sortChunks(ChunkVector& chunksSortedBySize); - /// &&& doc + /// Insert the chunk table described into the correct locations in + /// `wcMap` and `chunkMap`. + /// @param `wcMap` - WorkerChunkMap being constructed. + /// @param `chunkMap` - ChunkMap being constructed. + /// @param `workerId` - worker id where this table was found. + /// @param `dbName` - database name for the table being inserted. + /// @param `tableName` - table name for the table being inserted. + /// @param `chunkIdNum` - chunk id number for the table being inserted. + /// @param `sz` - size in bytes of the table being inserted. + static void insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId, + std::string const& dbName, std::string const& tableName, + int64_t chunkIdNum, int64_t sz); + + /// Calculate the total bytes in each chunk and then sort the resulting ChunkVector by chunk size, + /// descending. + static void calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize); + + /// Make new ChunkMap and WorkerChunkMap from the data in `jsChunks`. + static std::pair, std::shared_ptr> + makeNewMaps(nlohmann::json const& jsChunks); + + /// Verify that all chunks belong to at least one worker and that all chunks are represented in shared + /// scans. /// @throws ChunkMapException static void verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); @@ -116,34 +190,40 @@ class CzarChunkMap { static std::string dumpWorkerChunkMap(WorkerChunkMap const& wcMap); + /// Return shared pointers to `_chunkMap` and `_workerChunkMap`, which should be held until + /// finished with the data. + std::pair, + std::shared_ptr> + getMaps() const { + std::lock_guard lck(_mapMtx); + return {_chunkMap, _workerChunkMap}; + } + private: - /// Try to retrieve + /// Try to `_read` values for maps from `qmeta`. CzarChunkMap(std::shared_ptr const& qmeta); - /// &&& doc + /// Read the json worker list from the database and update the maps if there's a new + /// version since the `_lastUpdateTime`. /// @throws `qmeta::QMetaError` - void _read(); - - /// &&& doc - void _insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId, std::string const& dbName, std::string const& tableName, int64_t chunkIdNum, int64_t sz); + bool _read(); - /// &&& doc - void _calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize); + std::shared_ptr _qmeta; ///< Database connection to collect json worker list. - std::shared_ptr _qmeta; ///< Database connection to collect json work list. + /// Map of all workers and which chunks they contain. + std::shared_ptr _workerChunkMap; - /// &&& doc - std::shared_ptr _workerChunksMap; + /// Map of all chunks in the system with chunkId number as the key and the values contain + /// information about the tables in those chunks and which worker is responsible for + /// handling the chunk in a shared scan. + std::shared_ptr _chunkMap; - /// &&& doc - std::shared_ptr _ChunksMap; - - /// List of chunks sorted by the total size of all tables in the chunk. - std::shared_ptr _chunksSortedBySize; + /// The last time the maps were updated with information from the replicator. + TIMEPOINT _lastUpdateTime; // initialized to 0; + mutable std::mutex _mapMtx; ///< protects _workerChunkMap, _chunkMap, _timeStamp, and _qmeta. }; - } // namespace lsst::qserv::czar #endif // LSST_QSERV_CZAR_CZARCHUNKMAP_H diff --git a/src/czar/testCzar.cc b/src/czar/testCzar.cc new file mode 100644 index 000000000..5c0c8787c --- /dev/null +++ b/src/czar/testCzar.cc @@ -0,0 +1,162 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// System headers +#include +#include +#include + +// Third-party headers +#include "boost/asio.hpp" + +// Boost unit test header +#define BOOST_TEST_MODULE Czar_1 +#include + +// LSST headers +#include "lsst/log/Log.h" + +// Qserv headers +#include "czar/CzarChunkMap.h" + +namespace test = boost::test_tools; +using namespace lsst::qserv; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.testCzar"); +} + +using namespace std; + +BOOST_AUTO_TEST_SUITE(Suite) + +BOOST_AUTO_TEST_CASE(CzarChunkMap) { + // Each chunk only occurs on one worker + string test1 = R"( + { + "ce1c1b79-e6fb-11ee-a46b-0242c0a80308": + {"qcase01": + {"Object":[[1234567890,0],[6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7310,0]], + "Source":[[1234567890,0],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[7165,76356]], + "RefObject":[[1234567890,0],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[7165,578396]] + } + }, + "ddc3f1b9-e6fb-11ee-a46b-0242c0a80304": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000]], + "Source":[[1234567890,0],[7480,2259419]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728]], + "RefObject":[[1234567890,0],[6995,10920]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834]], + "RunDeepSource":[[1234567890,0],[6995,58604]] + } + } + } + )"; + + /// 3 workers, each containing all chunks. + string test2 = R"( + { + "ce1c1b79-e6fb-11ee-a46b-0242c0a80308": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608], + [6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304], + [6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000],[7310,0]], + "Source":[[1234567890,0],[7480,2259419],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728],[7165,76356]], + "RefObject":[[1234567890,0],[6995,10920],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[6995,58604],[7165,578396]] + } + }, + "brnd1b79-e6fb-11ee-a46b-0242c0a80308": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608], + [6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304], + [6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000],[7310,0]], + "Source":[[1234567890,0],[7480,2259419],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728],[7165,76356]], + "RefObject":[[1234567890,0],[6995,10920],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[6995,58604],[7165,578396]] + } + }, + "ddc3f1b9-e6fb-11ee-a46b-0242c0a80304": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608], + [6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304], + [6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000],[7310,0]], + "Source":[[1234567890,0],[7480,2259419],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728],[7165,76356]], + "RefObject":[[1234567890,0],[6995,10920],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[6995,58604],[7165,578396]] + } + } + } + )"; + + auto jsTest1 = nlohmann::json::parse(test1); + auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(jsTest1); + czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure. + LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 1 passed"); + + auto jsTest2 = nlohmann::json::parse(test2); + tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(jsTest2); + czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure. + LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 2 passed"); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/global/clock_defs.h b/src/global/clock_defs.h index d582b416f..9db4dadbc 100644 --- a/src/global/clock_defs.h +++ b/src/global/clock_defs.h @@ -23,6 +23,7 @@ #define LSST_QSERV_GLOBAL_CLOCKDEFS_H // System headers +#include #include #include #include diff --git a/src/qmeta/QMeta.h b/src/qmeta/QMeta.h index 4f5935dda..a8c3a6672 100644 --- a/src/qmeta/QMeta.h +++ b/src/qmeta/QMeta.h @@ -31,6 +31,7 @@ #include // Qserv headers +#include "global/clock_defs.h" #include "global/intTypes.h" #include "qmeta/QInfo.h" #include "qmeta/QStats.h" @@ -97,7 +98,7 @@ class QMeta { Workers workers; /// The last time the map was updated (since UNIX Epoch). - std::chrono::time_point updateTime; + TIMEPOINT updateTime; }; /** diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index f399c49d6..ea1d61094 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -842,15 +842,11 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr const& prevUpdateTime) { lock_guard lock(_dbMutex); QMeta::ChunkMap chunkMap; -/* &&& -QMeta::ChunkMap QMetaMysql::getChunkMap() { // &&& - lock_guard sync(_dbMutex); ->>>>>>> 07d082050 (Added code to read chunk disposition map and organize for czar use.) -*/ auto trans = QMetaTransaction::create(*_conn); diff --git a/src/util/TimeUtils.cc b/src/util/TimeUtils.cc index 71a7e023d..f29ee47a6 100644 --- a/src/util/TimeUtils.cc +++ b/src/util/TimeUtils.cc @@ -51,4 +51,13 @@ uint64_t TimeUtils::tp2ms(chrono::system_clock::time_point const& tp) { return chrono::duration_cast(tp.time_since_epoch()).count(); } +string TimeUtils::timePointToDateTimeString(TIMEPOINT const& point) { + auto const timer = chrono::system_clock::to_time_t(point); + auto broken_time = *localtime(&timer); + + ostringstream ss; + ss << put_time(&broken_time, "%Y-%m-%d %H:%M:%S"); + return ss.str(); +} + } // namespace lsst::qserv::util diff --git a/src/util/TimeUtils.h b/src/util/TimeUtils.h index 4dd2aa4b8..b5d5a35bb 100644 --- a/src/util/TimeUtils.h +++ b/src/util/TimeUtils.h @@ -31,6 +31,9 @@ #include #include +// Qserv headers +#include "global/clock_defs.h" + // This header declarations namespace lsst::qserv::util { @@ -44,6 +47,9 @@ struct TimeUtils { /// @return a human-readable timestamp in a format 'YYYY-MM-DD HH:MM:SS.mmm' static std::string toDateTimeString(std::chrono::milliseconds const& millisecondsSinceEpoch); + /// @return a human-readable time in a format 'YYYY-MM-DD HH:MM:SS' + static std::string timePointToDateTimeString(TIMEPOINT const& point); + /** * @param tp The timepoint to be converted. * @return The number of milliseconds since UNIX Epoch From 71acedf10ec1b5f9f82e277eeb59806ab028efe7 Mon Sep 17 00:00:00 2001 From: John Gates Date: Wed, 27 Mar 2024 11:57:02 -0700 Subject: [PATCH 6/8] Made failure to create CzarChunkMaps a non-fatal error. --- src/czar/Czar.cc | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index cc9bf11c9..0e3bab5d8 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -156,7 +156,11 @@ Czar::Czar(string const& configFilePath, string const& czarName) // the name of the Czar gets translated into a numeric identifier. _czarConfig->setId(_uqFactory->userQuerySharedResources()->qMetaCzarId); - _czarChunkMap = CzarChunkMap::create(_uqFactory->userQuerySharedResources()->queryMetadata); + try { + _czarChunkMap = CzarChunkMap::create(_uqFactory->userQuerySharedResources()->queryMetadata); + } catch (ChunkMapException const& exc) { + LOGS(_log, LOG_LVL_WARN, string(__func__) + " failed to create CzarChunkMap " + exc.what()); + } // Tell workers to cancel any queries that were submitted before this restart of Czar. // Figure out which query (if any) was recorded in Czar database before the restart. From 23e0bcd69b54b25be6a531713ac764be8d5cca1e Mon Sep 17 00:00:00 2001 From: John Gates Date: Mon, 8 Apr 2024 12:58:24 -0700 Subject: [PATCH 7/8] Changed code to use QMeta::ChunkMap instead of json object. --- src/czar/CzarChunkMap.cc | 28 +++++++++++++------------- src/czar/CzarChunkMap.h | 9 +++------ src/czar/testCzar.cc | 43 ++++++++++++++++++++++++++++++++++++++-- src/qmeta/QMetaMysql.cc | 1 - src/qmeta/testQMeta.cc | 9 ++------- 5 files changed, 60 insertions(+), 30 deletions(-) diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index 13dfdf161..e92990e52 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -31,7 +31,6 @@ // Qserv headers #include "qmeta/Exceptions.h" -#include "qmeta/QMeta.h" #include "util/Bug.h" #include "util/TimeUtils.h" @@ -69,11 +68,8 @@ bool CzarChunkMap::_read() { return false; } - auto const& jsChunks = qChunkMap.chunks; - LOGS(_log, LOG_LVL_DEBUG, "chunkMap=" << jsChunks); - // Make the new maps. - auto [chunkMapPtr, wcMapPtr] = makeNewMaps(jsChunks); + auto [chunkMapPtr, wcMapPtr] = makeNewMaps(qChunkMap); verify(*chunkMapPtr, *wcMapPtr); LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); @@ -88,18 +84,22 @@ bool CzarChunkMap::_read() { } pair, shared_ptr> CzarChunkMap::makeNewMaps( - nlohmann::json const& jsChunks) { + qmeta::QMeta::ChunkMap const& qChunkMap) { // Create new maps. auto wcMapPtr = make_shared(); auto chunkMapPtr = make_shared(); - for (auto const& [workerId, dbs] : jsChunks.items()) { - for (auto const& [dbName, tables] : dbs.items()) { - for (auto const& [tableName, chunks] : tables.items()) { - for (auto const& [index, chunkNumNSz] : chunks.items()) { + // Workers -> Databases map + for (auto const& [workerId, dbs] : qChunkMap.workers) { + // Databases -> Tables map + for (auto const& [dbName, tables] : dbs) { + // Tables -> Chunks map + for (auto const& [tableName, chunks] : tables) { + // vector of ChunkInfo + for (qmeta::QMeta::ChunkMap::ChunkInfo const& chunkInfo : chunks) { try { - int64_t chunkNum = chunkNumNSz.at(0); - int64_t sz = chunkNumNSz.at(1); + int64_t chunkNum = chunkInfo.chunk; + int64_t sz = chunkInfo.size; LOGS(_log, LOG_LVL_DEBUG, "workerdId=" << workerId << " db=" << dbName << " table=" << tableName << " chunk=" << chunkNum << " sz=" << sz); @@ -109,12 +109,12 @@ pair, shared_ptr, std::shared_ptr> - makeNewMaps(nlohmann::json const& jsChunks); + makeNewMaps(qmeta::QMeta::ChunkMap const& qChunkMap); /// Verify that all chunks belong to at least one worker and that all chunks are represented in shared /// scans. diff --git a/src/czar/testCzar.cc b/src/czar/testCzar.cc index 5c0c8787c..fcc38dda4 100644 --- a/src/czar/testCzar.cc +++ b/src/czar/testCzar.cc @@ -37,6 +37,7 @@ // Qserv headers #include "czar/CzarChunkMap.h" +#include "qmeta/QMeta.h" namespace test = boost::test_tools; using namespace lsst::qserv; @@ -49,6 +50,42 @@ using namespace std; BOOST_AUTO_TEST_SUITE(Suite) +void insertIntoQChunkMap(qmeta::QMeta::ChunkMap& qChunkMap, string const& workerId, string const& dbName, + string const& tableName, unsigned int chunkNum, size_t sz) { + qChunkMap.workers[workerId][dbName][tableName].push_back(qmeta::QMeta::ChunkMap::ChunkInfo{chunkNum, sz}); +} + +qmeta::QMeta::ChunkMap convertJsonToChunkMap(nlohmann::json const& jsChunks) { + qmeta::QMeta::ChunkMap qChunkMap; + for (auto const& [workerId, dbs] : jsChunks.items()) { + for (auto const& [dbName, tables] : dbs.items()) { + for (auto const& [tableName, chunks] : tables.items()) { + for (auto const& [index, chunkNumNSz] : chunks.items()) { + try { + int64_t chunkNum = chunkNumNSz.at(0); + int64_t sz = chunkNumNSz.at(1); + LOGS(_log, LOG_LVL_DEBUG, + "workerdId=" << workerId << " db=" << dbName << " table=" << tableName + << " chunk=" << chunkNum << " sz=" << sz); + insertIntoQChunkMap(qChunkMap, workerId, dbName, tableName, chunkNum, sz); + } catch (invalid_argument const& exc) { + throw czar::ChunkMapException( + ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + } catch (out_of_range const& exc) { + throw czar::ChunkMapException( + ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + } + } + } + } + } + return qChunkMap; +} + BOOST_AUTO_TEST_CASE(CzarChunkMap) { // Each chunk only occurs on one worker string test1 = R"( @@ -149,12 +186,14 @@ BOOST_AUTO_TEST_CASE(CzarChunkMap) { )"; auto jsTest1 = nlohmann::json::parse(test1); - auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(jsTest1); + qmeta::QMeta::ChunkMap qChunkMap1 = convertJsonToChunkMap(jsTest1); + auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(qChunkMap1); czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure. LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 1 passed"); auto jsTest2 = nlohmann::json::parse(test2); - tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(jsTest2); + qmeta::QMeta::ChunkMap qChunkMap2 = convertJsonToChunkMap(jsTest2); + tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(qChunkMap2); czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure. LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 2 passed"); } diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index ea1d61094..40befc5e9 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -842,7 +842,6 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr const& prevUpdateTime) { lock_guard lock(_dbMutex); diff --git a/src/qmeta/testQMeta.cc b/src/qmeta/testQMeta.cc index 986b9d501..31cb287b2 100644 --- a/src/qmeta/testQMeta.cc +++ b/src/qmeta/testQMeta.cc @@ -412,16 +412,11 @@ BOOST_AUTO_TEST_CASE(messWithQueryStats) { } BOOST_CHECK(caught); } -/* &&& -<<<<<<< HEAD -======= -*/ + BOOST_AUTO_TEST_CASE(getChunkMap) { // The test assumes that the underlying tables exists and it's empty. QMeta::ChunkMap chunkMap; BOOST_CHECK_THROW(qMeta->getChunkMap(), EmptyTableError); } -/* &&& ->>>>>>> 07d082050 (Added code to read chunk disposition map and organize for czar use.) -*/ + BOOST_AUTO_TEST_SUITE_END() From fa63d4956d3edc3bdf7e4e1340cde4ef2c3f62ba Mon Sep 17 00:00:00 2001 From: John Gates Date: Tue, 9 Apr 2024 09:20:16 -0700 Subject: [PATCH 8/8] Changes for review. --- src/czar/CzarChunkMap.cc | 10 +++++----- src/czar/CzarChunkMap.h | 28 ++++++++++++++-------------- src/czar/testCzar.cc | 1 + 3 files changed, 20 insertions(+), 19 deletions(-) diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index e92990e52..ef7bb85d3 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -99,7 +99,7 @@ pair, shared_ptr; + using SizeT = uint64_t; CzarChunkMap() = delete; + CzarChunkMap(CzarChunkMap const&) = delete; + CzarChunkMap& operator=(CzarChunkMap const&) = delete; static Ptr create(std::shared_ptr const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); } @@ -84,13 +84,10 @@ class CzarChunkMap { int64_t getChunkId() const { return _chunkId; } - int64_t getTotalBytes() const { return _totalBytes; } + SizeT getTotalBytes() const { return _totalBytes; } std::weak_ptr getPrimaryScanWorker() const { return _primaryScanWorker; } - /// Add up the bytes in each table for this chunk to get `_totalBytes` - void calcTotalBytes(); - /// Add `worker` to the `_workerHasThisMap` to indicate that worker has a copy /// of this chunk. void addToWorkerHasThis(std::shared_ptr const& worker); @@ -100,29 +97,32 @@ class CzarChunkMap { friend CzarChunkMap; private: - int64_t const _chunkId; ///< The Id number for this chunk. - int64_t _totalBytes = 0; ///< The total number of bytes used by all tables in this chunk. + int64_t const _chunkId; ///< The Id number for this chunk. + SizeT _totalBytes = 0; ///< The total number of bytes used by all tables in this chunk. std::weak_ptr _primaryScanWorker; ///< The worker to be used to shared scans. /// Key is databaseName+tableName, value is size in bytes. - std::map, int64_t> _dbTableMap; + std::map, SizeT> _dbTableMap; /// Map of workers that have this chunk std::map> _workerHasThisMap; + + /// Add up the bytes in each table for this chunk to get `_totalBytes` + void _calcTotalBytes(); }; /// Essentially a structure for storing which chunks are associated with a worker. class WorkerChunksData { public: using Ptr = std::shared_ptr; - WorkerChunksData(std::string const& wId) : _workerId(wId) {} + WorkerChunksData(std::string const& workerId) : _workerId(workerId) {} /// Return the worker's id string. std::string const& getWorkerId() const { return _workerId; } /// Return the number of bytes contained in all chunks/tables to be /// accessed in a full table scan on this worker. - int64_t getSharedScanTotalSize() const { return _sharedScanTotalSize; } + SizeT getSharedScanTotalSize() const { return _sharedScanTotalSize; } /// Return a reference to `_sharedScanChunkMap`. A copy of the pointer /// to this class (or the containing map) should be held to ensure the reference. @@ -147,7 +147,7 @@ class CzarChunkMap { /// The total size (in bytes) of all chunks on this worker that /// are to be used in shared scans. - int64_t _sharedScanTotalSize = 0; + SizeT _sharedScanTotalSize = 0; }; using WorkerChunkMap = std::map; @@ -168,7 +168,7 @@ class CzarChunkMap { /// @param `sz` - size in bytes of the table being inserted. static void insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId, std::string const& dbName, std::string const& tableName, - int64_t chunkIdNum, int64_t sz); + int64_t chunkIdNum, SizeT sz); /// Calculate the total bytes in each chunk and then sort the resulting ChunkVector by chunk size, /// descending. diff --git a/src/czar/testCzar.cc b/src/czar/testCzar.cc index fcc38dda4..cde8e59f2 100644 --- a/src/czar/testCzar.cc +++ b/src/czar/testCzar.cc @@ -27,6 +27,7 @@ // Third-party headers #include "boost/asio.hpp" +#include "nlohmann/json.hpp" // Boost unit test header #define BOOST_TEST_MODULE Czar_1