diff --git a/admin/local/docker/compose/docker-compose.yml b/admin/local/docker/compose/docker-compose.yml index 5a3c37f99..dc5ca4880 100644 --- a/admin/local/docker/compose/docker-compose.yml +++ b/admin/local/docker/compose/docker-compose.yml @@ -415,6 +415,7 @@ services: --registry-host=repl-mgr-registry --controller-auto-register-workers=1 --qserv-sync-force + --qserv-chunk-map-update --debug expose: - "25081" diff --git a/src/ccontrol/UserQueryFactory.cc b/src/ccontrol/UserQueryFactory.cc index 589088692..68761c196 100644 --- a/src/ccontrol/UserQueryFactory.cc +++ b/src/ccontrol/UserQueryFactory.cc @@ -224,6 +224,8 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st // First check for SUBMIT and strip it std::string query = aQuery; + // TODO: DM-43386 need to have WorkerChunkMap info at this point + std::string stripped; bool async = false; if (UserQueryType::isSubmit(query, stripped)) { diff --git a/src/czar/CMakeLists.txt b/src/czar/CMakeLists.txt index 4b85ea97f..3b9d63e40 100644 --- a/src/czar/CMakeLists.txt +++ b/src/czar/CMakeLists.txt @@ -1,7 +1,9 @@ add_library(czar OBJECT) +add_dependencies(czar proto) target_sources(czar PRIVATE Czar.cc + CzarChunkMap.cc HttpCzarSvc.cc HttpCzarQueryModule.cc HttpModule.cc @@ -50,4 +52,32 @@ endfunction() czar_utils( qserv-czar-http -) \ No newline at end of file +) + +function(czar_tests) + foreach(TEST IN ITEMS ${ARGV}) + add_executable(${TEST} ${TEST}.cc) + target_link_libraries(${TEST} PUBLIC + cconfig + ccontrol + czar + global + mysql + parser + qana + qdisp + qproc + qserv_meta + query + rproc + sql + Boost::unit_test_framework + ) + add_test(NAME ${TEST} COMMAND ${TEST}) + endforeach() +endfunction() + +czar_tests( + testCzar +) + diff --git a/src/czar/Czar.cc b/src/czar/Czar.cc index 34ca6e74d..0e3bab5d8 100644 --- a/src/czar/Czar.cc +++ b/src/czar/Czar.cc @@ -43,6 +43,7 @@ #include "ccontrol/UserQueryResources.h" #include "ccontrol/UserQuerySelect.h" #include "ccontrol/UserQueryType.h" +#include "czar/CzarChunkMap.h" #include "czar/CzarErrors.h" #include "czar/HttpSvc.h" #include "czar/MessageTable.h" @@ -138,8 +139,7 @@ Czar::Czar(string const& configFilePath, string const& czarName) _czarConfig(cconfig::CzarConfig::create(configFilePath, czarName)), _idCounter(), _uqFactory(), - _clientToQuery(), - _mutex() { + _clientToQuery() { // set id counter to milliseconds since the epoch, mod 1 year. struct timeval tv; gettimeofday(&tv, nullptr); @@ -156,6 +156,12 @@ Czar::Czar(string const& configFilePath, string const& czarName) // the name of the Czar gets translated into a numeric identifier. _czarConfig->setId(_uqFactory->userQuerySharedResources()->qMetaCzarId); + try { + _czarChunkMap = CzarChunkMap::create(_uqFactory->userQuerySharedResources()->queryMetadata); + } catch (ChunkMapException const& exc) { + LOGS(_log, LOG_LVL_WARN, string(__func__) + " failed to create CzarChunkMap " + exc.what()); + } + // Tell workers to cancel any queries that were submitted before this restart of Czar. // Figure out which query (if any) was recorded in Czar database before the restart. // The id will be used as the high-watermark for queries that need to be cancelled. diff --git a/src/czar/Czar.h b/src/czar/Czar.h index 3fae22b83..32549fe9d 100644 --- a/src/czar/Czar.h +++ b/src/czar/Czar.h @@ -61,6 +61,8 @@ class FileMonitor; namespace lsst::qserv::czar { +class CzarChunkMap; + /// @addtogroup czar /** @@ -179,6 +181,9 @@ class Czar { /// The HTTP server processing Czar management requests. std::shared_ptr _controlHttpSvc; + + /// Map of which chunks on which workers and shared scan order. + std::shared_ptr _czarChunkMap; }; } // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc new file mode 100644 index 000000000..ef7bb85d3 --- /dev/null +++ b/src/czar/CzarChunkMap.cc @@ -0,0 +1,360 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// Class header +#include "czar/CzarChunkMap.h" + +// System headers +#include + +// LSST headers +#include "lsst/log/Log.h" + +// Qserv headers +#include "qmeta/Exceptions.h" +#include "util/Bug.h" +#include "util/TimeUtils.h" + +using namespace std; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarChunkMap"); +} // namespace + +namespace lsst::qserv::czar { + +CzarChunkMap::CzarChunkMap(std::shared_ptr const& qmeta) : _qmeta(qmeta) { + try { + auto mapsSet = _read(); + if (!mapsSet) { + throw ChunkMapException(ERR_LOC, "CzarChunkMap maps were not set in contructor"); + } + } catch (qmeta::QMetaError const& qExc) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " CzarChunkMap could not read DB " << qExc.what()); + throw ChunkMapException(ERR_LOC, string(" CzarChunkMap constructor failed read ") + qExc.what()); + } +} + +bool CzarChunkMap::_read() { + LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() start"); + // If replacing the map, this may take a bit of time, but it's probably + // better to wait for new maps if something changed. + std::lock_guard gLock(_mapMtx); + qmeta::QMeta::ChunkMap qChunkMap = _qmeta->getChunkMap(); + if (_lastUpdateTime >= qChunkMap.updateTime) { + LOGS(_log, LOG_LVL_DEBUG, + __func__ << " CzarChunkMap no need to read " + << util::TimeUtils::timePointToDateTimeString(_lastUpdateTime) + << " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime)); + return false; + } + + // Make the new maps. + auto [chunkMapPtr, wcMapPtr] = makeNewMaps(qChunkMap); + + verify(*chunkMapPtr, *wcMapPtr); + LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); + LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); + + _workerChunkMap = wcMapPtr; + _chunkMap = chunkMapPtr; + _lastUpdateTime = qChunkMap.updateTime; + + LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() end"); + return true; +} + +pair, shared_ptr> CzarChunkMap::makeNewMaps( + qmeta::QMeta::ChunkMap const& qChunkMap) { + // Create new maps. + auto wcMapPtr = make_shared(); + auto chunkMapPtr = make_shared(); + + // Workers -> Databases map + for (auto const& [workerId, dbs] : qChunkMap.workers) { + // Databases -> Tables map + for (auto const& [dbName, tables] : dbs) { + // Tables -> Chunks map + for (auto const& [tableName, chunks] : tables) { + // vector of ChunkInfo + for (qmeta::QMeta::ChunkMap::ChunkInfo const& chunkInfo : chunks) { + try { + int64_t chunkNum = chunkInfo.chunk; + SizeT sz = chunkInfo.size; + LOGS(_log, LOG_LVL_DEBUG, + "workerdId=" << workerId << " db=" << dbName << " table=" << tableName + << " chunk=" << chunkNum << " sz=" << sz); + insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum, + sz); + } catch (invalid_argument const& exc) { + throw ChunkMapException( + ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkInfo.chunk) + " " + exc.what()); + } catch (out_of_range const& exc) { + throw ChunkMapException( + ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkInfo.chunk) + " " + exc.what()); + } + } + } + } + } + + auto chunksSortedBySize = make_shared(); + calcChunkMap(*chunkMapPtr, *chunksSortedBySize); + + // At this point we have + // - wcMapPtr has a map of workerData by worker id with each worker having a map of ChunkData + // - chunkMapPtr has a map of all chunkData by chunk id + // - chunksSortedBySize a list of chunks sorted with largest first. + // From here need to assign shared scan chunk priority + // Go through the chunksSortedBySize list and assign each chunk to worker that has it with the smallest + // totalScanSize. + for (auto&& chunkData : *chunksSortedBySize) { + SizeT smallest = std::numeric_limits::max(); + WorkerChunksData::Ptr smallestWkr = nullptr; + for (auto&& [wkrId, wkrDataWeak] : chunkData->_workerHasThisMap) { + auto wkrData = wkrDataWeak.lock(); + if (wkrData == nullptr) { + LOGS(_log, LOG_LVL_ERROR, __func__ << " unexpected null weak ptr for " << wkrId); + continue; // maybe the next one will be ok. + } + LOGS(_log, LOG_LVL_DEBUG, + __func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize + << " smallest=" << smallest); + if (wkrData->_sharedScanTotalSize < smallest) { + smallestWkr = wkrData; + smallest = smallestWkr->_sharedScanTotalSize; + } + } + if (smallestWkr == nullptr) { + throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + + to_string(chunkData->_chunkId)); + } + smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData; + smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes; + chunkData->_primaryScanWorker = smallestWkr; + LOGS(_log, LOG_LVL_DEBUG, + " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId); + } + + LOGS(_log, LOG_LVL_TRACE, " chunkMap=" << dumpChunkMap(*chunkMapPtr)); + LOGS(_log, LOG_LVL_TRACE, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr)); + return {chunkMapPtr, wcMapPtr}; +} + +void CzarChunkMap::insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId, + string const& dbName, string const& tableName, int64_t chunkIdNum, + SizeT sz) { + // Get or make the worker entry + WorkerChunksData::Ptr workerChunksData; + auto iterWC = wcMap.find(workerId); + if (iterWC == wcMap.end()) { + workerChunksData = WorkerChunksData::Ptr(new WorkerChunksData(workerId)); + wcMap[workerId] = workerChunksData; + } else { + workerChunksData = iterWC->second; + } + + // Get or make the ChunkData entry in chunkMap + ChunkData::Ptr chunkData; + auto iterChunkData = chunkMap.find(chunkIdNum); + if (iterChunkData == chunkMap.end()) { + chunkData = ChunkData::Ptr(new ChunkData(chunkIdNum)); + chunkMap[chunkIdNum] = chunkData; + } else { + chunkData = iterChunkData->second; + } + + // Set or verify the table information + auto iterDT = chunkData->_dbTableMap.find({dbName, tableName}); + if (iterDT == chunkData->_dbTableMap.end()) { + // doesn't exist so set it up + chunkData->_dbTableMap[{dbName, tableName}] = sz; + } else { + // Verify that it matches other data + auto const& dbTbl = iterDT->first; + auto tblSz = iterDT->second; + auto const& dbN = dbTbl.first; + auto const& tblN = dbTbl.second; + if (dbName != dbN || tblN != tableName || tblSz != sz) { + LOGS(_log, LOG_LVL_ERROR, + __func__ << " data mismatch for " << dbName << "." << tableName << "=" << sz << " vs " << dbN + << "." << tblN << "=" << tblSz); + } + } + + // Link WorkerData the single chunkData instance for the chunkId + workerChunksData->_chunkDataMap[chunkIdNum] = chunkData; + + // Add worker to the list of workers containing the chunk. + chunkData->addToWorkerHasThis(workerChunksData); +} + +void CzarChunkMap::calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) { + // Calculate total bytes for all chunks. + for (auto&& [chunkIdNum, chunkData] : chunkMap) { + chunkData->_calcTotalBytes(); + chunksSortedBySize.push_back(chunkData); + } + + sortChunks(chunksSortedBySize); +} + +void CzarChunkMap::sortChunks(std::vector& chunksSortedBySize) { + /// Return true if a->_totalBytes > b->_totalBytes + auto sortBySizeDesc = [](ChunkData::Ptr const& a, ChunkData::Ptr const& b) { + if (b == nullptr && a != nullptr) return true; + if (a == nullptr) return false; + return a->_totalBytes > b->_totalBytes; + }; + + std::sort(chunksSortedBySize.begin(), chunksSortedBySize.end(), sortBySizeDesc); +} + +void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) { + // Use a set to prevent duplicate ids caused by replication levels > 1. + set allChunkIds; + int errorCount = 0; + for (auto const& [wkr, wkrData] : wcMap) { + for (auto const& [chunkId, chunkData] : wkrData->_chunkDataMap) { + allChunkIds.insert(chunkId); + } + } + + for (auto const& [chunkId, chunkDataPtr] : chunkMap) { + if (chunkDataPtr == nullptr) { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " had nullptr"); + ++errorCount; + continue; + } + auto primeScanWkr = chunkDataPtr->_primaryScanWorker.lock(); + if (primeScanWkr == nullptr) { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " missing primaryScanWorker"); + ++errorCount; + continue; + } + if (primeScanWkr->_sharedScanChunkMap.find(chunkId) == primeScanWkr->_sharedScanChunkMap.end()) { + LOGS(_log, LOG_LVL_ERROR, + " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " + << primeScanWkr->_workerId); + ++errorCount; + continue; + } + auto iter = allChunkIds.find(chunkId); + if (iter != allChunkIds.end()) { + allChunkIds.erase(iter); + } else { + LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " chunkId was not in allChunks list"); + ++errorCount; + continue; + } + } + + auto missing = allChunkIds.size(); + if (missing > 0) { + string allMissingIds; + for (auto const& cId : allChunkIds) { + allMissingIds += to_string(cId) + ","; + } + LOGS(_log, LOG_LVL_ERROR, + " There were " << missing << " missing chunks from the scan list " << allMissingIds); + ++errorCount; + } + + if (errorCount > 0) { + throw ChunkMapException(ERR_LOC, "verification failed with " + to_string(errorCount) + " errors"); + } +} + +string CzarChunkMap::dumpChunkMap(ChunkMap const& chunkMap) { + stringstream os; + os << "ChunkMap{"; + for (auto const& [cId, cDataPtr] : chunkMap) { + os << "(cId=" << cId << ":"; + os << ((cDataPtr == nullptr) ? "null" : cDataPtr->dump()) << ")"; + } + os << "}"; + return os.str(); +} + +string CzarChunkMap::dumpWorkerChunkMap(WorkerChunkMap const& wcMap) { + stringstream os; + os << "WorkerChunkMap{"; + for (auto const& [wId, wDataPtr] : wcMap) { + os << "(wId=" << wId << ":"; + os << ((wDataPtr == nullptr) ? "null" : wDataPtr->dump()) << ")"; + } + os << "}"; + return os.str(); +} + +void CzarChunkMap::ChunkData::_calcTotalBytes() { + _totalBytes = 0; + for (auto const& [key, val] : _dbTableMap) { + _totalBytes += val; + } +} + +void CzarChunkMap::ChunkData::addToWorkerHasThis(std::shared_ptr const& worker) { + if (worker == nullptr) { + throw ChunkMapException(ERR_LOC, string(__func__) + " worker was null"); + } + + _workerHasThisMap[worker->_workerId] = worker; +} + +string CzarChunkMap::ChunkData::dump() const { + stringstream os; + auto primaryWorker = _primaryScanWorker.lock(); + os << "{ChunkData id=" << _chunkId << " totalBytes=" << _totalBytes; + os << " primaryWorker=" << ((primaryWorker == nullptr) ? "null" : primaryWorker->_workerId); + os << " workers{"; + for (auto const& [wId, wData] : _workerHasThisMap) { + os << "(" << wId << ")"; + } + os << "} tables{"; + for (auto const& [dbTbl, sz] : _dbTableMap) { + os << "(" << dbTbl.first << "." << dbTbl.second << " sz=" << sz << ")"; + } + os << "}}"; + return os.str(); +} + +string CzarChunkMap::WorkerChunksData::dump() const { + stringstream os; + os << "{WorkerChunksData id=" << _workerId << " scanTotalSize=" << _sharedScanTotalSize; + os << " chunkDataIds{"; + for (auto const& [chunkId, chunkData] : _chunkDataMap) { + os << "(" << chunkId << ")"; + } + os << "} sharedScanChunks{"; + for (auto const& [chunkId, chunkData] : _sharedScanChunkMap) { + os << "(" << chunkId << ")"; + } + os << "}}"; + return os.str(); +} + +} // namespace lsst::qserv::czar diff --git a/src/czar/CzarChunkMap.h b/src/czar/CzarChunkMap.h new file mode 100644 index 000000000..ba4ca8968 --- /dev/null +++ b/src/czar/CzarChunkMap.h @@ -0,0 +1,226 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +#ifndef LSST_QSERV_CZAR_CZARCHUNKMAP_H +#define LSST_QSERV_CZAR_CZARCHUNKMAP_H + +// System headers +#include +#include +#include +#include +#include +#include +#include + +// Qserv headers +#include "global/clock_defs.h" +#include "qmeta/QMeta.h" +#include "util/Issue.h" + +namespace lsst::qserv::czar { + +class ChunkMapException : public util::Issue { +public: + ChunkMapException(Context const& ctx, std::string const& msg) : util::Issue(ctx, msg) {} +}; + +/// This class is used to organize worker chunk table information so that it +/// can be used to send jobs to the appropriate worker and inform workers +/// what chunks they can expect to handle in shared scans. +/// The data for the maps is provided by the Replicator and stored in QMeta. +/// When the data is changed, there is a timestamp that is updated, which +/// will cause new maps to be made by this class. +/// +/// The maps generated are constant objects stored with shared pointers. As +/// such, it should be possible for numerous threads to use each map +/// simultaneously provided they have their own pointers to the maps. +/// The pointers to the maps are mutex protected to safely allow map updates. +/// +/// The czar is expected to heavily use the +/// `getMaps() -> WorkerChunkMap -> getSharedScanChunkMap()` +/// to send jobs to workers, as that gets an ordered list of all chunks +/// the worker should handle during a shared scan. +/// `getMaps() -> ChunkMap` is expected to be more useful if there is a +/// failure and a chunk query needs to go to a different worker. +class CzarChunkMap { +public: + using Ptr = std::shared_ptr; + using SizeT = uint64_t; + + CzarChunkMap() = delete; + CzarChunkMap(CzarChunkMap const&) = delete; + CzarChunkMap& operator=(CzarChunkMap const&) = delete; + + static Ptr create(std::shared_ptr const& qmeta) { return Ptr(new CzarChunkMap(qmeta)); } + + class WorkerChunksData; + + /// Essentially a structure for storing data about which tables and workers are associated with this + /// chunk. + class ChunkData { + public: + using Ptr = std::shared_ptr; + ChunkData(int chunkId_) : _chunkId(chunkId_) {} + + int64_t getChunkId() const { return _chunkId; } + + SizeT getTotalBytes() const { return _totalBytes; } + + std::weak_ptr getPrimaryScanWorker() const { return _primaryScanWorker; } + + /// Add `worker` to the `_workerHasThisMap` to indicate that worker has a copy + /// of this chunk. + void addToWorkerHasThis(std::shared_ptr const& worker); + + std::string dump() const; + + friend CzarChunkMap; + + private: + int64_t const _chunkId; ///< The Id number for this chunk. + SizeT _totalBytes = 0; ///< The total number of bytes used by all tables in this chunk. + std::weak_ptr _primaryScanWorker; ///< The worker to be used to shared scans. + + /// Key is databaseName+tableName, value is size in bytes. + std::map, SizeT> _dbTableMap; + + /// Map of workers that have this chunk + std::map> _workerHasThisMap; + + /// Add up the bytes in each table for this chunk to get `_totalBytes` + void _calcTotalBytes(); + }; + + /// Essentially a structure for storing which chunks are associated with a worker. + class WorkerChunksData { + public: + using Ptr = std::shared_ptr; + WorkerChunksData(std::string const& workerId) : _workerId(workerId) {} + + /// Return the worker's id string. + std::string const& getWorkerId() const { return _workerId; } + + /// Return the number of bytes contained in all chunks/tables to be + /// accessed in a full table scan on this worker. + SizeT getSharedScanTotalSize() const { return _sharedScanTotalSize; } + + /// Return a reference to `_sharedScanChunkMap`. A copy of the pointer + /// to this class (or the containing map) should be held to ensure the reference. + std::map const& getSharedScanChunkMap() const { return _sharedScanChunkMap; } + + std::string dump() const; + + friend CzarChunkMap; + + private: + std::string const _workerId; + + /// Map of all chunks found on the worker where key is chunkId + std::map _chunkDataMap; + + /// Map of chunks this worker will handle during shared scans. + /// Since scans are done in order of chunk id numbers, it helps + /// to have this in chunk id number order. + /// At some point, thus should be sent to workers so they + /// can make more accurate time estimates for chunk completion. + std::map _sharedScanChunkMap; + + /// The total size (in bytes) of all chunks on this worker that + /// are to be used in shared scans. + SizeT _sharedScanTotalSize = 0; + }; + + using WorkerChunkMap = std::map; + using ChunkMap = std::map; + using ChunkVector = std::vector; + + /// Sort the chunks in `chunksSortedBySize` in descending order by total size in bytes. + static void sortChunks(ChunkVector& chunksSortedBySize); + + /// Insert the chunk table described into the correct locations in + /// `wcMap` and `chunkMap`. + /// @param `wcMap` - WorkerChunkMap being constructed. + /// @param `chunkMap` - ChunkMap being constructed. + /// @param `workerId` - worker id where this table was found. + /// @param `dbName` - database name for the table being inserted. + /// @param `tableName` - table name for the table being inserted. + /// @param `chunkIdNum` - chunk id number for the table being inserted. + /// @param `sz` - size in bytes of the table being inserted. + static void insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, std::string const& workerId, + std::string const& dbName, std::string const& tableName, + int64_t chunkIdNum, SizeT sz); + + /// Calculate the total bytes in each chunk and then sort the resulting ChunkVector by chunk size, + /// descending. + static void calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize); + + /// Make new ChunkMap and WorkerChunkMap from the data in `qChunkMap`. + static std::pair, std::shared_ptr> + makeNewMaps(qmeta::QMeta::ChunkMap const& qChunkMap); + + /// Verify that all chunks belong to at least one worker and that all chunks are represented in shared + /// scans. + /// @throws ChunkMapException + static void verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap); + + static std::string dumpChunkMap(ChunkMap const& chunkMap); + + static std::string dumpWorkerChunkMap(WorkerChunkMap const& wcMap); + + /// Return shared pointers to `_chunkMap` and `_workerChunkMap`, which should be held until + /// finished with the data. + std::pair, + std::shared_ptr> + getMaps() const { + std::lock_guard lck(_mapMtx); + return {_chunkMap, _workerChunkMap}; + } + +private: + /// Try to `_read` values for maps from `qmeta`. + CzarChunkMap(std::shared_ptr const& qmeta); + + /// Read the json worker list from the database and update the maps if there's a new + /// version since the `_lastUpdateTime`. + /// @throws `qmeta::QMetaError` + bool _read(); + + std::shared_ptr _qmeta; ///< Database connection to collect json worker list. + + /// Map of all workers and which chunks they contain. + std::shared_ptr _workerChunkMap; + + /// Map of all chunks in the system with chunkId number as the key and the values contain + /// information about the tables in those chunks and which worker is responsible for + /// handling the chunk in a shared scan. + std::shared_ptr _chunkMap; + + /// The last time the maps were updated with information from the replicator. + TIMEPOINT _lastUpdateTime; // initialized to 0; + + mutable std::mutex _mapMtx; ///< protects _workerChunkMap, _chunkMap, _timeStamp, and _qmeta. +}; + +} // namespace lsst::qserv::czar + +#endif // LSST_QSERV_CZAR_CZARCHUNKMAP_H diff --git a/src/czar/testCzar.cc b/src/czar/testCzar.cc new file mode 100644 index 000000000..cde8e59f2 --- /dev/null +++ b/src/czar/testCzar.cc @@ -0,0 +1,202 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + +// System headers +#include +#include +#include + +// Third-party headers +#include "boost/asio.hpp" +#include "nlohmann/json.hpp" + +// Boost unit test header +#define BOOST_TEST_MODULE Czar_1 +#include + +// LSST headers +#include "lsst/log/Log.h" + +// Qserv headers +#include "czar/CzarChunkMap.h" +#include "qmeta/QMeta.h" + +namespace test = boost::test_tools; +using namespace lsst::qserv; + +namespace { +LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.testCzar"); +} + +using namespace std; + +BOOST_AUTO_TEST_SUITE(Suite) + +void insertIntoQChunkMap(qmeta::QMeta::ChunkMap& qChunkMap, string const& workerId, string const& dbName, + string const& tableName, unsigned int chunkNum, size_t sz) { + qChunkMap.workers[workerId][dbName][tableName].push_back(qmeta::QMeta::ChunkMap::ChunkInfo{chunkNum, sz}); +} + +qmeta::QMeta::ChunkMap convertJsonToChunkMap(nlohmann::json const& jsChunks) { + qmeta::QMeta::ChunkMap qChunkMap; + for (auto const& [workerId, dbs] : jsChunks.items()) { + for (auto const& [dbName, tables] : dbs.items()) { + for (auto const& [tableName, chunks] : tables.items()) { + for (auto const& [index, chunkNumNSz] : chunks.items()) { + try { + int64_t chunkNum = chunkNumNSz.at(0); + int64_t sz = chunkNumNSz.at(1); + LOGS(_log, LOG_LVL_DEBUG, + "workerdId=" << workerId << " db=" << dbName << " table=" << tableName + << " chunk=" << chunkNum << " sz=" << sz); + insertIntoQChunkMap(qChunkMap, workerId, dbName, tableName, chunkNum, sz); + } catch (invalid_argument const& exc) { + throw czar::ChunkMapException( + ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + } catch (out_of_range const& exc) { + throw czar::ChunkMapException( + ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + + " db=" + dbName + " table=" + tableName + + " chunk=" + to_string(chunkNumNSz) + " " + exc.what()); + } + } + } + } + } + return qChunkMap; +} + +BOOST_AUTO_TEST_CASE(CzarChunkMap) { + // Each chunk only occurs on one worker + string test1 = R"( + { + "ce1c1b79-e6fb-11ee-a46b-0242c0a80308": + {"qcase01": + {"Object":[[1234567890,0],[6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7310,0]], + "Source":[[1234567890,0],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[7165,76356]], + "RefObject":[[1234567890,0],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[7165,578396]] + } + }, + "ddc3f1b9-e6fb-11ee-a46b-0242c0a80304": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000]], + "Source":[[1234567890,0],[7480,2259419]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728]], + "RefObject":[[1234567890,0],[6995,10920]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834]], + "RunDeepSource":[[1234567890,0],[6995,58604]] + } + } + } + )"; + + /// 3 workers, each containing all chunks. + string test2 = R"( + { + "ce1c1b79-e6fb-11ee-a46b-0242c0a80308": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608], + [6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304], + [6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000],[7310,0]], + "Source":[[1234567890,0],[7480,2259419],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728],[7165,76356]], + "RefObject":[[1234567890,0],[6995,10920],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[6995,58604],[7165,578396]] + } + }, + "brnd1b79-e6fb-11ee-a46b-0242c0a80308": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608], + [6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304], + [6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000],[7310,0]], + "Source":[[1234567890,0],[7480,2259419],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728],[7165,76356]], + "RefObject":[[1234567890,0],[6995,10920],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[6995,58604],[7165,578396]] + } + }, + "ddc3f1b9-e6fb-11ee-a46b-0242c0a80304": + {"qcase01": + {"Object":[[1234567890,0],[6631,1612],[6801,4752],[6970,5780],[7138,3212],[7308,2144],[7478,4608], + [6630,1460],[6800,6068],[6968,1000],[6971,2716],[7140,4556],[7310,2144],[7648,1568]], + "Source":[[1234567890,0],[6631,45724],[6801,123940],[6970,151660],[7138,97252],[7308,56784],[7478,99304], + [6630,37084],[6800,163888],[6968,33044],[6971,67016],[7140,145300],[7310,83872],[7648,30096]] + }, + "qcase02": + {"Object":[[1234567890,0],[7480,1055000],[7310,0]], + "Source":[[1234567890,0],[7480,2259419],[7310,0]] + }, + "qcase03": + {"RefDeepSrcMatch":[[1234567890,0],[6995,7728],[7165,76356]], + "RefObject":[[1234567890,0],[6995,10920],[7165,119616]], + "RunDeepForcedSource":[[1234567890,0],[6995,11708834],[7165,130617531]], + "RunDeepSource":[[1234567890,0],[6995,58604],[7165,578396]] + } + } + } + )"; + + auto jsTest1 = nlohmann::json::parse(test1); + qmeta::QMeta::ChunkMap qChunkMap1 = convertJsonToChunkMap(jsTest1); + auto [chunkMapPtr, wcMapPtr] = czar::CzarChunkMap::makeNewMaps(qChunkMap1); + czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure. + LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 1 passed"); + + auto jsTest2 = nlohmann::json::parse(test2); + qmeta::QMeta::ChunkMap qChunkMap2 = convertJsonToChunkMap(jsTest2); + tie(chunkMapPtr, wcMapPtr) = czar::CzarChunkMap::makeNewMaps(qChunkMap2); + czar::CzarChunkMap::verify(*chunkMapPtr, *wcMapPtr); // Throws on failure. + LOGS(_log, LOG_LVL_DEBUG, "CzarChunkMap test 2 passed"); +} + +BOOST_AUTO_TEST_SUITE_END() diff --git a/src/global/clock_defs.h b/src/global/clock_defs.h index d582b416f..9db4dadbc 100644 --- a/src/global/clock_defs.h +++ b/src/global/clock_defs.h @@ -23,6 +23,7 @@ #define LSST_QSERV_GLOBAL_CLOCKDEFS_H // System headers +#include #include #include #include diff --git a/src/qmeta/Exceptions.h b/src/qmeta/Exceptions.h index deeccf73b..511e1cd35 100644 --- a/src/qmeta/Exceptions.h +++ b/src/qmeta/Exceptions.h @@ -109,6 +109,15 @@ class MissingTableError : public QMetaError { virtual std::string typeName() const override { return "MissingTableError"; } }; +/// Exception thrown when the specified metadata table is empty. +class EmptyTableError : public QMetaError { +public: + EmptyTableError(util::Issue::Context const& ctx, std::string const& table) + : QMetaError(ctx, "Query metadata table is empty: " + table) {} + + virtual std::string typeName() const override { return "EmptyTableError"; } +}; + /// Exception thrown when database consistency is violated. class ConsistencyError : public QMetaError { public: diff --git a/src/qmeta/QMeta.h b/src/qmeta/QMeta.h index 07e6afd91..a8c3a6672 100644 --- a/src/qmeta/QMeta.h +++ b/src/qmeta/QMeta.h @@ -23,6 +23,7 @@ #define LSST_QSERV_QMETA_QMETA_H // System headers +#include #include #include #include @@ -30,6 +31,7 @@ #include // Qserv headers +#include "global/clock_defs.h" #include "global/intTypes.h" #include "qmeta/QInfo.h" #include "qmeta/QStats.h" @@ -46,7 +48,6 @@ namespace lsst::qserv::qmeta { /** * @ingroup qmeta - * * @brief Interface for query metadata. */ @@ -58,6 +59,48 @@ class QMeta { */ typedef std::vector > TableNames; + /** + * The structure ChunkMap encapsulates a disposition of chunks at Qserv workers + * along with a time when the map was updated. + * + * Here is an example on how to using the map for getting info on all chunks in + * the given context: + * @code + * std::string const worker = "worker-001"; + * std::string const database = "LSST-DR01"; + * std::string const table = "Object"; + * + * ChunkMap const& chunkMap = ...; + * for (auto const& [chunk, size] : chunkMap[worker][database][table]) { + * ... + * } + * @endcode + */ + struct ChunkMap { + /// @return 'true' if the map is empty (or constructed using the default constructor) + bool empty() const { + return workers.empty() || (std::chrono::time_point() == updateTime); + } + + // NOTE: Separate types were added here for the sake of clarity to avoid + // a definition of the unreadable nested map. + + struct ChunkInfo { + unsigned int chunk = 0; ///< The chunk number + size_t size = 0; ///< The file size (in bytes) of the chunk table + }; + typedef std::vector Chunks; ///< Collection of chunks + typedef std::map Tables; ///< tables-to-chunks + typedef std::map Databases; ///< Databases-to-tables + typedef std::map Workers; ///< Workers-to-databases + + /// The chunk disposition map for all workers. + Workers workers; + + /// The last time the map was updated (since UNIX Epoch). + TIMEPOINT updateTime; + }; + /** * Create QMeta instance from configuration dictionary. * @@ -287,6 +330,21 @@ class QMeta { /// Write messages/errors generated during the query to the QMessages table. virtual void addQueryMessages(QueryId queryId, std::shared_ptr const& msgStore) = 0; + /** + * Fetch the chunk map which was updated after the specified time point. + * @param prevUpdateTime The cut off time for the chunk map age. Note that the default + * value of the parameter represents the start time of the UNIX Epoch. Leaving the default + * value forces an attempt to read the map from the database if the one would exist + * in there. + * @return Return the most current chunk disposition or the empty object if the persistent + * map is older than it was requested.The result could be evaluated by calling + * method empty() on the result object. + * @throws EmptyTableError if the corresponding metadata table doesn't have any record + * @throws SqlError for any other error related to MySQL + */ + virtual ChunkMap getChunkMap(std::chrono::time_point const& prevUpdateTime = + std::chrono::time_point()) = 0; + protected: // Default constructor QMeta() {} diff --git a/src/qmeta/QMetaMysql.cc b/src/qmeta/QMetaMysql.cc index d669b05f8..40befc5e9 100644 --- a/src/qmeta/QMetaMysql.cc +++ b/src/qmeta/QMetaMysql.cc @@ -25,6 +25,7 @@ // System headers #include +#include // Third-party headers #include "boost/lexical_cast.hpp" @@ -34,6 +35,7 @@ #include "lsst/log/Log.h" // Qserv headers +#include "global/stringUtil.h" #include "qdisp/JobStatus.h" #include "qdisp/MessageStore.h" #include "qmeta/Exceptions.h" @@ -47,7 +49,7 @@ using namespace std; namespace { // Current version of QMeta schema -char const VERSION_STR[] = "9"; +char const VERSION_STR[] = "10"; LOG_LOGGER _log = LOG_GET("lsst.qserv.qmeta.QMetaMysql"); @@ -840,6 +842,84 @@ void QMetaMysql::addQueryMessages(QueryId queryId, shared_ptr const& prevUpdateTime) { + lock_guard lock(_dbMutex); + + QMeta::ChunkMap chunkMap; + + auto trans = QMetaTransaction::create(*_conn); + + // Check if the table needs to be read. Note that the default value of + // the previous update timestamp always forces an attempt to read the map. + auto const updateTime = _getChunkMapUpdateTime(lock); + bool const force = + (prevUpdateTime == chrono::time_point()) || (prevUpdateTime < updateTime); + if (!force) { + trans->commit(); + return QMeta::ChunkMap(); + } + + // Read the map itself + + sql::SqlErrorObject errObj; + sql::SqlResults results; + + string const tableName = "chunkMap"; + string const query = "SELECT `worker`,`database`,`table`,`chunk`,`size` FROM `" + tableName + "`"; + LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query); + if (!_conn->runQuery(query, results, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "query failed: " << query); + throw SqlError(ERR_LOC, errObj); + } + vector> const rows = results.extractFirstNColumns(5); + trans->commit(); + + if (rows.empty()) throw EmptyTableError(ERR_LOC, tableName); + try { + for (auto const& row : rows) { + string const& worker = row[0]; + string const& database = row[1]; + string const& table = row[2]; + unsigned int chunk = lsst::qserv::stoui(row[3]); + size_t const size = stoull(row[4]); + chunkMap.workers[worker][database][table].push_back(ChunkMap::ChunkInfo{chunk, size}); + } + chunkMap.updateTime = updateTime; + } catch (exception const& ex) { + string const msg = "Failed to parse result set of query " + query + ", ex: " + string(ex.what()); + throw ConsistencyError(ERR_LOC, msg); + } + return chunkMap; +} + +chrono::time_point QMetaMysql::_getChunkMapUpdateTime(lock_guard const& lock) { + sql::SqlErrorObject errObj; + sql::SqlResults results; + string const tableName = "chunkMapStatus"; + string const query = "SELECT `update_time` FROM `" + tableName + "` ORDER BY `update_time` DESC LIMIT 1"; + LOGS(_log, LOG_LVL_DEBUG, "Executing query: " << query); + if (!_conn->runQuery(query, results, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "query failed: " << query); + throw SqlError(ERR_LOC, errObj); + } + vector updateTime; + if (!results.extractFirstColumn(updateTime, errObj)) { + LOGS(_log, LOG_LVL_ERROR, "Failed to extract result set of query " + query); + throw SqlError(ERR_LOC, errObj); + } + if (updateTime.empty()) { + throw EmptyTableError(ERR_LOC, tableName); + } else if (updateTime.size() > 1) { + throw ConsistencyError(ERR_LOC, "Too many rows in result set of query " + query); + } + try { + return chrono::time_point() + chrono::seconds(stol(updateTime[0])); + } catch (exception const& ex) { + string const msg = "Failed to parse result set of query " + query + ", ex: " + string(ex.what()); + throw ConsistencyError(ERR_LOC, msg); + } +} + void QMetaMysql::_addQueryMessage(QueryId queryId, qdisp::QueryMessage const& qMsg, int& cancelCount, int& completeCount, int& execFailCount, map& msgCountMap) { // Don't add duplicate messages. diff --git a/src/qmeta/QMetaMysql.h b/src/qmeta/QMetaMysql.h index 59664c2ac..34def9096 100644 --- a/src/qmeta/QMetaMysql.h +++ b/src/qmeta/QMetaMysql.h @@ -23,6 +23,7 @@ #define LSST_QSERV_QMETA_QMETAMYSQL_H // System headers +#include #include #include @@ -44,7 +45,6 @@ namespace lsst::qserv::qmeta { /** * @ingroup qmeta - * * @brief Mysql-based implementation of qserv metadata. */ @@ -263,6 +263,10 @@ class QMetaMysql : public QMeta { /// @see QMeta::addQueryMessages() void addQueryMessages(QueryId queryId, std::shared_ptr const& msgStore) override; + /// @see QMeta::getChunkMap + QMeta::ChunkMap getChunkMap(std::chrono::time_point const& prevUpdateTime = + std::chrono::time_point()) override; + protected: /// Check that all necessary tables exist void _checkDb(); @@ -277,6 +281,18 @@ class QMetaMysql : public QMeta { }; private: + /** + * Read the last update time of the chunk map. + * @param A lock acquired on the mutex _dbMutex. + * @return The update time + * @throw EmptyTableError If the corrresponding table is epty + * @throw SqlError For any SQL-specific error + * @throw ConsistencyError For any problem met when parsing or interpreting results read + * from the table. + */ + std::chrono::time_point _getChunkMapUpdateTime( + std::lock_guard const& lock); + /// Add qMsg to the permanent message table. void _addQueryMessage(QueryId queryId, qdisp::QueryMessage const& qMsg, int& cancelCount, int& completeCount, int& execFailCount, diff --git a/src/qmeta/schema/migrate-9-to-10.sql b/src/qmeta/schema/migrate-9-to-10.sql new file mode 100644 index 000000000..4bd87d83b --- /dev/null +++ b/src/qmeta/schema/migrate-9-to-10.sql @@ -0,0 +1,13 @@ +CREATE TABLE IF NOT EXISTS `chunkMap` ( + `worker` VARCHAR(256) NOT NULL COMMENT 'A unique identifier of a worker hosting the chunk replica', + `database` VARCHAR(256) NOT NULL COMMENT 'The name of a database', + `table` VARCHAR(256) NOT NULL COMMENT 'The name of a table', + `chunk` INT UNSIGNED NOT NULL COMMENT 'The number of a chunk', + `size` BIGINT UNSIGNED NOT NULL COMMENT 'The size of a chunk') +ENGINE = InnoDB +COMMENT = 'Chunk disposition across workers'; + +CREATE TABLE IF NOT EXISTS `chunkMapStatus` ( + `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map') +ENGINE = InnoDB +COMMENT = 'Satus info on the chunk map'; diff --git a/src/qmeta/schema/migrate-None-to-9.sql.jinja b/src/qmeta/schema/migrate-None-to-10.sql.jinja similarity index 90% rename from src/qmeta/schema/migrate-None-to-9.sql.jinja rename to src/qmeta/schema/migrate-None-to-10.sql.jinja index 3fd0e7e38..b34f95f53 100644 --- a/src/qmeta/schema/migrate-None-to-9.sql.jinja +++ b/src/qmeta/schema/migrate-None-to-10.sql.jinja @@ -207,6 +207,28 @@ CREATE TABLE IF NOT EXISTS `QMessages` ( ENGINE = InnoDB COMMENT = 'Table of messages generated during queries.'; +-- ----------------------------------------------------- +-- Table `chunkMap` +-- ----------------------------------------------------- + +CREATE TABLE IF NOT EXISTS `chunkMap` ( + `worker` VARCHAR(256) NOT NULL COMMENT 'A unique identifier of a worker hosting the chunk replica', + `database` VARCHAR(256) NOT NULL COMMENT 'The name of a database', + `table` VARCHAR(256) NOT NULL COMMENT 'The name of a table', + `chunk` INT UNSIGNED NOT NULL COMMENT 'The number of a chunk', + `size` BIGINT UNSIGNED NOT NULL COMMENT 'The size of a chunk') +ENGINE = InnoDB +COMMENT = 'Chunk disposition across workers'; + +-- ----------------------------------------------------- +-- Table `chunkMapStatus` +-- ----------------------------------------------------- + +CREATE TABLE IF NOT EXISTS `chunkMapStatus` ( + `update_time` TIMESTAMP NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT 'The most recent update time of the map') +ENGINE = InnoDB +COMMENT = 'Satus info on the chunk map'; + -- Update version on every schema change. -- Version 0 corresponds to initial QMeta release and it had no -- QMetadata table at all. @@ -219,4 +241,6 @@ COMMENT = 'Table of messages generated during queries.'; -- Version 7 added final row count to QInfo. -- Version 8 replaced INT with BIGINT in the byte and row counter columns of QInfo. -- Version 9 removed the full-text index on the query text from QInfo. -INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '9'); +-- Version 10 added the worker-to-chunk map tables chunkMap and chunkMapStatus + +INSERT INTO `QMetadata` (`metakey`, `value`) VALUES ('version', '10'); diff --git a/src/qmeta/testQMeta.cc b/src/qmeta/testQMeta.cc index 3589f9b36..31cb287b2 100644 --- a/src/qmeta/testQMeta.cc +++ b/src/qmeta/testQMeta.cc @@ -413,4 +413,10 @@ BOOST_AUTO_TEST_CASE(messWithQueryStats) { BOOST_CHECK(caught); } +BOOST_AUTO_TEST_CASE(getChunkMap) { + // The test assumes that the underlying tables exists and it's empty. + QMeta::ChunkMap chunkMap; + BOOST_CHECK_THROW(qMeta->getChunkMap(), EmptyTableError); +} + BOOST_AUTO_TEST_SUITE_END() diff --git a/src/replica/apps/MasterControllerHttpApp.cc b/src/replica/apps/MasterControllerHttpApp.cc index 462371824..2b00107fc 100644 --- a/src/replica/apps/MasterControllerHttpApp.cc +++ b/src/replica/apps/MasterControllerHttpApp.cc @@ -147,6 +147,10 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[]) " This affect replicas to be deleted from the workers during the synchronization" " stages.", _forceQservSync); + parser().flag("qserv-chunk-map-update", + "The flag which would result in updating the chunk disposition map" + " in Qserv's QMeta database.", + _qservChunkMapUpdate); parser().flag("purge", "The binary flag which, if provided, enables the 'purge' algorithm in" " the end of each replication cycle that eliminates excess replicas which" @@ -198,7 +202,7 @@ int MasterControllerHttpApp::runImpl() { _replicationTask = ReplicationTask::create( _controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec, - _forceQservSync, _replicationIntervalSec, _purge); + _forceQservSync, _qservChunkMapUpdate, _replicationIntervalSec, _purge); _replicationTask->start(); _healthMonitorTask = HealthMonitorTask::create( diff --git a/src/replica/apps/MasterControllerHttpApp.h b/src/replica/apps/MasterControllerHttpApp.h index d3e17f782..f5ef4ed02 100644 --- a/src/replica/apps/MasterControllerHttpApp.h +++ b/src/replica/apps/MasterControllerHttpApp.h @@ -132,6 +132,7 @@ class MasterControllerHttpApp : public Application { bool _purge; bool _forceQservSync; + bool _qservChunkMapUpdate; bool _permanentDelete; /// A connection URL for the MySQL service of the Qserv master database. diff --git a/src/replica/contr/ReplicationTask.cc b/src/replica/contr/ReplicationTask.cc index c7dbdea2d..360f78927 100644 --- a/src/replica/contr/ReplicationTask.cc +++ b/src/replica/contr/ReplicationTask.cc @@ -22,23 +22,35 @@ // Class header #include "replica/contr/ReplicationTask.h" +// System headers +#include + // Qserv headers +#include "replica/config/Configuration.h" #include "replica/jobs/FindAllJob.h" #include "replica/jobs/FixUpJob.h" #include "replica/jobs/ReplicateJob.h" #include "replica/jobs/RebalanceJob.h" #include "replica/jobs/PurgeJob.h" +#include "replica/mysql/DatabaseMySQL.h" +#include "replica/mysql/DatabaseMySQLGenerator.h" +#include "replica/mysql/DatabaseMySQLUtils.h" +#include "replica/services/DatabaseServices.h" +#include "replica/util/ReplicaInfo.h" using namespace std; namespace lsst::qserv::replica { +using namespace database::mysql; + ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - unsigned int replicationIntervalSec, bool purge) { + bool qservChunkMapUpdate, unsigned int replicationIntervalSec, + bool purge) { return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync, - replicationIntervalSec, purge)); + qservChunkMapUpdate, replicationIntervalSec, purge)); } bool ReplicationTask::onRun() { @@ -51,6 +63,8 @@ bool ReplicationTask::onRun() { launch(priority, saveReplicaInfo, allWorkers); sync(_qservSyncTimeoutSec, _forceQservSync); + if (_qservChunkMapUpdate) _updateChunkMap(); + launch(priority); sync(_qservSyncTimeoutSec, _forceQservSync); @@ -73,10 +87,81 @@ bool ReplicationTask::onRun() { ReplicationTask::ReplicationTask(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, unsigned int qservSyncTimeoutSec, bool forceQservSync, - unsigned int replicationIntervalSec, bool purge) + bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge) : Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec), _qservSyncTimeoutSec(qservSyncTimeoutSec), _forceQservSync(forceQservSync), + _qservChunkMapUpdate(qservChunkMapUpdate), _purge(purge) {} +void ReplicationTask::_updateChunkMap() { + // Open MySQL connection using the RAII-style handler that would automatically + // abort the transaction should any problem occured when loading data into the table. + ConnectionHandler h; + try { + h.conn = Connection::open(Configuration::qservCzarDbParams("qservMeta")); + } catch (exception const& ex) { + error("failed to connect to the czar's database server, ex: " + string(ex.what())); + return; + } + QueryGenerator const g(h.conn); + + // Get info on known chunk replicas from the persistent store of the Replication system + // and package those into ready-to-ingest data. + bool const allDatabases = true; + string const emptyDatabaseFilter; + bool const isPublished = true; + bool const includeFileInfo = true; // need this to access tables sizes + vector rows; + for (auto const& workerName : serviceProvider()->config()->workers()) { + vector replicas; + serviceProvider()->databaseServices()->findWorkerReplicas(replicas, workerName, emptyDatabaseFilter, + allDatabases, isPublished, includeFileInfo); + for (auto const& replica : replicas) { + for (auto const& fileInfo : replica.fileInfo()) { + if (fileInfo.isData() && !fileInfo.isOverlap()) { + rows.push_back(g.packVals(workerName, replica.database(), fileInfo.baseTable(), + replica.chunk(), fileInfo.size)); + } + } + } + } + if (rows.empty()) { + warn("no replicas found in the persistent state of the Replication system"); + return; + } + + // Get the limit for the length of the bulk insert queries. The limit is needed + // to run the query generation. + size_t maxQueryLength = 0; + string const globalVariableName = "max_allowed_packet"; + try { + string const query = g.showVars(SqlVarScope::GLOBAL, globalVariableName); + h.conn->executeInOwnTransaction([&query, &maxQueryLength](auto conn) { + bool const noMoreThanOne = true; + if (!selectSingleValue(conn, query, maxQueryLength, "Value", noMoreThanOne)) { + throw runtime_error("no such variable found"); + } + }); + } catch (exception const& ex) { + error("failed to get a value of GLOBAL '" + globalVariableName + "', ex: " + string(ex.what())); + return; + } + + // Execute a sequence of queries atomically + vector const deleteQueries = {g.delete_("chunkMap"), g.delete_("chunkMapStatus")}; + vector insertQueries = g.insertPacked( + "chunkMap", g.packIds("worker", "database", "table", "chunk", "size"), rows, maxQueryLength); + insertQueries.push_back(g.insert("chunkMapStatus", Sql::NOW)); + try { + h.conn->executeInOwnTransaction([&deleteQueries, &insertQueries](auto conn) { + for (auto const& query : deleteQueries) conn->execute(query); + for (auto const& query : insertQueries) conn->execute(query); + }); + } catch (exception const& ex) { + error("failed to update chunk map in the Czar database, ex: " + string(ex.what())); + return; + } +} + } // namespace lsst::qserv::replica diff --git a/src/replica/contr/ReplicationTask.h b/src/replica/contr/ReplicationTask.h index 2f8191e58..5bc99c76b 100644 --- a/src/replica/contr/ReplicationTask.h +++ b/src/replica/contr/ReplicationTask.h @@ -55,6 +55,7 @@ class ReplicationTask : public Task { * @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving * up on the Qserv synchronization requests. * @param forceQservSync Force chunk removal at worker resource collections if 'true'. + * @param qservChunkMapUpdate Update the chunk disposition map in Qserv's QMeta database if 'true'. * @param replicationIntervalSec The number of seconds to wait in the end of each * iteration loop before to begin the new one. * @param purge Purge excess replicas if 'true'. @@ -62,7 +63,7 @@ class ReplicationTask : public Task { */ static Ptr create(Controller::Ptr const& controller, Task::AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, + unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge); protected: @@ -72,15 +73,23 @@ class ReplicationTask : public Task { private: /// @see ReplicationTask::create() ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated, - unsigned int qservSyncTimeoutSec, bool forceQservSync, + unsigned int qservSyncTimeoutSec, bool forceQservSync, bool qservChunkMapUpdate, unsigned int replicationIntervalSec, bool purge); + void _updateChunkMap(); + /// The maximum number of seconds to be waited before giving up /// on the Qserv synchronization requests. unsigned int const _qservSyncTimeoutSec; - bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'. - bool const _purge; ///< Purge excess replicas if 'true'. + /// Force removal at worker resource collections if 'true'. + bool const _forceQservSync; + + /// Update the chunk disposition map in Qserv's QMeta database if 'true'. + bool const _qservChunkMapUpdate; + + /// Purge excess replicas if 'true'. + bool const _purge; }; } // namespace lsst::qserv::replica diff --git a/src/replica/contr/Task.h b/src/replica/contr/Task.h index b805b32f5..cc0151912 100644 --- a/src/replica/contr/Task.h +++ b/src/replica/contr/Task.h @@ -197,6 +197,12 @@ class Task : public EventLogger, public std::enable_shared_from_this { */ void debug(std::string const& msg) { LOGS(_log, LOG_LVL_DEBUG, context() << msg); } + /** + * Log a message into the Logger's LOG_LVL_WARN stream. + * @param msg A message to be logged. + */ + void warn(std::string const& msg) { LOGS(_log, LOG_LVL_WARN, context() << msg); } + /** * Log a message into the Logger's LOG_LVL_ERROR stream. * @param msg A message to be logged. diff --git a/src/sql/SqlResults.cc b/src/sql/SqlResults.cc index c2cd8e8b0..495288977 100644 --- a/src/sql/SqlResults.cc +++ b/src/sql/SqlResults.cc @@ -173,6 +173,24 @@ bool SqlResults::extractFirst4Columns(std::vector& col1, std::vecto return true; } +std::vector> SqlResults::extractFirstNColumns(size_t numColumns) { + std::vector> rows; + for (int resultIdx = 0, numResults = _results.size(); resultIdx < numResults; ++resultIdx) { + MYSQL_ROW row; + while ((row = mysql_fetch_row(_results[resultIdx])) != nullptr) { + std::vector columns; + columns.reserve(numColumns); + for (size_t colIdx = 0; colIdx < numColumns; ++colIdx) { + columns.push_back(row[colIdx]); + } + rows.push_back(std::move(columns)); + } + mysql_free_result(_results[resultIdx]); + } + _results.clear(); + return rows; +} + bool SqlResults::extractFirstValue(std::string& ret, SqlErrorObject& errObj) { if (_results.size() != 1) { std::stringstream ss; diff --git a/src/sql/SqlResults.h b/src/sql/SqlResults.h index 2f4e9b154..fae501cb2 100644 --- a/src/sql/SqlResults.h +++ b/src/sql/SqlResults.h @@ -51,8 +51,8 @@ namespace detail { * is the sequence of strings (pointers) and their lengths. Pointer may be NULL * if the column value is NONE. */ -class SqlResults_Iterator : public std::iterator > > { +class SqlResults_Iterator + : public std::iterator>> { public: SqlResults_Iterator(); SqlResults_Iterator(std::vector const& results); @@ -96,6 +96,15 @@ class SqlResults : boost::noncopyable { std::vector&, std::vector&, SqlErrorObject&); bool extractFirst4Columns(std::vector&, std::vector&, std::vector&, std::vector&, SqlErrorObject&); + + /** + * Extract a result set into the 2D array. + * @param numColumns The number of columns in the array. + * @return a 2D array, where the first index of the array represents rows + * and the second index represents columns. + */ + std::vector> extractFirstNColumns(size_t numColumns); + void freeResults(); /// Return row iterator diff --git a/src/util/TimeUtils.cc b/src/util/TimeUtils.cc index 71a7e023d..f29ee47a6 100644 --- a/src/util/TimeUtils.cc +++ b/src/util/TimeUtils.cc @@ -51,4 +51,13 @@ uint64_t TimeUtils::tp2ms(chrono::system_clock::time_point const& tp) { return chrono::duration_cast(tp.time_since_epoch()).count(); } +string TimeUtils::timePointToDateTimeString(TIMEPOINT const& point) { + auto const timer = chrono::system_clock::to_time_t(point); + auto broken_time = *localtime(&timer); + + ostringstream ss; + ss << put_time(&broken_time, "%Y-%m-%d %H:%M:%S"); + return ss.str(); +} + } // namespace lsst::qserv::util diff --git a/src/util/TimeUtils.h b/src/util/TimeUtils.h index 4dd2aa4b8..b5d5a35bb 100644 --- a/src/util/TimeUtils.h +++ b/src/util/TimeUtils.h @@ -31,6 +31,9 @@ #include #include +// Qserv headers +#include "global/clock_defs.h" + // This header declarations namespace lsst::qserv::util { @@ -44,6 +47,9 @@ struct TimeUtils { /// @return a human-readable timestamp in a format 'YYYY-MM-DD HH:MM:SS.mmm' static std::string toDateTimeString(std::chrono::milliseconds const& millisecondsSinceEpoch); + /// @return a human-readable time in a format 'YYYY-MM-DD HH:MM:SS' + static std::string timePointToDateTimeString(TIMEPOINT const& point); + /** * @param tp The timepoint to be converted. * @return The number of milliseconds since UNIX Epoch