Skip to content

Commit

Permalink
Moderate refactoring and code consolidation in the worker requests
Browse files Browse the repository at this point in the history
Simplified error reporting. Reduced code duplication.
  • Loading branch information
iagaponenko committed Aug 17, 2023
1 parent 42b7db6 commit bc0ca76
Show file tree
Hide file tree
Showing 44 changed files with 314 additions and 772 deletions.
171 changes: 38 additions & 133 deletions src/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -148,67 +148,45 @@ message Result {
// sending this message carrying an identifier of a command. Specific
// commands may require additional parameters which should be sent as
// separate messages (of the corresponding types).
//
message WorkerCommandH {

// Types of commands
enum Command {

// Return back a value passed as an input parameters
// The command is meant for testing the protocol
TEST_ECHO = 1;

// Add a group of collocated chunks
ADD_CHUNK_GROUP = 2;

// Remove a group of collocated chunks
REMOVE_CHUNK_GROUP = 3;

// Update (rebuild and/or reload) the list of available chunks
UPDATE_CHUNK_LIST = 4;

// Return a list of chunks known to a worker
GET_CHUNK_LIST = 5;

// Set a new list of chunks
SET_CHUNK_LIST = 6;

// Return various status info on a worker
GET_STATUS = 7;
TEST_ECHO = 1; // Return back a value sent to the command processor.
ADD_CHUNK_GROUP = 2; // Add a group of collocated chunks.
REMOVE_CHUNK_GROUP = 3; // Remove a group of collocated chunks.
UPDATE_CHUNK_LIST = 4; // Update (rebuild and/or reload) the list of available chunks.
GET_CHUNK_LIST = 5; // Return a list of chunks known to a worker.
SET_CHUNK_LIST = 6; // Set a new list of chunks.
GET_STATUS = 7; // Return various status info on a worker.
}
required Command command = 1;
}

// The completion status to be sent back with responses to the worker commands.
message WorkerCommandStatus {
enum Code {
SUCCESS = 1; // The sccessful completion of a request.
INVALID = 2; // Invalid parameters of the request.
IN_USE = 3; // The request is rejected because one of the chunks is in use.
ERROR = 4; // An error occurred during command execution.
}
optional Code code = 1 [default = SUCCESS];
optional string error = 2 [default = ""]; // Optional error message (depends on the code)
}

// This message must be sent after the command header to provide
// a service with a value to be echoed back in response to
// the 'TEST_ECHO' command.
//
message WorkerCommandTestEchoM {

// An input string to be returned back by the service
required string value = 1;
required string value = 1; // The input string to be returned back by the service
}

// The message to be sent back in response to the 'TEST_ECHO' command
//
message WorkerCommandTestEchoR {

// Completion status of the operation
enum Status {
SUCCESS = 1; // successful completion of a request
ERROR = 2; // an error occurred during command execution
}
required Status status = 1;

// Optional error message (depending on the status)
optional string error = 2 [default = ""];

// The original value returned by the operation
required string value = 3;
required WorkerCommandStatus status = 4; // Completion status of the operation
required string value = 3; // The original value returned by the operation
}

// The message type embedded into the relevant contexts below
//
message WorkerCommandChunk {
required string db = 1;
required uint32 chunk = 2;
Expand All @@ -218,119 +196,57 @@ message WorkerCommandChunk {
// This message must be sent after the command header for the 'ADD_CHUNK_GROUP'
// or 'REMOVE_CHUNK_GROUP' command to tell the service which chunks needs to be
// added or removed.
//
message WorkerCommandChunkGroupM {

required uint32 chunk = 1;
repeated string dbs = 2;

optional bool force = 3 [ default = false];
}

// The message to be sent back in response to the 'ADD_CHUNK_GROUP'
// or 'REMOVE_CHUNK_GROUP' commands.
//
message WorkerCommandChunkGroupR {

// Completion status of the operation
enum Status {
SUCCESS = 1; // successful completion of a request
INVALID = 2; // invalid parameters of the request
IN_USE = 3; // request is rejected because one of the chunks is in use
ERROR = 4; // an error occurred during command execution
}
required Status status = 1;

// Optional error message (depending on the status)
optional string error = 2 [default = ""];
required WorkerCommandStatus status = 3; // Completion status of the operation
}

// This message must be sent after the command header for the 'UPDATE_CHUNK_LIST'
// command
//
// command.
message WorkerCommandUpdateChunkListM {

// Rebuild the list from actual tables existing in the database
required bool rebuild = 1;

// Reload the new list into a worker
required bool reload = 2;
required bool rebuild = 1; // Rebuild the list from actual tables existing in the database
required bool reload = 2; // Reload the new list into a worker
}

// The message to be sent back in response to the 'UPDATE_CHUNK_LIST'
// command.
//
message WorkerCommandUpdateChunkListR {

// Completion status of the operation
enum Status {
SUCCESS = 1; // successful completion of a request
ERROR = 2; // an error occurred during command execution
}
required Status status = 1;

// Optional error message (depending on the status)
optional string error = 2 [default = ""];

repeated WorkerCommandChunk added = 3; // chunks which have been added
repeated WorkerCommandChunk removed = 4; // chunks which have been removed
required WorkerCommandStatus status = 5; // Completion status of the operation
repeated WorkerCommandChunk added = 3; // Chunks that were added
repeated WorkerCommandChunk removed = 4; // Chunks that were removed
}

// The message to be sent back in response to the 'GET_CHUNK_LIST'
// command.
//
message WorkerCommandGetChunkListR {

// Completion status of the operation
enum Status {
SUCCESS = 1; // successful completion of a request
ERROR = 2; // an error occurred during command execution
}
required Status status = 1;

// Optional error message (depending on the status)
optional string error = 2 [default = ""];

required WorkerCommandStatus status = 4; // Completion status of the operation
repeated WorkerCommandChunk chunks = 3;
}

// This message must be sent after the command header for the 'SET_CHUNK_LIST'
// to tell the service which chunks needs to be set.
//
message WorkerCommandSetChunkListM {

repeated WorkerCommandChunk chunks = 1;

optional bool force = 2 [ default = false];

// The operation involves databases which are listed below
repeated string databases = 3;
repeated string databases = 3; // The operation involves databases listed here
}

// The message to be sent back in response to the 'SET_CHUNK_LIST'
// command.
//
message WorkerCommandSetChunkListR {

// Completion status of the operation
enum Status {
SUCCESS = 1; // successful completion of a request
INVALID = 2; // invalid parameters of the request
IN_USE = 3; // request is rejected because one of the chunks is in use
ERROR = 4; // an error occurred during command execution
}
required Status status = 1;

// Optional error message (depending on the status)
optional string error = 2 [default = ""];

// The previous list of chunks
repeated WorkerCommandChunk chunks = 3;
required WorkerCommandStatus status = 4; // Completion status of the operation
repeated WorkerCommandChunk chunks = 3; // The previous list of chunks
}

// This message must be sent after the command header for the 'GET_STATUS'
// to customize a scope of the request.
//
message WorkerCommandGetStatusM {
// Include detailed info on the tasks
optional bool include_tasks = 1 [ default = false];
Expand All @@ -353,14 +269,11 @@ message WorkerCommandGetStatusM {
}

// The message to be sent back in response to the 'GET_STATUS' command
//
message WorkerCommandGetStatusR {

// Status info serialized from a JSON object
required string info = 1;
required WorkerCommandStatus status = 2; // Completion status of the operation
required string info = 1; // Status info serialized from a JSON object
}


/////////////////////////////////////////////////////////////////
// Protocol definition for the query management requests. These
// requests do not require any response messages to be explicitly
Expand All @@ -372,18 +285,10 @@ message WorkerCommandGetStatusR {
////////////////////////////////////////////////////////////////

message QueryManagement {

// Supported operations
enum Operation {

// Cancel older queries before the specified query (excluding that one).
CANCEL_AFTER_RESTART = 1;

// Cancel a specific query.
CANCEL = 2;

// Notify workers on the completion of the specified query.
COMPLETE = 3;
CANCEL_AFTER_RESTART = 1; // Cancel older queries before the specified query (excluding that one).
CANCEL = 2; // Cancel a specific query.
COMPLETE = 3; // Notify workers on the completion of the specified query.
}
required Operation op = 1;
required uint64 query_id = 2;
Expand Down
24 changes: 9 additions & 15 deletions src/replica/AddReplicaQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@

// Qserv headers
#include "global/ResourceUnit.h"
#include "proto/worker.pb.h"
#include "replica/Configuration.h"
#include "replica/ServiceProvider.h"

Expand Down Expand Up @@ -74,35 +75,28 @@ void AddReplicaQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<AddReplicaQservMgtRequest>();

_qservRequest = xrdreq::AddChunkGroupQservRequest::create(
chunk(), databases(),
[request](xrdreq::ChunkGroupQservRequest::Status status, string const& error) {
chunk(), databases(), [request](proto::WorkerCommandStatus::Code code, string const& error) {
if (request->state() == State::FINISHED) return;

replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");

if (request->state() == State::FINISHED) return;

switch (status) {
case xrdreq::ChunkGroupQservRequest::Status::SUCCESS:
switch (code) {
case proto::WorkerCommandStatus::SUCCESS:
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;

case xrdreq::ChunkGroupQservRequest::Status::INVALID:
case proto::WorkerCommandStatus::INVALID:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_BAD, error);
break;

case xrdreq::ChunkGroupQservRequest::Status::IN_USE:
case proto::WorkerCommandStatus::IN_USE:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_CHUNK_IN_USE, error);
break;

case xrdreq::ChunkGroupQservRequest::Status::ERROR:
case proto::WorkerCommandStatus::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;

default:
throw logic_error("AddReplicaQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
xrdreq::ChunkGroupQservRequest::status2str(status));
" unhandled request completion code: " +
proto::WorkerCommandStatus_Code_Name(code));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
21 changes: 8 additions & 13 deletions src/replica/GetReplicasQservMgtRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@

// Qserv headers
#include "global/ResourceUnit.h"
#include "proto/worker.pb.h"
#include "replica/Configuration.h"
#include "replica/ServiceProvider.h"

Expand Down Expand Up @@ -112,30 +113,24 @@ void GetReplicasQservMgtRequest::startImpl(replica::Lock const& lock) {
auto const request = shared_from_base<GetReplicasQservMgtRequest>();

_qservRequest = xrdreq::GetChunkListQservRequest::create(
inUseOnly(), [request](xrdreq::GetChunkListQservRequest::Status status, string const& error,
inUseOnly(), [request](proto::WorkerCommandStatus::Code code, string const& error,
xrdreq::GetChunkListQservRequest::ChunkCollection const& collection) {
if (request->state() == State::FINISHED) return;

replica::Lock lock(request->_mtx, request->context() + string(__func__) + "[callback]");

if (request->state() == State::FINISHED) return;

switch (status) {
case xrdreq::GetChunkListQservRequest::Status::SUCCESS:

switch (code) {
case proto::WorkerCommandStatus::SUCCESS:
request->_setReplicas(lock, collection);
request->finish(lock, QservMgtRequest::ExtendedState::SUCCESS);
break;

case xrdreq::GetChunkListQservRequest::Status::ERROR:

case proto::WorkerCommandStatus::ERROR:
request->finish(lock, QservMgtRequest::ExtendedState::SERVER_ERROR, error);
break;

default:
throw logic_error("GetReplicasQservMgtRequest::" + string(__func__) +
" unhandled server status: " +
xrdreq::GetChunkListQservRequest::status2str(status));
throw logic_error(
"GetReplicasQservMgtRequest::" + string(__func__) +
" unhandled server status: " + proto::WorkerCommandStatus_Code_Name(code));
}
});
XrdSsiResource resource(ResourceUnit::makeWorkerPath(worker()));
Expand Down
Loading

0 comments on commit bc0ca76

Please sign in to comment.