-
Notifications
You must be signed in to change notification settings - Fork 18
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
U/jgates/loader #426
base: main
Are you sure you want to change the base?
U/jgates/loader #426
Conversation
core/modules/loader/BufferUdp.h
Outdated
|
||
/// If the buffer already contains valid data, advanceWriteCursor() | ||
/// must be called with the length of the valid data. Otherwise, | ||
/// BufferUdp thinks it has an empty buffer. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this seems hostile to the user. Why not call the function inside the body of this ctor?
core/modules/loader/BufferUdp.h
Outdated
void advanceWriteCursor(size_t len) { | ||
_wCursor += len; | ||
if (not isAppendSafe(0)) { | ||
throw new std::overflow_error("BufferUdp advanceWriteCursor beyond buffer len=" + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are you throwing an exception pointer instead of an exception instance? AFAIK this is not conventional.
core/modules/loader/BufferUdp.h
Outdated
_rCursor += len; | ||
if (_rCursor > _end) { | ||
throw new std::overflow_error("BufferUdp advanceReadCursor beyond buffer len=" + | ||
std::to_string(len)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you want to be sure the read cursor does not advance to or beyond the write cursor?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If everything has been passed properly to asio, it shouldn't go outside the buffer. But if it has, this lets us know.
// EOF is only a problem if no MsgElement was retrieved. | ||
// ??? This is definitely the case in UDP, EOF as nothing more will show up. | ||
// ??? But TCP is another issue as EOF is returned when the connection is still live but | ||
// ??? there's no data (len=0). Why does read_some set error to eof before the tcp connection is closed? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yeah; according to the docs eof
is because the peer closed the connection. You're sure the peer didn't close it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm pretty sure I've seen the warning below show up in the log, but it has been a little while since I went looking for it.
|
||
/// A buffer for reading and writing. Nothing can be read from the buffer until | ||
/// something has written to it. | ||
class BufferUdp { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
are there unit tests for this class? it seems like it would be valuable to have some.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There are test for UdpBuffer at the beginning of udpTest.cc. I think there's some room for discussion for the best way to setup unit tests for this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You mentioned in that file that it's an integration test. There is a lot of code here without testing at the 'unit' granularity, which I think is an issue.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I made a separate ticket for creating unit tests DM-16649
core/modules/loader/Central.h
Outdated
} | ||
|
||
|
||
virtual std::string getOurLogId() { return "baseclass"; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe an explanation of the name "baseclass" could go here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the method needs to be declared const
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should really be removed. Digging through the log was getting confusing as everything wound up in the same log and it was sometimes difficult to discern what class or object wrote the log statement.
core/modules/loader/Central.h
Outdated
boost::asio::io_service& _ioService; | ||
|
||
/// Initialization order is important. | ||
DoList _doList{*this}; ///< List of items to be checked at regular intervals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
passing this
to another object during initialization is dangerous.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree with that!
Especially after reading the comment above: Initialization order is important
.
One alternative would be to dynamically create this object from the body of the class's c-tor after finishing all implicit & explicit initializations of the class's members. The other one would be to extend class DoList
with a special setCentral(Central*)
method for late binding?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is there is safe, since the this pointer passed to doList wont be used until the last member '_checkDoListThread' is defined. This is somewhat fragile in that moving the _checkDoListThread declaration can cause a race condition.
The alternative is to break construction into two parts. The regular constructor is called and then a second step, I'll call go(), is called, which sets this pointer and creates the thread and moves in into _checkDoListThread. I'm ok with either. Factory functions don't really work well with base classes, since the child constructors wont call them.
core/modules/loader/Central.h
Outdated
|
||
std::atomic<uint64_t> _sequence{1}; | ||
|
||
util::CommandQueue::Ptr _queue{new util::CommandQueue()}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
prefer make_shared
to new
core/modules/loader/Central.h
Outdated
std::atomic<uint64_t> _sequence{1}; | ||
|
||
util::CommandQueue::Ptr _queue{new util::CommandQueue()}; | ||
util::ThreadPool::Ptr _pool{util::ThreadPool::newThreadPool(10, _queue)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's easy to miss that _queue
is new'ed in braces above, and then used in the braces init here, and if the order of these 2 lines gets reversed that could lead to crashes. A comment when doing this kind of thing can be helpful.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the "magic number" 10
? Could this be put into some configuration service external to teh class Central
or be passed into the constructor of class Central
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It goes back to the initialization order being important comment. The choice being between construction order being important, but hidden in the base class. Or, having the complexity of multistage construction that is passed to all the child classes and cannot be hidden.
core/modules/loader/Central.cc
Outdated
while(_loop) { | ||
// Run and then sleep for a second. A more advanced timer should be used | ||
_doList.checkList(); | ||
usleep(100000); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why not use a condition variable instead of a timer?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Because it isn't really waiting for anything to happen. The sleep is mostly there to keep this from wasting CPU cycles if _doList is empty, and most of the items on the list have waits (from rate limiting) far longer that 0.1 seconds, so going through the list can easily result in no DoList items actually doing anything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
but there should be a way a member of the list can trigger the condition variable so when there is work to do the loop runs.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The loop always run except for the brief period of time it sleeps (0.1 seconds). Most of the items on the list create a command that is then run in the thread pool. The list itself is not low latency, but the commands that are run by the list can be low latency. All the DoListItems just create commands and add them to the pool, which is pretty quick. Synchronization exists within the running commands, so they can be responsive. The DoList itself does not need to be responsive.
std::mutex _listMtx; ///< Protects _list (lock this one first) | ||
|
||
std::list<DoListItem::Ptr> _addList; | ||
std::mutex _addListMtx; ///< Protects _addList (lock this one second) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thank you for the comments and the narrow scope of the mutex. Big applause!
core/modules/loader/DoList.h
Outdated
public: | ||
using Ptr = std::shared_ptr<DoListItem>; | ||
|
||
DoListItem() = default; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Igor recommended to me a pattern that I found to be useful when using enable_shared_from_this
: make the constructors private and write a static factory function makeNewDoListItem()
that returns a populated DoListItem::Ptr
. This prevents anyone from creating a DoListItem
that is not owned by a shared_ptr
, and since calling shared_from_this
on a non-pointer-owned-object leads to a misleading error ("bad weak_ptr" if I remember correctly), this prevents you from having to debug that issue & captures it at compile time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have used that technique and I like it. I think making the constructor private causes problems with child classes, so forcing the use of a factory function becomes tricky. Maybe protected would work, but the child classes would also need factories.
_list.splice(_list.end(), _addList); | ||
} | ||
for (auto iter = _list.begin(); iter != _list.end(); ++iter){ | ||
DoListItem::Ptr const& item = *iter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can replace both these lines with
for (auto&& item : _list) {
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(item will be a DoListItem::Ptr
)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's going to be DoListItem::Ptr&
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think I need the iterator to erase elements without having to do a second lookup.
core/modules/loader/DoList.cc
Outdated
LOGS(_log, LOG_LVL_DEBUG, "queuing command"); | ||
_central.queueCmd(cmd); | ||
} else { | ||
if (item->removeFromList()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is removeFromList
really more like shouldRemoveFromList
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
as it is, the name reads like a command to me. But from its definition and the usage here, I think it's actually a question...?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Comments for class BufferUdp
.
core/modules/loader/ServerUdpBase.h
Outdated
boost::asio::io_service& _ioService; | ||
boost::asio::ip::udp::socket _socket; | ||
boost::asio::ip::udp::endpoint _senderEndpoint; | ||
// char _data[MAX_MSG_SIZE]; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this still needed?
core/modules/loader/BufferUdp.cc
Outdated
// Class header | ||
#include "BufferUdp.h" | ||
|
||
// system headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment needs to be delete because no system headers are explicitly included here
core/modules/loader/BufferUdp.cc
Outdated
|
||
|
||
// Third-party headers | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This comment needs to be delete because no third-party headers are explicitly included here
core/modules/loader/BufferUdp.h
Outdated
|
||
/// The absolute largest UDP message we would send. | ||
/// Usually, they should be much smaller. | ||
#define MAX_MSG_SIZE 6000 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've looked through the code, and I'm not seeing a case requiring this to be defined as a macro. Perhaps it could be moved as a static data members of class BufferUdp
?
#include <arpa/inet.h> | ||
#include <cstring> | ||
#include <stdexcept> | ||
#include <memory> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This header needs to be moved one line up
core/modules/loader/BufferUdp.cc
Outdated
namespace qserv { | ||
namespace loader { | ||
|
||
/// Repeatedly read a socket until a valid MsgElement is read, eof, or an error occurs. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Normally this comment is expected in the header file.
core/modules/loader/BufferUdp.cc
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
*/ | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line?
core/modules/loader/BufferUdp.cc
Outdated
// qserv headers | ||
#include "loader/LoaderMsg.h" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line?
core/modules/loader/BufferUdp.cc
Outdated
// LSST headers | ||
#include "lsst/log/Log.h" | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished reviewing class Central
core/modules/loader/Central.h
Outdated
|
||
// system headers | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers belong to a dedicated group of:
// The third-party headers
core/modules/loader/Central.h
Outdated
#include "util/ThreadPool.h" | ||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line
core/modules/loader/Central.h
Outdated
ChunkSubchunk(int chunk_, int subchunk_) : chunk(chunk_), subchunk(subchunk_) {} | ||
int const chunk; | ||
int const subchunk; | ||
friend std::ostream& operator<<(std::ostream& os, ChunkSubchunk csc); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Three problems with this operator:
- it doe not need to be declared as
friend
because both data members of structChunkSubchunk
ar public. - its second parameter needs to be declared as
ChunkSubchunk const& csc
- the operator needs to be moved out of the struct, probably right after the declaration of the struct
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the friend designation is harmless and the definition location helps associate the function with the struct.
core/modules/loader/Central.h
Outdated
boost::asio::io_service& _ioService; | ||
|
||
/// Initialization order is important. | ||
DoList _doList{*this}; ///< List of items to be checked at regular intervals. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
100% agree with that!
Especially after reading the comment above: Initialization order is important
.
One alternative would be to dynamically create this object from the body of the class's c-tor after finishing all implicit & explicit initializations of the class's members. The other one would be to extend class DoList
with a special setCentral(Central*)
method for late binding?
core/modules/loader/Central.h
Outdated
std::atomic<uint64_t> _sequence{1}; | ||
|
||
util::CommandQueue::Ptr _queue{new util::CommandQueue()}; | ||
util::ThreadPool::Ptr _pool{util::ThreadPool::newThreadPool(10, _queue)}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why the "magic number" 10
? Could this be put into some configuration service external to teh class Central
or be passed into the constructor of class Central
?
|
||
|
||
namespace { | ||
LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.Central"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LSST Logger is not used by the implementation of the class
#include "loader/MasterServer.h" | ||
#include "loader/MWorkerList.h" | ||
#include "loader/WWorkerList.h" | ||
#include "loader/WorkerServer.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This header needs to be moved one line up
|
||
void run(); | ||
|
||
std::string getMasterHostName() const { return _masterAddr.ip; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should tis method return std::string const&
instead?
core/modules/loader/Central.h
Outdated
} | ||
|
||
|
||
virtual std::string getOurLogId() { return "baseclass"; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And the method needs to be declared const
|
||
protected: | ||
/// Repeatedly check the items on the _doList. | ||
void _checkDoList(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why tis method seats in the protected
area instead of the private
one? My understanding is that a thread created in the class's constructor is the only client of the method.
And, by the way, members which are declared as protected
should NOT begin with symbol _
(as per the LSST Style Guide).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
" protected should NOT begin with symbol _" bleh, so much for being able to easily discern member variables in class functions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed class CentralClient
* You should have received a copy of the LSST License Statement and | ||
* the GNU General Public License along with this program. If not, | ||
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line
core/modules/loader/CentralClient.h
Outdated
|
||
// system headers | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers qualify as the "Third-party headers".
Also, this header needs to be "promoted" one line up.
namespace qserv { | ||
namespace loader { | ||
|
||
class KeyInfoData : public util::Tracker { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document this class. It's role, contexts in which it's used and how it's going to be used.
int subchunk; | ||
bool success{false}; | ||
|
||
friend std::ostream& operator<<(std::ostream& os, KeyInfoData const& data); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not to be friend
because all members of the class are public
. Besides, the operator may be moved outside the class.
}; | ||
|
||
/// TODO Maybe base this one CentralWorker or have a common base class? | ||
class CentralClient : public Central { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document this class.
core/modules/loader/CentralClient.cc
Outdated
void CentralClient::handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { | ||
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInsertComplete"); | ||
|
||
StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
auto
?
core/modules/loader/CentralClient.cc
Outdated
// Insert a oneShot DoListItem to keep trying to add the key until | ||
// we get word that it has been added successfully. | ||
LOGS(_log, LOG_LVL_INFO, "Trying to insert key=" << key << " chunk=" << chunk << | ||
" subchunk=" <<subchunk); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing space after <<
?
... <<subchunk);
core/modules/loader/CentralClient.cc
Outdated
/// Returns a pointer to a Tracker object that can be used to track job | ||
// completion and the status of the job. keyInsertOneShot will call | ||
// _keyInsertReq until it knows the task was completed, via a call | ||
// to _handleKeyInsertComplete |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The method documentation needs to be in the header file.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed class CentralMaster
core/modules/loader/CentralMaster.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_CENTRALMASTER_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition. It should be:
LSST_QSERV_LOADER_CENTRALMASTER_H
* You should have received a copy of the LSST License Statement and | ||
* the GNU General Public License along with this program. If not, | ||
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line
core/modules/loader/CentralMaster.h
Outdated
|
||
// system headers | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Promote this header one line up
core/modules/loader/CentralMaster.h
Outdated
#define LSST_QSERV_LOADER_CENTRALMASTER_H_ | ||
|
||
// system headers | ||
#include <boost/bind.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers are the "Third party" headers, not the "system" ones
|
||
// Qserv headers | ||
#include "loader/Central.h" | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line
core/modules/loader/CentralMaster.cc
Outdated
throw LoaderMsgErr(funcName + " Multiple rightMost workers " + | ||
" name=" + std::to_string(rightMostName) + | ||
" name=" + std::to_string(item->getName()), | ||
__FILE__, __LINE__); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is a nice exception class which captures a context where exception need to report the one:
util::Issue
core/modules/loader/CentralMaster.cc
Outdated
// TODO Make a better algorithm, insert workers at busiest worker. | ||
// TODO maybe rate limit this check. | ||
std::string funcName("_assignNeighborIfNeeded"); | ||
if (_addingWorker) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It feels like that check be better done after acquiring a lock on a mutex. Otherwise there is a chance of running into a race condition.
core/modules/loader/CentralMaster.cc
Outdated
auto pair = _mWorkerList->getActiveInactiveWorkerLists(); | ||
std::vector<MWorkerListItem::Ptr>& activeList = pair.first; | ||
std::vector<MWorkerListItem::Ptr>& inactiveList = pair.second; | ||
if (inactiveList.empty() || _addingWorker) { return; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The _addingWorker
condition was already checked earlier
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The check on _addingWorker (which is atomic) earlier is meant to avoid trying to trying to lock _assignMtx, but as you said there is a race condition, so this second check is needed.
Unfortunately, it looks like _addingWorker is is never set back to false, which will take some effort.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
core/modules/loader/CentralMaster.cc
Outdated
if (inactiveList.empty() || _addingWorker) { return; } | ||
double sum = 0.0; | ||
int max = 0; | ||
uint32_t maxName = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This variable should be named as workerNumber
core/modules/loader/CentralMaster.cc
Outdated
} | ||
} | ||
|
||
MWorkerListItem::Ptr CentralMaster::getWorkerNamed(uint32_t name) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That name
is not consistent with its numeric type. Should be workerNumber
. And the same applies to the name of the method.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed classes CentralWorker and ClientServer
core/modules/loader/CentralMaster.cc
Outdated
auto pair = _mWorkerList->getActiveInactiveWorkerLists(); | ||
std::vector<MWorkerListItem::Ptr>& activeList = pair.first; | ||
std::vector<MWorkerListItem::Ptr>& inactiveList = pair.second; | ||
if (inactiveList.empty() || _addingWorker) { return; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
|
||
// system headers | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers are the "Third-party headers"
core/modules/loader/CentralWorker.h
Outdated
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_CENTRAL_WORKER_H_ | ||
#define LSST_QSERV_LOADER_CENTRAL_WORKER_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra '_' in the end of the macro definition
|
||
namespace proto { | ||
class WorkerKeysInfo; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Usually we add a comment to the namespace closing parenthesis
} // proto
core/modules/loader/ClientServer.cc
Outdated
|
||
|
||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too many empty lines in the end of the file
core/modules/loader/ClientServer.cc
Outdated
std::unique_ptr<proto::LdrMsgReceived> protoBuf; | ||
if (success) { | ||
protoBuf = seData->protoParse<proto::LdrMsgReceived>(); | ||
if (protoBuf == nullptr) { success = false; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly brackets are not needed here.
core/modules/loader/ClientServer.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_CLIENTSERVER_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro-definition
#include <cstdlib> | ||
#include <iostream> | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers are the third-party headers
class CentralClient; | ||
|
||
|
||
class ClientServer : public ServerUdpBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document the class and its methods!
Please, add deleted:
ClientServer(ClientServer const&) = delete;
ClientServer& operator=(ClientServer const&) = delete;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed class DoList and other classes in its header
_list.splice(_list.end(), _addList); | ||
} | ||
for (auto iter = _list.begin(); iter != _list.end(); ++iter){ | ||
DoListItem::Ptr const& item = *iter; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it's going to be DoListItem::Ptr&
core/modules/loader/DoList.cc
Outdated
// System headers | ||
#include <iostream> | ||
|
||
// Third-party headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, remove unused comment
if (item == nullptr) return false; | ||
if (item->isAlreadyOnList()) return false; // fast atomic test | ||
{ | ||
std::lock_guard<std::mutex> lock(_addListMtx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way this method locks the mutex contradicts to the comments to the data members. My understanding is that _listMtx
should be always locked first like it's done by method DoList::checkList()
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have to lock both of them, you must lock _list first. If you know only one will be locked, you only need to lock one of them.
core/modules/loader/DoList.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_DOLIST_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition
core/modules/loader/DoList.h
Outdated
using TimePoint = std::chrono::system_clock::time_point; | ||
using Clock = std::chrono::system_clock; | ||
|
||
TimeOut(std::chrono::milliseconds timeOut) : _timeOut(timeOut) {} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is implicit conversion construction (from std::chrono::milliseconds
to TimeOut
) expected here?
core/modules/loader/DoList.h
Outdated
private: | ||
/// Lock _mtx before calling. | ||
bool _isOneShotDone() { | ||
return (!_needInfo && _oneShot); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Parentheses are not needed here
core/modules/loader/DoList.h
Outdated
virtual util::CommandTracked::Ptr createCommand()=0; | ||
|
||
protected: | ||
std::atomic<bool> _addedToList{false}; ///< True when added to a DoList |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think protected members should NOT begin with _
. Here is the link to the relevant section of the Style guide:
https://developer.lsst.io/cpp/style.html#the-parts-of-a-class-must-be-sorted-public-protected-and-private
And this is for private
members:
https://developer.lsst.io/cpp/style.html#private-class-variables-and-methods-must-have-underscore-prefix
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed class LoaderMsg.
core/modules/loader/LoaderMsg.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_LOADERMSG_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition.
core/modules/loader/LoaderMsg.h
Outdated
|
||
/* &&& | ||
// TODO Add more information | ||
class LoaderMsgErr : public std::exception { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you still need the older version of this class after switching its base to util::Issue
?
core/modules/loader/LoaderMsg.h
Outdated
}; | ||
*/ | ||
|
||
class LoaderMsgErr : public util::Issue { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍
core/modules/loader/LoaderMsg.h
Outdated
using Ptr = std::shared_ptr<MsgElement>; | ||
enum { | ||
NOTHING = 0, | ||
STRING_ELEM = 1, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you need to explicitly enumerate the rest? Are these numbers related to some enum
in the ProtoBuf schema?
core/modules/loader/LoaderMsg.h
Outdated
static bool equal(MsgElement* a, MsgElement* b) { | ||
if (a == b) return true; | ||
if (a == nullptr || b == nullptr) return false; | ||
if (a->_elementType != b->_elementType) { return false; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly brackets are not needed.
Same applies to the next line.
core/modules/loader/LoaderMsg.h
Outdated
if (not success) { | ||
return nullptr; | ||
} | ||
return protoItem; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you like it to be compact then the last 5 lines could be done as:
return proto::ProtoImporter<T>::setMsgFrom(*protoItem, element.data(), element.length()) ?
protoItem : nullptr;
core/modules/loader/LoaderMsg.h
Outdated
// | ||
class LoaderMsg { | ||
public: | ||
enum { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What does it define? States of in the communication FSA? Or just a bag of constance which would be a VERY BAD idea :) In that (the later) case I would recommend splitting that "bag" into a few named enum
types grouping constants accordingly.
Also, why it's anonymous?
I always find it useful to complement these enum
types with a static function:
enum States {
IDLE = 0,
IN_PROGRESS,
...
};
static std::string state2str(State state);
This is very handy if you need to dump enums into the log streams.
core/modules/loader/LoaderMsg.h
Outdated
SHIFT_FROM_RIGHT_RECEIVED | ||
}; | ||
|
||
enum { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This type should be given some name.
StringElement::Ptr senderHost; | ||
UInt32Element::Ptr senderPort; | ||
|
||
friend std::ostream& operator<<(std::ostream& os, LoaderMsg const& loaderMsg); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No need to be a friend
because all data members of the class are public.
// The message contains the message kind and the address of the entity sending | ||
// the message. | ||
// | ||
class LoaderMsg { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document methods and data members of the class!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished reviewing class MWorkerList
#include <mutex> | ||
|
||
// Qserv headers | ||
#include "loader/Updateable.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, move this header at the bottom of the sorted list of headers in this group!
core/modules/loader/MWorkerList.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_MWORKERLIST_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition.
core/modules/loader/MWorkerList.h
Outdated
} | ||
|
||
uint32_t getId() const { | ||
std::lock_guard<std::mutex> lck(_mtx); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think you need a lock for primitive types if their values are set 'in stone' by class's constructors.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
While you will never get just part of a primitive type, if there is another function setting its value, I'd rather wait until that function is done before reading it and that is best done with a mutex. While this isn't the best example of that being an issue, I'd rather be safe than sorry when it comes to synchronization errors, and optimization can come later.
core/modules/loader/MWorkerList.h
Outdated
|
||
|
||
/// Standard information for a single worker, IP address, key range, timeouts. | ||
class MWorkerListItem : public std::enable_shared_from_this<MWorkerListItem> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document methods of the class, including their parameters!
core/modules/loader/MWorkerList.h
Outdated
_tcpAddress(new NetworkAddress(tcpAddress)), | ||
_central(central) {} | ||
|
||
uint32_t _wId; ///< Worker Id |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make this const
:
uint32_t const _wId; ///< Worker Id
os << " " << *elem.second << "\n"; | ||
} | ||
os << "MWorkerList ip:\n"; | ||
for (auto elem:_ipMap) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do (as suggested for a loop above):
for (auto&& elem:_ipMap) {
|
||
void MWorkerListItem::flagNeedToSendList() { | ||
auto slw = _sendListToWorker; | ||
if (slw != nullptr) { slw->setNeedInfo(); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Curly brackets are not needed here.
|
||
|
||
void MWorkerListItem::flagNeedToSendList() { | ||
auto slw = _sendListToWorker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this thread safe?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Making a copy of the shared_ptr is a way to make sure the shared_ptr is not modified (reset, or reassigned) while it is in use. As far as I can tell the pointer object is threadsafe (while it does not make any guarantees about the pointed-to object) so this should be thread safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
However, I can't tell that this copying is necessary, since once an object is assigned to the ptr, that ptr is never changed, so unless I'm missing something I think you could just say if (_sendListToWorker != nullptr) { _sendListToWorker->setNeedInfo(); }
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is just a check to make sure there isn't a segfault.
Once _sendListToWorker has been set, it shouldn't be changed unless this entire MWorkerListItem is removed and all calls like this become moot.
I don't thing there's a way to make this obvious in the code but I'll add some comments.
|
||
|
||
void MWorkerListItem::sendListToWorkerInfoReceived() { | ||
auto slw = _sendListToWorker; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this thread safe?
core/modules/loader/MWorkerList.cc
Outdated
|
||
|
||
std::ostream& operator<<(std::ostream& os, MWorkerListItem const& item) { | ||
os << "name=" << item._wId << " address=" << *item._udpAddress << " range(" << item._range << ")"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this thread safe? My understanding is that worker's ranges may change, and the range is ot a trivial data type.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
other places that use _range take a lock. That should be done here too I think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed class MasterServer
core/modules/loader/MasterServer.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_MASTERSERVER_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition
#include <cstdlib> | ||
#include <iostream> | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers are the Third-party headers, not the system ones
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also promote the asio
one line up
|
||
// Qserv headers | ||
#include "loader/ServerUdpBase.h" | ||
#include "loader/MWorkerList.h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Promote this header one line up
class LoaderMsg; | ||
class CentralMaster; | ||
|
||
class MasterServer : public ServerUdpBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document this class and its methods!
int status, std::string const& msgTxt); // TODO shows up in both MasterServer and WorkerServer | ||
|
||
private: | ||
CentralMaster* _centralMaster; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be:
CentralMaster* const _centralMaster;
core/modules/loader/MasterServer.cc
Outdated
LOGS(_log, LOG_LVL_INFO, "MasterServer::parseMsg sender " << senderEndpoint << | ||
" kind=" << inMsg.msgKind->element << " data length=" << data->getAvailableWriteLength()); | ||
switch (inMsg.msgKind->element) { | ||
case LoaderMsg::MSG_RECEIVED: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case
should be indented 4 spaces from a position of switch
sendBufferTo(inMsg.senderHost->element, inMsg.senderPort->element, *reply); | ||
return nullptr; | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Perhaps extra line?
BufferUdp::Ptr MasterServer::workerKeysInfo(LoaderMsg const& inMsg, BufferUdp::Ptr const& data, | ||
boost::asio::ip::udp::endpoint const& senderEndpoint) { | ||
|
||
std::string funcName("MasterServer::workerKeysInfo"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could be:
std::string const funcName(...);
LOGS(_log, LOG_LVL_INFO, funcName << " Master got wId=" << workerId); | ||
|
||
/// Find the worker name in the map. | ||
auto workerItem = _centralMaster->getWorkerWithId(protoItem->wid()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not to use workerId
which you got 2 lines above?
auto workerItem = _centralMaster->getWorkerWithId(workerId);
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alternatively, just don't cache workerItem
and use protoItem->wid()
everywhere, especially if this operation is cheap.
core/modules/loader/MasterServer.cc
Outdated
// Send the response to the worker that asked for it. | ||
_centralMaster->sendBufferTo(requestorAddr->ip, requestorAddr->port, sendBuf); | ||
|
||
} catch (LoaderMsgErr &msgErr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LoaderMsgErr const& msgErr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed class NetworkAddress
core/modules/loader/Neighbor.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_NEIGHBOR_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition
#define LSST_QSERV_LOADER_NEIGHBOR_H_ | ||
|
||
// system headers | ||
#include <boost/bind.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers belong to the "Third party" headers
|
||
// system headers | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move this header up
Neighbor() = delete; | ||
explicit Neighbor(Type t) : _type(t) {} | ||
|
||
std::string getTypeStr() { return _type == LEFT ? "LEFT" : "RIGHT"; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be const
method
_addressTcp.reset(new NetworkAddress(addr)); | ||
} | ||
|
||
NetworkAddress getAddressTcp() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be const
method
StringElement::Ptr data = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*bufData)); | ||
|
||
if (data == nullptr) { | ||
LOGS(_log, LOG_LVL_WARN, "NetworkAddress::create data==nullptr " + note); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be LOG_LVL_ERROR
instead?
Or perhaps an exception thrown?
core/modules/loader/NetworkAddress.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_NETWORKADDRESS_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra_
in the end of the macro definition
core/modules/loader/NetworkAddress.h
Outdated
|
||
|
||
|
||
friend std::ostream& operator<<(std::ostream& os, NetworkAddress const& adr); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does not need to be friend
because all data members are public
|
||
friend std::ostream& operator<<(std::ostream& os, NetworkAddress const& adr); | ||
}; | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line
|
||
}}} // namespace lsst::qserv::loader | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
extra line
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Finished reviewing files ServerTcpBase.(h.cc)
# Harvest special binary products - files starting with the package's name | ||
# followed by underscore: | ||
# | ||
# qserv-<something>.cc |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Fix the comment to:
# udp<something>.cc
core/modules/loader/ServerTcpBase.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_SERVER_TCP_BASE_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition
|
||
// system headers | ||
#include <boost/asio.hpp> | ||
#include <boost/bind.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Boost headers are the third-party headers
|
||
class ServerTcpBase; | ||
|
||
class TcpBaseConnection : public std::enable_shared_from_this<TcpBaseConnection> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document this class and its methods.
Also, move inline implementations into the CC files. This will make the header more observable and easier to compile.
Explicitly delete:
TcpBaseConnection() = delete;
TcpBaseConnection(TcpBaseConnection const&) = delete;
TcpBaseConnection& operator=(TcpBaseConnection const&) = delete;
TcpBaseConnection(boost::asio::io_context& io_context, ServerTcpBase* tcpBase) : | ||
_socket(io_context), _serverTcpBase(tcpBase) {} | ||
|
||
void _readKind(const boost::system::error_code&, size_t /*bytes_transferred*/); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move const
after the type and give a name to the parameter:
boost::system::error_code const& ec
} | ||
// Extract key pairs from the protobuffer | ||
int keyCount = protoKeyList->keycount(); // TODO delete keycount from KeyList | ||
int sz = protoKeyList->keypair_size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's safer to declare it const
:
int const sz = protoKeyList->keypair_size();
Just in case.
core/modules/loader/ServerTcpBase.cc
Outdated
ServerTcpBase::writeData(_socket, _buf); | ||
LOGS(_log, LOG_LVL_INFO, funcName << " done dumpKeys " << | ||
_serverTcpBase->getCentralWorker()->dumpKeysStr(2)); | ||
} catch (LoaderMsgErr &msgErr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LoaderMsgErr const& msgErr
core/modules/loader/ServerTcpBase.cc
Outdated
return; | ||
} | ||
boost::system::error_code ecode; | ||
_readKind(ecode, 0); // get next message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already made a comment earlier on calling this method explicitly. Consider refactoring _readKind
core/modules/loader/ServerTcpBase.cc
Outdated
return; | ||
} | ||
boost::system::error_code ecode; | ||
_readKind(ecode, 0); // get next message |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've already made a comment earlier on calling this method explicitly. Consider refactoring _readKind
core/modules/loader/ServerTcpBase.cc
Outdated
_serverTcpBase->getCentralWorker()->finishShiftFromRight(); | ||
LOGS(_log, LOG_LVL_INFO, funcName << " done dumpKeys " << | ||
_serverTcpBase->getCentralWorker()->dumpKeysStr(2)); | ||
} catch (LoaderMsgErr &msgErr) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LoaderMsgErr const& msgErr
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reviewed class ServerUdpBase. And I'm finding the implementation of method ServerUdpBase::sendBufferTo as potentially dangerous. Could you, please, revisit it?
core/modules/loader/ServerUdpBase.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_SERVER_UDP_BASE_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition.
namespace loader { | ||
|
||
|
||
class ServerUdpBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document this class and its methods.
Consider deriving this class from std::enable_shared_from_this
. The add a static factory method and made the constructor private.
// char _data[MAX_MSG_SIZE]; | ||
BufferUdp::Ptr _data; ///< data buffer for receiving | ||
BufferUdp::Ptr _sendData; ///< data buffer for sending. | ||
std::string _hostName; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
std::string const _hostName;
BufferUdp::Ptr _data; ///< data buffer for receiving | ||
BufferUdp::Ptr _sendData; ///< data buffer for sending. | ||
std::string _hostName; | ||
int _port; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Make it const
boost::asio::ip::udp::endpoint resolve(std::string const& hostName, int port); | ||
|
||
protected: | ||
std::atomic<uint32_t> _errCount{0}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove _
which is reserved for private
members only
#include <iostream> | ||
#include <unistd.h> | ||
|
||
// Third-party headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused comment
_receivePrepare(); | ||
} | ||
} else { | ||
LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::_receiveCallback got empty message, ignoring"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That's not technically true, because this also cover a case of a error in the communication. Hence the right logic should be:
if (error) {
# extract error message for the error and report it.
} else if (bytesRecvd == 0) {
# complain about 0-length message like you do
} else {
# normal processing
}
Please, revisit the rest of your code to see if you have the same problem in there as well.
core/modules/loader/ServerUdpBase.cc
Outdated
std::condition_variable cv; | ||
bool done = false; | ||
|
||
auto callbackFunc = [&mtx, &cv, &done](const boost::system::error_code& error, std::size_t bytesTransferred) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is VERY DANGEROUS!!! And this could be a BUG in the application. What you're doing is passing a reference onto a stack variable into a function which is going to be called asynchronously from some other thread at a time when this method will finish and its stack variable will disappear.
You may solve his problem by making these variables data members of the class.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In this case it is ok. This function waits for the response before it returns, so all of the references are valid for the life of the callback. I will add a comment.
core/modules/loader/ServerUdpBase.cc
Outdated
}; | ||
|
||
ip::udp::endpoint dest(boost::asio::ip::address::from_string(hostName), port); | ||
_socket.async_send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why ASYNC I/O if you're going to wait on the CV below?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I need to use async io since _socket is not thread safe. Asio will order async writes so they don't cause problems. I'm not sure asio lets you mix synchronous and async use of the socket for this but I was having communication issues, and this appears to work.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmmm... Really should be the same as send_to
... This extra complication worries me, and makes me wonder if this "fixed" the problem, or just kicked it down the road 🤔 If you haven't tried send_to
here instead recently, it might be worth another try in case you addressed the issue you were seeing in the meantime elsewhere?
But if you stick with async_send_to
, a style suggestion (YMMV): I'd just go ahead and put the lambda directly inline in the call to async_send_...
(per comment above) in a situation like this. You are only "using" it once, and nesting it inside the async_send_to
in my mind makes it more clear when reading the code when it is going to be invoked.
} | ||
|
||
|
||
void ServerUdpBase::_sendCallback(const boost::system::error_code& error, size_t bytes_sent) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move const
to the right:
boost::system::error_code const& error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've reviewed files StringRange.(h,cc), and I think it needs to be redesigned.
#include <cstdlib> | ||
#include <iostream> | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move Boost headers into the third-party category.
Sort them alphabetically.
core/modules/loader/StringRange.h
Outdated
|
||
/// Return true if other functionally equivalent. | ||
bool equal(StringRange const& other) const { | ||
if (_valid != other._valid) { return false; } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
According to the LSST Style Guide, the curly brackets are not needed in this line and in other if
statements in this and other methods.
core/modules/loader/StringRange.h
Outdated
namespace loader { | ||
|
||
/// Class for storing the range of a single worker. | ||
/// This is likely to become a template class, hence lots in the header. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my view, a problem with this intent (to carry a specific numeric, string, or even a composite (like struct) type as a template parameter) has certain implications - you would need to pass that specific type around your application Range<int64_t>
. This will essentially make a half of your code which depends on this type, moves values of that range around, makes them or produces them (to be sent further) very messy and template(-ized). I've already seen many uses of the StringRange type in various classes of this code. Besides I've notices a lot of dynamic type casts into the string type, which is already a precursor of troubles in the future.
My recommendation here is to model indexes as a hierarchy:
-
some base interface
Index
which would declarepure virtual
comparison methods (not operators) likeeqal
,less
, etc. -
a family of derived classes (possibly with the help of templates) which would be storing actual values of the indexes and implement the above defined
virtual
methods.
Then you can write your class Range
as a regular (non-template) class implemented i terms of the base interface Index
. And most of your code which moves values of indexes around and operates on ranges (objects of class Range
would be totally independent of an underlining type (a subclass of Index
.
The only other area where you will have to deal with specific subtypes would be some small code which will be serializing those objects into the Protobuf structures and deserializing protobuf structures into the objects deriving from Index
(a factory).
You may also need to think about how to implement these types in the Protobuf w/o adding extra complexity (you may have already thought about this problem) in order to support a spectrum of index types w/o recoding the application each time a new index is added.
This will make your code observable, easy to compile (remember ugly compilation errors at a presence of templates), and yet type safe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
StringRange is used as the key to a map and it needs to follow certain rules to do so, such as having operator<() defined. Much of what you suggest is probably still possible (I don't think I've tried a pure virtual operator before). This class is meant primarily to get things up and running and will be replaced by templates or baseclass and children, or some mixture of the two at some point. At this point, I have a better idea of what the limitations are.
protobuffs are not terribly friendly for this sort of thing, either.
This is not really in the immediate scope of problems I want to tackle, but I am thinking about it.
core/modules/loader/StringRange.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_STRINGRANGE_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition.
@@ -0,0 +1,126 @@ | |||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the protocol needs to be refined to avoid depending on optional choice of the string
versus uint64
index types. There should be an explicit negotiation at some index setup/startup time between clients, master and workers before the index gets created from scratch.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It gets worse than that. It's difficult to get away from chunk + subchunk as values. A problem for another day but worth thinking about.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: clean up indentation to 4 spaces consistently throughout?
@@ -11,5 +11,9 @@ env.Protoc(File("replication.proto"), | |||
PROTOC_PATH='.', | |||
PROTOC_CCOUT='.', | |||
PROTOC_PYOUT='.',) | |||
env.Protoc(File("loader.proto"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is the third Protobuf file then we may need to add a rule into our SConsUtils which would automatically inject this type of statements. Probably not for this ticket.
core/modules/loader/Updateable.h
Outdated
* see <http://www.lsstcorp.org/LegalNotices/>. | ||
* | ||
*/ | ||
#ifndef LSST_QSERV_LOADER_UPDATABLE_H_ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra _
in the end of the macro definition
#include <list> | ||
#include <memory> | ||
|
||
// Qserv headers |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove unused comment
|
||
// Qserv headers | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Remove extra line or two
#include <cstdlib> | ||
#include <iostream> | ||
#include <boost/bind.hpp> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This header to be promoted one line up
|
||
class CentralWorker; | ||
|
||
class WorkerServer : public ServerUdpBase { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please, document this class and its methods
void _msgRecieved(LoaderMsg const& inMsg, BufferUdp::Ptr const& data, | ||
boost::asio::ip::udp::endpoint const& senderEndpoint); | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One extra empty line?
CentralWorker* _centralWorker; | ||
}; | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One extra empty line?
core/modules/loader/udpTest.cc
Outdated
|
||
// System headers | ||
#include <iostream> | ||
#include <boost/asio.hpp> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
// The third-party header
core/modules/loader/StringRange.cc
Outdated
|
||
|
||
std::string StringRange::decrementString(std::string const& str, char minChar) { | ||
if (str == "") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think str.empty()
is a little better than comparing with an empty string.
core/modules/loader/StringRange.cc
Outdated
|
||
std::string StringRange::decrementString(std::string const& str, char minChar) { | ||
if (str == "") { | ||
return ""; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: returning string()
might be more optimal.
Code cleaned up, system tested with 100k inserts and lookups.
6f944aa
to
6f0ba51
Compare
No description provided.