Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

U/jgates/loader #426

Open
wants to merge 49 commits into
base: main
Choose a base branch
from
Open

U/jgates/loader #426

wants to merge 49 commits into from

Conversation

jgates108
Copy link
Contributor

No description provided.


/// If the buffer already contains valid data, advanceWriteCursor()
/// must be called with the length of the valid data. Otherwise,
/// BufferUdp thinks it has an empty buffer.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this seems hostile to the user. Why not call the function inside the body of this ctor?

void advanceWriteCursor(size_t len) {
_wCursor += len;
if (not isAppendSafe(0)) {
throw new std::overflow_error("BufferUdp advanceWriteCursor beyond buffer len=" +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are you throwing an exception pointer instead of an exception instance? AFAIK this is not conventional.

_rCursor += len;
if (_rCursor > _end) {
throw new std::overflow_error("BufferUdp advanceReadCursor beyond buffer len=" +
std::to_string(len));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do you want to be sure the read cursor does not advance to or beyond the write cursor?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If everything has been passed properly to asio, it shouldn't go outside the buffer. But if it has, this lets us know.

// EOF is only a problem if no MsgElement was retrieved.
// ??? This is definitely the case in UDP, EOF as nothing more will show up.
// ??? But TCP is another issue as EOF is returned when the connection is still live but
// ??? there's no data (len=0). Why does read_some set error to eof before the tcp connection is closed?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah; according to the docs eof is because the peer closed the connection. You're sure the peer didn't close it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm pretty sure I've seen the warning below show up in the log, but it has been a little while since I went looking for it.


/// A buffer for reading and writing. Nothing can be read from the buffer until
/// something has written to it.
class BufferUdp {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

are there unit tests for this class? it seems like it would be valuable to have some.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are test for UdpBuffer at the beginning of udpTest.cc. I think there's some room for discussion for the best way to setup unit tests for this.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You mentioned in that file that it's an integration test. There is a lot of code here without testing at the 'unit' granularity, which I think is an issue.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made a separate ticket for creating unit tests DM-16649

}


virtual std::string getOurLogId() { return "baseclass"; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe an explanation of the name "baseclass" could go here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the method needs to be declared const

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should really be removed. Digging through the log was getting confusing as everything wound up in the same log and it was sometimes difficult to discern what class or object wrote the log statement.

boost::asio::io_service& _ioService;

/// Initialization order is important.
DoList _doList{*this}; ///< List of items to be checked at regular intervals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

passing this to another object during initialization is dangerous.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree with that!
Especially after reading the comment above: Initialization order is important.

One alternative would be to dynamically create this object from the body of the class's c-tor after finishing all implicit & explicit initializations of the class's members. The other one would be to extend class DoList with a special setCentral(Central*) method for late binding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is there is safe, since the this pointer passed to doList wont be used until the last member '_checkDoListThread' is defined. This is somewhat fragile in that moving the _checkDoListThread declaration can cause a race condition.

The alternative is to break construction into two parts. The regular constructor is called and then a second step, I'll call go(), is called, which sets this pointer and creates the thread and moves in into _checkDoListThread. I'm ok with either. Factory functions don't really work well with base classes, since the child constructors wont call them.


std::atomic<uint64_t> _sequence{1};

util::CommandQueue::Ptr _queue{new util::CommandQueue()};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

prefer make_shared to new

std::atomic<uint64_t> _sequence{1};

util::CommandQueue::Ptr _queue{new util::CommandQueue()};
util::ThreadPool::Ptr _pool{util::ThreadPool::newThreadPool(10, _queue)};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's easy to miss that _queue is new'ed in braces above, and then used in the braces init here, and if the order of these 2 lines gets reversed that could lead to crashes. A comment when doing this kind of thing can be helpful.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the "magic number" 10? Could this be put into some configuration service external to teh class Central or be passed into the constructor of class Central?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It goes back to the initialization order being important comment. The choice being between construction order being important, but hidden in the base class. Or, having the complexity of multistage construction that is passed to all the child classes and cannot be hidden.

while(_loop) {
// Run and then sleep for a second. A more advanced timer should be used
_doList.checkList();
usleep(100000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not use a condition variable instead of a timer?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because it isn't really waiting for anything to happen. The sleep is mostly there to keep this from wasting CPU cycles if _doList is empty, and most of the items on the list have waits (from rate limiting) far longer that 0.1 seconds, so going through the list can easily result in no DoList items actually doing anything.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

but there should be a way a member of the list can trigger the condition variable so when there is work to do the loop runs.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The loop always run except for the brief period of time it sleeps (0.1 seconds). Most of the items on the list create a command that is then run in the thread pool. The list itself is not low latency, but the commands that are run by the list can be low latency. All the DoListItems just create commands and add them to the pool, which is pretty quick. Synchronization exists within the running commands, so they can be responsive. The DoList itself does not need to be responsive.

std::mutex _listMtx; ///< Protects _list (lock this one first)

std::list<DoListItem::Ptr> _addList;
std::mutex _addListMtx; ///< Protects _addList (lock this one second)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thank you for the comments and the narrow scope of the mutex. Big applause!

public:
using Ptr = std::shared_ptr<DoListItem>;

DoListItem() = default;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Igor recommended to me a pattern that I found to be useful when using enable_shared_from_this: make the constructors private and write a static factory function makeNewDoListItem() that returns a populated DoListItem::Ptr. This prevents anyone from creating a DoListItem that is not owned by a shared_ptr, and since calling shared_from_this on a non-pointer-owned-object leads to a misleading error ("bad weak_ptr" if I remember correctly), this prevents you from having to debug that issue & captures it at compile time.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have used that technique and I like it. I think making the constructor private causes problems with child classes, so forcing the use of a factory function becomes tricky. Maybe protected would work, but the child classes would also need factories.

_list.splice(_list.end(), _addList);
}
for (auto iter = _list.begin(); iter != _list.end(); ++iter){
DoListItem::Ptr const& item = *iter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can replace both these lines with
for (auto&& item : _list) {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(item will be a DoListItem::Ptr)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to be DoListItem::Ptr&

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I need the iterator to erase elements without having to do a second lookup.

LOGS(_log, LOG_LVL_DEBUG, "queuing command");
_central.queueCmd(cmd);
} else {
if (item->removeFromList()) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is removeFromList really more like shouldRemoveFromList?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as it is, the name reads like a command to me. But from its definition and the usage here, I think it's actually a question...?

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comments for class BufferUdp.

boost::asio::io_service& _ioService;
boost::asio::ip::udp::socket _socket;
boost::asio::ip::udp::endpoint _senderEndpoint;
// char _data[MAX_MSG_SIZE];
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this still needed?

// Class header
#include "BufferUdp.h"

// system headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs to be delete because no system headers are explicitly included here



// Third-party headers

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment needs to be delete because no third-party headers are explicitly included here


/// The absolute largest UDP message we would send.
/// Usually, they should be much smaller.
#define MAX_MSG_SIZE 6000
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've looked through the code, and I'm not seeing a case requiring this to be defined as a macro. Perhaps it could be moved as a static data members of class BufferUdp?

#include <arpa/inet.h>
#include <cstring>
#include <stdexcept>
#include <memory>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header needs to be moved one line up

namespace qserv {
namespace loader {

/// Repeatedly read a socket until a valid MsgElement is read, eof, or an error occurs.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Normally this comment is expected in the header file.

* see <http://www.lsstcorp.org/LegalNotices/>.
*/


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line?

// qserv headers
#include "loader/LoaderMsg.h"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line?

// LSST headers
#include "lsst/log/Log.h"


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line?

core/modules/loader/BufferUdp.cc Show resolved Hide resolved
Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished reviewing class Central


// system headers
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers belong to a dedicated group of:

// The third-party headers

#include "util/ThreadPool.h"



Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line

ChunkSubchunk(int chunk_, int subchunk_) : chunk(chunk_), subchunk(subchunk_) {}
int const chunk;
int const subchunk;
friend std::ostream& operator<<(std::ostream& os, ChunkSubchunk csc);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Three problems with this operator:

  1. it doe not need to be declared as friend because both data members of struct ChunkSubchunk ar public.
  2. its second parameter needs to be declared as ChunkSubchunk const& csc
  3. the operator needs to be moved out of the struct, probably right after the declaration of the struct

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the friend designation is harmless and the definition location helps associate the function with the struct.

boost::asio::io_service& _ioService;

/// Initialization order is important.
DoList _doList{*this}; ///< List of items to be checked at regular intervals.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

100% agree with that!
Especially after reading the comment above: Initialization order is important.

One alternative would be to dynamically create this object from the body of the class's c-tor after finishing all implicit & explicit initializations of the class's members. The other one would be to extend class DoList with a special setCentral(Central*) method for late binding?

std::atomic<uint64_t> _sequence{1};

util::CommandQueue::Ptr _queue{new util::CommandQueue()};
util::ThreadPool::Ptr _pool{util::ThreadPool::newThreadPool(10, _queue)};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why the "magic number" 10? Could this be put into some configuration service external to teh class Central or be passed into the constructor of class Central?



namespace {
LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.Central");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LSST Logger is not used by the implementation of the class

#include "loader/MasterServer.h"
#include "loader/MWorkerList.h"
#include "loader/WWorkerList.h"
#include "loader/WorkerServer.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header needs to be moved one line up


void run();

std::string getMasterHostName() const { return _masterAddr.ip; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should tis method return std::string const& instead?

}


virtual std::string getOurLogId() { return "baseclass"; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And the method needs to be declared const


protected:
/// Repeatedly check the items on the _doList.
void _checkDoList();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why tis method seats in the protected area instead of the private one? My understanding is that a thread created in the class's constructor is the only client of the method.

And, by the way, members which are declared as protected should NOT begin with symbol _ (as per the LSST Style Guide).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

" protected should NOT begin with symbol _" bleh, so much for being able to easily discern member variables in class functions.

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed class CentralClient

* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line


// system headers
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers qualify as the "Third-party headers".
Also, this header needs to be "promoted" one line up.

namespace qserv {
namespace loader {

class KeyInfoData : public util::Tracker {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document this class. It's role, contexts in which it's used and how it's going to be used.

int subchunk;
bool success{false};

friend std::ostream& operator<<(std::ostream& os, KeyInfoData const& data);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not to be friend because all members of the class are public. Besides, the operator may be moved outside the class.

};

/// TODO Maybe base this one CentralWorker or have a common base class?
class CentralClient : public Central {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document this class.

void CentralClient::handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInsertComplete");

StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

auto?

core/modules/loader/CentralClient.cc Show resolved Hide resolved
core/modules/loader/CentralClient.cc Outdated Show resolved Hide resolved
// Insert a oneShot DoListItem to keep trying to add the key until
// we get word that it has been added successfully.
LOGS(_log, LOG_LVL_INFO, "Trying to insert key=" << key << " chunk=" << chunk <<
" subchunk=" <<subchunk);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing space after << ?

... <<subchunk);

/// Returns a pointer to a Tracker object that can be used to track job
// completion and the status of the job. keyInsertOneShot will call
// _keyInsertReq until it knows the task was completed, via a call
// to _handleKeyInsertComplete
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The method documentation needs to be in the header file.

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed class CentralMaster

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_CENTRALMASTER_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition. It should be:

LSST_QSERV_LOADER_CENTRALMASTER_H

* You should have received a copy of the LSST License Statement and
* the GNU General Public License along with this program. If not,
* see <http://www.lsstcorp.org/LegalNotices/>.
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line


// system headers
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promote this header one line up

#define LSST_QSERV_LOADER_CENTRALMASTER_H_

// system headers
#include <boost/bind.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers are the "Third party" headers, not the "system" ones


// Qserv headers
#include "loader/Central.h"

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line

throw LoaderMsgErr(funcName + " Multiple rightMost workers " +
" name=" + std::to_string(rightMostName) +
" name=" + std::to_string(item->getName()),
__FILE__, __LINE__);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a nice exception class which captures a context where exception need to report the one:

util::Issue

// TODO Make a better algorithm, insert workers at busiest worker.
// TODO maybe rate limit this check.
std::string funcName("_assignNeighborIfNeeded");
if (_addingWorker) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It feels like that check be better done after acquiring a lock on a mutex. Otherwise there is a chance of running into a race condition.

auto pair = _mWorkerList->getActiveInactiveWorkerLists();
std::vector<MWorkerListItem::Ptr>& activeList = pair.first;
std::vector<MWorkerListItem::Ptr>& inactiveList = pair.second;
if (inactiveList.empty() || _addingWorker) { return; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The _addingWorker condition was already checked earlier

Copy link
Contributor Author

@jgates108 jgates108 Nov 9, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The check on _addingWorker (which is atomic) earlier is meant to avoid trying to trying to lock _assignMtx, but as you said there is a race condition, so this second check is needed.

Unfortunately, it looks like _addingWorker is is never set back to false, which will take some effort.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

if (inactiveList.empty() || _addingWorker) { return; }
double sum = 0.0;
int max = 0;
uint32_t maxName = 0;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This variable should be named as workerNumber

}
}

MWorkerListItem::Ptr CentralMaster::getWorkerNamed(uint32_t name) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That name is not consistent with its numeric type. Should be workerNumber. And the same applies to the name of the method.

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed classes CentralWorker and ClientServer

core/modules/loader/CentralClient.cc Show resolved Hide resolved
auto pair = _mWorkerList->getActiveInactiveWorkerLists();
std::vector<MWorkerListItem::Ptr>& activeList = pair.first;
std::vector<MWorkerListItem::Ptr>& inactiveList = pair.second;
if (inactiveList.empty() || _addingWorker) { return; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍


// system headers
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers are the "Third-party headers"

*
*/
#ifndef LSST_QSERV_LOADER_CENTRAL_WORKER_H_
#define LSST_QSERV_LOADER_CENTRAL_WORKER_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra '_' in the end of the macro definition


namespace proto {
class WorkerKeysInfo;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Usually we add a comment to the namespace closing parenthesis

} // proto





Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Too many empty lines in the end of the file

std::unique_ptr<proto::LdrMsgReceived> protoBuf;
if (success) {
protoBuf = seData->protoParse<proto::LdrMsgReceived>();
if (protoBuf == nullptr) { success = false; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curly brackets are not needed here.

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_CLIENTSERVER_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro-definition

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers are the third-party headers

class CentralClient;


class ClientServer : public ServerUdpBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document the class and its methods!
Please, add deleted:

ClientServer(ClientServer const&) = delete;
ClientServer& operator=(ClientServer const&) = delete;

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed class DoList and other classes in its header

_list.splice(_list.end(), _addList);
}
for (auto iter = _list.begin(); iter != _list.end(); ++iter){
DoListItem::Ptr const& item = *iter;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's going to be DoListItem::Ptr&

// System headers
#include <iostream>

// Third-party headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, remove unused comment

if (item == nullptr) return false;
if (item->isAlreadyOnList()) return false; // fast atomic test
{
std::lock_guard<std::mutex> lock(_addListMtx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way this method locks the mutex contradicts to the comments to the data members. My understanding is that _listMtx should be always locked first like it's done by method DoList::checkList()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you have to lock both of them, you must lock _list first. If you know only one will be locked, you only need to lock one of them.

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_DOLIST_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition

using TimePoint = std::chrono::system_clock::time_point;
using Clock = std::chrono::system_clock;

TimeOut(std::chrono::milliseconds timeOut) : _timeOut(timeOut) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is implicit conversion construction (from std::chrono::milliseconds to TimeOut) expected here?

private:
/// Lock _mtx before calling.
bool _isOneShotDone() {
return (!_needInfo && _oneShot);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Parentheses are not needed here

virtual util::CommandTracked::Ptr createCommand()=0;

protected:
std::atomic<bool> _addedToList{false}; ///< True when added to a DoList
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think protected members should NOT begin with _. Here is the link to the relevant section of the Style guide:
https://developer.lsst.io/cpp/style.html#the-parts-of-a-class-must-be-sorted-public-protected-and-private

And this is for private members:
https://developer.lsst.io/cpp/style.html#private-class-variables-and-methods-must-have-underscore-prefix

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed class LoaderMsg.

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_LOADERMSG_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition.


/* &&&
// TODO Add more information
class LoaderMsgErr : public std::exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you still need the older version of this class after switching its base to util::Issue?

};
*/

class LoaderMsgErr : public util::Issue {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

using Ptr = std::shared_ptr<MsgElement>;
enum {
NOTHING = 0,
STRING_ELEM = 1,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you need to explicitly enumerate the rest? Are these numbers related to some enum in the ProtoBuf schema?

static bool equal(MsgElement* a, MsgElement* b) {
if (a == b) return true;
if (a == nullptr || b == nullptr) return false;
if (a->_elementType != b->_elementType) { return false; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curly brackets are not needed.
Same applies to the next line.

if (not success) {
return nullptr;
}
return protoItem;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you like it to be compact then the last 5 lines could be done as:

return proto::ProtoImporter<T>::setMsgFrom(*protoItem, element.data(), element.length()) ?
       protoItem : nullptr;

//
class LoaderMsg {
public:
enum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does it define? States of in the communication FSA? Or just a bag of constance which would be a VERY BAD idea :) In that (the later) case I would recommend splitting that "bag" into a few named enum types grouping constants accordingly.

Also, why it's anonymous?

I always find it useful to complement these enum types with a static function:

enum States {
  IDLE = 0,
  IN_PROGRESS,
  ...
};
static std::string state2str(State state);

This is very handy if you need to dump enums into the log streams.

SHIFT_FROM_RIGHT_RECEIVED
};

enum {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This type should be given some name.

StringElement::Ptr senderHost;
UInt32Element::Ptr senderPort;

friend std::ostream& operator<<(std::ostream& os, LoaderMsg const& loaderMsg);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No need to be a friend because all data members of the class are public.

// The message contains the message kind and the address of the entity sending
// the message.
//
class LoaderMsg {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document methods and data members of the class!

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished reviewing class MWorkerList

#include <mutex>

// Qserv headers
#include "loader/Updateable.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, move this header at the bottom of the sorted list of headers in this group!

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_MWORKERLIST_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition.

}

uint32_t getId() const {
std::lock_guard<std::mutex> lck(_mtx);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think you need a lock for primitive types if their values are set 'in stone' by class's constructors.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While you will never get just part of a primitive type, if there is another function setting its value, I'd rather wait until that function is done before reading it and that is best done with a mutex. While this isn't the best example of that being an issue, I'd rather be safe than sorry when it comes to synchronization errors, and optimization can come later.



/// Standard information for a single worker, IP address, key range, timeouts.
class MWorkerListItem : public std::enable_shared_from_this<MWorkerListItem> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document methods of the class, including their parameters!

_tcpAddress(new NetworkAddress(tcpAddress)),
_central(central) {}

uint32_t _wId; ///< Worker Id
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make this const:

uint32_t const _wId; ///< Worker Id

os << " " << *elem.second << "\n";
}
os << "MWorkerList ip:\n";
for (auto elem:_ipMap) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do (as suggested for a loop above):

for (auto&& elem:_ipMap) {


void MWorkerListItem::flagNeedToSendList() {
auto slw = _sendListToWorker;
if (slw != nullptr) { slw->setNeedInfo(); }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Curly brackets are not needed here.



void MWorkerListItem::flagNeedToSendList() {
auto slw = _sendListToWorker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread safe?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Making a copy of the shared_ptr is a way to make sure the shared_ptr is not modified (reset, or reassigned) while it is in use. As far as I can tell the pointer object is threadsafe (while it does not make any guarantees about the pointed-to object) so this should be thread safe.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

However, I can't tell that this copying is necessary, since once an object is assigned to the ptr, that ptr is never changed, so unless I'm missing something I think you could just say if (_sendListToWorker != nullptr) { _sendListToWorker->setNeedInfo(); }

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is just a check to make sure there isn't a segfault.
Once _sendListToWorker has been set, it shouldn't be changed unless this entire MWorkerListItem is removed and all calls like this become moot.
I don't thing there's a way to make this obvious in the code but I'll add some comments.



void MWorkerListItem::sendListToWorkerInfoReceived() {
auto slw = _sendListToWorker;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread safe?



std::ostream& operator<<(std::ostream& os, MWorkerListItem const& item) {
os << "name=" << item._wId << " address=" << *item._udpAddress << " range(" << item._range << ")";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this thread safe? My understanding is that worker's ranges may change, and the range is ot a trivial data type.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

other places that use _range take a lock. That should be done here too I think?

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed class MasterServer

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_MASTERSERVER_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers are the Third-party headers, not the system ones

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also promote the asio one line up


// Qserv headers
#include "loader/ServerUdpBase.h"
#include "loader/MWorkerList.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promote this header one line up

class LoaderMsg;
class CentralMaster;

class MasterServer : public ServerUdpBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document this class and its methods!

int status, std::string const& msgTxt); // TODO shows up in both MasterServer and WorkerServer

private:
CentralMaster* _centralMaster;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be:

CentralMaster* const _centralMaster;

LOGS(_log, LOG_LVL_INFO, "MasterServer::parseMsg sender " << senderEndpoint <<
" kind=" << inMsg.msgKind->element << " data length=" << data->getAvailableWriteLength());
switch (inMsg.msgKind->element) {
case LoaderMsg::MSG_RECEIVED:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

case should be indented 4 spaces from a position of switch

sendBufferTo(inMsg.senderHost->element, inMsg.senderPort->element, *reply);
return nullptr;
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Perhaps extra line?

BufferUdp::Ptr MasterServer::workerKeysInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data,
boost::asio::ip::udp::endpoint const& senderEndpoint) {

std::string funcName("MasterServer::workerKeysInfo");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could be:

std::string const funcName(...);

LOGS(_log, LOG_LVL_INFO, funcName << " Master got wId=" << workerId);

/// Find the worker name in the map.
auto workerItem = _centralMaster->getWorkerWithId(protoItem->wid());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not to use workerId which you got 2 lines above?

auto workerItem = _centralMaster->getWorkerWithId(workerId);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Alternatively, just don't cache workerItem and use protoItem->wid() everywhere, especially if this operation is cheap.

// Send the response to the worker that asked for it.
_centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf);

} catch (LoaderMsgErr &msgErr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoaderMsgErr const& msgErr

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed class NetworkAddress

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_NEIGHBOR_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition

#define LSST_QSERV_LOADER_NEIGHBOR_H_

// system headers
#include <boost/bind.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers belong to the "Third party" headers


// system headers
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move this header up

Neighbor() = delete;
explicit Neighbor(Type t) : _type(t) {}

std::string getTypeStr() { return _type == LEFT ? "LEFT" : "RIGHT"; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be const method

_addressTcp.reset(new NetworkAddress(addr));
}

NetworkAddress getAddressTcp() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be const method

StringElement::Ptr data = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*bufData));

if (data == nullptr) {
LOGS(_log, LOG_LVL_WARN, "NetworkAddress::create data==nullptr " + note);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be LOG_LVL_ERROR instead?
Or perhaps an exception thrown?

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_NETWORKADDRESS_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra_ in the end of the macro definition




friend std::ostream& operator<<(std::ostream& os, NetworkAddress const& adr);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does not need to be friend because all data members are public


friend std::ostream& operator<<(std::ostream& os, NetworkAddress const& adr);
};

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line


}}} // namespace lsst::qserv::loader


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

extra line

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Finished reviewing files ServerTcpBase.(h.cc)

# Harvest special binary products - files starting with the package's name
# followed by underscore:
#
# qserv-<something>.cc
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix the comment to:

#   udp<something>.cc

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_SERVER_TCP_BASE_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition


// system headers
#include <boost/asio.hpp>
#include <boost/bind.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Boost headers are the third-party headers


class ServerTcpBase;

class TcpBaseConnection : public std::enable_shared_from_this<TcpBaseConnection> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document this class and its methods.
Also, move inline implementations into the CC files. This will make the header more observable and easier to compile.

Explicitly delete:

TcpBaseConnection() = delete;
TcpBaseConnection(TcpBaseConnection const&) = delete;
TcpBaseConnection& operator=(TcpBaseConnection const&) = delete;

TcpBaseConnection(boost::asio::io_context& io_context, ServerTcpBase* tcpBase) :
_socket(io_context), _serverTcpBase(tcpBase) {}

void _readKind(const boost::system::error_code&, size_t /*bytes_transferred*/);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move const after the type and give a name to the parameter:

boost::system::error_code const& ec

}
// Extract key pairs from the protobuffer
int keyCount = protoKeyList->keycount(); // TODO delete keycount from KeyList
int sz = protoKeyList->keypair_size();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's safer to declare it const:

int const sz = protoKeyList->keypair_size();

Just in case.

ServerTcpBase::writeData(_socket, _buf);
LOGS(_log, LOG_LVL_INFO, funcName << " done dumpKeys " <<
_serverTcpBase->getCentralWorker()->dumpKeysStr(2));
} catch (LoaderMsgErr &msgErr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoaderMsgErr const& msgErr

return;
}
boost::system::error_code ecode;
_readKind(ecode, 0); // get next message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already made a comment earlier on calling this method explicitly. Consider refactoring _readKind

return;
}
boost::system::error_code ecode;
_readKind(ecode, 0); // get next message
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've already made a comment earlier on calling this method explicitly. Consider refactoring _readKind

_serverTcpBase->getCentralWorker()->finishShiftFromRight();
LOGS(_log, LOG_LVL_INFO, funcName << " done dumpKeys " <<
_serverTcpBase->getCentralWorker()->dumpKeysStr(2));
} catch (LoaderMsgErr &msgErr) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LoaderMsgErr const& msgErr

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed class ServerUdpBase. And I'm finding the implementation of method ServerUdpBase::sendBufferTo as potentially dangerous. Could you, please, revisit it?

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_SERVER_UDP_BASE_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition.

namespace loader {


class ServerUdpBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document this class and its methods.
Consider deriving this class from std::enable_shared_from_this. The add a static factory method and made the constructor private.

// char _data[MAX_MSG_SIZE];
BufferUdp::Ptr _data; ///< data buffer for receiving
BufferUdp::Ptr _sendData; ///< data buffer for sending.
std::string _hostName;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

std::string const _hostName;

BufferUdp::Ptr _data; ///< data buffer for receiving
BufferUdp::Ptr _sendData; ///< data buffer for sending.
std::string _hostName;
int _port;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make it const

boost::asio::ip::udp::endpoint resolve(std::string const& hostName, int port);

protected:
std::atomic<uint32_t> _errCount{0};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove _ which is reserved for private members only

#include <iostream>
#include <unistd.h>

// Third-party headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused comment

_receivePrepare();
}
} else {
LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::_receiveCallback got empty message, ignoring");
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's not technically true, because this also cover a case of a error in the communication. Hence the right logic should be:

if (error) {
  # extract error message for the error and report it.
} else if (bytesRecvd == 0) {
  # complain about 0-length message like you do
} else {
  # normal processing
}

Please, revisit the rest of your code to see if you have the same problem in there as well.

std::condition_variable cv;
bool done = false;

auto callbackFunc = [&mtx, &cv, &done](const boost::system::error_code& error, std::size_t bytesTransferred) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is VERY DANGEROUS!!! And this could be a BUG in the application. What you're doing is passing a reference onto a stack variable into a function which is going to be called asynchronously from some other thread at a time when this method will finish and its stack variable will disappear.
You may solve his problem by making these variables data members of the class.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In this case it is ok. This function waits for the response before it returns, so all of the references are valid for the life of the callback. I will add a comment.

};

ip::udp::endpoint dest(boost::asio::ip::address::from_string(hostName), port);
_socket.async_send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why ASYNC I/O if you're going to wait on the CV below?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I need to use async io since _socket is not thread safe. Asio will order async writes so they don't cause problems. I'm not sure asio lets you mix synchronous and async use of the socket for this but I was having communication issues, and this appears to work.

Copy link
Contributor

@fritzm fritzm Dec 6, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmmm... Really should be the same as send_to... This extra complication worries me, and makes me wonder if this "fixed" the problem, or just kicked it down the road 🤔 If you haven't tried send_to here instead recently, it might be worth another try in case you addressed the issue you were seeing in the meantime elsewhere?

But if you stick with async_send_to, a style suggestion (YMMV): I'd just go ahead and put the lambda directly inline in the call to async_send_... (per comment above) in a situation like this. You are only "using" it once, and nesting it inside the async_send_to in my mind makes it more clear when reading the code when it is going to be invoked.

}


void ServerUdpBase::_sendCallback(const boost::system::error_code& error, size_t bytes_sent) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move const to the right:

boost::system::error_code const& error

Copy link
Contributor

@iagaponenko iagaponenko left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed files StringRange.(h,cc), and I think it needs to be redesigned.

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move Boost headers into the third-party category.
Sort them alphabetically.


/// Return true if other functionally equivalent.
bool equal(StringRange const& other) const {
if (_valid != other._valid) { return false; }
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

According to the LSST Style Guide, the curly brackets are not needed in this line and in other if statements in this and other methods.

namespace loader {

/// Class for storing the range of a single worker.
/// This is likely to become a template class, hence lots in the header.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In my view, a problem with this intent (to carry a specific numeric, string, or even a composite (like struct) type as a template parameter) has certain implications - you would need to pass that specific type around your application Range<int64_t>. This will essentially make a half of your code which depends on this type, moves values of that range around, makes them or produces them (to be sent further) very messy and template(-ized). I've already seen many uses of the StringRange type in various classes of this code. Besides I've notices a lot of dynamic type casts into the string type, which is already a precursor of troubles in the future.

My recommendation here is to model indexes as a hierarchy:

  1. some base interface Index which would declare pure virtual comparison methods (not operators) like eqal, less, etc.

  2. a family of derived classes (possibly with the help of templates) which would be storing actual values of the indexes and implement the above defined virtual methods.

Then you can write your class Range as a regular (non-template) class implemented i terms of the base interface Index. And most of your code which moves values of indexes around and operates on ranges (objects of class Range would be totally independent of an underlining type (a subclass of Index.

The only other area where you will have to deal with specific subtypes would be some small code which will be serializing those objects into the Protobuf structures and deserializing protobuf structures into the objects deriving from Index (a factory).

You may also need to think about how to implement these types in the Protobuf w/o adding extra complexity (you may have already thought about this problem) in order to support a spectrum of index types w/o recoding the application each time a new index is added.

This will make your code observable, easy to compile (remember ugly compilation errors at a presence of templates), and yet type safe.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

StringRange is used as the key to a map and it needs to follow certain rules to do so, such as having operator<() defined. Much of what you suggest is probably still possible (I don't think I've tried a pure virtual operator before). This class is meant primarily to get things up and running and will be replaced by templates or baseclass and children, or some mixture of the two at some point. At this point, I have a better idea of what the limitations are.
protobuffs are not terribly friendly for this sort of thing, either.

This is not really in the immediate scope of problems I want to tackle, but I am thinking about it.

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_STRINGRANGE_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition.

@@ -0,0 +1,126 @@
/*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the protocol needs to be refined to avoid depending on optional choice of the string versus uint64 index types. There should be an explicit negotiation at some index setup/startup time between clients, master and workers before the index gets created from scratch.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It gets worse than that. It's difficult to get away from chunk + subchunk as values. A problem for another day but worth thinking about.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: clean up indentation to 4 spaces consistently throughout?

@@ -11,5 +11,9 @@ env.Protoc(File("replication.proto"),
PROTOC_PATH='.',
PROTOC_CCOUT='.',
PROTOC_PYOUT='.',)
env.Protoc(File("loader.proto"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is the third Protobuf file then we may need to add a rule into our SConsUtils which would automatically inject this type of statements. Probably not for this ticket.

* see <http://www.lsstcorp.org/LegalNotices/>.
*
*/
#ifndef LSST_QSERV_LOADER_UPDATABLE_H_
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra _ in the end of the macro definition

#include <list>
#include <memory>

// Qserv headers
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove unused comment


// Qserv headers


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra line or two

#include <cstdlib>
#include <iostream>
#include <boost/bind.hpp>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This header to be promoted one line up


class CentralWorker;

class WorkerServer : public ServerUdpBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please, document this class and its methods

void _msgRecieved(LoaderMsg const& inMsg, BufferUdp::Ptr const& data,
boost::asio::ip::udp::endpoint const& senderEndpoint);


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One extra empty line?

CentralWorker* _centralWorker;
};


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One extra empty line?


// System headers
#include <iostream>
#include <boost/asio.hpp>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

// The third-party header



std::string StringRange::decrementString(std::string const& str, char minChar) {
if (str == "") {
Copy link
Contributor

@n8pease n8pease Nov 16, 2018

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I think str.empty() is a little better than comparing with an empty string.


std::string StringRange::decrementString(std::string const& str, char minChar) {
if (str == "") {
return "";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: returning string() might be more optimal.

Code cleaned up, system tested with 100k inserts and lookups.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants