Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DM-47985; Master Controller's option to manage replica synchronization at Qserv workers #882

Merged
merged 1 commit into from
Dec 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/replica/apps/MasterControllerHttpApp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ struct {
unsigned int const workerReconfigTimeoutSec = 600;

bool const purge = false;
bool const disableQservSync = false;
bool const forceQservSync = false;
bool const permanentDelete = false;

Expand Down Expand Up @@ -111,6 +112,7 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[])
_qservSyncTimeoutSec(::defaultOptions.qservSyncTimeoutSec),
_workerReconfigTimeoutSec(::defaultOptions.workerReconfigTimeoutSec),
_purge(::defaultOptions.purge),
_disableQservSync(::defaultOptions.disableQservSync),
_forceQservSync(::defaultOptions.forceQservSync),
_permanentDelete(::defaultOptions.permanentDelete),
_qservCzarDbUrl(Configuration::qservCzarDbUrl()) {
Expand Down Expand Up @@ -155,6 +157,8 @@ MasterControllerHttpApp::MasterControllerHttpApp(int argc, char* argv[])
" would override the corresponding parameter specified"
" in the Configuration.",
_workerReconfigTimeoutSec);
parser().flag("qserv-sync-disable", "The flag which disables replica synchroization at Qserv workers.",
_disableQservSync);
parser().flag("qserv-sync-force",
"The flag which would force Qserv workers to update their list of replicas"
" even if some of the chunk replicas were still in use by on-going queries."
Expand Down Expand Up @@ -212,7 +216,7 @@ int MasterControllerHttpApp::runImpl() {

_replicationTask = ReplicationTask::create(
_controller, [self](Task::Ptr const& ptr) { self->_isFailed.fail(); }, _qservSyncTimeoutSec,
_forceQservSync, _replicationIntervalSec, _purge);
_disableQservSync, _forceQservSync, _replicationIntervalSec, _purge);
_replicationTask->start();

_healthMonitorTask = HealthMonitorTask::create(
Expand Down
1 change: 1 addition & 0 deletions src/replica/apps/MasterControllerHttpApp.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ class MasterControllerHttpApp : public Application {
unsigned int _workerReconfigTimeoutSec;

bool _purge;
bool _disableQservSync;
bool _forceQservSync;
bool _permanentDelete;

Expand Down
22 changes: 12 additions & 10 deletions src/replica/contr/ReplicationTask.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,11 @@ namespace lsst::qserv::replica {

ReplicationTask::Ptr ReplicationTask::create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge) {
return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, forceQservSync,
replicationIntervalSec, purge));
unsigned int qservSyncTimeoutSec, bool disableQservSync,
bool forceQservSync, unsigned int replicationIntervalSec,
bool purge) {
return Ptr(new ReplicationTask(controller, onTerminated, qservSyncTimeoutSec, disableQservSync,
forceQservSync, replicationIntervalSec, purge));
}

bool ReplicationTask::onRun() {
Expand All @@ -49,21 +50,21 @@ bool ReplicationTask::onRun() {
serviceProvider()->config()->get<int>("controller", "catalog-management-priority-level");

launch<FindAllJob>(priority, saveReplicaInfo, allWorkers);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (!_disableQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

launch<FixUpJob>(priority);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (!_disableQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

launch<ReplicateJob>(priority, numReplicas);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (!_disableQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

bool const estimateOnly = false;
launch<RebalanceJob>(priority, estimateOnly);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (!_disableQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);

if (_purge) {
launch<PurgeJob>(priority, numReplicas);
sync(_qservSyncTimeoutSec, _forceQservSync);
if (!_disableQservSync) sync(_qservSyncTimeoutSec, _forceQservSync);
}

// Keep on getting calls on this method after a wait time
Expand All @@ -72,10 +73,11 @@ bool ReplicationTask::onRun() {

ReplicationTask::ReplicationTask(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool disableQservSync, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge)
: Task(controller, "REPLICATION-THREAD ", onTerminated, replicationIntervalSec),
_qservSyncTimeoutSec(qservSyncTimeoutSec),
_disableQservSync(disableQservSync),
_forceQservSync(forceQservSync),
_purge(purge) {}

Expand Down
10 changes: 6 additions & 4 deletions src/replica/contr/ReplicationTask.h
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ class ReplicationTask : public Task {
* of the task. Set it to 'nullptr' if no call back should be made.
* @param qservSyncTimeoutSec The maximum number of seconds to be waited before giving
* up on the Qserv synchronization requests.
* @param disableQservSync Disable replica synchronization at Qserv workers if 'true'.
* @param forceQservSync Force chunk removal at worker resource collections if 'true'.
* @param replicationIntervalSec The number of seconds to wait in the end of each
* iteration loop before to begin the new one.
Expand All @@ -62,7 +63,7 @@ class ReplicationTask : public Task {
*/
static Ptr create(Controller::Ptr const& controller,
Task::AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool disableQservSync, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge);

protected:
Expand All @@ -72,15 +73,16 @@ class ReplicationTask : public Task {
private:
/// @see ReplicationTask::create()
ReplicationTask(Controller::Ptr const& controller, AbnormalTerminationCallbackType const& onTerminated,
unsigned int qservSyncTimeoutSec, bool forceQservSync,
unsigned int qservSyncTimeoutSec, bool disableQservSync, bool forceQservSync,
unsigned int replicationIntervalSec, bool purge);

/// The maximum number of seconds to be waited before giving up
/// on the Qserv synchronization requests.
unsigned int const _qservSyncTimeoutSec;

bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'.
bool const _purge; ///< Purge excess replicas if 'true'.
bool const _disableQservSync; ///< Disable replica synchroization at Qserv workers if 'true'.
bool const _forceQservSync; ///< Force removal at worker resource collections if 'true'.
bool const _purge; ///< Purge excess replicas if 'true'.
};

} // namespace lsst::qserv::replica
Expand Down
Loading