Skip to content

Commit

Permalink
Extended Protobuf definition to return file resource locations
Browse files Browse the repository at this point in the history
Also elminated deprecated protobuf schema and large result attributes
  • Loading branch information
iagaponenko committed Jul 19, 2023
1 parent 27ab504 commit fab0082
Show file tree
Hide file tree
Showing 16 changed files with 36 additions and 127 deletions.
9 changes: 3 additions & 6 deletions src/ccontrol/MergingHandler.cc
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,7 @@ const char* MergingHandler::getStateStr(MsgState const& state) {
return "unknown";
}

bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& largeResult, int& nextBufSize,
int& resultRows) {
bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, int& nextBufSize, int& resultRows) {
LOGS(_log, LOG_LVL_DEBUG,
"From:" << _wName << " flush state=" << getStateStr(_state) << " blen=" << bLen << " last=" << last);
resultRows = 0;
Expand All @@ -112,7 +111,6 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar

{
nextBufSize = _response->protoHeader.size();
largeResult = _response->protoHeader.largeresult();
bool endNoData = _response->protoHeader.endnodata();
int seq = -1;
int scsSeq = -1;
Expand All @@ -123,9 +121,8 @@ bool MergingHandler::flush(int bLen, BufPtr const& bufPtr, bool& last, bool& lar
scsSeq = _response->protoHeader.scsseq();
}
LOGS(_log, LOG_LVL_DEBUG,
"HEADER_WAIT: From:" << _wName << " nextBufSize=" << nextBufSize
<< " largeResult=" << largeResult << " endNoData=" << endNoData
<< " seq=" << seq << " scsseq=" << scsSeq);
"HEADER_WAIT: From:" << _wName << " nextBufSize=" << nextBufSize << " endNoData="
<< endNoData << " seq=" << seq << " scsseq=" << scsSeq);

_state = MsgState::RESULT_WAIT;
if (endNoData || nextBufSize == 0) {
Expand Down
3 changes: 1 addition & 2 deletions src/ccontrol/MergingHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,7 @@ class MergingHandler : public qdisp::ResponseHandler {
/// Flush the retrieved buffer where bLen bytes were set. If last==true,
/// then no more buffer() and flush() calls should occur.
/// @return true if successful (no error)
bool flush(int bLen, BufPtr const& bufPtr, bool& last, bool& largeResult, int& nextBufSize,
int& resultRows) override;
bool flush(int bLen, BufPtr const& bufPtr, bool& last, int& nextBufSize, int& resultRows) override;

/// Signal an unrecoverable error condition. No further calls are expected.
void errorFlush(std::string const& msg, int code) override;
Expand Down
1 change: 0 additions & 1 deletion src/proto/FakeProtocolFixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,6 @@ class FakeProtocolFixture {
p->set_protocol(2);
p->set_size(500);
p->set_md5(std::string("1234567890abcdef0"));
p->set_largeresult(false);
p->set_endnodata(false);
return p;
}
Expand Down
10 changes: 8 additions & 2 deletions src/proto/worker.proto
Original file line number Diff line number Diff line change
Expand Up @@ -90,28 +90,32 @@ message ProtoHeader {
optional sfixed32 size = 2; // protobufs discourages messages > megabytes
optional bytes md5 = 3;
optional string wname = 4;
optional bool largeresult = 5;
optional bool largeresult = 5; // DEPRECATED
optional bool endnodata = 6; // True if this header is the end, no more data. size should be 0.
optional uint32 seq = 7; // sequence number from SendChannel
optional int32 scsseq = 8; // sequence number from SendChannelShared, can be -1
}

// DEPRECATED
message ColumnSchema {
optional string name = 1; // Optional to allow type-only transmission
optional string sqltype = 2;
optional int32 mysqltype = 3;
}

// DEPRECATED
message RowSchema {
repeated ColumnSchema columnschema = 1;
}

message RowBundle {
repeated bytes column = 1; // bytes to allow BLOB encoding
repeated bool isnull = 2; // Flag to allow sending nulls.
}

message Result {
optional int64 session = 1;
optional RowSchema rowschema = 2;
optional RowSchema rowschema = 2; // DEPRECATED
optional int32 errorcode = 3;
optional string errormsg = 4;
repeated RowBundle row = 5;
Expand All @@ -120,6 +124,8 @@ message Result {
optional uint32 rowcount = 8;
optional uint64 transmitsize = 9;
optional int32 attemptcount = 10;
optional string fileresource_xroot = 11; /// XROOTD url for the result file
optional string fileresource_http = 12; /// HTTP url for the result file
}

// Result protocol 2:
Expand Down
1 change: 0 additions & 1 deletion src/qdisp/Executive.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ class QuerySession;
namespace qdisp {

class JobQuery;
class LargeResultMgr;
class MessageStore;
class PseudoFifo;

Expand Down
19 changes: 5 additions & 14 deletions src/qdisp/QueryRequest.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,12 +357,10 @@ bool QueryRequest::_importStream(JobQuery::Ptr const& jq) {
ResponseHandler::BufPtr bufPtr = make_shared<vector<char>>(buff, buff + len);

// Use `flush()` to read the buffer and extract the header.
bool largeResult = false;
int nextBufSize = 0;
bool last = false;
int resultRows = 0;
bool flushOk = jq->getDescription()->respHandler()->flush(len, bufPtr, last, largeResult, nextBufSize,
resultRows);
bool flushOk = jq->getDescription()->respHandler()->flush(len, bufPtr, last, nextBufSize, resultRows);

if (!flushOk) {
LOGS(_log, LOG_LVL_ERROR, "_importStream not flushOk");
Expand Down Expand Up @@ -512,7 +510,6 @@ void QueryRequest::_processData(JobQuery::Ptr const& jq, int blen, bool xrdLast)
ResponseHandler::BufPtr nextHeaderBufPtr;

// Values for these variables to be filled in by flush() calls.
bool largeResult = false;
int nextBufSize = 0;
int resultRows = 0;
bool last = false;
Expand All @@ -525,8 +522,8 @@ void QueryRequest::_processData(JobQuery::Ptr const& jq, int blen, bool xrdLast)
int respSize = blen - protoHeaderSize;
nextHeaderBufPtr = make_shared<vector<char>>(bufPtr->begin() + respSize, bufPtr->end());
// Read the result
// Values for last, largeResult, and nextBufSize filled in by flush
flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, largeResult, nextBufSize, resultRows);
// Values for last, and nextBufSize filled in by flush
flushOk = jq->getRespHandler()->flush(respSize, bufPtr, last, nextBufSize, resultRows);
if (last) {
// Last should only be true when the header is read, not the result.
throw util::Bug(ERR_LOC, "_processData result had 'last' true, which cannot be allowed.");
Expand All @@ -545,15 +542,9 @@ void QueryRequest::_processData(JobQuery::Ptr const& jq, int blen, bool xrdLast)
}

// Read the next header
// Values for last, largeResult, and nextBufSize filled in by flush
// Values for last, and nextBufSize filled in by flush
// resultRows is ignored in headers, and should always be 0.
flushOk = jq->getRespHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, largeResult, nextBufSize,
resultRows);

if (largeResult) {
if (!_largeResult) LOGS(_log, LOG_LVL_DEBUG, "holdState largeResult set to true");
_largeResult = true; // Once the worker indicates it's a large result, it stays that way.
}
flushOk = jq->getRespHandler()->flush(protoHeaderSize, nextHeaderBufPtr, last, nextBufSize, resultRows);

if (flushOk) {
if (last != xrdLast) {
Expand Down
3 changes: 1 addition & 2 deletions src/qdisp/QueryRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this<Q
bool initialRequest);
void _flushError(JobQuery::Ptr const& jq);

/// _holdState indicates the data is being held by SSI for a large response using LargeResultMgr.
/// _holdState indicates the data is being held by SSI for a large response.
/// If the state is NOT NO_HOLD0, then this instance has decremented the shared semaphore and it
/// must increment the semaphore before going away.
enum HoldState { NO_HOLD0 = 0, GET_DATA1 = 1, MERGE2 = 2 };
Expand Down Expand Up @@ -161,7 +161,6 @@ class QueryRequest : public XrdSsiRequest, public std::enable_shared_from_this<Q

std::atomic<bool> _finishedCalled{false};

bool _largeResult{false}; ///< True if the worker flags this job as having a large result.
QdispPool::Ptr _qdispPool;
std::shared_ptr<AskForResponseDataCmd> _askForResponseDataCmd;

Expand Down
6 changes: 2 additions & 4 deletions src/qdisp/ResponseHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,13 +55,11 @@ class ResponseHandler {
/// Flush the retrieved buffer where bLen bytes were set. If last==true,
/// then no more buffer() and flush() calls should occur.
/// @return true if successful (no error)
/// last, largeResult, nextBufSize, and resultRows are set in flush.
/// last, nextBufSize, and resultRows are set in flush.
/// last - true if no more messages for this job.
/// largeResult - true if there is more than 1 message in the result.
/// nextBufSize - size of the next buffer
/// resultRows - number of result rows in this result.
virtual bool flush(int bLen, BufPtr const& bufPtr, bool& last, bool& largeResult, int& nextBufSize,
int& resultRows) = 0;
virtual bool flush(int bLen, BufPtr const& bufPtr, bool& last, int& nextBufSize, int& resultRows) = 0;

/// Signal an unrecoverable error condition. No further calls are expected.
virtual void errorFlush(std::string const& msg, int code) = 0;
Expand Down
1 change: 0 additions & 1 deletion src/qdisp/XrdSsiMocks.cc
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,6 @@ class Agent : public XrdSsiResponder, public XrdSsiStream {
ph->set_size(0);
ph->set_md5(std::string("d41d8cd98f00b204e9800998ecf8427"));
ph->set_wname("localhost");
ph->set_largeresult(false);
ph->set_endnodata(true);
std::string pHdrString;
ph->SerializeToString(&pHdrString);
Expand Down
6 changes: 2 additions & 4 deletions src/rproc/InfileMerger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -206,15 +206,13 @@ bool InfileMerger::merge(std::shared_ptr<proto::WorkerResponse> const& response)
}
// TODO: Check session id (once session id mgmt is implemented)
if (not(response->result.has_jobid() && response->result.has_rowcount() &&
response->result.has_transmitsize() && response->result.has_attemptcount() &&
response->result.has_rowschema())) {
response->result.has_transmitsize() && response->result.has_attemptcount())) {
LOGS(_log, LOG_LVL_ERROR,
"merge response missing required field"
<< " jobid:" << response->result.has_jobid()
<< " rowcount:" << response->result.has_rowcount()
<< " transmitsize:" << response->result.has_transmitsize()
<< " attemptcount:" << response->result.has_attemptcount()
<< " rowschema:" << response->result.has_rowschema());
<< " attemptcount:" << response->result.has_attemptcount());
return false;
}
int const jobId = response->result.jobid();
Expand Down
4 changes: 0 additions & 4 deletions src/rproc/InfileMerger.h
Original file line number Diff line number Diff line change
Expand Up @@ -177,10 +177,6 @@ class InfileMerger {

std::string engineToStr(InfileMerger::DbEngine engine);

/// Create the shared thread pool and/or change its size.
// @return the size of the large result thread pool.
static int setLargeResultPoolSize(int size);

/// Merge a worker response, which contains:
/// Size of ProtoHeader message
/// ProtoHeader message
Expand Down
38 changes: 1 addition & 37 deletions src/rproc/ProtoRowBuffer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ ProtoRowBuffer::ProtoRowBuffer(proto::Result& res, int jobId, std::string const&
_jobIdSqlType(jobIdSqlType),
_jobIdMysqlType(jobIdMysqlType) {
_jobIdStr = std::string("'") + std::to_string(jobId) + "'";
_initSchema();
if (_result.row_size() > 0) {
_initCurrentRow();
}
Expand All @@ -103,43 +102,8 @@ unsigned ProtoRowBuffer::fetch(char* buffer, unsigned bufLen) {
return fetched;
}

/// Import schema from the proto message into a Schema object
void ProtoRowBuffer::_initSchema() {
_schema.columns.clear();

// Set jobId and attemptCount
sql::ColSchema jobIdCol;
jobIdCol.name = _jobIdColName;
jobIdCol.colType.sqlType = _jobIdSqlType;
jobIdCol.colType.mysqlType = _jobIdMysqlType;
_schema.columns.push_back(jobIdCol);

proto::RowSchema const& prs = _result.rowschema();
for (int i = 0, e = prs.columnschema_size(); i != e; ++i) {
proto::ColumnSchema const& pcs = prs.columnschema(i);
sql::ColSchema cs;
if (pcs.has_name()) {
cs.name = pcs.name();
}
if (not pcs.has_sqltype()) {
throw util::Bug(ERR_LOC, "_initSchema _result missing sqltype");
}
cs.colType.sqlType = pcs.sqltype();
if (pcs.has_mysqltype()) {
cs.colType.mysqlType = pcs.mysqltype();
}
_schema.columns.push_back(cs);
}
}

std::string ProtoRowBuffer::dump() const {
std::string str("ProtoRowBuffer schema(");
for (auto sCol : _schema.columns) {
str += "(Name=" + sCol.name;
str += ",colType=" + sCol.colType.sqlType + ":" + std::to_string(sCol.colType.mysqlType) + ")";
}
str += ") ";
str += "Row " + std::to_string(_rowIdx) + "(";
std::string str("ProtoRowBuffer Row " + std::to_string(_rowIdx) + "(");
str += printCharVect(_currentRow);
str += ")";
return str;
Expand Down
3 changes: 0 additions & 3 deletions src/rproc/ProtoRowBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
// Qserv headers
#include "mysql/RowBuffer.h"
#include "proto/worker.pb.h"
#include "sql/Schema.h"

namespace lsst::qserv::rproc {

Expand Down Expand Up @@ -125,7 +124,6 @@ class ProtoRowBuffer : public mysql::RowBuffer {

private:
void _initCurrentRow();
void _initSchema();
void _readNextRow();
// Copy a row bundle into a destination STL char container
template <typename T>
Expand All @@ -149,7 +147,6 @@ class ProtoRowBuffer : public mysql::RowBuffer {
std::string _nullToken; ///< Null indicator (e.g. \N)
proto::Result& _result; ///< Ref to Resultmessage

sql::Schema _schema; ///< Schema object
int _rowIdx; ///< Row index
int _rowTotal; ///< Total row count
std::vector<char> _currentRow; ///< char buffer representing current row.
Expand Down
18 changes: 5 additions & 13 deletions src/wbase/TransmitData.cc
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
#include "global/LogContext.h"
#include "proto/ProtoHeaderWrap.h"
#include "util/Bug.h"
#include "util/InstanceCount.h"
#include "util/MultiError.h"
#include "util/StringHash.h"
#include "wbase/Task.h"
Expand Down Expand Up @@ -71,7 +72,6 @@ proto::ProtoHeader* TransmitData::_createHeader() {
hdr->set_size(0);
hdr->set_md5(util::StringHash::getMd5("", 0));
hdr->set_wname(getHostname());
hdr->set_largeresult(false);
hdr->set_endnodata(true);
return hdr;
}
Expand Down Expand Up @@ -146,23 +146,17 @@ xrdsvc::StreamBuffer::Ptr TransmitData::getStreamBuffer(Task::Ptr const& task) {
return xrdsvc::StreamBuffer::createWithMove(_dataMsg, task);
}

void TransmitData::_buildHeader(bool largeResult) {
void TransmitData::_buildHeader(lock_guard<mutex> const& lock) {
LOGS(_log, LOG_LVL_DEBUG, _idStr << "TransmitData::_buildHeader");

// The size of the dataMsg must include space for the header for the next dataMsg.
_header->set_size(_dataMsg.size() + proto::ProtoHeaderWrap::getProtoHeaderSize());
// The md5 hash must not include the header for the next dataMsg.
_header->set_md5(util::StringHash::getMd5(_dataMsg.data(), _dataMsg.size()));
_header->set_largeresult(largeResult);
_header->set_endnodata(false);
}

void TransmitData::buildDataMsg(Task const& task, bool largeResult, util::MultiError& multiErr) {
lock_guard<mutex> lock(_trMtx);
_buildDataMsg(task, largeResult, multiErr);
}

void TransmitData::_buildDataMsg(Task const& task, bool largeResult, util::MultiError& multiErr) {
void TransmitData::buildDataMsg(Task const& task, util::MultiError& multiErr) {
QSERV_LOGCONTEXT_QUERY_JOB(task.getQueryId(), task.getJobId());
LOGS(_log, LOG_LVL_INFO,
_idStr << "TransmitData::_buildDataMsg rowCount=" << _rowCount << " tSize=" << _tSize);
Expand All @@ -181,16 +175,14 @@ void TransmitData::_buildDataMsg(Task const& task, bool largeResult, util::Multi
_result->SerializeToString(&_dataMsg);
// Build the header for this message, but this message can't be transmitted until the
// next header has been built and appended to _transmitData->dataMsg. That happens
// later in SendChannelShared.
_buildHeader(largeResult);
// later in ChannelShared.
_buildHeader(lock);
}

void TransmitData::initResult(Task& task, std::vector<SchemaCol>& schemaCols) {
lock_guard<mutex> lock(_trMtx);
_result->set_queryid(task.getQueryId());
_result->set_jobid(task.getJobId());
_result->mutable_rowschema();

if (task.getSession() >= 0) {
_result->set_session(task.getSession());
}
Expand Down
Loading

0 comments on commit fab0082

Please sign in to comment.