From 26d928ff4dc5642759df8c3af8711e45438c61fa Mon Sep 17 00:00:00 2001 From: John Gates Date: Fri, 18 Jun 2021 16:25:07 -0700 Subject: [PATCH] Added code for testing. --- .../configuration/etc/log4cxx.czar.properties | 4 +- .../etc/log4cxx.worker.properties | 3 +- .../configuration/etc/qserv-czar.cnf | 6 +- core/modules/ccontrol/UserQuerySelect.cc | 6 +- core/modules/czar/WorkerResourceLists.cc | 54 +++++- core/modules/proto/worker.proto | 4 +- core/modules/qdisp/Executive.cc | 6 +- core/modules/qdisp/QdispPool.cc | 11 +- core/modules/qdisp/QueryRequest.cc | 31 +++- core/modules/qdisp/UberJob.cc | 3 + core/modules/qdisp/UberJob.h | 2 + core/modules/qproc/TaskMsgFactory.cc | 4 +- core/modules/util/InstanceCount.cc | 4 +- core/modules/wbase/SendChannel.cc | 6 + core/modules/wbase/SendChannel.h | 2 + core/modules/wbase/SendChannelShared.cc | 173 ++++++++++++------ core/modules/wbase/SendChannelShared.h | 21 ++- core/modules/wbase/Task.cc | 2 +- core/modules/wcontrol/SqlConnMgr.h | 3 + core/modules/wdb/QueryRunner.cc | 13 +- core/modules/xrdsvc/ChannelStream.cc | 11 +- core/modules/xrdsvc/ChannelStream.h | 2 + core/modules/xrdsvc/SsiRequest.cc | 47 ++++- core/modules/xrdsvc/SsiRequest.h | 2 + 24 files changed, 325 insertions(+), 95 deletions(-) diff --git a/admin/templates/configuration/etc/log4cxx.czar.properties b/admin/templates/configuration/etc/log4cxx.czar.properties index b157ef43b9..94644a008c 100644 --- a/admin/templates/configuration/etc/log4cxx.czar.properties +++ b/admin/templates/configuration/etc/log4cxx.czar.properties @@ -26,4 +26,6 @@ log4j.appender.FILE.layout.conversionPattern=%d{yyyy-MM-ddTHH:mm:ss.SSSZ} %X %-5 #log4j.logger.lsst.qserv.qproc=DEBUG #log4j.logger.lsst.qserv.util=DEBUG #log4j.logger.lsst.qserv.qana=DEBUG -log4j.logger.lsst.qserv.xrdssi.msgs=WARN +#log4j.logger.lsst.qserv.xrdssi.msgs=WARN //&&& +log4j.logger.lsst.qserv.xrdssi.msgs=DEBUG + diff --git a/admin/templates/configuration/etc/log4cxx.worker.properties b/admin/templates/configuration/etc/log4cxx.worker.properties index 29514138e7..ec708b4889 100644 --- a/admin/templates/configuration/etc/log4cxx.worker.properties +++ b/admin/templates/configuration/etc/log4cxx.worker.properties @@ -9,4 +9,5 @@ log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout log4j.appender.CONSOLE.layout.ConversionPattern=[%d{yyyy-MM-ddTHH:mm:ss.SSSZ}] %X %-5p %c{2} (%F:%L) - %m%n -log4j.logger.lsst.qserv.xrdssi.msgs=WARN +#log4j.logger.lsst.qserv.xrdssi.msgs=WARN &&& +log4j.logger.lsst.qserv.xrdssi.msgs=DEBUG diff --git a/admin/templates/configuration/etc/qserv-czar.cnf b/admin/templates/configuration/etc/qserv-czar.cnf index bf9763a27e..ea650b7132 100755 --- a/admin/templates/configuration/etc/qserv-czar.cnf +++ b/admin/templates/configuration/etc/qserv-czar.cnf @@ -106,14 +106,16 @@ qMetaSecsBetweenChunkCompletionUpdates = 59 # Please see qdisp/QdispPool.h QdispPool::QdispPool for more information [qdisppool] #size of the pool -poolSize = 50 +#poolSize = 50 +poolSize = 1500 # Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3 # Must be greater than 0. largestPriority = 3 # Maximum number of threads running for each queue. No spaces. Values separated by ':' # Using largestPriority = 2 and vectRunsizes = 3:5:8 # queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8. -vectRunSizes = 50:50:50:50 +#vectRunSizes = 50:50:50:50 +vectRunSizes = 1500:1500:1500:1500 # Minimum number of threads running for each queue. No spaces. Values separated by ':' vectMinRunningSizes = 0:1:3:3 diff --git a/core/modules/ccontrol/UserQuerySelect.cc b/core/modules/ccontrol/UserQuerySelect.cc index 0bff19004d..c216964d02 100644 --- a/core/modules/ccontrol/UserQuerySelect.cc +++ b/core/modules/ccontrol/UserQuerySelect.cc @@ -365,7 +365,7 @@ void UserQuerySelect::submit() { // &&& TODO:UJ skipping in proof of concept: get a list of all databases in jobs (all jobs should use the same databases) Use this to check for conflicts // assign jobs to uberJobs - int maxChunksPerUber = 10; + int maxChunksPerUber = 50; // keep cycling through workers until no more chunks to place. /// make a map that will be destroyed as chunks are checked/used @@ -404,6 +404,7 @@ void UserQuerySelect::submit() { } } + //LOGS(_log, LOG_LVL_INFO, "&&& making UberJob " << uberResultName << " chunks=" << chunksInUber); if (chunksInUber > 0) { uberJobs.push_back(uJob); } @@ -424,10 +425,13 @@ void UserQuerySelect::submit() { for (auto&& uJob:uberJobs) { uJob->runUberJob(); } + LOGS(_log, LOG_LVL_INFO, "&&& All UberJobs sent."); // If any chunks in the query were not found on a worker's list, run them individually. for (auto& ciq:chunksInQuery) { + LOGS(_log, LOG_LVL_INFO, "&&& running remaining jobs "); qdisp::JobQuery* jqRaw = ciq.second; qdisp::JobQuery::Ptr job = _executive->getSharedPtrForRawJobPtr(jqRaw); + LOGS(_log, LOG_LVL_INFO, "&&& running remaining jobs " << job->getIdStr()); std::function funcBuildJob = [this, job{move(job)}](util::CmdData*) { // references in captures cause races QSERV_LOGCONTEXT_QUERY(_qMetaQueryId); diff --git a/core/modules/czar/WorkerResourceLists.cc b/core/modules/czar/WorkerResourceLists.cc index 9e38afac59..0d50213fc4 100644 --- a/core/modules/czar/WorkerResourceLists.cc +++ b/core/modules/czar/WorkerResourceLists.cc @@ -26,6 +26,7 @@ #include #include #include +#include // Third-party headers @@ -188,13 +189,15 @@ bool WorkerResourceLists::readIn(std::string const& fName) { map> resourceMap; if (fs.is_open()){ string line; + map> tmpMap; while(getline(fs, line)) { auto pos = line.find_first_of(" "); string wNameShort = line.substr(0, pos); string chunkIdStr = line.substr(pos+1); int chunkId = std::stoi(chunkIdStr); - LOGS(_log, LOG_LVL_INFO, "&&& line='" << line << "' name=" << wNameShort << " chunk=" << chunkIdStr << " c=" << chunkId); + //LOGS(_log, LOG_LVL_INFO, "&&& line='" << line << "' name=" << wNameShort << " chunk=" << chunkIdStr << " c=" << chunkId); + /* &&& // Avoid making duplicate chunk entries auto ret = foundChunks.insert(chunkId); bool elementWasInserted = ret.second; @@ -203,7 +206,53 @@ bool WorkerResourceLists::readIn(std::string const& fName) { set& chunkSet = workerChunkMap[wNameShort]; chunkSet.insert(chunkId); } + */ + // need to add entry to the worker + tmpMap[wNameShort].insert(chunkId); + + } + + // Try to make fairly even distribution accross workers. + list workerNames; + for (auto const& elem:tmpMap) { + string name = elem.first; + workerNames.push_back(name); + } + + while (!workerNames.empty()) { + for (auto wIter = workerNames.begin(); wIter != workerNames.end();) { + auto wIterAdvanced = false; + string wName = *wIter; + set& tmpChunkSet = tmpMap[wName]; + set& chunkSet = workerChunkMap[wName]; + bool done = false; + auto cIter = tmpChunkSet.begin(); + for (int j=0; j<10 && !done;) { + if (cIter == tmpChunkSet.end()) { + // no more chunks on this worker + done = true; + LOGS(_log, LOG_LVL_INFO, "&&& worker empty " << wName << " elemCount=" << chunkSet.size()); + auto wIterToDel = wIter++; + wIterAdvanced = true; + workerNames.erase(wIterToDel); + } else { + int chunkId = *cIter; + auto ret = foundChunks.insert(chunkId); + bool elementInserted = ret.second; + if (elementInserted) { + ++j; + chunkSet.insert(chunkId); + } + auto cIterToDel = cIter++; + tmpChunkSet.erase(cIterToDel); + } + } + if (!wIterAdvanced) ++wIter; + } } + + + // At this point, there's a map of short worker names and integer chunkIds. // This needs to be turned in to a map of sets of chunk resource name keyed by // worker resource name. @@ -211,7 +260,7 @@ bool WorkerResourceLists::readIn(std::string const& fName) { for (auto const& elem:workerChunkMap) { string shortName = elem.first; set const& chunkInts = elem.second; - string workerResourceN = "/worker/worker-" + shortName; + string workerResourceN = "/worker/" + shortName; auto& chunkStrs = resourceMap[workerResourceN]; for (int j:chunkInts) { string chunkResourceN = "/chk/wise_01/" + to_string(j); @@ -235,7 +284,6 @@ bool WorkerResourceLists::readIn(std::string const& fName) { deque const& dq = elem.second; for(auto const& res:dq) { wr->insert(res); - LOGS(_log, LOG_LVL_INFO, "&&& wName=" << wName << " res=" << res); } } } diff --git a/core/modules/proto/worker.proto b/core/modules/proto/worker.proto index 5c04d990d7..7d32d88dcc 100644 --- a/core/modules/proto/worker.proto +++ b/core/modules/proto/worker.proto @@ -83,7 +83,9 @@ message TaskMsg { message UberJobMsg { required uint64 queryid = 1; required uint32 czarid = 2; - repeated TaskMsg taskMsgs = 3; + required uint32 uberjobid = 3; + repeated TaskMsg taskmsgs = 4; + required uint32 magicnumber = 5; } // Result message received from worker diff --git a/core/modules/qdisp/Executive.cc b/core/modules/qdisp/Executive.cc index 502df157ce..514f1b90b3 100644 --- a/core/modules/qdisp/Executive.cc +++ b/core/modules/qdisp/Executive.cc @@ -292,7 +292,7 @@ bool Executive::join() { } void Executive::markCompleted(int jobId, bool success) { - LOGS(_log, LOG_LVL_WARN, "&&& Executive::markCompleted jobId=" << jobId << " success=" << success); + //LOGS(_log, LOG_LVL_WARN, "&&& Executive::markCompleted jobId=" << jobId << " success=" << success); ResponseHandler::Error err; string idStr = QueryIdHelper::makeIdStr(_id, jobId); LOGS(_log, LOG_LVL_DEBUG, "Executive::markCompleted " << success); @@ -571,7 +571,7 @@ void Executive::_waitAllUntilEmpty() { void Executive::_addToChunkJobMap(JobQuery::Ptr const& job) { int chunkId = job->getDescription()->resource().chunk(); auto entry = pair(chunkId, job.get()); - LOGS(_log, LOG_LVL_WARN, "&&& _addToChunkJobMap chunkId=" << chunkId); + //LOGS(_log, LOG_LVL_WARN, "&&& _addToChunkJobMap chunkId=" << chunkId); lock_guard lck(_chunkToJobMapMtx); if (_chunkToJobMapInvalid) { throw Bug("map insert FAILED, map is already invalid"); @@ -611,7 +611,7 @@ bool Executive::startUberJob(UberJob::Ptr const& uJob) { // Construct a temporary resource object to pass to ProcessRequest(). // Affinity should be meaningless here as there should only be one instance of each worker. XrdSsiResource::Affinity affinity = XrdSsiResource::Affinity::Default; - LOGS(_log, LOG_LVL_INFO, "&&& uJob->workerResource=" << uJob->getWorkerResource()); + LOGS(_log, LOG_LVL_INFO, "&&& startUberJob uJob->workerResource=" << uJob->getWorkerResource()); XrdSsiResource uJobResource(uJob->getWorkerResource(), "", uJob->getIdStr(), "", 0, affinity); // Now construct the actual query request and tie it to the jobQuery. The diff --git a/core/modules/qdisp/QdispPool.cc b/core/modules/qdisp/QdispPool.cc index 15f7c60e8e..a6dcce4490 100644 --- a/core/modules/qdisp/QdispPool.cc +++ b/core/modules/qdisp/QdispPool.cc @@ -29,6 +29,7 @@ // Qserv headers #include "util/common.h" +#include "util/InstanceCount.h" //&&& namespace { LOG_LOGGER _log = LOG_GET("lsst.qserv.qdisp.QdispPool"); @@ -70,6 +71,7 @@ void PriorityQueue::queCmd(util::Command::Ptr const& cmd) { void PriorityQueue::queCmd(PriorityCommand::Ptr const& cmd, int priority) { { + //util::InstanceCount ic("PQ:queCmd&&&"); std::lock_guard lock(_mtx); auto iter = _queues.find(priority); if (iter == _queues.end()) { @@ -82,17 +84,20 @@ void PriorityQueue::queCmd(PriorityCommand::Ptr const& cmd, int priority) { } } cmd->_priority = priority; + //LOGS (_log, LOG_LVL_INFO, "&&&priQue p=" << priority << _statsStr()); iter->second->queCmd(cmd); LOGS (_log, LOG_LVL_DEBUG, "priQue p=" << priority << _statsStr()); _changed = true; } _cv.notify_one(); + _cv.notify_one(); //&&& } std::atomic localLogLimiter(0); util::Command::Ptr PriorityQueue::getCmd(bool wait){ + //util::InstanceCount ic("PQ:getCmd&&&"); util::Command::Ptr ptr; std::unique_lock uLock(_mtx); while (true) { @@ -100,7 +105,7 @@ util::Command::Ptr PriorityQueue::getCmd(bool wait){ ++localLogLimiter; // Log this every once in while to INFO so there's some idea of system // load without generating crushing amounts of log messages. - if (localLogLimiter % 500 == 0) { + if (true) { //&&& localLogLimiter % 500 == 0) { // &&& revert LOGS(_log, LOG_LVL_INFO, "priQueGet " << _statsStr()); } else { LOGS(_log, LOG_LVL_DEBUG, "priQueGet " << _statsStr()); @@ -115,6 +120,8 @@ util::Command::Ptr PriorityQueue::getCmd(bool wait){ if (que->running < que->getMinRunning()) { ptr = que->getCmd(false); // no wait if (ptr != nullptr) { + _changed = true; + _cv.notify_one(); return ptr; } } @@ -130,6 +137,7 @@ util::Command::Ptr PriorityQueue::getCmd(bool wait){ if (ptr != nullptr) { _changed = true; _cv.notify_one(); + _cv.notify_one(); //&&& return ptr; } } @@ -137,6 +145,7 @@ util::Command::Ptr PriorityQueue::getCmd(bool wait){ // If nothing was found, wait or return nullptr. if (wait) { + //util::InstanceCount icWait("PQ:getCmd&&&WAIT"); LOGS (_log, LOG_LVL_DEBUG, "getCmd wait " << _statsStr()); _cv.wait(uLock, [this](){ return _changed; }); } else { diff --git a/core/modules/qdisp/QueryRequest.cc b/core/modules/qdisp/QueryRequest.cc index 02ad058b3f..30100771c6 100644 --- a/core/modules/qdisp/QueryRequest.cc +++ b/core/modules/qdisp/QueryRequest.cc @@ -79,9 +79,12 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { void action(util::CmdData *data) override { // If everything is ok, call GetResponseData to have XrdSsi ask the worker for the data. QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); + //util::InstanceCount ica("AskFor_action&&&"); util::Timer tWaiting; util::Timer tTotal; + //LOGS(_log, LOG_LVL_INFO, "&&& AskForResponse start"); { + util::InstanceCount icb("AskFor_A&&&"); tTotal.start(); auto jq = _jBase.lock(); auto qr = _qRequest.lock(); @@ -99,21 +102,25 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { return; } vector& buffer = *_bufPtr; - LOGS(_log, LOG_LVL_TRACE, "AskForResp GetResponseData size=" << buffer.size()); + LOGS(_log, LOG_LVL_INFO, "AskForResp GetResponseData size=" << buffer.size()); // &&& revert to debug tWaiting.start(); qr->GetResponseData(&buffer[0], buffer.size()); + LOGS(_log, LOG_LVL_INFO, "&&& AskForResp GetResponseData called"); } // Wait for XrdSsi to call ProcessResponseData with the data, // which will notify this wait with a call to receivedProcessResponseDataParameters. { + //util::InstanceCount icc("AskFor_B&&&"); unique_lock uLock(_mtx); // TODO: make timed wait, check for wedged, if weak pointers dead, log and give up. - // Hoping for _state == DATAREADY1, + // Hoping for _state == DATAREADY1, waiting for call to QueryRequest::ProcessResponseData + util::InstanceCount icc1("AskFor_B1&&&"); _cv.wait(uLock, [this](){ return _state != State::STARTED0; }); + util::InstanceCount icc2("AskFor_B2&&&"); tWaiting.stop(); // _mtx is locked at this point. - LOGS(_log, LOG_LVL_TRACE, "AskForResp should be DATAREADY1 " << (int)_state); + LOGS(_log, LOG_LVL_INFO, "AskForResp should be DATAREADY1 " << (int)_state); // &&& revert to TRACE if (_state == State::DONE2) { // There was a problem. End the stream associated auto qr = _qRequest.lock(); @@ -129,6 +136,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { // If more data needs to be sent, _processData will make a new AskForResponseDataCmd // object and queue it. { + //util::InstanceCount icd("AskFor_C&&&"); auto jq = _jBase.lock(); auto qr = _qRequest.lock(); if (jq == nullptr || qr == nullptr) { @@ -136,12 +144,14 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { LOGS(_log, LOG_LVL_WARN, "AskForResp null before processData"); return; } + LOGS(_log, LOG_LVL_INFO, "&&& AskForResp _processData a"); qr->_processData(jq, _blen, _last); // _processData will have created another AskForResponseDataCmd object if was needed. + LOGS(_log, LOG_LVL_INFO, "&&& AskForResp _processData b"); tTotal.stop(); } _setState(State::DONE2); - LOGS(_log, LOG_LVL_DEBUG, "Ask data is done wait=" << tWaiting.getElapsed() << + LOGS(_log, LOG_LVL_INFO, "Ask data is done wait=" << tWaiting.getElapsed() << // &&& revert to debug " total=" << tTotal.getElapsed()); } @@ -152,6 +162,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { _last = last; _state = State::DATAREADY1; } + LOGS(_log, LOG_LVL_INFO, "&&& notifyDataSuccess blen=" << blen << " last=" << last); _cv.notify_all(); } @@ -187,7 +198,7 @@ class QueryRequest::AskForResponseDataCmd : public PriorityCommand { int _blen = -1; bool _last = true; - util::InstanceCount _instCount{"AskForResponseDataCmd"}; + //&&&util::InstanceCount _instCount{"AskForResponseDataCmd"}; }; @@ -300,6 +311,7 @@ bool QueryRequest::ProcessResponse(XrdSsiErrInfo const& eInfo, XrdSsiRespInfo c /// Retrieve and process results in using the XrdSsi stream mechanism /// Uses a copy of JobQuery::Ptr instead of _jobQuery as a call to cancel() would reset _jobQuery. bool QueryRequest::_importStream(JobBase::Ptr const& jq) { + //util::InstanceCount ic("QR:_importStream&&&"); if (_askForResponseDataCmd != nullptr) { LOGS(_log, LOG_LVL_ERROR, "_importStream There's already an _askForResponseDataCmd object!!"); // Keep the previous object from wedging the pool. @@ -320,7 +332,9 @@ bool QueryRequest::_importStream(JobBase::Ptr const& jq) { bool largeResult = false; int nextBufSize = 0; bool last = false; + //LOGS(_log, LOG_LVL_INFO, "&&& flush meta start"); bool flushOk = jq->getRespHandler()->flush(len, bufPtr, last, largeResult, nextBufSize); + //LOGS(_log, LOG_LVL_INFO, "&&& flush meta end"); if (!flushOk) { LOGS(_log, LOG_LVL_ERROR, "_importStream not flushOk"); @@ -328,6 +342,7 @@ bool QueryRequest::_importStream(JobBase::Ptr const& jq) { } if (!last) { + LOGS(_log, LOG_LVL_INFO, "&&& _askForResponseDataCmd nextBufSize=" << nextBufSize); _askForResponseDataCmd = make_shared(shared_from_this(), jq, nextBufSize); _queueAskForResponse(_askForResponseDataCmd, jq, true); } else { @@ -381,6 +396,7 @@ void QueryRequest::_setHoldState(HoldState state) { void QueryRequest::ProcessResponseData(XrdSsiErrInfo const& eInfo, char *buff, int blen, bool last) { // Step 7 QSERV_LOGCONTEXT_QUERY_JOB(_qid, _jobid); + util::InstanceCount ic("QR::ProcessResponseData&&&"); // buff is ignored here. It points to jq->getDescription()->respHandler()->_mBuf, which // is accessed directly by the respHandler. _mBuf is a member of MergingHandler. LOGS(_log, LOG_LVL_DEBUG, "ProcessResponseData with buflen=" << blen @@ -436,6 +452,7 @@ void QueryRequest::ProcessResponseData(XrdSsiErrInfo const& eInfo, void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) { + //util::InstanceCount ic("QR::_processData&&&"); // It's possible jq and _jobQuery differ, so need to use jq. if (jq->isQueryCancelled()) { LOGS(_log, LOG_LVL_WARN, "QueryRequest::_processData job was cancelled."); @@ -465,7 +482,9 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) nextHeaderBufPtr = make_shared>(bufPtr->begin() + respSize, bufPtr->end()); // Read the result // Values for last, largeResult, and nextBufSize filled in by flush + //LOGS(_log, LOG_LVL_INFO, "&&& flush a start"); flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize); + //LOGS(_log, LOG_LVL_INFO, "&&& flush a end"); if (last) { // Last should only be true when the header is read, not the result. throw Bug("_processData result had 'last' true, which cannot be allowed."); @@ -483,7 +502,9 @@ void QueryRequest::_processData(JobBase::Ptr const& jq, int blen, bool xrdLast) // Read the next header // Values for last, largeResult, and nextBufSize filled in by flush + //LOGS(_log, LOG_LVL_INFO, "&&& flush b start"); flushOk = jq->getRespHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, largeResult, nextBufSize); + LOGS(_log, LOG_LVL_INFO, "&&& flush b end"); if (largeResult) { if (!_largeResult) LOGS(_log, LOG_LVL_DEBUG, "holdState largeResult set to true"); _largeResult = true; // Once the worker indicates it's a large result, it stays that way. diff --git a/core/modules/qdisp/UberJob.cc b/core/modules/qdisp/UberJob.cc index 6cf555040f..a58e8e1a28 100644 --- a/core/modules/qdisp/UberJob.cc +++ b/core/modules/qdisp/UberJob.cc @@ -89,6 +89,9 @@ bool UberJob::runUberJob() { proto::UberJobMsg* ujMsg = google::protobuf::Arena::CreateMessage(&arena); ujMsg->set_queryid(getQueryId()); ujMsg->set_czarid(_czarId); + ujMsg->set_uberjobid(_uberJobId); + ujMsg->set_magicnumber(UberJob::getMagicNumber()); + LOGS(_log, LOG_LVL_INFO, "&&& runUberJob sz=" << _jobs.size()); for (auto&& job:_jobs) { proto::TaskMsg* tMsg = ujMsg->add_taskmsgs(); job->getDescription()->fillTaskMsg(tMsg); diff --git a/core/modules/qdisp/UberJob.h b/core/modules/qdisp/UberJob.h index 46b2bd85bb..3621da7807 100644 --- a/core/modules/qdisp/UberJob.h +++ b/core/modules/qdisp/UberJob.h @@ -41,6 +41,8 @@ class UberJob : public JobBase { public: using Ptr = std::shared_ptr; + static uint32_t getMagicNumber() { return 93452; } + static Ptr create(Executive::Ptr const& executive, std::shared_ptr const& respHandler, int queryId, int uberJobId, qmeta::CzarId czarId, std::string const& workerResource); diff --git a/core/modules/qproc/TaskMsgFactory.cc b/core/modules/qproc/TaskMsgFactory.cc index 79efc91a0d..4911aed3c7 100644 --- a/core/modules/qproc/TaskMsgFactory.cc +++ b/core/modules/qproc/TaskMsgFactory.cc @@ -70,7 +70,7 @@ bool TaskMsgFactory::fillTaskMsg(proto::TaskMsg* taskMsg, ChunkQuerySpec const& taskMsg->set_jobid(jobId); taskMsg->set_attemptcount(attemptCount); taskMsg->set_czarid(czarId); - LOGS(_log, LOG_LVL_INFO, "&&& _makeMsg ses=" << _session << " db=" << chunkQuerySpec.db << " qId=" << queryId << " jId=" << jobId << " att=" << attemptCount << " cz=" << czarId); + //LOGS(_log, LOG_LVL_INFO, "&&& _makeMsg ses=" << _session << " db=" << chunkQuerySpec.db << " qId=" << queryId << " jId=" << jobId << " att=" << attemptCount << " cz=" << czarId); // scanTables (for shared scans) // check if more than 1 db in scanInfo std::string db; @@ -87,7 +87,6 @@ bool TaskMsgFactory::fillTaskMsg(proto::TaskMsg* taskMsg, ChunkQuerySpec const& taskMsg->set_scanpriority(chunkQuerySpec.scanInfo.scanRating); taskMsg->set_scaninteractive(chunkQuerySpec.scanInteractive); - LOGS(_log, LOG_LVL_WARN, "&&& _makeMsg scanR=" << chunkQuerySpec.scanInfo.scanRating << " scanI=" << chunkQuerySpec.scanInteractive << " chunkId=" << chunkQuerySpec.chunkId); // per-chunk taskMsg->set_chunkid(chunkQuerySpec.chunkId); @@ -114,7 +113,6 @@ bool TaskMsgFactory::fillTaskMsg(proto::TaskMsg* taskMsg, ChunkQuerySpec const& _addFragment(*taskMsg, resultTable, chunkQuerySpec.subChunkTables, chunkQuerySpec.subChunkIds, chunkQuerySpec.queries); } - LOGS(_log, LOG_LVL_WARN, "&&& _makeMsg end chunkId=" << chunkQuerySpec.chunkId); return true; } diff --git a/core/modules/util/InstanceCount.cc b/core/modules/util/InstanceCount.cc index a860ad56c1..0c5fc17d98 100644 --- a/core/modules/util/InstanceCount.cc +++ b/core/modules/util/InstanceCount.cc @@ -46,7 +46,7 @@ void InstanceCount::_increment(std::string const& source) { auto ret = _instances.insert(entry); auto iter = ret.first; iter->second += 1; - LOGS(_log, LOG_LVL_DEBUG, "InstanceCount " << source + LOGS(_log, LOG_LVL_WARN, "InstanceCount " << source // &&& revert to debug << " " << iter->first << "=" << iter->second); } @@ -56,7 +56,7 @@ InstanceCount::~InstanceCount() { auto iter = _instances.find(_className); if (iter != _instances.end()) { iter->second -= 1; - LOGS(_log, LOG_LVL_DEBUG, "~InstanceCount " << iter->first << "=" << iter->second << " : " << *this); + LOGS(_log, LOG_LVL_WARN, "~InstanceCount " << iter->first << "=" << iter->second << " : " << *this); // &&& revert to debug } else { LOGS(_log, LOG_LVL_ERROR, "~InstanceCount " << _className << " was not found! : " << *this); } diff --git a/core/modules/wbase/SendChannel.cc b/core/modules/wbase/SendChannel.cc index 071ceb08c7..cd1398b3bd 100644 --- a/core/modules/wbase/SendChannel.cc +++ b/core/modules/wbase/SendChannel.cc @@ -216,5 +216,11 @@ bool SendChannel::setMetadata(const char *buf, int blen) { } +uint64_t SendChannel::getSeq() const { + if (_ssiRequest == nullptr) return 0; + return _ssiRequest->getSeq(); +} + + }}} // namespace lsst::qserv::wbase diff --git a/core/modules/wbase/SendChannel.h b/core/modules/wbase/SendChannel.h index 238080859d..6bc9465402 100644 --- a/core/modules/wbase/SendChannel.h +++ b/core/modules/wbase/SendChannel.h @@ -98,6 +98,8 @@ class SendChannel { /// Set just before destorying this object to prevent pointless error messages. void setDestroying() { _destroying = true; } + uint64_t getSeq() const; + protected: std::function _release = [](){;}; ///< Function to release resources. diff --git a/core/modules/wbase/SendChannelShared.cc b/core/modules/wbase/SendChannelShared.cc index 4f8240e6d4..f624599506 100644 --- a/core/modules/wbase/SendChannelShared.cc +++ b/core/modules/wbase/SendChannelShared.cc @@ -27,6 +27,7 @@ // Qserv headers #include "global/LogContext.h" #include "proto/ProtoHeaderWrap.h" +#include "util/InstanceCount.h" // &&& #include "util/Timer.h" // LSST headers @@ -42,9 +43,11 @@ namespace lsst { namespace qserv { namespace wbase { +uint64_t idSeq = 0; + SendChannelShared::Ptr SendChannelShared::create(SendChannel::Ptr const& sendChannel, wcontrol::TransmitMgr::Ptr const& transmitMgr) { - auto scs = shared_ptr(new SendChannelShared(sendChannel, transmitMgr)); + auto scs = shared_ptr(new SendChannelShared(idSeq++, sendChannel, transmitMgr)); return scs; } @@ -64,17 +67,22 @@ void SendChannelShared::incrTaskCountBy(int partialCount) { } -bool SendChannelShared::transmitTaskLast(StreamGuard sLock, bool inLast) { +bool SendChannelShared::transmitTaskLast(bool inLast) { /// _caller must have locked _streamMutex before calling this. + //LOGS(_log, LOG_LVL_INFO, "&&& transmitTaskLast A channel=" << _id << " inLast=" << inLast << " lastC=" + // << _lastCount << " taskC=" << _taskCount); if (not inLast) return false; // This wasn't the last message buffer for this task, so it doesn't matter. + lock_guard lg(_lastCMtx); ++_lastCount; bool lastTaskDone = _lastCount >= _taskCount; + LOGS(_log, LOG_LVL_INFO, "&&& transmitTaskLast A channel=" << _id << " inLast=" << inLast << " lastC=" + << _lastCount << " taskC=" << _taskCount << " lastTaskDone=" << lastTaskDone); return lastTaskDone; } bool SendChannelShared::kill(StreamGuard sLock, std::string const& note) { - LOGS(_log, LOG_LVL_DEBUG, "SendChannelShared::kill() called " << note); + LOGS(_log, LOG_LVL_DEBUG, "channel=" << _id << "SendChannelShared::kill() called " << note); bool ret = _sendChannel->kill(note); _lastRecvd = true; return ret; @@ -92,51 +100,82 @@ bool SendChannelShared::addTransmit(bool cancelled, bool erred, bool last, bool assert(tData != nullptr); // This lock may be held for a very long time. - std::unique_lock qLock(_queueMtx); - _transmitQueue.push(tData); - - // If _lastRecvd is true, the last message has already been transmitted and - // this SendChannel is effectively dead. - bool reallyLast = _lastRecvd; - string idStr(makeIdStr(qId, jId)); - - // If something bad already happened, just give up. - if (reallyLast || isDead()) { - // If there's been some kind of error, make sure that nothing hangs waiting - // for this. - LOGS(_log, LOG_LVL_WARN, "addTransmit getting messages after isDead or reallyLast " << idStr); - _lastRecvd = true; - return false; - } + //LOGS(_log, LOG_LVL_WARN, "&&& scs::addTransmit A channel=" << _id << " last" << last); { - lock_guard streamLock(streamMutex); - reallyLast = transmitTaskLast(streamLock, last); - } - - if (reallyLast || erred || cancelled) { - _lastRecvd = true; + util::InstanceCount icA("SCS::addTransmitA&&&"); + std::lock_guard qLock(_queueMtx); + util::InstanceCount icB("SCS::addTransmitB&&&"); + //LOGS(_log, LOG_LVL_WARN, "&&& scs::addTransmit B channel=" << _id); + _transmitQueue.push(tData); + + // If _lastRecvd is true, the last message has already been transmitted and + // this SendChannel is effectively dead. + bool reallyLast = _lastRecvd; + string idStr(makeIdStr(qId, jId)); + + // If something bad already happened, just give up. + if (reallyLast || isDead()) { + // If there's been some kind of error, make sure that nothing hangs waiting + // for this. + LOGS(_log, LOG_LVL_WARN, "addTransmit getting messages after isDead or reallyLast " << idStr << " channel=" << _id); + _lastRecvd = true; + return false; + } + { + /* &&& + util::InstanceCount icSA("SCS::addTransmit streamLockA&&&"); + lock_guard streamLock(streamMutex); + util::InstanceCount icSB("SCS::addTransmit streamLockB&&&"); + */ + reallyLast = transmitTaskLast(last); //&&& POSSIBLE IMPORTANT FIX + if (reallyLast || erred || cancelled) { + _lastRecvd = true; + } + } } + /* &&& // If this is reallyLast or at least 2 items are in the queue, the transmit can happen if (_lastRecvd || _transmitQueue.size() >= 2) { bool scanInteractive = tData->scanInteractive; // If there was an error, give this high priority. if (erred || cancelled) scanInteractive = true; int czarId = tData->czarId; - return _transmit(erred, scanInteractive, largeResult, czarId); + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::addTrasmit _transmit start _lastRecvd=" << _lastRecvd << " _transmitQueue.size=" << _transmitQueue.size()); + auto result = _transmit(erred, scanInteractive, largeResult, czarId); + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::addTrasmit _transmit done _lastRecvd=" << _lastRecvd << " _transmitQueue.size=" << _transmitQueue.size()); + return result; } else { // Not enough information to transmit. Maybe there will be with the next call // to addTransmit. } + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::addTrasmit end outside of if _lastRecvd=" << _lastRecvd << " _transmitQueue.size=" << _transmitQueue.size()); return true; + */ + + util::InstanceCount icAT("SCS::addTransmit_tr&&& seq=" + to_string(getSeq())); + LOGS(_log, LOG_LVL_INFO, "&&& calling _transmit"); + bool scanInteractive = tData->scanInteractive; + if (erred || cancelled) scanInteractive = true; + int czarId = tData->czarId; + return _transmit(erred, scanInteractive, largeResult, czarId); } bool SendChannelShared::_transmit(bool erred, bool scanInteractive, bool largeResult, qmeta::CzarId czarId) { string idStr = "QID?"; + util::InstanceCount icA("SCS::_Transmit A&&&"); + std::unique_lock qLock(_queueMtx); + util::InstanceCount icB("SCS::_Transmit B&&&"); + // keep looping until nothing more can be transmitted. while(_transmitQueue.size() >= 2 || _lastRecvd) { + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit a _lastRecvd=" << _lastRecvd); + if (_transmitQueue.size() == 0) { + LOGS(_log, LOG_LVL_INFO, "&&& messages returned by another _transmit"); // &&& change log level. + return true; + } TransmitData::Ptr thisTransmit = _transmitQueue.front(); _transmitQueue.pop(); if (thisTransmit == nullptr) { @@ -168,39 +207,58 @@ bool SendChannelShared::_transmit(bool erred, bool scanInteractive, bool largeRe // The first message needs to put its header data in metadata as there's // no previous message it could attach its header to. - if (_firstTransmit.exchange(false)) { - // Put the header for the first message in metadata - // _metaDataBuf must remain valid until Finished() is called. - proto::ProtoHeader* thisPHdr = thisTransmit->header; - string thisHeaderString; - thisPHdr->SerializeToString(&thisHeaderString); - _metadataBuf = proto::ProtoHeaderWrap::wrap(thisHeaderString); - lock_guard streamLock(streamMutex); - bool metaSet = _sendChannel->setMetadata(_metadataBuf.data(), _metadataBuf.size()); - if (!metaSet) { - LOGS(_log, LOG_LVL_ERROR, "Failed to setMeta " << idStr); - kill(streamLock, "metadata"); - return false; - } - } - - // Put the data for the transmit in a StreamBuffer and send it. - auto streamBuf = xrdsvc::StreamBuffer::createWithMove(thisTransmit->dataMsg); + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit m"); { - // Limit the number of concurrent transmits. See xrdssu.cnf [transmits] maxtransmits. - wcontrol::TransmitLock transmitLock(*_transmitMgr, scanInteractive, largeResult, czarId); - lock_guard streamLock(streamMutex); - bool sent = _sendBuf(streamLock, streamBuf, reallyLast, "transmitLoop " + idStr); - if (!sent) { - LOGS(_log, LOG_LVL_ERROR, "Failed to send " << idStr); - kill(streamLock, "SendChannelShared::_transmit b"); - return false; + util::InstanceCount icSA("SCS::_Transmit streamLock A&&&"); + lock_guard streamLock(streamMutex); // &&& IMPORTANT FIX + util::InstanceCount icSB("SCS::_Transmit streamLock B&&&"); + if (_firstTransmit.exchange(false)) { + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit m1"); + // Put the header for the first message in metadata + // _metaDataBuf must remain valid until Finished() is called. + proto::ProtoHeader* thisPHdr = thisTransmit->header; + string thisHeaderString; + thisPHdr->SerializeToString(&thisHeaderString); + _metadataBuf = proto::ProtoHeaderWrap::wrap(thisHeaderString); + bool metaSet = _sendChannel->setMetadata(_metadataBuf.data(), _metadataBuf.size()); + if (!metaSet) { + LOGS(_log, LOG_LVL_ERROR, "Failed to setMeta " << idStr); + kill(streamLock, "metadata"); + return false; + } + } + qLock.unlock(); // Unlock so new messages can be added to the queue + + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit p"); + // Put the data for the transmit in a StreamBuffer and send it. + auto streamBuf = xrdsvc::StreamBuffer::createWithMove(thisTransmit->dataMsg); + { + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit q"); + // Limit the number of concurrent transmits. See xrdssu.cnf [transmits] maxtransmits. + //wcontrol::TransmitLock transmitLock(*_transmitMgr, scanInteractive, largeResult, czarId); // &&& re-enable + util::InstanceCount icSBA("SCS::_Transmit sendBuf A&&&"); + bool sent = _sendBuf(streamLock, streamBuf, reallyLast, "transmitLoop " + idStr); + util::InstanceCount icSBB("SCS::_Transmit sendBuf B&&&"); + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit q1"); + if (!sent) { + LOGS(_log, LOG_LVL_ERROR, "Failed to send " << idStr); + kill(streamLock, "SendChannelShared::_transmit b"); + return false; + } } } // If that was the last message, break the loop. - if (reallyLast) return true; + if (reallyLast) { + LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit reallylast return true"); + return true; + } + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit loop end"); + util::InstanceCount icQA("SCS::_Transmit qlock end A&&&"); + qLock.lock(); // relock after releasing streamLock, but before while() test. + util::InstanceCount icQB("SCS::_Transmit qlock end B&&&"); } + //LOGS(_log, LOG_LVL_INFO, "&&&SCS::_transmit out of loop end"); return true; } @@ -211,6 +269,7 @@ util::TimerHistogram transmitHisto("transmit Hist", {0.1, 1, 5, 10, 20, 40}); bool SendChannelShared::_sendBuf(lock_guard const& streamLock, xrdsvc::StreamBuffer::Ptr& streamBuf, bool last, string const& note) { + util::InstanceCount ic("SCS:_sendBuff0&&&"); bool sent = _sendChannel->sendStream(streamBuf, last); if (!sent) { LOGS(_log, LOG_LVL_ERROR, "Failed to transmit " << note << "!"); @@ -218,11 +277,15 @@ bool SendChannelShared::_sendBuf(lock_guard const& streamLock, } else { util::Timer t; t.start(); - LOGS(_log, LOG_LVL_DEBUG, "_sendbuf wait start"); + LOGS(_log, LOG_LVL_DEBUG, "_sendbuf wait start " << note); + util::InstanceCount icId("SCS:_sendBuffA&&&ch=" + to_string(_id) + " seq=" + to_string(getSeq())); + LOGS(_log, LOG_LVL_INFO, "&&& calling waitForDoneWithThis"); + util::InstanceCount icA("SCS:_sendBuffA&&&"); streamBuf->waitForDoneWithThis(); // Block until this buffer has been sent. + util::InstanceCount icB("SCS:_sendBuffb&&&"); t.stop(); auto logMsg = transmitHisto.addTime(t.getElapsed(), note); - LOGS(_log, LOG_LVL_DEBUG, logMsg); + LOGS(_log, LOG_LVL_DEBUG, "_sendbuf wait end->" << logMsg); } return sent; } diff --git a/core/modules/wbase/SendChannelShared.h b/core/modules/wbase/SendChannelShared.h index 4cfbcc4a36..873685f938 100644 --- a/core/modules/wbase/SendChannelShared.h +++ b/core/modules/wbase/SendChannelShared.h @@ -111,8 +111,7 @@ class SendChannelShared { /// /// @return true if inLast is true and this is the last task to call this /// with inLast == true. - /// The calling Thread must hold 'streamMutex' before calling this. - bool transmitTaskLast(StreamGuard sLock, bool inLast); + bool transmitTaskLast(bool inLast); /// streamMutex is used to protect _lastCount and messages that are sent /// using SendChannelShared. @@ -121,10 +120,16 @@ class SendChannelShared { /// Return a normalized id string. std::string makeIdStr(int qId, int jId); + uint64_t getId() const { return _id; } + int getTaskCount() const { return _taskCount; } + int getLastCount() const { return _lastCount; } + + uint64_t getSeq() const { return _sendChannel->getSeq(); } + private: /// Private constructor to protect shared pointer integrity. - SendChannelShared(SendChannel::Ptr const& sendChannel, wcontrol::TransmitMgr::Ptr const& transmitMgr) - : _transmitMgr(transmitMgr), _sendChannel(sendChannel) { + SendChannelShared(uint64_t id, SendChannel::Ptr const& sendChannel, wcontrol::TransmitMgr::Ptr const& transmitMgr) + : _id(id), _transmitMgr(transmitMgr), _sendChannel(sendChannel) { if (_sendChannel == nullptr) { throw Bug("SendChannelShared constructor given nullptr"); } @@ -146,14 +151,20 @@ class SendChannelShared { xrdsvc::StreamBuffer::Ptr& streamBuf, bool last, std::string const& note); + uint64_t const _id; ///< id number for this send channel shared. + std::queue _transmitQueue; ///< Queue of data to be encoded and sent. std::mutex _queueMtx; ///< protects _transmitQueue, _taskCount, _lastCount /// metadata buffer. Once set, it cannot change until after Finish() has been called. std::string _metadataBuf; + /// The number of tasks to be sent over this SendChannel. This must be set to the final value + /// before any tasks are processed to avoid race conditions. std::atomic _taskCount{0}; ///< The number of tasks to be sent over this SendChannel. - int _lastCount = 0; ///< Then number of 'last' buffers received. + std::atomic _lastCount{0}; ///< Then number of 'last' buffers received. + std::mutex _lastCMtx; ///< Protects _lastCount and the validity of transmitTaskLast(bool inLast). + std::atomic _lastRecvd{false}; ///< The truly 'last' transmit message is in the queue. std::atomic _firstTransmit{true}; ///< True until the first transmit has been sent. diff --git a/core/modules/wbase/Task.cc b/core/modules/wbase/Task.cc index 99ec755790..c673484480 100644 --- a/core/modules/wbase/Task.cc +++ b/core/modules/wbase/Task.cc @@ -166,7 +166,7 @@ std::vector Task::createTasks(proto::TaskMsg const& taskMsg, vect.push_back(task); } } else { - LOGS(_log, LOG_LVL_INFO, "&&& Task::createTasks queryStr=" << queryStr); + //LOGS(_log, LOG_LVL_INFO, "&&& Task::createTasks queryStr=" << queryStr); auto task = std::make_shared(taskMsg, queryStr, fragNum, sendChannel, gArena, rmLock); //TODO: Maybe? Is it better to move fragment info from // ChunkResource getResourceFragment(int i) to here??? diff --git a/core/modules/wcontrol/SqlConnMgr.h b/core/modules/wcontrol/SqlConnMgr.h index 950eb4befd..8fb443b68e 100644 --- a/core/modules/wcontrol/SqlConnMgr.h +++ b/core/modules/wcontrol/SqlConnMgr.h @@ -31,6 +31,7 @@ #include // Qserv headers +#include "util/InstanceCount.h" //&&& namespace lsst { @@ -100,6 +101,8 @@ class SqlConnLock { private: SqlConnMgr& _sqlConnMgr; + + util::InstanceCount _ic{"SqlConnLock&&&"}; }; diff --git a/core/modules/wdb/QueryRunner.cc b/core/modules/wdb/QueryRunner.cc index 83466f1fce..dcea525a16 100644 --- a/core/modules/wdb/QueryRunner.cc +++ b/core/modules/wdb/QueryRunner.cc @@ -188,9 +188,11 @@ bool QueryRunner::runQuery() { // Put the error from _initConnection in _transmitData via _multiError. _buildDataMsg(0, 0); // Since there's an error, this will be the last transmit from this QueryRunner. + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch transmit err start"); if (!_transmit(true)) { LOGS(_log, LOG_LVL_WARN, " Could not report error to czar as sendChannel not accepting msgs."); } + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch transmit err end"); return false; } @@ -206,7 +208,7 @@ bool QueryRunner::runQuery() { } else { throw UnsupportedError(_task->getIdStr() + " QueryRunner: Expected protocol > 1 in TaskMsg"); } - LOGS(_log, LOG_LVL_DEBUG, "QueryRunner::runQuery() END"); + LOGS(_log, LOG_LVL_INFO, "QueryRunner::runQuery() END"); //&&& revert to DEBUG return false; } @@ -279,7 +281,9 @@ bool QueryRunner::_transmit(bool lastIn) { int qId = _task->getQueryId(); int jId = _task->getJobId(); + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_transmit start lastIn=" << lastIn); bool success = _task->sendChannel->addTransmit(_cancelled, erred, lastIn, _largeResult, _transmitData, qId, jId); + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_transmit end lastIn=" << lastIn); // Large results get priority, but new large results should not get priority until // after they have started transmitting. @@ -462,6 +466,7 @@ bool QueryRunner::_dispatchChannel() { // (see pull-request for DM-216) // Now get rows... while (!_fillRows(res, numFields, rowCount, tSize)) { + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch loop start"); if (tSize > proto::ProtoHeaderWrap::PROTOBUFFER_HARD_LIMIT) { LOGS_ERROR("Message single row too large to send using protobuffer"); erred = true; @@ -469,13 +474,16 @@ bool QueryRunner::_dispatchChannel() { } LOGS(_log, LOG_LVL_TRACE, "Splitting message size=" << tSize << ", rowCount=" << rowCount); _buildDataMsg(rowCount, tSize); + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch transmit false start"); if (!_transmit(false)) { LOGS(_log, LOG_LVL_ERROR, "Could not transmit intermediate results."); return false; } + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch transmit false end"); rowCount = 0; tSize = 0; _initTransmit(); // reset _transmitData + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch loop end"); } } @@ -491,12 +499,15 @@ bool QueryRunner::_dispatchChannel() { _mysqlConn->freeResult(); } if (!_cancelled) { + //util::InstanceCount icTransmit("_dispatchChannel::Transmit&&&"); // Send results. This needs to happen after the error check. _buildDataMsg(rowCount, tSize); + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch transmit true start"); if (!_transmit(true)) { // All remaining rows/errors for this QueryRunner should be in this transmit. LOGS(_log, LOG_LVL_ERROR, "Could not transmit last results."); return false; } + //LOGS(_log, LOG_LVL_INFO, "&&&QR::_dispatch transmit true end"); } else { erred = true; // Set poison error, no point in sending. diff --git a/core/modules/xrdsvc/ChannelStream.cc b/core/modules/xrdsvc/ChannelStream.cc index bacfbe78b6..424b12a5c4 100644 --- a/core/modules/xrdsvc/ChannelStream.cc +++ b/core/modules/xrdsvc/ChannelStream.cc @@ -64,8 +64,8 @@ void ChannelStream::append(StreamBuffer::Ptr const& streamBuffer, bool last) { if (_closed) { throw Bug("ChannelStream::append: Stream closed, append(...,last=true) already received"); } - LOGS(_log, LOG_LVL_DEBUG, "seq=" << to_string(_seq) << " ChannelStream::append last=" << last - << " " << util::prettyCharBuf(streamBuffer->data, streamBuffer->getSize(), 5)); + //&&&LOGS(_log, LOG_LVL_DEBUG, "seq=" << to_string(_seq) << " ChannelStream::append last=" << last + //&&& << " " << util::prettyCharBuf(streamBuffer->data, streamBuffer->getSize(), 5)); { unique_lock lock(_mutex); LOGS(_log, LOG_LVL_INFO, "seq=" << to_string(_seq) << " Trying to append message (flowing)"); @@ -79,15 +79,16 @@ void ChannelStream::append(StreamBuffer::Ptr const& streamBuffer, bool last) { /// Pull out a data packet as a Buffer object (called by XrdSsi code) XrdSsiStream::Buffer* ChannelStream::GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bool &last) { + util::InstanceCount ic("ChannelStream::GetBuff&&& a"); unique_lock lock(_mutex); while(_msgs.empty() && !_closed) { // No msgs, but we aren't done // wait. - LOGS(_log, LOG_LVL_INFO, "seq=" << to_string(_seq) << " Waiting, no data ready"); + LOGS(_log, LOG_LVL_INFO, "seq=" << to_string(_seq) << " GetBuff Waiting, no data ready"); _hasDataCondition.wait(lock); } if (_msgs.empty() && _closed) { // It's closed and no more msgs are available. - LOGS(_log, LOG_LVL_INFO, "seq=" << to_string(_seq) << " Not waiting, but closed"); + LOGS(_log, LOG_LVL_INFO, "seq=" << to_string(_seq) << " GetBuff Not waiting, but closed"); dlen = 0; eInfo.Set("Not an active stream", EOPNOTSUPP); return 0; @@ -98,7 +99,7 @@ XrdSsiStream::Buffer* ChannelStream::GetBuff(XrdSsiErrInfo &eInfo, int &dlen, bo _msgs.pop_front(); last = _closed && _msgs.empty(); LOGS(_log, LOG_LVL_INFO, "seq=" << to_string(_seq) - << " returning buffer (" << dlen << ", " << (last ? "(last)" : "(more)") << ")"); + << " GetBuff returning buffer (" << dlen << ", " << (last ? "(last)" : "(more)") << ")"); return sb.get(); } diff --git a/core/modules/xrdsvc/ChannelStream.h b/core/modules/xrdsvc/ChannelStream.h index f636967d03..267f376a53 100644 --- a/core/modules/xrdsvc/ChannelStream.h +++ b/core/modules/xrdsvc/ChannelStream.h @@ -59,6 +59,8 @@ class ChannelStream : public XrdSsiStream { bool closed() const { return _closed; } + uint64_t getSeq() const { return _seq; } + private: bool _closed; ///< Closed to new append() calls? // Can keep a deque of (buf, bufsize) to reduce copying, if needed. diff --git a/core/modules/xrdsvc/SsiRequest.cc b/core/modules/xrdsvc/SsiRequest.cc index a4e1a2d157..3385f62119 100644 --- a/core/modules/xrdsvc/SsiRequest.cc +++ b/core/modules/xrdsvc/SsiRequest.cc @@ -40,6 +40,7 @@ #include "global/ResourceUnit.h" #include "proto/FrameBuffer.h" #include "proto/worker.pb.h" +#include "qdisp/UberJob.h" #include "qmeta/types.h" #include "util/Timer.h" #include "wbase/MsgProcessor.h" @@ -212,6 +213,14 @@ void SsiRequest::execute(XrdSsiRequest& req) { // to actually do something once everything is actually setup. } + +uint64_t SsiRequest::getSeq() const { + if (_stream == nullptr) return 0; + return _stream->getSeq(); +} + + + wbase::WorkerCommand::Ptr SsiRequest::parseWorkerCommand(char const* reqData, int reqSize) { wbase::SendChannel::Ptr const sendChannel = @@ -437,7 +446,7 @@ bool SsiRequest::replyFile(int fd, long long fSize) { bool SsiRequest::replyStream(StreamBuffer::Ptr const& sBuf, bool last) { - LOGS(_log, LOG_LVL_DEBUG, "replyStream, checking stream size=" << sBuf->getSize() << " last=" << last); + LOGS(_log, LOG_LVL_INFO, "replyStream, checking stream size=" << sBuf->getSize() << " last=" << last); // &&& debug // Normally, XrdSsi would call Recycle() when it is done with sBuf, but if this function // returns false, then it must call Recycle(). Otherwise, the scheduler will likely @@ -464,6 +473,7 @@ bool SsiRequest::replyStream(StreamBuffer::Ptr const& sBuf, bool last) { sBuf->Recycle(); return false; } + LOGS(_log, LOG_LVL_WARN, "&&& SsiRequest::replyStream _stream seq=" << getSeq()); // XrdSsi or Finished() will call Recycle(). _stream->append(sBuf, last); return true; @@ -501,8 +511,16 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, // Check the purpose of _finMutex, as it is locked before this is called. qmeta::CzarId czarId = uberJobMsg->czarid(); QueryId qId = uberJobMsg->queryid(); + uint32_t uberJobId = uberJobMsg->uberjobid(); + uint32_t magicNumber = uberJobMsg->magicnumber(); + + if (magicNumber != qdisp::UberJob::getMagicNumber()) { + throw Bug("UberJob incorrect magic number"); + } LOGS(_log, LOG_LVL_INFO, "&&& _handleUberJob qId=" << qId << " czarId=" << czarId); + std::vector jobIds; + int tSize = uberJobMsg->taskmsgs_size(); if (tSize == 0) { return; @@ -515,7 +533,7 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, vector tasks; LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ tSize=" << tSize); for (int j=0; j < tSize; ++j) { - LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ j=" << j); + //LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ j=" << j); proto::TaskMsg const& taskMsg = uberJobMsg->taskmsgs(j); if (!taskMsg.has_db() || !taskMsg.has_chunkid()) { @@ -523,11 +541,12 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, " chunkId=" + to_string(taskMsg.chunkid())); return; } + int jobId = taskMsg.jobid(); + jobIds.push_back(jobId); string db = taskMsg.db(); int chunkId = taskMsg.chunkid(); - //&&&string resourcePath = "/" + db + "/" + to_string(chunkId); string resourcePath = ResourceUnit::makePath(chunkId, db); - LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ resourcePath=" << resourcePath); + //LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ resourcePath=" << resourcePath); ResourceUnit ru(resourcePath); if (ru.db() != db || ru.chunk() != chunkId) { throw Bug("resource path didn't match ru"); @@ -535,13 +554,31 @@ void SsiRequest::_handleUberJob(proto::UberJobMsg* uberJobMsg, auto resourceLock = std::make_shared(*(_resourceMonitor.get()), resourcePath); // If the query uses subchunks, the taskMsg will return multiple Tasks. Otherwise, one task. - LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ creating tasks"); + //LOGS(_log, LOG_LVL_INFO, "&&& SsiRequest::_hUJ creating tasks"); auto nTasks = wbase::Task::createTasks(taskMsg, sendChannel, gArena, resourceLock); // Move nTasks into tasks tasks.insert(tasks.end(), std::make_move_iterator(nTasks.begin()), std::make_move_iterator(nTasks.end())); } + // make a log message showing which jobs in the uberjob + string qIdStr = to_string(qId); + string logMsg = "uberJob={QID," + qIdStr + "#" + to_string(uberJobId); + logMsg += "} channel=" + to_string(sendChannel->getId()); + logMsg += " taskC=" + to_string(sendChannel->getTaskCount()); + logMsg += " lastC=" + to_string(sendChannel->getLastCount()); + logMsg += " jobs=("; + bool firstMsg = true; + for (auto const& jobId:jobIds) { + if (!firstMsg) { + logMsg += ","; + firstMsg = false; + } + logMsg += "{QID," + qIdStr + "#" + to_string(jobId) + "}"; + } + logMsg += ")"; + LOGS(_log, LOG_LVL_INFO, logMsg); + _processor->processTasks(tasks); // Queues tasks to be run later. } diff --git a/core/modules/xrdsvc/SsiRequest.h b/core/modules/xrdsvc/SsiRequest.h index 9c5efd8611..dedd3de90b 100644 --- a/core/modules/xrdsvc/SsiRequest.h +++ b/core/modules/xrdsvc/SsiRequest.h @@ -122,6 +122,8 @@ class SsiRequest bool sendMetadata(const char *buf, int blen); + uint64_t getSeq() const; + /// Call this to allow object to die after it truly is no longer needed. /// i.e. It is know Finish() will not be called. /// NOTE: It is important that any non-static SsiRequest member