Skip to content

Commit

Permalink
Migrated Czar management protocol to use the REST API
Browse files Browse the repository at this point in the history
  • Loading branch information
iagaponenko committed Dec 15, 2023
1 parent 33321ef commit a10694d
Show file tree
Hide file tree
Showing 12 changed files with 618 additions and 197 deletions.
110 changes: 7 additions & 103 deletions src/ccontrol/UserQueryQservManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@

// Qserv headers
#include "cconfig/CzarConfig.h"
#include "qdisp/CzarStats.h"
#include "qdisp/MessageStore.h"
#include "sql/SqlBulkInsert.h"
#include "sql/SqlConnection.h"
#include "sql/SqlConnectionFactory.h"
#include "util/String.h"
#include "wconfig/WorkerConfig.h"

using namespace std;
using json = nlohmann::json;
Expand Down Expand Up @@ -71,45 +68,11 @@ void UserQueryQservManager::submit() {
auto const czarConfig = cconfig::CzarConfig::instance();
auto const resultDbConn = sql::SqlConnectionFactory::make(czarConfig->getMySqlResultConfig());

// Remove quotes around a value of the input parameter. Also parse the command.
// Some commands may have optional parameters.
// Note that (single or double) quotes are required by SQL when calling
// the stored procedure. The quotes are preserved AS-IS by the Qserv query parser.
string command;
vector<string> params;
if (_value.size() > 2) {
string const space = " ";
string const quotesRemoved = _value.substr(1, _value.size() - 2);
for (auto&& str : util::String::split(quotesRemoved, space)) {
// This is just in case if the splitter won't recognise consequtive spaces.
if (str.empty() || (str == space)) continue;
if (command.empty()) {
command = str;
} else {
params.push_back(str);
}
}
}

// Create the table as per the command.
string createTable;
// Create the table.
string const createTable = "CREATE TABLE " + _resultTableName + "(`result` BLOB)";
vector<string> resColumns; // This must match the schema in the CREATE TABLE statement.
if (command == "config") {
createTable = "CREATE TABLE " + _resultTableName + "(`config` BLOB)";
resColumns.push_back("config");
} else if (command == "query_proc_stats") {
createTable = "CREATE TABLE " + _resultTableName + "(`stats` BLOB)";
resColumns.push_back("stats");
} else if (command == "query_info") {
createTable = "CREATE TABLE " + _resultTableName +
"(`queryId` BIGINT NOT NULL, `timestamp_ms` BIGINT NOT NULL, `num_jobs` INT NOT NULL)";
resColumns.push_back("queryId");
resColumns.push_back("timestamp_ms");
resColumns.push_back("num_jobs");
} else {
createTable = "CREATE TABLE " + _resultTableName + "(`result` BLOB)";
resColumns.push_back("result");
}
resColumns.push_back("result");

LOGS(_log, LOG_LVL_TRACE, "creating result table: " << createTable);
sql::SqlErrorObject errObj;
if (!resultDbConn->runQuery(createTable, errObj)) {
Expand All @@ -120,69 +83,10 @@ void UserQueryQservManager::submit() {
return;
}

// Prepare data for the command.
// note that the output string(s) should be quoted.
auto const stats = qdisp::CzarStats::get();
// Return a value of the original input (which includeds quotes).
list<vector<string>> rows;
if (command == "config") {
json const result = cconfig::CzarConfig::instance()->toJson();
vector<string> row = {"'" + result.dump() + "'"};
rows.push_back(move(row));
} else if (command == "query_proc_stats") {
json const result = json::object({{"qdisp_stats", stats->getQdispStatsJson()},
{"transmit_stats", stats->getTransmitStatsJson()}});
vector<string> row = {"'" + result.dump() + "'"};
rows.push_back(move(row));
} else if (command == "query_info") {
// The optonal query identifier and the number of the last seconds in a history
// of queries may be provided to narrow a scope of the operation:
//
// query_info
// query_info <qid>
// query_info <qid> <seconds>
//
// Where any value may be set to 0 to indicate the default behavior. Any extra
// parameters will be ignored.
//
QueryId selectQueryId = 0; // any query
unsigned int lastSeconds = 0; // any timestamps
try {
if (params.size() > 0) selectQueryId = stoull(params[0]);
if (params.size() > 1) lastSeconds = stoul(params[1]);
} catch (exception const& ex) {
string const message =
"failed to parse values of parameter from " + _value + ", ex: " + string(ex.what());
LOGS(_log, LOG_LVL_ERROR, message);
_messageStore->addMessage(-1, "SQL", 1051, message, MessageSeverity::MSG_ERROR);
_qState = ERROR;
return;
}

// The original order of timestams within queries will be preserved as if
// the following query was issued:
//
// SELECT
// `queryId`,
// `timestamp_ms`,
// `num_jobs`
// FROM
// `table`
// ORDER BY
// `queryId`,
// `timestamp_ms` ASC
//
for (auto&& [queryId, history] : stats->getQueryProgress(selectQueryId, lastSeconds)) {
string const queryIdStr = to_string(queryId);
for (auto&& point : history) {
vector<string> row = {queryIdStr, to_string(point.timestampMs), to_string(point.numJobs)};
rows.push_back(move(row));
}
}
} else {
// Return a value of the original command (which includeds quotes).
vector<string> row = {_value};
rows.push_back(move(row));
}
vector<string> row = {_value};
rows.push_back(move(row));

// Ingest row(s) into the table.
bool success = true;
Expand Down
3 changes: 3 additions & 0 deletions src/replica/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,11 @@ target_sources(replica PRIVATE
FixUpJob.cc
GetReplicasQservMgtRequest.cc
GetDbStatusQservMgtRequest.cc
GetConfigQservCzarMgtRequest.cc
GetConfigQservMgtRequest.cc
GetQueryProgressQservCzarMgtRequest.cc
GetResultFilesQservMgtRequest.cc
GetStatusQservCzarMgtRequest.cc
GetStatusQservMgtRequest.cc
HealthMonitorTask.cc
HttpAsyncReqApp.cc
Expand Down
60 changes: 60 additions & 0 deletions src/replica/GetConfigQservCzarMgtRequest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/GetConfigQservCzarMgtRequest.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetConfigQservCzarMgtRequest");

} // namespace

namespace lsst::qserv::replica {

shared_ptr<GetConfigQservCzarMgtRequest> GetConfigQservCzarMgtRequest::create(
shared_ptr<ServiceProvider> const& serviceProvider, string const& czarName,
GetConfigQservCzarMgtRequest::CallbackType const& onFinish) {
return shared_ptr<GetConfigQservCzarMgtRequest>(
new GetConfigQservCzarMgtRequest(serviceProvider, czarName, onFinish));
}

GetConfigQservCzarMgtRequest::GetConfigQservCzarMgtRequest(
shared_ptr<ServiceProvider> const& serviceProvider, string const& czarName,
GetConfigQservCzarMgtRequest::CallbackType const& onFinish)
: QservCzarMgtRequest(serviceProvider, "QSERV_CZAR_GET_CONFIG", czarName), _onFinish(onFinish) {}

void GetConfigQservCzarMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const service = "/config";
createHttpReq(lock, service);
}

void GetConfigQservCzarMgtRequest::notify(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_TRACE, context() << __func__);
notifyDefaultImpl<GetConfigQservCzarMgtRequest>(lock, _onFinish);
}

} // namespace lsst::qserv::replica
88 changes: 88 additions & 0 deletions src/replica/GetConfigQservCzarMgtRequest.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/
#ifndef LSST_QSERV_REPLICA_GETCONFIGQSERVCZARMGTREQUEST_H
#define LSST_QSERV_REPLICA_GETCONFIGQSERVCZARMGTREQUEST_H

// System headers
#include <memory>
#include <string>

// Qserv headers
#include "replica/QservCzarMgtRequest.h"

namespace lsst::qserv::replica {
class ServiceProvider;
} // namespace lsst::qserv::replica

// This header declarations
namespace lsst::qserv::replica {

/**
* Class GetConfigQservCzarMgtRequest is a request for obtaining configuration parameters
* of the Qserv Czar.
*/
class GetConfigQservCzarMgtRequest : public QservCzarMgtRequest {
public:
typedef std::shared_ptr<GetConfigQservCzarMgtRequest> Ptr;

/// The function type for notifications on the completion of the request
typedef std::function<void(Ptr)> CallbackType;

GetConfigQservCzarMgtRequest() = delete;
GetConfigQservCzarMgtRequest(GetConfigQservCzarMgtRequest const&) = delete;
GetConfigQservCzarMgtRequest& operator=(GetConfigQservCzarMgtRequest const&) = delete;

virtual ~GetConfigQservCzarMgtRequest() override = default;

/**
* Static factory method is needed to prevent issues with the lifespan
* and memory management of instances created otherwise (as values or via
* low-level pointers).
* @param serviceProvider A reference to a provider of services for accessing
* Configuration, saving the request's persistent state to the database.
* @param czarName The name of a Czar to send the request to.
* @param onFinish (optional) callback function to be called upon request completion.
* @return A pointer to the created object.
*/
static std::shared_ptr<GetConfigQservCzarMgtRequest> create(
std::shared_ptr<ServiceProvider> const& serviceProvider, std::string const& czarName,
CallbackType const& onFinish = nullptr);

protected:
/// @see QservMgtRequest::createHttpReqImpl()
virtual void createHttpReqImpl(replica::Lock const& lock) override;

/// @see QservMgtRequest::notify()
virtual void notify(replica::Lock const& lock) override;

private:
/// @see GetConfigQservCzarMgtRequest::create()
GetConfigQservCzarMgtRequest(std::shared_ptr<ServiceProvider> const& serviceProvider,
std::string const& czarName, CallbackType const& onFinish);

// Input parameters

CallbackType _onFinish; ///< This callback is reset after finishing the request.
};

} // namespace lsst::qserv::replica

#endif // LSST_QSERV_REPLICA_GETCONFIGQSERVCZARMGTREQUEST_H
71 changes: 71 additions & 0 deletions src/replica/GetQueryProgressQservCzarMgtRequest.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* LSST Data Management System
*
* This product includes software developed by the
* LSST Project (http://www.lsst.org/).
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU General Public License as published by
* the Free Software Foundation, either version 3 of the License, or
* (at your option) any later version.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*/

// Class header
#include "replica/GetQueryProgressQservCzarMgtRequest.h"

// Qserv headers
#include "util/String.h"

// LSST headers
#include "lsst/log/Log.h"

using namespace std;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.replica.GetQueryProgressQservCzarMgtRequest");

} // namespace

namespace lsst::qserv::replica {

shared_ptr<GetQueryProgressQservCzarMgtRequest> GetQueryProgressQservCzarMgtRequest::create(
shared_ptr<ServiceProvider> const& serviceProvider, string const& czarName,
vector<QueryId> const& queryIds, unsigned int lastSeconds,
GetQueryProgressQservCzarMgtRequest::CallbackType const& onFinish) {
return shared_ptr<GetQueryProgressQservCzarMgtRequest>(new GetQueryProgressQservCzarMgtRequest(
serviceProvider, czarName, queryIds, lastSeconds, onFinish));
}

GetQueryProgressQservCzarMgtRequest::GetQueryProgressQservCzarMgtRequest(
shared_ptr<ServiceProvider> const& serviceProvider, string const& czarName,
vector<QueryId> const& queryIds, unsigned int lastSeconds,
GetQueryProgressQservCzarMgtRequest::CallbackType const& onFinish)
: QservCzarMgtRequest(serviceProvider, "QSERV_CZAR_GET_QUERY_PROGRESS", czarName),
_queryIds(queryIds),
_lastSeconds(lastSeconds),
_onFinish(onFinish) {}

void GetQueryProgressQservCzarMgtRequest::createHttpReqImpl(replica::Lock const& lock) {
string const service = "/query-progress";
string query;
query += "?query_ids=" + util::String::toString(_queryIds);
query += "&last_seconds=" + to_string(_lastSeconds);
createHttpReq(lock, service, query);
}

void GetQueryProgressQservCzarMgtRequest::notify(replica::Lock const& lock) {
LOGS(_log, LOG_LVL_TRACE, context() << __func__);
notifyDefaultImpl<GetQueryProgressQservCzarMgtRequest>(lock, _onFinish);
}

} // namespace lsst::qserv::replica
Loading

0 comments on commit a10694d

Please sign in to comment.