Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tickets/dm 45548 #884

Draft
wants to merge 22 commits into
base: tickets/DM-43715
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ add_subdirectory(mysql)
add_subdirectory(parser)
add_subdirectory(partition)
add_subdirectory(proto)
add_subdirectory(protojson)
add_subdirectory(proxy)
add_subdirectory(qana)
add_subdirectory(qdisp)
Expand All @@ -89,7 +90,6 @@ add_subdirectory(wpublish)
add_subdirectory(wsched)
add_subdirectory(www)
add_subdirectory(xrdlog)
add_subdirectory(xrdreq)
add_subdirectory(xrdsvc)

#-----------------------------------------------------------------------------
Expand All @@ -103,6 +103,7 @@ target_link_libraries(qserv_common PUBLIC
mysql
sql
util
protojson
)

install(
Expand Down Expand Up @@ -143,7 +144,6 @@ target_link_libraries(qserv_czar PUBLIC
rproc
qserv_css
qserv_meta
xrdreq
)

install(
Expand Down
2 changes: 0 additions & 2 deletions src/admin/templates/http/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,6 @@ largestPriority = 3
vectRunSizes = 50:50:50:50
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299

[replication]

Expand Down
10 changes: 4 additions & 6 deletions src/admin/templates/proxy/etc/qserv-czar.cnf.jinja
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,19 @@ notifyWorkersOnCzarRestart = 1
#[debug]
#chunkLimit = -1

# Please see qdisp/QdispPool.h QdispPool::QdispPool for more information
# Please see util/QdispPool.h QdispPool::QdispPool for more information
[qdisppool]
#size of the pool
poolSize = 50
poolSize = 1000
# Low numbers are higher priority. Largest priority 3 creates 4 priority queues 0, 1, 2, 3
# Must be greater than 0.
largestPriority = 3
# Maximum number of threads running for each queue. No spaces. Values separated by ':'
# Using largestPriority = 2 and vectRunsizes = 3:5:8
# queue 0 would have runSize 3, queue 1 would have runSize 5, and queue 2 would have runSize 8.
vectRunSizes = 50:50:50:50
vectRunSizes = 800:800:500:500
# Minimum number of threads running for each queue. No spaces. Values separated by ':'
vectMinRunningSizes = 0:1:3:3
# Maximum number of QueryRequests allowed to be running at one time.
qReqPseudoFifoMaxRunning = 299
vectMinRunningSizes = 0:3:3:3

[replication]

Expand Down
7 changes: 3 additions & 4 deletions src/cconfig/CzarConfig.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,18 +62,17 @@ namespace lsst::qserv::cconfig {

std::mutex CzarConfig::_mtxOnInstance;

std::shared_ptr<CzarConfig> CzarConfig::_instance;
CzarConfig::Ptr CzarConfig::_instance;

std::shared_ptr<CzarConfig> CzarConfig::create(std::string const& configFileName,
std::string const& czarName) {
CzarConfig::Ptr CzarConfig::create(std::string const& configFileName, std::string const& czarName) {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
_instance = std::shared_ptr<CzarConfig>(new CzarConfig(util::ConfigStore(configFileName), czarName));
}
return _instance;
}

std::shared_ptr<CzarConfig> CzarConfig::instance() {
CzarConfig::Ptr CzarConfig::instance() {
std::lock_guard<std::mutex> const lock(_mtxOnInstance);
if (_instance == nullptr) {
throw std::logic_error("CzarConfig::" + std::string(__func__) + ": instance has not been created.");
Expand Down
54 changes: 48 additions & 6 deletions src/cconfig/CzarConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ namespace lsst::qserv::cconfig {
*/
class CzarConfig {
public:
using Ptr = std::shared_ptr<CzarConfig>;
/**
* Create an instance of CzarConfig and load parameters from the specifid file.
* @note One has to call this method at least once before trying to obtain
Expand All @@ -63,15 +64,15 @@ class CzarConfig {
* @param czarName - the unique name of Czar.
* @return the shared pointer to the configuration object
*/
static std::shared_ptr<CzarConfig> create(std::string const& configFileName, std::string const& czarName);
static Ptr create(std::string const& configFileName, std::string const& czarName);

/**
* Get a pointer to an instance that was created by the last call to
* the method 'create'.
* @return the shared pointer to the configuration object
* @throws std::logic_error when attempting to call the bethod before creating an instance.
*/
static std::shared_ptr<CzarConfig> instance();
static Ptr instance();

CzarConfig() = delete;
CzarConfig(CzarConfig const&) = delete;
Expand Down Expand Up @@ -117,7 +118,7 @@ class CzarConfig {
*/
std::string const& getXrootdFrontendUrl() const { return _xrootdFrontendUrl->getVal(); }

/* Get the maximum number of threads for xrootd to use.
/* Get the maximum number of threads for xrootd to use. // TODO:UJ delete
*
* @return the maximum number of threads for xrootd to use.
*/
Expand Down Expand Up @@ -198,6 +199,28 @@ class CzarConfig {
/// the OOM situation.
unsigned int czarStatsRetainPeriodSec() const { return _czarStatsRetainPeriodSec->getVal(); }

/// A worker is considered fully ALIVE if the last update from the worker has been
/// heard in less than _activeWorkerTimeoutAliveSecs seconds.
int getActiveWorkerTimeoutAliveSecs() const { return _activeWorkerTimeoutAliveSecs->getVal(); }

/// A worker is considered DEAD if it hasn't been heard from in more than
/// _activeWorkerTimeoutDeadSecs.
int getActiveWorkerTimeoutDeadSecs() const { return _activeWorkerTimeoutDeadSecs->getVal(); }

/// Max lifetime of a message to be sent to an active worker. If the czar has been
/// trying to send a message to a worker and has failed for this many seconds,
/// it gives up at this point, removing elements of the message to save memory.
int getActiveWorkerMaxLifetimeSecs() const { return _activeWorkerMaxLifetimeSecs->getVal(); }

/// The maximum number of chunks (basically Jobs) allowed in a single UberJob.
int getUberJobMaxChunks() const { return _uberJobMaxChunks->getVal(); }

/// Return the maximum number of http connections to use for czar commands.
int getCommandMaxHttpConnections() const { return _commandMaxHttpConnections->getVal(); }

/// Return the sleep time (in milliseconds) between messages sent to active workers.
int getMonitorSleepTimeMilliSec() const { return _monitorSleepTimeMilliSec->getVal(); }

// Parameters of the Czar management service

std::string const& replicationInstanceId() const { return _replicationInstanceId->getVal(); }
Expand Down Expand Up @@ -293,7 +316,7 @@ class CzarConfig {
CVTIntPtr _resultMaxConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxconnections", notReq, 40);
CVTIntPtr _resultMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 8192);
util::ConfigValTInt::create(_configValMap, "resultdb", "maxhttpconnections", notReq, 2000);
CVTIntPtr _oldestResultKeptDays =
util::ConfigValTInt::create(_configValMap, "resultdb", "oldestResultKeptDays", notReq, 30);

Expand Down Expand Up @@ -344,10 +367,11 @@ class CzarConfig {
CVTIntPtr _qdispMaxPriority =
util::ConfigValTInt::create(_configValMap, "qdisppool", "largestPriority", notReq, 2);
CVTStrPtr _qdispVectRunSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "50:50:50:50");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectRunSizes", notReq, "800:800:500:50");
CVTStrPtr _qdispVectMinRunningSizes =
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:1:3:3");
util::ConfigValTStr::create(_configValMap, "qdisppool", "vectMinRunningSizes", notReq, "0:3:3:3");

// TODO:UJ delete xrootd specific entries.
CVTIntPtr _xrootdSpread = util::ConfigValTInt::create(_configValMap, "tuning", "xrootdSpread", notReq, 4);
CVTIntPtr _qMetaSecsBetweenChunkCompletionUpdates = util::ConfigValTInt::create(
_configValMap, "tuning", "qMetaSecsBetweenChunkCompletionUpdates", notReq, 60);
Expand Down Expand Up @@ -385,6 +409,24 @@ class CzarConfig {
util::ConfigValTInt::create(_configValMap, "replication", "http_port", notReq, 0);
CVTUIntPtr _replicationNumHttpThreads =
util::ConfigValTUInt::create(_configValMap, "replication", "num_http_threads", notReq, 2);

// Active Worker
CVTIntPtr _activeWorkerTimeoutAliveSecs = // 5min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutAliveSecs", notReq, 60 * 5);
CVTIntPtr _activeWorkerTimeoutDeadSecs = // 10min
util::ConfigValTInt::create(_configValMap, "activeworker", "timeoutDeadSecs", notReq, 60 * 10);
CVTIntPtr _activeWorkerMaxLifetimeSecs = // 1hr
util::ConfigValTInt::create(_configValMap, "activeworker", "maxLifetimeSecs", notReq, 60 * 60);
CVTIntPtr _monitorSleepTimeMilliSec = util::ConfigValTInt::create(
_configValMap, "activeworker", "monitorSleepTimeMilliSec", notReq, 15'000);

// UberJobs
CVTIntPtr _uberJobMaxChunks =
util::ConfigValTInt::create(_configValMap, "uberjob", "maxChunks", notReq, 1000);

/// This may impact `_resultMaxHttpConnections` as too many connections may cause kernel memory issues.
CVTIntPtr _commandMaxHttpConnections =
util::ConfigValTInt::create(_configValMap, "uberjob", "commandMaxHttpConnections", notReq, 2000);
};

} // namespace lsst::qserv::cconfig
Expand Down
2 changes: 0 additions & 2 deletions src/ccontrol/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,6 @@ target_link_libraries(ccontrol PUBLIC
parser
replica
sphgeom
xrdreq
XrdCl
)

Expand All @@ -51,7 +50,6 @@ FUNCTION(ccontrol_tests)
qserv_meta
query
rproc
xrdreq
Boost::unit_test_framework
Threads::Threads
)
Expand Down
Loading
Loading