Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tickets/DM-43384 #846

Closed
wants to merge 9 commits into from
2 changes: 1 addition & 1 deletion src/ccontrol/UserQueryFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ UserQuery::Ptr UserQueryFactory::newUserQuery(std::string const& aQuery, std::st
// First check for SUBMIT and strip it
std::string query = aQuery;

// &&& need to have WorkerChunkMap info at this point
// TODO: DM-43386 need to have WorkerChunkMap info at this point

std::string stripped;
bool async = false;
Expand Down
31 changes: 30 additions & 1 deletion src/czar/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
add_library(czar OBJECT)
add_dependencies(czar proto)

target_sources(czar PRIVATE
Czar.cc
Expand Down Expand Up @@ -51,4 +52,32 @@ endfunction()

czar_utils(
qserv-czar-http
)
)

function(czar_tests)
foreach(TEST IN ITEMS ${ARGV})
add_executable(${TEST} ${TEST}.cc)
target_link_libraries(${TEST} PUBLIC
cconfig
ccontrol
czar
global
mysql
parser
qana
qdisp
qproc
qserv_meta
query
rproc
sql
Boost::unit_test_framework
)
add_test(NAME ${TEST} COMMAND ${TEST})
endforeach()
endfunction()

czar_tests(
testCzar
)

138 changes: 79 additions & 59 deletions src/czar/CzarChunkMap.cc
Original file line number Diff line number Diff line change
@@ -1,25 +1,4 @@
// -*- LSST-C++ -*-
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
// -*- LSST-C++ -*-
/*
* LSST Data Management System
*
Expand Down Expand Up @@ -54,6 +33,7 @@
#include "qmeta/Exceptions.h"
#include "qmeta/QMeta.h"
#include "util/Bug.h"
#include "util/TimeUtils.h"

using namespace std;

Expand All @@ -63,23 +43,52 @@ LOG_LOGGER _log = LOG_GET("lsst.qserv.czar.CzarChunkMap");

namespace lsst::qserv::czar {

CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(qmeta){
CzarChunkMap::CzarChunkMap(std::shared_ptr<qmeta::QMeta> const& qmeta) : _qmeta(qmeta) {
try {
_read();
auto mapsSet = _read();
if (!mapsSet) {
throw ChunkMapException(ERR_LOC, "CzarChunkMap maps were not set in contructor");
}
} catch (qmeta::QMetaError const& qExc) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " CzarChunkMap could not read DB " << qExc.what());
LOGS(_log, LOG_LVL_ERROR, __func__ << " &&& CzarChunkMap could not read DB " << qExc.what());
// &&& maybe flag invalid
throw ChunkMapException(ERR_LOC, string(" CzarChunkMap constructor failed read ") + qExc.what());
}
}

void CzarChunkMap::_read() {
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() start");
qmeta::QMeta::ChunkMap chunkMap = _qmeta->getChunkMap();
auto const& jsChunks = chunkMap.chunks;
LOGS(_log, LOG_LVL_WARN, "&&& chunkMap=" << jsChunks);
bool CzarChunkMap::_read() {
LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() start");
// If replacing the map, this may take a bit of time, but it's probably
// better to wait for new maps if something changed.
std::lock_guard gLock(_mapMtx);
qmeta::QMeta::ChunkMap qChunkMap = _qmeta->getChunkMap();
if (_lastUpdateTime >= qChunkMap.updateTime) {
LOGS(_log, LOG_LVL_DEBUG,
__func__ << " CzarChunkMap no need to read "
<< util::TimeUtils::timePointToDateTimeString(_lastUpdateTime)
<< " db=" << util::TimeUtils::timePointToDateTimeString(qChunkMap.updateTime));
return false;
}

auto const& jsChunks = qChunkMap.chunks;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought the chunk map was migrated away from the JSON library.

LOGS(_log, LOG_LVL_DEBUG, "chunkMap=" << jsChunks);

// Make the new maps.
auto [chunkMapPtr, wcMapPtr] = makeNewMaps(jsChunks);

verify(*chunkMapPtr, *wcMapPtr);
LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));

_workerChunkMap = wcMapPtr;
_chunkMap = chunkMapPtr;
_lastUpdateTime = qChunkMap.updateTime;

LOGS(_log, LOG_LVL_TRACE, "CzarChunkMap::_read() end");
return true;
}

pair<shared_ptr<CzarChunkMap::ChunkMap>, shared_ptr<CzarChunkMap::WorkerChunkMap>> CzarChunkMap::makeNewMaps(
nlohmann::json const& jsChunks) {
// Create new maps.
auto wcMapPtr = make_shared<WorkerChunkMap>();
auto chunkMapPtr = make_shared<ChunkMap>();
Expand All @@ -88,27 +97,32 @@ void CzarChunkMap::_read() {
for (auto const& [dbName, tables] : dbs.items()) {
for (auto const& [tableName, chunks] : tables.items()) {
for (auto const& [index, chunkNumNSz] : chunks.items()) {
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() " << index);
try {
int64_t chunkNum = chunkNumNSz.at(0);
int64_t sz = chunkNumNSz.at(1);
LOGS(_log, LOG_LVL_WARN, "&&& workerdId=" << workerId << " db=" << dbName << " table=" << tableName << " chunk=" << chunkNum << " sz=" << sz);
_insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum, sz);
LOGS(_log, LOG_LVL_DEBUG,
"workerdId=" << workerId << " db=" << dbName << " table=" << tableName
<< " chunk=" << chunkNum << " sz=" << sz);
insertIntoChunkMap(*wcMapPtr, *chunkMapPtr, workerId, dbName, tableName, chunkNum,
sz);
} catch (invalid_argument const& exc) {
// &&& better option? Cancel mapping and inform the replicator that the table is bad?
throw ChunkMapException(ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what());
throw ChunkMapException(
ERR_LOC, string(__func__) + " invalid_argument workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
} catch (out_of_range const& exc) {
// &&& better option?
throw ChunkMapException(ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId + " db=" + dbName + " table=" + tableName + " chunk=" + to_string(chunkNumNSz) + " " + exc.what());
throw ChunkMapException(
ERR_LOC, string(__func__) + " out_of_range workerdId=" + workerId +
" db=" + dbName + " table=" + tableName +
" chunk=" + to_string(chunkNumNSz) + " " + exc.what());
}
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() e1");
}
}
}
}

auto chunksSortedBySize = make_shared<ChunkVector>();
_calcChunkMap(*chunkMapPtr, *chunksSortedBySize);
calcChunkMap(*chunkMapPtr, *chunksSortedBySize);

// At this point we have
// - wcMapPtr has a map of workerData by worker id with each worker having a map of ChunkData
Expand All @@ -124,33 +138,36 @@ void CzarChunkMap::_read() {
auto wkrData = wkrDataWeak.lock();
if (wkrData == nullptr) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " unexpected null weak ptr for " << wkrId);
continue; // maybe the next one will be ok.
continue; // maybe the next one will be ok.
}
LOGS(_log, LOG_LVL_INFO, " &&& wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize << " smallest=" << smallest);
LOGS(_log, LOG_LVL_DEBUG,
__func__ << " wkrId=" << wkrData << " tsz=" << wkrData->_sharedScanTotalSize
<< " smallest=" << smallest);
if (wkrData->_sharedScanTotalSize < smallest) {
smallestWkr = wkrData;
smallest = smallestWkr->_sharedScanTotalSize;
}
}
if (smallestWkr == nullptr) {
throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" + to_string(chunkData->_chunkId));
throw ChunkMapException(ERR_LOC, string(__func__) + " no smallesWkr found for chunk=" +
to_string(chunkData->_chunkId));
}
smallestWkr->_sharedScanChunkMap[chunkData->_chunkId] = chunkData;
smallestWkr->_sharedScanTotalSize += chunkData->_totalBytes;
chunkData->_primaryScanWorker = smallestWkr;
LOGS(_log, LOG_LVL_DEBUG, " chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId);
LOGS(_log, LOG_LVL_DEBUG,
" chunk=" << chunkData->_chunkId << " assigned to scan on " << smallestWkr->_workerId);
}

verify(*chunkMapPtr, *wcMapPtr);
LOGS(_log, LOG_LVL_DEBUG, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_DEBUG, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));

LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_read() end");
LOGS(_log, LOG_LVL_TRACE, " chunkMap=" << dumpChunkMap(*chunkMapPtr));
LOGS(_log, LOG_LVL_TRACE, " workerChunkMap=" << dumpWorkerChunkMap(*wcMapPtr));
return {chunkMapPtr, wcMapPtr};
}

void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId, string const& dbName, string const& tableName, int64_t chunkIdNum, int64_t sz) {
void CzarChunkMap::insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap, string const& workerId,
string const& dbName, string const& tableName, int64_t chunkIdNum,
int64_t sz) {
// Get or make the worker entry
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap start");
WorkerChunksData::Ptr workerChunksData;
auto iterWC = wcMap.find(workerId);
if (iterWC == wcMap.end()) {
Expand Down Expand Up @@ -182,9 +199,9 @@ void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap
auto const& dbN = dbTbl.first;
auto const& tblN = dbTbl.second;
if (dbName != dbN || tblN != tableName || tblSz != sz) {
LOGS(_log, LOG_LVL_ERROR, __func__ << " data mismatch for "
<< dbName << "." << tableName << "=" << sz << " vs "
<< dbN << "." << tblN << "=" << tblSz);
LOGS(_log, LOG_LVL_ERROR,
__func__ << " data mismatch for " << dbName << "." << tableName << "=" << sz << " vs " << dbN
<< "." << tblN << "=" << tblSz);
}
}

Expand All @@ -193,10 +210,9 @@ void CzarChunkMap::_insertIntoChunkMap(WorkerChunkMap& wcMap, ChunkMap& chunkMap

// Add worker to the list of workers containing the chunk.
chunkData->addToWorkerHasThis(workerChunksData);
LOGS(_log, LOG_LVL_WARN, "&&& CzarChunkMap::_insertIntoChunkMap end");
}

void CzarChunkMap::_calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) {
void CzarChunkMap::calcChunkMap(ChunkMap& chunkMap, ChunkVector& chunksSortedBySize) {
// Calculate total bytes for all chunks.
for (auto&& [chunkIdNum, chunkData] : chunkMap) {
chunkData->calcTotalBytes();
Expand All @@ -218,6 +234,7 @@ void CzarChunkMap::sortChunks(std::vector<ChunkData::Ptr>& chunksSortedBySize) {
}

void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap) {
// Use a set to prevent duplicate ids caused by replication levels > 1.
set<int64_t> allChunkIds;
int errorCount = 0;
for (auto const& [wkr, wkrData] : wcMap) {
Expand All @@ -239,7 +256,9 @@ void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap)
continue;
}
if (primeScanWkr->_sharedScanChunkMap.find(chunkId) == primeScanWkr->_sharedScanChunkMap.end()) {
LOGS(_log, LOG_LVL_ERROR, " chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for " << primeScanWkr->_workerId);
LOGS(_log, LOG_LVL_ERROR,
" chunkId=" << chunkId << " should have been (and was not) in the sharedScanChunkMap for "
<< primeScanWkr->_workerId);
++errorCount;
continue;
}
Expand All @@ -259,7 +278,8 @@ void CzarChunkMap::verify(ChunkMap const& chunkMap, WorkerChunkMap const& wcMap)
for (auto const& cId : allChunkIds) {
allMissingIds += to_string(cId) + ",";
}
LOGS(_log, LOG_LVL_ERROR, " There were " << missing << " missing chunks from the scan list " << allMissingIds);
LOGS(_log, LOG_LVL_ERROR,
" There were " << missing << " missing chunks from the scan list " << allMissingIds);
++errorCount;
}

Expand Down Expand Up @@ -307,7 +327,7 @@ void CzarChunkMap::ChunkData::addToWorkerHasThis(std::shared_ptr<WorkerChunksDat

string CzarChunkMap::ChunkData::dump() const {
stringstream os;
auto primaryWorker = _primaryScanWorker.lock();
auto primaryWorker = _primaryScanWorker.lock();
os << "{ChunkData id=" << _chunkId << " totalBytes=" << _totalBytes;
os << " primaryWorker=" << ((primaryWorker == nullptr) ? "null" : primaryWorker->_workerId);
os << " workers{";
Expand Down Expand Up @@ -337,4 +357,4 @@ string CzarChunkMap::WorkerChunksData::dump() const {
return os.str();
}

} // namespace lsst::qserv::wconfig
} // namespace lsst::qserv::czar
Loading