Skip to content

Commit

Permalink
Refactored and extended worker configuration service
Browse files Browse the repository at this point in the history
The service is now available to the client code via shared pointer
that can be stored and used by classes as needed. The change reduces
the number of parameters which are sent around the code. In the new
version of the code only the shared pointer to the service is put
to where the configuration parameters are consumed.

Also added new parameters to support the file-based result delivery,
including: a location of the results folder, the number of BOOST ASIO
threads to run the QHTTP server at worker, and a selector for the desired
results delivery protocol.
  • Loading branch information
iagaponenko committed Jul 19, 2023
1 parent ae9a111 commit 27ab504
Show file tree
Hide file tree
Showing 8 changed files with 272 additions and 162 deletions.
18 changes: 18 additions & 0 deletions src/admin/templates/xrootd/etc/xrdssi.cf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -85,5 +85,23 @@ maxtransmits = 50
maxalreadytransmitting = 10

[results]

# The name of a folder where query results will be stored.
dirname = {{ results_dirname }}

# The port number of the worker XROOTD service for serving files.
# NOTE: the hardcoded value may need to be replaced with a template
xrootd_port = 1094

# The number of the BOOST ASIO threads for HTTP requests
num_http_threads = 4

# Result delivery protocol. Allowed options:
# SSI - XROOTD/SSI stream (the default mode if no specific choice is proided)
# XROOT - XROOT file protocol
# HTTP - HTTP protocol
protocol = SSI

# Set to any value but 0 if result files (if any) left after the previous run of
# the worker had to be deleted from the corresponding folder.
clean_up_on_start = 1
96 changes: 95 additions & 1 deletion src/wconfig/WorkerConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,10 @@

// System headers
#include <sstream>
#include <stdexcept>

// Third party headers
#include <boost/algorithm/string/predicate.hpp>

// LSST headers
#include "lsst/log/Log.h"
Expand All @@ -35,14 +39,99 @@
#include "util/ConfigStoreError.h"
#include "wsched/BlendScheduler.h"

using namespace lsst::qserv::wconfig;

namespace {

LOG_LOGGER _log = LOG_GET("lsst.qserv.wconfig.WorkerConfig");

WorkerConfig::ResultDeliveryProtocol parseResultDeliveryProtocol(std::string const& str) {
// Using BOOST's 'iequals' for case-insensitive comparisons.
if (str.empty() || boost::iequals(str, "SSI")) {
return WorkerConfig::ResultDeliveryProtocol::SSI;
} else if (boost::iequals(str, "XROOT")) {
return WorkerConfig::ResultDeliveryProtocol::XROOT;
} else if (boost::iequals(str, "HTTP")) {
return WorkerConfig::ResultDeliveryProtocol::HTTP;
}
throw std::invalid_argument("WorkerConfig::" + std::string(__func__) + " unsupported method '" + str +
"'.");
}
} // namespace

namespace lsst::qserv::wconfig {

std::mutex WorkerConfig::_mtxOnInstance;

std::shared_ptr<WorkerConfig> WorkerConfig::_instance;

std::shared_ptr<WorkerConfig> WorkerConfig::create(std::string const& configFileName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
_instance = std::shared_ptr<WorkerConfig>(
configFileName.empty() ? new WorkerConfig()
: new WorkerConfig(util::ConfigStore(configFileName)));
}
return _instance;
}

std::shared_ptr<WorkerConfig> WorkerConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("WorkerConfig::" + std::string(__func__) + ": instance has not been created.");
}
return _instance;
}

std::string WorkerConfig::protocol2str(ResultDeliveryProtocol const& p) {
switch (p) {
case WorkerConfig::ResultDeliveryProtocol::SSI:
return "SSI";
case WorkerConfig::ResultDeliveryProtocol::XROOT:
return "XROOT";
case WorkerConfig::ResultDeliveryProtocol::HTTP:
return "HTTP";
}
throw std::invalid_argument("WorkerConfig::" + std::string(__func__) + ": unknown protocol " +
std::to_string(static_cast<int>(p)));
}

WorkerConfig::WorkerConfig()
: _memManClass("MemManReal"),
_memManSizeMb(1000),
_memManLocation("/qserv/data/mysql"),
_threadPoolSize(wsched::BlendScheduler::getMinPoolSize()),
_maxPoolThreads(5000),
_maxGroupSize(1),
_requiredTasksCompleted(25),
_prioritySlow(2),
_prioritySnail(1),
_priorityMed(3),
_priorityFast(4),
_maxReserveSlow(2),
_maxReserveSnail(2),
_maxReserveMed(2),
_maxReserveFast(2),
_maxActiveChunksSlow(2),
_maxActiveChunksSnail(1),
_maxActiveChunksMed(4),
_maxActiveChunksFast(4),
_scanMaxMinutesFast(60),
_scanMaxMinutesMed(60 * 8),
_scanMaxMinutesSlow(60 * 12),
_scanMaxMinutesSnail(60 * 24),
_maxTasksBootedPerUserQuery(5),
_maxSqlConnections(800),
_ReservedInteractiveSqlConnections(50),
_bufferMaxTotalGB(41),
_maxTransmits(40),
_maxPerQid(3),
_resultsDirname("/qserv/data/results"),
_resultsXrootdPort(1094),
_resultsNumHttpThreads(1),
_resultDeliveryProtocol(ResultDeliveryProtocol::SSI),
_resultsCleanUpOnStart(true) {}

WorkerConfig::WorkerConfig(const util::ConfigStore& configStore)
: _memManClass(configStore.get("memman.class", "MemManReal")),
_memManSizeMb(configStore.getInt("memman.memory", 1000)),
Expand Down Expand Up @@ -74,7 +163,12 @@ WorkerConfig::WorkerConfig(const util::ConfigStore& configStore)
configStore.getInt("sqlconnections.reservedinteractivesqlconn", 50)),
_bufferMaxTotalGB(configStore.getInt("transmit.buffermaxtotalgb", 41)),
_maxTransmits(configStore.getInt("transmit.maxtransmits", 40)),
_maxPerQid(configStore.getInt("transmit.maxperqid", 3)) {
_maxPerQid(configStore.getInt("transmit.maxperqid", 3)),
_resultsDirname(configStore.get("results.dirname", "/qserv/data/results")),
_resultsXrootdPort(configStore.getInt("results.xrootd_port", 1094)),
_resultsNumHttpThreads(configStore.getInt("results.num_http_threads", 1)),
_resultDeliveryProtocol(::parseResultDeliveryProtocol(configStore.get("results.protocol", "SSI"))),
_resultsCleanUpOnStart(configStore.getInt("results.clean_up_on_start", 1) != 0) {
int mysqlPort = configStore.getInt("mysql.port");
std::string mysqlSocket = configStore.get("mysql.socket");
if (mysqlPort == 0 && mysqlSocket.empty()) {
Expand Down
Loading

0 comments on commit 27ab504

Please sign in to comment.