Skip to content

Commit

Permalink
Added unit test.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Mar 27, 2024
1 parent 07d0820 commit 0a63ea8
Show file tree
Hide file tree
Showing 10 changed files with 402 additions and 94 deletions.
2 changes: 1 addition & 1 deletion src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
// First check for SUBMIT and strip it
std::string query = aQuery;

// &&& need to have WorkerChunkMap info at this point
// TODO: DM-43386 need to have WorkerChunkMap info at this point

std::string stripped;
bool async = false;
Expand Down
31 changes: 30 additions & 1 deletion src/czar/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
add_library(czar OBJECT)
add_dependencies(czar proto)

target_sources(czar PRIVATE
Czar.cc
Expand Down Expand Up @@ -51,4 +52,32 @@ endfunction()

czar_utils(
qserv-czar-http
)
)

function(czar_tests)
foreach(TEST IN ITEMS ${ARGV})
add_executable(${TEST} ${TEST}.cc)
target_link_libraries(${TEST} PUBLIC
cconfig
ccontrol
czar
global
mysql
parser
qana
qdisp
qproc
qserv_meta
query
rproc
sql
Boost::unit_test_framework
)
add_test(NAME ${TEST} COMMAND ${TEST})
endforeach()
endfunction()

czar_tests(
testCzar
)

138 changes: 79 additions & 59 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
@@ -1,25 +1,4 @@
// -*- LSST-C++ -*-
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
// -*- LSST-C++ -*-
/*
* LSST Data Management System
*
Expand Down Expand Up @@ -54,6 +33,7 @@
#include "qmeta/Exceptions.h"
#include "qmeta/QMeta.h"
#include "util/Bug.h"
#include "util/TimeUtils.h"

using namespace std;

Expand All @@ -63,23 +43,52 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarChunkMap");

namespace lsst::qserv::czar {

CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(qmeta){
CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(qmeta) {
try {
_read();
auto mapsSet = _read();
if (!mapsSet) {
throw ChunkMapException(ERR_LOC, "CzarChunkMap maps were not set in contructor");
}
} catch (qmeta::QMetaError const& qExc) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " CzarChunkMap could not read DB " << qExc.what());
LOGS(_log, LOG_LVL_ERROR, __func__ << " &&& CzarChunkMap could not read DB " << qExc.what());
// &&& maybe flag invalid
throw ChunkMapException(ERR_LOC, string(" CzarChunkMap constructor failed read ") + qExc.what());
}
}

void CzarChunkMap::_read() {
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() start");
qmeta::QMeta::ChunkMap chunkMap = _qmeta->getChunkMap();
auto const& jsChunks = chunkMap.chunks;
LOGS(_log, LOG_LVL_WARN, "&&& chunkMap=" << jsChunks);
bool CzarChunkMap::_read() {
LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() start");
// If replacing the map, this may take a bit of time, but it's probably
// better to wait for new maps if something changed.
std::lock_guard gLock(_mapMtx);
qmeta::QMeta::ChunkMap qChunkMap = _qmeta->getChunkMap();
if (_lastUpdateTime >= qChunkMap.updateTime) {
LOGS(_log, LOG_LVL_DEBUG,
__func__ << " CzarChunkMap no need to read "
<< util::TimeUtils::timePointToDateTimeString(_lastUpdateTime)
<< " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime));
return false;
}

auto const& jsChunks = qChunkMap.chunks;
LOGS(_log, LOG_LVL_DEBUG, "chunkMap=" << jsChunks);

// Make the new maps.
auto [chunkMapPtr, wcMapPtr] = makeNewMaps(jsChunks);

verify(*chunkMapPtr, *wcMapPtr);
LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));

_workerChunkMap = wcMapPtr;
_chunkMap = chunkMapPtr;
_lastUpdateTime = qChunkMap.updateTime;

LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() end");
return true;
}

pair<shared_ptr<CzarChunkMap::ChunkMap>, shared_ptr<CzarChunkMap::WorkerChunkMap>> CzarChunkMap::makeNewMaps(
nlohmann::json const& jsChunks) {
// Create new maps.
auto wcMapPtr = make_shared<WorkerChunkMap>();
auto chunkMapPtr = make_shared<ChunkMap>();
Expand All @@ -88,27 +97,32 @@ void CzarChunkMap::_read() {
for (auto const& [dbName, tables] : dbs.items()) {
for (auto const& [tableName, chunks] : tables.items()) {
for (auto const& [index, chunkNumNSz] : chunks.items()) {
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() " << index);
try {
int64_t chunkNum = chunkNumNSz.at(0);
int64_t sz = chunkNumNSz.at(1);
LOGS(_log, LOG_LVL_WARN, "&&& workerdId=" << workerId << " db=" << dbName << " table=" << tableName << " chunk=" << chunkNum << " sz=" << sz);
_insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum, sz);
LOGS(_log, LOG_LVL_DEBUG,
"workerdId=" << workerId << " db=" << dbName << " table=" << tableName
<< " chunk=" << chunkNum << " sz=" << sz);
insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum,
sz);
} catch (invalid_argument const& exc) {
// &&& better option? Cancel mapping and inform the replicator that the table is bad?
throw ChunkMapException(ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what());
throw ChunkMapException(
ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
} catch (out_of_range const& exc) {
// &&& better option?
throw ChunkMapException(ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what());
throw ChunkMapException(
ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
}
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() e1");
}
}
}
}

auto chunksSortedBySize = make_shared<ChunkVector>();
_calcChunkMap(*chunkMapPtr, *chunksSortedBySize);
calcChunkMap(*chunkMapPtr, *chunksSortedBySize);

// At this point we have
// - wcMapPtr has a map of workerData by worker id with each worker having a map of ChunkData
Expand All @@ -124,33 +138,36 @@ void CzarChunkMap::_read() {
auto wkrData = wkrDataWeak.lock();
if (wkrData == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " unexpected null weak ptr for " << wkrId);
continue; // maybe the next one will be ok.
continue; // maybe the next one will be ok.
}
LOGS(_log, LOG_LVL_INFO, " &&& wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize << " smallest=" << smallest);
LOGS(_log, LOG_LVL_DEBUG,
__func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize
<< " smallest=" << smallest);
if (wkrData->_sharedScanTotalSize < smallest) {
smallestWkr = wkrData;
smallest = smallestWkr->_sharedScanTotalSize;
}
}
if (smallestWkr == nullptr) {
throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + to_string(chunkData->_chunkId));
throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" +
to_string(chunkData->_chunkId));
}
smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData;
smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes;
chunkData->_primaryScanWorker = smallestWkr;
LOGS(_log, LOG_LVL_DEBUG, " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId);
LOGS(_log, LOG_LVL_DEBUG,
" chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId);
}

verify(*chunkMapPtr, *wcMapPtr);
LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));

LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() end");
LOGS(_log, LOG_LVL_TRACE, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_TRACE, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));
return {chunkMapPtr, wcMapPtr};
}

void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId, string const& dbName, string const& tableName, int64_t chunkIdNum, int64_t sz) {
void CzarChunkMap::insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId,
string const& dbName, string const& tableName, int64_t chunkIdNum,
int64_t sz) {
// Get or make the worker entry
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap start");
WorkerChunksData::Ptr workerChunksData;
auto iterWC = wcMap.find(workerId);
if (iterWC == wcMap.end()) {
Expand Down Expand Up @@ -182,9 +199,9 @@ void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap
auto const& dbN = dbTbl.first;
auto const& tblN = dbTbl.second;
if (dbName != dbN || tblN != tableName || tblSz != sz) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " data mismatch for "
<< dbName << "." << tableName << "=" << sz << " vs "
<< dbN << "." << tblN << "=" << tblSz);
LOGS(_log, LOG_LVL_ERROR,
__func__ << " data mismatch for " << dbName << "." << tableName << "=" << sz << " vs " << dbN
<< "." << tblN << "=" << tblSz);
}
}

Expand All @@ -193,10 +210,9 @@ void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap

// Add worker to the list of workers containing the chunk.
chunkData->addToWorkerHasThis(workerChunksData);
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap end");
}

void CzarChunkMap::_calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) {
void CzarChunkMap::calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) {
// Calculate total bytes for all chunks.
for (auto&& [chunkIdNum, chunkData] : chunkMap) {
chunkData->calcTotalBytes();
Expand All @@ -218,6 +234,7 @@ void CzarChunkMap::sortChunks(std::vector<ChunkData::Ptr>& chunksSortedBySize) {
}

void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) {
// Use a set to prevent duplicate ids caused by replication levels > 1.
set<int64_t> allChunkIds;
int errorCount = 0;
for (auto const& [wkr, wkrData] : wcMap) {
Expand All @@ -239,7 +256,9 @@ void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap)
continue;
}
if (primeScanWkr->_sharedScanChunkMap.find(chunkId) == primeScanWkr->_sharedScanChunkMap.end()) {
LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " << primeScanWkr->_workerId);
LOGS(_log, LOG_LVL_ERROR,
" chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for "
<< primeScanWkr->_workerId);
++errorCount;
continue;
}
Expand All @@ -259,7 +278,8 @@ void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap)
for (auto const& cId : allChunkIds) {
allMissingIds += to_string(cId) + ",";
}
LOGS(_log, LOG_LVL_ERROR, " There were " << missing << " missing chunks from the scan list " << allMissingIds);
LOGS(_log, LOG_LVL_ERROR,
" There were " << missing << " missing chunks from the scan list " << allMissingIds);
++errorCount;
}

Expand Down Expand Up @@ -307,7 +327,7 @@ void CzarChunkMap::ChunkData::addToWorkerHasThis(std::shared_ptr<WorkerChunksDat

string CzarChunkMap::ChunkData::dump() const {
stringstream os;
auto primaryWorker = _primaryScanWorker.lock();
auto primaryWorker = _primaryScanWorker.lock();
os << "{ChunkData id=" << _chunkId << " totalBytes=" << _totalBytes;
os << " primaryWorker=" << ((primaryWorker == nullptr) ? "null" : primaryWorker->_workerId);
os << " workers{";
Expand Down Expand Up @@ -337,4 +357,4 @@ string CzarChunkMap::WorkerChunksData::dump() const {
return os.str();
}

} // namespace lsst::qserv::wconfig
} // namespace lsst::qserv::czar
Loading

0 comments on commit 0a63ea8

Please sign in to comment.