diff --git a/src/ccontrol/MergingHandler.cc b/src/ccontrol/MergingHandler.cc index aaa940049..637ff1b32 100644 --- a/src/ccontrol/MergingHandler.cc +++ b/src/ccontrol/MergingHandler.cc @@ -269,9 +269,7 @@ shared_ptr const& MergingHandler::_getHttpConnPool() { } MergingHandler::MergingHandler(std::shared_ptr merger, std::string const& tableName) - : _infileMerger{merger}, _tableName{tableName} { - _initState(); -} + : _infileMerger{merger}, _tableName{tableName} {} MergingHandler::~MergingHandler() { LOGS(_log, LOG_LVL_DEBUG, __func__ << " " << _tableName); } @@ -293,23 +291,6 @@ std::ostream& MergingHandler::print(std::ostream& os) const { return os << "MergingRequester(" << _tableName << ", flushed=" << (_flushed ? "true)" : "false)"); } -void MergingHandler::_initState() { _setError(0, ""); } - -bool MergingHandler::_merge(proto::ResponseSummary const& responseSummary, - proto::ResponseData const& responseData, - shared_ptr const& jobQuery) { - if (_flushed) { - throw util::Bug(ERR_LOC, "already flushed"); - } - bool success = _infileMerger->merge(responseSummary, responseData, jobQuery); - if (!success) { - LOGS(_log, LOG_LVL_WARN, __func__ << " failed"); - util::Error const& err = _infileMerger->getError(); - _setError(ccontrol::MSG_RESULT_ERROR, err.getMsg()); - } - return success; -} - bool MergingHandler::_mergeHttp(shared_ptr const& uberJob, proto::ResponseData const& responseData) { if (_flushed) { @@ -325,7 +306,7 @@ bool MergingHandler::_mergeHttp(shared_ptr const& uberJob, } void MergingHandler::_setError(int code, std::string const& msg) { - LOGS(_log, LOG_LVL_DEBUG, "_setErr: code: " << code << ", message: " << msg); + LOGS(_log, LOG_LVL_DEBUG, "_setError: code: " << code << ", message: " << msg); std::lock_guard lock(_errorMutex); _error = Error(code, msg); } diff --git a/src/ccontrol/MergingHandler.h b/src/ccontrol/MergingHandler.h index 6868abb16..a34a547ae 100644 --- a/src/ccontrol/MergingHandler.h +++ b/src/ccontrol/MergingHandler.h @@ -95,13 +95,6 @@ class MergingHandler : public qdisp::ResponseHandler { void prepScrubResults(int jobId, int attempt) override; private: - /// Prepare for first call to flush(). - void _initState(); - - // &&& delete - bool _merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData, - std::shared_ptr const& jobQuery); - /// Call InfileMerger to do the work of merging this data to the result. bool _mergeHttp(std::shared_ptr const& uberJob, proto::ResponseData const& responseData); diff --git a/src/ccontrol/UserQuerySelect.cc b/src/ccontrol/UserQuerySelect.cc index 186359768..46264c210 100644 --- a/src/ccontrol/UserQuerySelect.cc +++ b/src/ccontrol/UserQuerySelect.cc @@ -167,7 +167,7 @@ void UserQuerySelect::kill() { // make a copy of executive pointer to keep it alive and avoid race // with pointer being reset in discard() method if (exec != nullptr) { - exec->squash(); + exec->squash("UserQuerySelect::kill"); } } catch (UserQueryError const& e) { // Silence merger discarding errors, because this object is being @@ -296,6 +296,7 @@ void UserQuerySelect::submit() { return; } dbName = cs->db; + _queryDbName = dbName; dbNameSet = true; } @@ -308,13 +309,9 @@ void UserQuerySelect::submit() { ++sequence; } - if (dbNameSet) { - _queryDbName = dbName; - } - /// At this point the executive has a map of all jobs with the chunkIds as the key. // This is needed to prevent Czar::_monitor from starting things before they are ready. - exec->setReadyToExecute(); + exec->setAllJobsCreated(); buildAndSendUberJobs(); LOGS(_log, LOG_LVL_DEBUG, "total jobs in query=" << sequence); @@ -341,7 +338,8 @@ void UserQuerySelect::buildAndSendUberJobs() { LOGS(_log, LOG_LVL_ERROR, funcN << " called with null exec " << getQueryIdString()); return; } - if (!exec->isReadyToExecute()) { + + if (!exec->isAllJobsCreated()) { LOGS(_log, LOG_LVL_INFO, funcN << " executive isn't ready to generate UberJobs."); return; } @@ -406,6 +404,7 @@ void UserQuerySelect::buildAndSendUberJobs() { // numerical order. The workers run shared scans in numerical order of chunkId numbers. // Numerical order keeps the number of partially complete UberJobs running on a worker to a minimum, // and should minimize the time for the first UberJob on the worker to complete. + LOGS(_log, LOG_LVL_WARN, " &&&d " << funcN << " start assigning"); for (auto const& [chunkId, jqPtr] : unassignedChunksInQuery) { bool const increaseAttemptCount = true; jqPtr->getDescription()->incrAttemptCount(exec, increaseAttemptCount); diff --git a/src/czar/CzarChunkMap.cc b/src/czar/CzarChunkMap.cc index 6df9936c9..23c5aa816 100644 --- a/src/czar/CzarChunkMap.cc +++ b/src/czar/CzarChunkMap.cc @@ -428,7 +428,7 @@ void CzarFamilyMap::insertIntoMaps(std::shared_ptr const& newFami CzarChunkMap::SizeT sz) { // Get the CzarChunkMap for this family auto familyName = getFamilyNameFromDbName(dbName); - LOGS(_log, LOG_LVL_INFO, + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " familyInsrt{w=" << workerId << " fN=" << familyName << " dbN=" << dbName << " tblN=" << tableName << " chunk=" << chunkIdNum << " sz=" << sz << "}"); auto& nfMap = *newFamilyMap; diff --git a/src/czar/HttpCzarWorkerModule.cc b/src/czar/HttpCzarWorkerModule.cc index f0a05388b..a833e8f2b 100644 --- a/src/czar/HttpCzarWorkerModule.cc +++ b/src/czar/HttpCzarWorkerModule.cc @@ -99,12 +99,13 @@ json HttpCzarWorkerModule::_workerCzarComIssue() { json HttpCzarWorkerModule::_handleJobError(string const& func) { LOGS(_log, LOG_LVL_DEBUG, "HttpCzarWorkerModule::_handleJobError start"); + LOGS(_log, LOG_LVL_WARN, "&&& HttpCzarWorkerModule::_handleJobError start " << body().objJson); // Metadata-only responses for the file-based protocol should not have any data // Parse and verify the json message and then kill the UberJob. json jsRet = {{"success", 0}, {"errortype", "unknown"}, {"note", "initialized"}}; try { - // See qdisp::UberJob::runUberJob() for json message construction. &&& + // TODO:UJ see wbase::UberJobData::responseError for message construction string const targetWorkerId = body().required("workerid"); string const czarName = body().required("czar"); qmeta::CzarId const czarId = body().required("czarid"); @@ -147,7 +148,7 @@ json HttpCzarWorkerModule::_handleJobReady(string const& func) { try { // &&& TODO:UJ file response - move construction and parsing // &&& TODO:UJ to a class so it can be added to WorkerCzarComIssue - // See qdisp::UberJob::runUberJob() for json message construction. &&& + // See wbase::UberJobData::responseFileReady string const targetWorkerId = body().required("workerid"); string const czarName = body().required("czar"); qmeta::CzarId const czarId = body().required("czarid"); diff --git a/src/qdisp/Executive.cc b/src/qdisp/Executive.cc index 41754bc00..e414e986a 100644 --- a/src/qdisp/Executive.cc +++ b/src/qdisp/Executive.cc @@ -196,20 +196,22 @@ JobQuery::Ptr Executive::add(JobDescription::Ptr const& jobDesc) { QSERV_LOGCONTEXT_QUERY_JOB(jobQuery->getQueryId(), jobQuery->getJobId()); { - lock_guard lock(_cancelled.getMutex()); - if (_cancelled) { - LOGS(_log, LOG_LVL_DEBUG, - "Executive already cancelled, ignoring add(" << jobDesc->id() << ")"); - return nullptr; + { + lock_guard lock(_cancelled.getMutex()); + if (_cancelled) { + LOGS(_log, LOG_LVL_DEBUG, + "Executive already cancelled, ignoring add(" << jobDesc->id() << ")"); + return nullptr; + } } - if (!_addJobToMap(jobQuery)) { - LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate job add"); + if (!_track(jobQuery->getJobId(), jobQuery)) { + LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate track add"); return jobQuery; } - if (!_track(jobQuery->getJobId(), jobQuery)) { - LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate track add"); + if (!_addJobToMap(jobQuery)) { + LOGS(_log, LOG_LVL_ERROR, "Executive ignoring duplicate job add"); return jobQuery; } @@ -240,7 +242,7 @@ void Executive::addAndQueueUberJob(shared_ptr const& uj) { lock_guard lck(_uberJobsMapMtx); UberJobId ujId = uj->getUjId(); _uberJobsMap[ujId] = uj; - LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " ujId=" << ujId << " uj.sz=" << uj->getJobCount()); + LOGS(_log, LOG_LVL_TRACE, cName(__func__) << " ujId=" << ujId << " uj.sz=" << uj->getJobCount()); } auto runUberJobFunc = [uj](util::CmdData*) { uj->runUberJob(); }; @@ -415,18 +417,19 @@ void Executive::markCompleted(JobId jobId, bool success) { LOGS(_log, logLvl, "Executive: requesting squash, cause: " << " failed (code=" << err.getCode() << " " << err.getMsg() << ")"); - squash(); // ask to squash + squash(string("markComplete error ") + err.getMsg()); // ask to squash } } -void Executive::squash() { +void Executive::squash(string const& note) { bool alreadyCancelled = _cancelled.exchange(true); if (alreadyCancelled) { LOGS(_log, LOG_LVL_DEBUG, "Executive::squash() already cancelled! refusing. qid=" << getId()); return; } - LOGS(_log, LOG_LVL_INFO, "Executive::squash Trying to cancel all queries... qid=" << getId()); + LOGS(_log, LOG_LVL_WARN, + "Executive::squash Trying to cancel all queries... qid=" << getId() << " " << note); deque jobsToCancel; { lock_guard lockJobMap(_jobMapMtx); @@ -670,6 +673,7 @@ void Executive::_waitAllUntilEmpty() { int moreDetailThreshold = 10; int complainCount = 0; const chrono::seconds statePrintDelay(5); + // Loop until all jobs have completed and all jobs have been created. while (!_incompleteJobs.empty()) { count = _incompleteJobs.size(); if (count != lastCount) { @@ -769,9 +773,10 @@ void Executive::checkResultFileSize(uint64_t fileSize) { cName(__func__) << "recheck total=" << total << " max=" << maxResultTableSizeBytes); if (total > maxResultTableSizeBytes) { LOGS(_log, LOG_LVL_ERROR, "Executive: requesting squash, result file size too large " << total); - ResponseHandler::Error err(0, string("Incomplete result already too large ") + to_string(total)); + ResponseHandler::Error err(util::ErrorCode::CZAR_RESULT_TOO_LARGE, + string("Incomplete result already too large ") + to_string(total)); _multiError.push_back(err); - squash(); + squash("czar, file too large"); } } } diff --git a/src/qdisp/Executive.h b/src/qdisp/Executive.h index e72216474..48e64e3dd 100644 --- a/src/qdisp/Executive.h +++ b/src/qdisp/Executive.h @@ -147,7 +147,7 @@ class Executive : public std::enable_shared_from_this { void markCompleted(JobId refNum, bool success); /// Squash all the jobs. - void squash(); + void squash(std::string const& note); bool getEmpty() { return _empty; } @@ -210,13 +210,13 @@ class Executive : public std::enable_shared_from_this { // The below value should probably be based on the user query, with longer sleeps for slower queries. int getAttemptSleepSeconds() const { return 15; } // As above or until added to config file. - int getMaxAttempts() const { return 5; } // Should be set by config + int getMaxAttempts() const { return 50; } // TODO:UJ Should be set by config - /// Calling this indicates the executive is ready to create and execute UberJobs. - void setReadyToExecute() { _readyToExecute = true; } + /// Calling this indicates all Jobs for this user query have been created. + void setAllJobsCreated() { _allJobsCreated = true; } - /// Returns true if the executive is ready to create and execute UberJobs. - bool isReadyToExecute() { return _readyToExecute; } + /// Returns true if all jobs have been created. + bool isAllJobsCreated() { return _allJobsCreated; } /// Send a message to all workers to cancel this query. /// @param deleteResults - If true, delete all result files for this query on the workers. @@ -346,8 +346,8 @@ class Executive : public std::enable_shared_from_this { /// Weak pointer to the UserQuerySelect object for this query. std::weak_ptr _userQuerySelect; - /// Flag that is set to true when ready to create and run UberJobs. - std::atomic _readyToExecute{false}; + /// Flag that is set to true when all jobs have been created. + std::atomic _allJobsCreated{false}; protojson::ScanInfo::Ptr _scanInfo; ///< Scan rating and tables. diff --git a/src/qdisp/JobDescription.cc b/src/qdisp/JobDescription.cc index 660e57330..fdd29f3d9 100644 --- a/src/qdisp/JobDescription.cc +++ b/src/qdisp/JobDescription.cc @@ -65,14 +65,12 @@ bool JobDescription::incrAttemptCount(std::shared_ptr const& exec, bo if (increase) { ++_attemptCount; } - if (_attemptCount >= MAX_JOB_ATTEMPTS) { - LOGS(_log, LOG_LVL_ERROR, "attemptCount greater than maximum number of retries " << _attemptCount); - return false; - } if (exec != nullptr) { int maxAttempts = exec->getMaxAttempts(); - LOGS(_log, LOG_LVL_INFO, "JoQDescription::" << __func__ << " attempts=" << _attemptCount); + if (_attemptCount > 0) { + LOGS(_log, LOG_LVL_INFO, "JoBDescription::" << __func__ << " attempts=" << _attemptCount); + } if (_attemptCount > maxAttempts) { LOGS(_log, LOG_LVL_ERROR, "JoQDescription::" << __func__ << " attempts(" << _attemptCount << ") > maxAttempts(" @@ -80,10 +78,16 @@ bool JobDescription::incrAttemptCount(std::shared_ptr const& exec, bo exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "max attempts reached " + to_string(_attemptCount) + " " + _qIdStr, util::ErrorCode::INTERNAL); - exec->squash(); + exec->squash(string("incrAttemptCount ") + to_string(_attemptCount)); return false; } } + + if (_attemptCount >= MAX_JOB_ATTEMPTS) { + LOGS(_log, LOG_LVL_ERROR, "attemptCount greater than maximum number of retries " << _attemptCount); + return false; + } + return true; } diff --git a/src/qdisp/JobQuery.cc b/src/qdisp/JobQuery.cc index 71d9f19ec..b8f05034d 100644 --- a/src/qdisp/JobQuery.cc +++ b/src/qdisp/JobQuery.cc @@ -61,7 +61,8 @@ JobQuery::~JobQuery() { /// Cancel response handling. Return true if this is the first time cancel has been called. bool JobQuery::cancel(bool superfluous) { QSERV_LOGCONTEXT_QUERY_JOB(getQueryId(), getJobId()); - LOGS(_log, LOG_LVL_DEBUG, "JobQuery::cancel()"); + LOGS(_log, LOG_LVL_DEBUG, "JobQuery::cancel() " << superfluous); + LOGS(_log, LOG_LVL_WARN, "&&&JobQuery::cancel() " << superfluous); if (_cancelled.exchange(true) == false) { VMUTEX_NOT_HELD(_jqMtx); lock_guard lock(_jqMtx); diff --git a/src/qdisp/UberJob.cc b/src/qdisp/UberJob.cc index 07ccd6875..00c4d11bd 100644 --- a/src/qdisp/UberJob.cc +++ b/src/qdisp/UberJob.cc @@ -106,7 +106,7 @@ util::HistogramRolling histoUJSerialize("&&&uj histoUJSerialize", {0.1, 1.0, 10. void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelled LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " start"); - LOGS(_log, LOG_LVL_ERROR, cName(__func__) << "&&&uj runuj start"); + LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " &&&uj runuj start"); // Build the uberjob payload for each job. nlohmann::json uj; unique_lock jobsLock(_jobsMtx); @@ -119,7 +119,7 @@ void UberJob::runUberJob() { // &&& TODO:UJ this should probably check cancelle vector const headers = {"Content-Type: application/json"}; auto const& czarConfig = cconfig::CzarConfig::instance(); - int maxTableSizeMB = czarConfig->getMaxTableSizeMB(); + uint64_t maxTableSizeMB = czarConfig->getMaxTableSizeMB(); auto czInfo = protojson::CzarContactInfo::create( czarConfig->name(), czarConfig->id(), czarConfig->replicationHttpPort(), util::get_current_host_fqdn(), czar::Czar::czarStartupTime); @@ -220,7 +220,7 @@ void UberJob::_unassignJobs() { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " could not unassign job=" << jid << " cancelling"); exec->addMultiError(qmeta::JobStatus::RETRY_ERROR, "unable to re-assign " + jid, util::ErrorCode::INTERNAL); - exec->squash(); + exec->squash("_unassignJobs failure"); return; } LOGS(_log, LOG_LVL_DEBUG, @@ -292,14 +292,9 @@ void UberJob::callMarkCompleteFunc(bool success) { util::HistogramRolling histoQueImp("&&&uj histoQueImp", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); -/// Retrieve and process a result file using the file-based protocol -/// Uses a copy of JobQuery::Ptr instead of _jobQuery as a call to cancel() would reset _jobQuery. json UberJob::importResultFile(string const& fileUrl, uint64_t rowCount, uint64_t fileSize) { LOGS(_log, LOG_LVL_DEBUG, cName(__func__) << " fileUrl=" << fileUrl << " rowCount=" << rowCount << " fileSize=" << fileSize); - LOGS(_log, LOG_LVL_WARN, - cName(__func__) << "&&& fileUrl=" << fileUrl << " rowCount=" << rowCount - << " fileSize=" << fileSize); if (isQueryCancelled()) { LOGS(_log, LOG_LVL_WARN, cName(__func__) << " import job was cancelled."); @@ -398,7 +393,7 @@ json UberJob::workerError(int errorCode, string const& errorMsg) { // TODO:UJ see if recoverable errors can be detected on the workers, or // maybe allow a single retry before sending the error back to the user? bool recoverableError = false; - recoverableError = true; // TODO:UJ delete after testing + if (recoverableError) { // The czar should have new maps before the the new UberJob(s) for // these Jobs are created. (see Czar::_monitor) @@ -408,7 +403,7 @@ json UberJob::workerError(int errorCode, string const& errorMsg) { int errState = util::ErrorCode::MYSQLEXEC; getRespHandler()->flushHttpError(errorCode, errorMsg, errState); exec->addMultiError(errorCode, errorMsg, errState); - exec->squash(); + exec->squash(string("UberJob::workerError ") + errorMsg); } string errType = to_string(errorCode) + ":" + errorMsg; @@ -427,7 +422,7 @@ json UberJob::_importResultError(bool shouldCancel, string const& errorType, str if (shouldCancel) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " failing jobs"); callMarkCompleteFunc(false); // all jobs failed, no retry - exec->squash(); + exec->squash(string("_importResultError shouldCancel")); } else { /// - each JobQuery in _jobs needs to be flagged as needing to be /// put in an UberJob and it's attempt count increased and checked @@ -465,7 +460,7 @@ void UberJob::_importResultFinish(uint64_t resultRows) { if (!statusSet) { LOGS(_log, LOG_LVL_ERROR, cName(__func__) << " failed to set status, squashing " << getIdStr()); // Something has gone very wrong - exec->squash(); + exec->squash("UberJob::_importResultFinish couldn't set status"); return; } diff --git a/src/qdisp/UberJob.h b/src/qdisp/UberJob.h index c1ead8b24..cc2a32a31 100644 --- a/src/qdisp/UberJob.h +++ b/src/qdisp/UberJob.h @@ -106,6 +106,8 @@ class UberJob : public std::enable_shared_from_this { czar::CzarChunkMap::WorkerChunksData::Ptr getWorkerData() { return _workerData; } /// Queue the lambda function to collect and merge the results from the worker. + /// @return a json message indicating success unless the query has been + /// cancelled, limit row complete, or similar. nlohmann::json importResultFile(std::string const& fileUrl, uint64_t rowCount, uint64_t fileSize); /// Handle an error from the worker. diff --git a/src/qdisp/testQDisp.cc b/src/qdisp/testQDisp.cc index 59299d1c5..deee865d0 100644 --- a/src/qdisp/testQDisp.cc +++ b/src/qdisp/testQDisp.cc @@ -204,6 +204,7 @@ std::shared_ptr addMockRequests(qdisp::Executive::Ptr const& ex qdisp::JobDescription::Ptr job = makeMockJobDescription(ex, sequence.incr(), ru, msg, rv[j]); jobQuery = ex->add(job); } + ex->setAllJobsCreated(); return jobQuery; } @@ -377,7 +378,7 @@ BOOST_AUTO_TEST_CASE(ExecutiveCancel) { // squash SequentialInt sequence(0); tEnv.jqTest = executiveTest(tEnv.ex, sequence, chunkId, tEnv.qrMsg, 1); - tEnv.ex->squash(); + tEnv.ex->squash("test"); usleep(250000); // Give mock threads a quarter second to complete. tEnv.ex->join(); BOOST_CHECK(tEnv.jqTest->isQueryCancelled() == true); @@ -390,9 +391,9 @@ BOOST_AUTO_TEST_CASE(ExecutiveCancel) { // squash SequentialInt sequence(0); executiveTest(tEnv.ex, sequence, chunkId, tEnv.qrMsg, 20); - tEnv.ex->squash(); - tEnv.ex->squash(); // check that squashing twice doesn't cause issues. - usleep(250000); // Give mock threads a quarter second to complete. + tEnv.ex->squash("test"); + tEnv.ex->squash("test"); // check that squashing twice doesn't cause issues. + usleep(250000); // Give mock threads a quarter second to complete. tEnv.ex->join(); } } diff --git a/src/qproc/QuerySession.cc b/src/qproc/QuerySession.cc index 9bd643265..969409a4d 100644 --- a/src/qproc/QuerySession.cc +++ b/src/qproc/QuerySession.cc @@ -391,7 +391,6 @@ std::vector QuerySession::_buildChunkQueries(query::QueryTemplate:: } for (auto&& queryTemplate : queryTemplates) { - LOGS(_log, LOG_LVL_WARN, "&&&uj QuerySession::_buildChunkQueries qt=" << queryTemplate.dump()); std::string str = _context->queryMapping->apply(chunkSpec, queryTemplate); chunkQueries.push_back(std::move(str)); } diff --git a/src/rproc/InfileMerger.cc b/src/rproc/InfileMerger.cc index b192f6c0f..e44383b71 100644 --- a/src/rproc/InfileMerger.cc +++ b/src/rproc/InfileMerger.cc @@ -220,118 +220,6 @@ void InfileMerger::mergeCompleteFor(int jobId) { _totalResultSize += _perJobResultSize[jobId]; // TODO:UJ this can probably be simplified } -bool InfileMerger::merge(proto::ResponseSummary const& responseSummary, - proto::ResponseData const& responseData, - std::shared_ptr const& jq) { - JobId const jobId = responseSummary.jobid(); - std::string queryIdJobStr = QueryIdHelper::makeIdStr(responseSummary.queryid(), jobId); - if (!_queryIdStrSet) { - _setQueryIdStr(QueryIdHelper::makeIdStr(responseSummary.queryid())); - } - - // Nothing to do if size is zero. - if (responseData.row_size() == 0) { - return true; - } - - // Do nothing if the query got cancelled for any reason. - if (jq->isQueryCancelled()) { - return true; - } - auto executive = jq->getExecutive(); - if (executive == nullptr || executive->getCancelled() || executive->isRowLimitComplete()) { - return true; - } - - std::unique_ptr semaLock; - if (_dbEngine != MYISAM) { - // needed for parallel merging with INNODB and MEMORY - semaLock.reset(new util::SemaLock(*_semaMgrConn)); - } - - TimeCountTracker::CALLBACKFUNC cbf = [](TIMEPOINT start, TIMEPOINT end, double bytes, - bool success) { - if (!success) return; - if (std::chrono::duration const seconds = end - start; seconds.count() > 0) { - qdisp::CzarStats::get()->addXRootDSSIRecvRate(bytes / seconds.count()); - } - }; - auto tct = make_shared>(cbf); - - bool ret = false; - // Add columns to rows in virtFile. - util::Timer virtFileT; - virtFileT.start(); - int resultJobId = makeJobIdAttempt(responseSummary.jobid(), responseSummary.attemptcount()); - ProtoRowBuffer::Ptr pRowBuffer = std::make_shared( - responseData, resultJobId, _jobIdColName, _jobIdSqlType, _jobIdMysqlType); - std::string const virtFile = _infileMgr.prepareSrc(pRowBuffer); - std::string const infileStatement = sql::formLoadInfile(_mergeTable, virtFile); - virtFileT.stop(); - - // If the job attempt is invalid, exit without adding rows. - // It will wait here if rows need to be deleted. - if (_invalidJobAttemptMgr.incrConcurrentMergeCount(resultJobId)) { - return true; - } - - size_t const resultSize = responseData.transmitsize(); - size_t tResultSize; - { - std::lock_guard resultSzLock(_mtxResultSizeMtx); - _perJobResultSize[jobId] += resultSize; - tResultSize = _totalResultSize + _perJobResultSize[jobId]; - } - if (tResultSize > _maxResultTableSizeBytes) { - std::ostringstream os; - os << queryIdJobStr << " cancelling the query, queryResult table " << _mergeTable - << " is too large at " << tResultSize << " bytes, max allowed size is " << _maxResultTableSizeBytes - << " bytes"; - LOGS(_log, LOG_LVL_ERROR, os.str()); - _error = util::Error(-1, os.str(), -1); - return false; - } - - tct->addToValue(resultSize); - tct->setSuccess(); - tct.reset(); // stop transmit recieve timer before merging happens. - - qdisp::CzarStats::get()->addTotalBytesRecv(resultSize); - qdisp::CzarStats::get()->addTotalRowsRecv(responseData.rowcount()); - - // Stop here (if requested) after collecting stats on the amount of data collected - // from workers. - if (_config.debugNoMerge) { - return true; - } - - auto start = std::chrono::system_clock::now(); - switch (_dbEngine) { - case MYISAM: - ret = _applyMysqlMyIsam(infileStatement, resultSize); - break; - case INNODB: // Fallthrough - case MEMORY: - ret = _applyMysqlInnoDb(infileStatement, resultSize); - break; - default: - throw std::invalid_argument("InfileMerger::_dbEngine is unknown =" + engineToStr(_dbEngine)); - } - auto end = std::chrono::system_clock::now(); - auto mergeDur = std::chrono::duration_cast(end - start); - LOGS(_log, LOG_LVL_DEBUG, - "mergeDur=" << mergeDur.count() << " sema(total=" << _semaMgrConn->getTotalCount() - << " used=" << _semaMgrConn->getUsedCount() << ")"); - if (not ret) { - LOGS(_log, LOG_LVL_ERROR, "InfileMerger::merge mysql applyMysql failure"); - } - _invalidJobAttemptMgr.decrConcurrentMergeCount(); - - LOGS(_log, LOG_LVL_DEBUG, "virtFileT=" << virtFileT.getElapsed() << " mergeDur=" << mergeDur.count()); - - return ret; -} - uint32_t histLimitCount = 0; util::HistogramRolling histoInfileBuild("&&&uj histoInfileBuild", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); util::HistogramRolling histoMergeSecs("&&&uj histoMergeSecs", {0.1, 1.0, 10.0, 100.0, 1000.0}, 1h, 10000); @@ -440,7 +328,7 @@ bool InfileMerger::mergeHttp(qdisp::UberJob::Ptr const& uberJob, proto::Response } auto end = CLOCK::now(); auto mergeDur = std::chrono::duration_cast(end - start); - LOGS(_log, LOG_LVL_DEBUG, + LOGS(_log, LOG_LVL_TRACE, "mergeDur=" << mergeDur.count() << " sema(total=" << _semaMgrConn->getTotalCount() << " used=" << _semaMgrConn->getUsedCount() << ")"); std::chrono::duration secs = end - start; // &&& @@ -457,7 +345,7 @@ bool InfileMerger::mergeHttp(qdisp::UberJob::Ptr const& uberJob, proto::Response } _invalidJobAttemptMgr.decrConcurrentMergeCount(); - LOGS(_log, LOG_LVL_DEBUG, "virtFileT=" << virtFileT.getElapsed() << " mergeDur=" << mergeDur.count()); + LOGS(_log, LOG_LVL_TRACE, "virtFileT=" << virtFileT.getElapsed() << " mergeDur=" << mergeDur.count()); return ret; } diff --git a/src/rproc/InfileMerger.h b/src/rproc/InfileMerger.h index 3091246ca..14ab9b395 100644 --- a/src/rproc/InfileMerger.h +++ b/src/rproc/InfileMerger.h @@ -162,13 +162,6 @@ class InfileMerger { std::string engineToStr(InfileMerger::DbEngine engine); - /// Merge a worker response, which contains a single ResponseData message - /// Using job query info for early termination of the merge if needed. - /// @return true if merge was successfully imported. - // &&& delete - bool merge(proto::ResponseSummary const& responseSummary, proto::ResponseData const& responseData, - std::shared_ptr const& jq); - /// Merge the result data collected over Http. bool mergeHttp(std::shared_ptr const& uberJob, proto::ResponseData const& responseData); diff --git a/src/util/Error.h b/src/util/Error.h index c95ec76b0..825594ce6 100644 --- a/src/util/Error.h +++ b/src/util/Error.h @@ -61,6 +61,7 @@ struct ErrorCode { MYSQLCONNECT, MYSQLEXEC, INTERNAL, + CZAR_RESULT_TOO_LARGE, // Worker errors: WORKER_RESULT_TOO_LARGE }; diff --git a/src/wbase/FileChannelShared.cc b/src/wbase/FileChannelShared.cc index f51052a1b..338771488 100644 --- a/src/wbase/FileChannelShared.cc +++ b/src/wbase/FileChannelShared.cc @@ -381,7 +381,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrgetIdStr() << " bytesT=" << bytesTransmitted - << " _tsz=" << _transmitsize); + __func__ << " " << task->getIdStr() << " bytesT=" << _bytesWritten << " _tsz=" << _transmitsize); bufferFillT.stop(); bufferFillSecs += bufferFillT.getElapsed(); - int64_t const maxTableSize = task->getMaxTableSize(); + uint64_t const maxTableSize = task->getMaxTableSize(); // Fail the operation if the amount of data in the result set exceeds the requested // "large result" limit (in case one was specified). - if (maxTableSize > 0 && bytesTransmitted > maxTableSize) { - string const err = "The result set size " + to_string(bytesTransmitted) + + LOGS(_log, LOG_LVL_TRACE, "bytesWritten=" << _bytesWritten << " max=" << maxTableSize); + if (maxTableSize > 0 && _bytesWritten > maxTableSize) { + string const err = "The result set size " + to_string(_bytesWritten) + " of a job exceeds the requested limit of " + to_string(maxTableSize) + " bytes, task: " + task->getIdStr(); multiErr.push_back(util::Error(util::ErrorCode::WORKER_RESULT_TOO_LARGE, err)); LOGS(_log, LOG_LVL_ERROR, err); erred = true; - break; + //&&&task->cancel(); + //&&&buildAndTransmitError(multiErr, task, cancelled); + return erred; } int const ujRowLimit = task->getRowLimit(); @@ -472,7 +475,7 @@ bool FileChannelShared::buildAndTransmitResult(MYSQL_RES* mResult, shared_ptrgetIdStr()); } else { - qStats->addTaskTransmit(timeSeconds, bytesTransmitted, rowsTransmitted, bufferFillSecs); + qStats->addTaskTransmit(timeSeconds, taskBytesWritten, rowsTransmitted, bufferFillSecs); LOGS(_log, LOG_LVL_TRACE, "TaskTransmit time=" << timeSeconds << " bufferFillSecs=" << bufferFillSecs); } diff --git a/src/wbase/FileChannelShared.h b/src/wbase/FileChannelShared.h index 348eb3cb3..b1fb26a1a 100644 --- a/src/wbase/FileChannelShared.h +++ b/src/wbase/FileChannelShared.h @@ -305,6 +305,8 @@ class FileChannelShared { /// much faster to answer the query without scanning all 1000 chunks. std::atomic _rowLimitComplete; std::atomic _dead{false}; ///< Set to true when the contents of the file are no longer useful. + + std::atomic _bytesWritten{0}; ///< Total bytes written. }; } // namespace lsst::qserv::wbase diff --git a/src/wbase/Task.cc b/src/wbase/Task.cc index 33b24f39e..2fa6b3ce0 100644 --- a/src/wbase/Task.cc +++ b/src/wbase/Task.cc @@ -123,7 +123,7 @@ atomic taskSequence{0}; ///< Unique identifier source for Task. /// the util::CommandThreadPool is not called here. Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, size_t templateId, bool hasSubchunks, int subchunkId, string const& db, - protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, int maxTableSize, + protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, //&&& int maxTableSize, vector const& fragSubTables, vector const& fragSubchunkIds, shared_ptr const& sc, std::shared_ptr const& queryStats_, uint16_t resultsHttpPort) @@ -143,7 +143,7 @@ Task::Task(UberJobData::Ptr const& ujData, int jobId, int attemptCount, int chun _scanInfo(scanInfo), _scanInteractive(scanInteractive), _queryStats(queryStats_), - _maxTableSize(maxTableSize * ::MB_SIZE_BYTES), + //&&&_maxTableSize(maxTableSize * ::MB_SIZE_BYTES), _rowLimit(ujData->getRowLimit()), _ujData(ujData), _idStr(ujData->getIdStr() + " jId=" + to_string(_jId) + " sc=" + to_string(_subchunkId)) { @@ -235,7 +235,7 @@ std::vector Task::createTasksFromUberJobMsg( auto jobSubQueryTempMap = ujMsg->getJobSubQueryTempMap(); auto jobDbTablesMap = ujMsg->getJobDbTablesMap(); auto jobMsgVect = ujMsg->getJobMsgVect(); - int maxTableSizeMb = ujMsg->getMaxTableSizeMb(); + //&&& int maxTableSizeMb = ujMsg->getMaxTableSizeMb(); auto scanInfo = ujMsg->getScanInfo(); for (auto const& jobMsg : *jobMsgVect) { @@ -276,7 +276,7 @@ std::vector Task::createTasksFromUberJobMsg( int const subchunkId = -1; auto task = Task::Ptr(new Task( ujData, jobId, attemptCount, chunkId, fragmentNumber, templateId, noSubchunks, - subchunkId, chunkQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, + subchunkId, chunkQuerySpecDb, scanInfo, scanInteractive, //&&& maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, queryStats, resultsHttpPort)); vect.push_back(task); @@ -285,7 +285,7 @@ std::vector Task::createTasksFromUberJobMsg( bool const hasSubchunks = true; auto task = Task::Ptr(new Task(ujData, jobId, attemptCount, chunkId, fragmentNumber, templateId, hasSubchunks, subchunkId, chunkQuerySpecDb, - scanInfo, scanInteractive, maxTableSizeMb, + scanInfo, scanInteractive, //&&&maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, queryStats, resultsHttpPort)); vect.push_back(task); @@ -384,8 +384,9 @@ std::vector Task::createTasksForUnitTest( int const subchunkId = -1; auto task = Task::Ptr(new Task(ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, 0, noSubchunks, subchunkId, jdQuerySpecDb, scanInfo, - scanInteractive, maxTableSizeMb, fragSubTables, - fragSubchunkIds, sendChannel, nullptr, 0)); + //&&&scanInteractive, maxTableSizeMb, fragSubTables, + scanInteractive, fragSubTables, fragSubchunkIds, + sendChannel, nullptr, 0)); vect.push_back(task); } else { @@ -393,7 +394,7 @@ std::vector Task::createTasksForUnitTest( bool const hasSubchunks = true; auto task = Task::Ptr(new Task( ujData, jdJobId, jdAttemptCount, jdChunkId, fragmentNumber, 0, hasSubchunks, - subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, maxTableSizeMb, + subchunkId, jdQuerySpecDb, scanInfo, scanInteractive, //&&& maxTableSizeMb, fragSubTables, fragSubchunkIds, sendChannel, nullptr, 0)); vect.push_back(task); @@ -637,7 +638,8 @@ nlohmann::json Task::getJson() const { js["attemptId"] = _attemptCount; js["sequenceId"] = _tSeq; js["scanInteractive"] = _scanInteractive; - js["maxTableSize"] = _maxTableSize; + //&&&js["maxTableSize"] = _maxTableSize; + js["maxTableSize"] = _ujData->getMaxTableSizeBytes(); js["cancelled"] = to_string(_cancelled); js["state"] = static_cast(_state.load()); js["createTime_msec"] = util::TimeUtils::tp2ms(_createTime); @@ -655,6 +657,8 @@ nlohmann::json Task::getJson() const { return js; } +int64_t Task::getMaxTableSize() const { return _ujData->getMaxTableSizeBytes(); } + ostream& operator<<(ostream& os, Task const& t) { os << "Task: " << "msg: " << t.getIdStr() << " chunk=" << t._chunkId << " db=" << t._db << " " << t.getQueryString(); diff --git a/src/wbase/Task.h b/src/wbase/Task.h index 9f9d30b88..b6586f5d2 100644 --- a/src/wbase/Task.h +++ b/src/wbase/Task.h @@ -159,7 +159,7 @@ class Task : public util::CommandForThreadPool { // Unfortunately, this will be much easier if it is done after xrootd method is removed. Task(std::shared_ptr const& ujData, int jobId, int attemptCount, int chunkId, int fragmentNumber, size_t templateId, bool hasSubchunks, int subchunkId, std::string const& db, - protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, int maxTableSizeMb, + protojson::ScanInfo::Ptr const& scanInfo, bool scanInteractive, //&&&int maxTableSizeMb, std::vector const& fragSubTables, std::vector const& fragSubchunkIds, std::shared_ptr const& sc, std::shared_ptr const& queryStats_, uint16_t resultsHttpPort = 8080); @@ -237,7 +237,9 @@ class Task : public util::CommandForThreadPool { int getJobId() const { return _jId; } int getAttemptCount() const { return _attemptCount; } bool getScanInteractive() { return _scanInteractive; } - int64_t getMaxTableSize() const { return _maxTableSize; } + //&&&int64_t getMaxTableSize() const { return _maxTableSize; } + int64_t getMaxTableSize() const; + protojson::ScanInfo::Ptr getScanInfo() { return _scanInfo; } void setOnInteractive(bool val) { _onInteractive = val; } bool getOnInteractive() { return _onInteractive; } @@ -366,7 +368,7 @@ class Task : public util::CommandForThreadPool { /// Stores information on the query's resource usage. std::weak_ptr const _queryStats; - int64_t _maxTableSize = 0; + //&&&int64_t _maxTableSize = 0; std::atomic _memHandle{memman::MemMan::HandleType::INVALID}; memman::MemMan::Ptr _memMan; diff --git a/src/wbase/UberJobData.cc b/src/wbase/UberJobData.cc index b782e645a..5743354fb 100644 --- a/src/wbase/UberJobData.cc +++ b/src/wbase/UberJobData.cc @@ -56,8 +56,8 @@ namespace lsst::qserv::wbase { UberJobData::UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta::CzarId czarId, std::string czarHost, int czarPort, uint64_t queryId, int rowLimit, - std::string const& workerId, std::shared_ptr const& foreman, - std::string const& authKey) + uint64_t maxTableSizeBytes, std::string const& workerId, + std::shared_ptr const& foreman, std::string const& authKey) : _uberJobId(uberJobId), _czarName(czarName), _czarId(czarId), @@ -65,6 +65,7 @@ UberJobData::UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta _czarPort(czarPort), _queryId(queryId), _rowLimit(rowLimit), + _maxTableSizeBytes(maxTableSizeBytes), _workerId(workerId), _authKey(authKey), _foreman(foreman), diff --git a/src/wbase/UberJobData.h b/src/wbase/UberJobData.h index 2634e0325..a16960311 100644 --- a/src/wbase/UberJobData.h +++ b/src/wbase/UberJobData.h @@ -66,10 +66,10 @@ class UberJobData : public std::enable_shared_from_this { static Ptr create(UberJobId uberJobId, std::string const& czarName, qmeta::CzarId czarId, std::string const& czarHost, int czarPort, uint64_t queryId, int rowLimit, - std::string const& workerId, std::shared_ptr const& foreman, - std::string const& authKey) { + uint64_t maxTableSizeBytes, std::string const& workerId, + std::shared_ptr const& foreman, std::string const& authKey) { return Ptr(new UberJobData(uberJobId, czarName, czarId, czarHost, czarPort, queryId, rowLimit, - workerId, foreman, authKey)); + maxTableSizeBytes, workerId, foreman, authKey)); } /// Set file channel for this UberJob void setFileChannelShared(std::shared_ptr const& fileChannelShared); @@ -82,6 +82,7 @@ class UberJobData : public std::enable_shared_from_this { int getCzarPort() const { return _czarPort; } uint64_t getQueryId() const { return _queryId; } std::string getWorkerId() const { return _workerId; } + uint64_t getMaxTableSizeBytes() const { return _maxTableSizeBytes; } /// Add the tasks defined in the UberJob to this UberJobData object. void addTasks(std::vector> const& tasks) { @@ -112,8 +113,9 @@ class UberJobData : public std::enable_shared_from_this { private: UberJobData(UberJobId uberJobId, std::string const& czarName, qmeta::CzarId czarId, std::string czarHost, - int czarPort, uint64_t queryId, int rowLimit, std::string const& workerId, - std::shared_ptr const& foreman, std::string const& authKey); + int czarPort, uint64_t queryId, int rowLimit, uint64_t maxTableSizeBytes, + std::string const& workerId, std::shared_ptr const& foreman, + std::string const& authKey); /// Queue the response to be sent to the originating czar. void _queueUJResponse(http::Method method_, std::vector const& headers_, @@ -127,6 +129,7 @@ class UberJobData : public std::enable_shared_from_this { int const _czarPort; QueryId const _queryId; int const _rowLimit; ///< If > 0, only read this many rows before return the results. + uint64_t const _maxTableSizeBytes; std::string const _workerId; std::string const _authKey; diff --git a/src/wdb/testQueryRunner.cc b/src/wdb/testQueryRunner.cc index de9aebaab..5f7612dab 100644 --- a/src/wdb/testQueryRunner.cc +++ b/src/wdb/testQueryRunner.cc @@ -163,7 +163,7 @@ BOOST_AUTO_TEST_CASE(Simple) { auto const queries = queriesAndChunks(); auto ujData = lsst::qserv::wbase::UberJobData::create( mInfo.uberJobId, mInfo.czarName, mInfo.czarId, mInfo.czarHostName, mInfo.czarPort, mInfo.queryId, - mInfo.rowLimit, mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); + mInfo.rowLimit, mInfo.maxTableSize, mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); auto scanInfo = lsst::qserv::protojson::ScanInfo::create(); scanInfo->scanRating = mInfo.scanRating; scanInfo->infoTables.emplace_back(mInfo.db, mInfo.table, mInfo.lockInMemory, mInfo.scanRating); @@ -188,7 +188,7 @@ BOOST_AUTO_TEST_CASE(Output) { auto const queries = queriesAndChunks(); auto ujData = lsst::qserv::wbase::UberJobData::create( mInfo.uberJobId, mInfo.czarName, mInfo.czarId, mInfo.czarHostName, mInfo.czarPort, mInfo.queryId, - mInfo.rowLimit, mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); + mInfo.rowLimit, mInfo.maxTableSize, mInfo.targWorkerId, mInfo.foreman, mInfo.authKey); auto scanInfo = lsst::qserv::protojson::ScanInfo::create(); scanInfo->scanRating = mInfo.scanRating; scanInfo->infoTables.emplace_back(mInfo.db, mInfo.table, mInfo.lockInMemory, mInfo.scanRating); diff --git a/src/xrdsvc/HttpWorkerCzarModule.cc b/src/xrdsvc/HttpWorkerCzarModule.cc index a672b740a..0e915a673 100644 --- a/src/xrdsvc/HttpWorkerCzarModule.cc +++ b/src/xrdsvc/HttpWorkerCzarModule.cc @@ -120,11 +120,17 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { QueryId ujQueryId = uberJobMsg->getQueryId(); int ujRowLimit = uberJobMsg->getRowLimit(); auto targetWorkerId = uberJobMsg->getWorkerId(); + uint64_t maxTableSizeMb = uberJobMsg->getMaxTableSizeMb(); + uint64_t const MB_SIZE_BYTES = 1024 * 1024; + uint64_t maxTableSizeBytes = maxTableSizeMb * MB_SIZE_BYTES; // Get or create QueryStatistics and UserQueryInfo instances. auto queryStats = foreman()->getQueriesAndChunks()->addQueryId(ujQueryId, ujCzInfo->czId); auto userQueryInfo = queryStats->getUserQueryInfo(); LOGS(_log, LOG_LVL_WARN, uberJobMsg->getIdStr() << " &&& added to stats"); + LOGS(_log, LOG_LVL_WARN, + uberJobMsg->getIdStr() << " &&& bytesWritten added to stats maxTableSizeMb=" << maxTableSizeMb + << " maxTableSizeBytes=" << maxTableSizeBytes); if (userQueryInfo->getCancelledByCzar()) { throw wbase::TaskException( @@ -136,8 +142,8 @@ json HttpWorkerCzarModule::_handleQueryJob(string const& func) { } auto ujData = wbase::UberJobData::create(ujId, ujCzInfo->czName, ujCzInfo->czId, ujCzInfo->czHostName, - ujCzInfo->czPort, ujQueryId, ujRowLimit, targetWorkerId, - foreman(), authKey()); + ujCzInfo->czPort, ujQueryId, ujRowLimit, maxTableSizeBytes, + targetWorkerId, foreman(), authKey()); LOGS(_log, LOG_LVL_WARN, uberJobMsg->getIdStr() << " &&& ujData created"); // Find the entry for this queryId, create a new one if needed.