diff --git a/admin/templates/configuration/etc/log4cxx.index_master.properties b/admin/templates/configuration/etc/log4cxx.index_master.properties new file mode 100644 index 0000000000..0a3b0c6da5 --- /dev/null +++ b/admin/templates/configuration/etc/log4cxx.index_master.properties @@ -0,0 +1,18 @@ +# +# Configuration file for log4cxx +# can be used for unit test +# by launching next command before unit tests: +# export LSST_LOG_CONFIG=$HOME/.lsst/log4cxx.unittest.properties +# + +#log4j.rootLogger=INFO, CONSOLE +log4j.rootLogger=DEBUG, CONSOLE +#log4j.rootLogger=WARN, CONSOLE + +log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender +log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout +#log4j.appender.CONSOLE.layout.ConversionPattern=[%d{yyyy-MM-ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n +log4j.appender.CONSOLE.layout.ConversionPattern=[%d{ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n + +# Tune log at the module level +#log4j.logger.lsst.qserv.util=DEBUG diff --git a/core/modules/loader/BufferUdp.cc b/core/modules/loader/BufferUdp.cc index 4c57493894..29d463fee2 100644 --- a/core/modules/loader/BufferUdp.cc +++ b/core/modules/loader/BufferUdp.cc @@ -48,7 +48,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket, // If there's something in the buffer already, get it and return. // This can happen when the previous read of socket read multiple elements. - MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket&&&" + note); + MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket" + note); if (msgElem != nullptr) { return msgElem; } @@ -69,7 +69,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket, /// Try to retrieve an element (there's no guarantee that an entire element got read in a single read. // Store original cursor positions so they can be restored if the read fails. - msgElem = _safeRetrieve("2readFromSocket&&&" + note); + msgElem = _safeRetrieve("2readFromSocket" + note); if (msgElem != nullptr) { return msgElem; } @@ -117,11 +117,11 @@ void BufferUdp::advanceReadCursor(size_t len) { } -std::shared_ptr BufferUdp::_safeRetrieve(std::string const& note) { // &&& delete note, maybe +std::shared_ptr BufferUdp::_safeRetrieve(std::string const& note) { auto wCursorOriginal = _wCursor; auto rCursorOriginal = _rCursor; // throwOnMissing=false since missing data is possible with TCP. - MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve &&&", false); + MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve", false); if (msgElem != nullptr) { return msgElem; } else { diff --git a/core/modules/loader/BufferUdp.h b/core/modules/loader/BufferUdp.h index 2ef7980991..66e3730750 100644 --- a/core/modules/loader/BufferUdp.h +++ b/core/modules/loader/BufferUdp.h @@ -46,7 +46,7 @@ class MsgElement; /// A buffer for reading and writing. Nothing can be read from the buffer until /// something has been written to it. -/// TODO: rename BufferUdp is not really accurate anymore. &&& +/// TODO: rename BufferUdp is not really accurate anymore. class BufferUdp { public: using Ptr = std::shared_ptr; diff --git a/core/modules/loader/Central.cc b/core/modules/loader/Central.cc index 2192644578..6791c081e3 100644 --- a/core/modules/loader/Central.cc +++ b/core/modules/loader/Central.cc @@ -77,7 +77,6 @@ void Central::_checkDoList() { while(_loop) { // Run and then sleep for a second. TODO A more advanced timer should be used doList->checkList(); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(_loopSleepTime); } } diff --git a/core/modules/loader/CentralClient.cc b/core/modules/loader/CentralClient.cc index 03d9e04476..5b8cea4b01 100644 --- a/core/modules/loader/CentralClient.cc +++ b/core/modules/loader/CentralClient.cc @@ -49,24 +49,11 @@ namespace lsst { namespace qserv { namespace loader { -/* &&& -CentralClient::CentralClient(boost::asio::io_service& ioService_, - std::string const& hostName, ClientConfig const& cfg) - : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), - _hostName(hostName), _udpPort(cfg.getClientPortUdp()), - _defWorkerHost(cfg.getDefWorkerHost()), - _defWorkerPortUdp(cfg.getDefWorkerPortUdp()), - _doListMaxLookups(cfg.getMaxLookups()), - _doListMaxInserts(cfg.getMaxInserts()), - _maxRequestSleepTime(cfg.getMaxRequestSleepTime()) { -} -*/ + CentralClient::CentralClient(boost::asio::io_service& ioService_, std::string const& hostName, ClientConfig const& cfg) : CentralFollower(ioService_, hostName, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(),cfg.getLoopSleepTime(), cfg.getIOThreads(), cfg.getClientPortUdp()), - // &&& _hostName(hostName), - // &&& _udpPort(cfg.getClientPortUdp()), _defWorkerHost(cfg.getDefWorkerHost()), _defWorkerPortUdp(cfg.getDefWorkerPortUdp()), _doListMaxLookups(cfg.getMaxLookups()), @@ -74,11 +61,6 @@ CentralClient::CentralClient(boost::asio::io_service& ioService_, _maxRequestSleepTime(cfg.getMaxRequestSleepTime()) { } -/* &&& -void CentralClient::start() { - _server = std::make_shared(ioService, _hostName, _udpPort, this); -} -*/ void CentralClient::startService() { _server = std::make_shared(ioService, _hostName, _udpPort, this); @@ -92,7 +74,7 @@ CentralClient::~CentralClient() { void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup"); - auto const sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup&&& ")); + auto const sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup ")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list"); return; @@ -128,14 +110,14 @@ void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptrkeyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success()); - LOGS(_log, LOG_LVL_WARN, "&&&INFO Successful KEY_LOOKUP key=" << key << " " << chunkInfo); + LOGS(_log, LOG_LVL_INFO, "Successful KEY_LOOKUP key=" << key << " " << chunkInfo); } void CentralClient::handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInsertComplete"); - auto sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete&&& ")); + auto sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete ")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to retrieve element"); return; @@ -173,7 +155,7 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique mapSize = _waitingKeyInsertMap.size(); } keyInsertOneShot->keyInsertComplete(); - LOGS(_log, LOG_LVL_WARN, "&&&INFO Successful KEY_INSERT_COMPLETE key=" << key << " " << chunkInfo << + LOGS(_log, LOG_LVL_INFO, "Successful KEY_INSERT_COMPLETE key=" << key << " " << chunkInfo << " mapSize=" << mapSize); } @@ -199,11 +181,10 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk, size_t sz = _waitingKeyInsertMap.size(); lck.unlock(); if (loopCount % 100 == 0) { - LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key << + LOGS(_log, LOG_LVL_DEBUG, "keyInsertReq waiting key=" << key << "size=" << sz << " loopCount=" << loopCount); } // Let the CPU do something else while waiting for some requests to finish. - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(_maxRequestSleepTime); ++loopCount; lck.lock(); @@ -294,7 +275,6 @@ KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) { "size=" << sz << " loopCount=" << loopCount); } // Let the CPU do something else while waiting for some requests to finish. - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(_maxRequestSleepTime); sleptForMicroSec += _maxRequestSleepTime; ++loopCount; diff --git a/core/modules/loader/CentralClient.h b/core/modules/loader/CentralClient.h index 779b1d21fd..eed867d310 100644 --- a/core/modules/loader/CentralClient.h +++ b/core/modules/loader/CentralClient.h @@ -67,20 +67,17 @@ class KeyInfoData : public util::Tracker { /// so replies to its request can be sent directly back to it. /// 'Central' provides access to the master and a DoList for handling requests. /// TODO Maybe base this on CentralWorker or have a common base class? -// &&& class CentralClient : public Central { class CentralClient : public CentralFollower { public: /// The client needs to know the master's IP and its own IP. CentralClient(boost::asio::io_service& ioService_, std::string const& hostName, ClientConfig const& cfg); - // &&&void start(); void startService() override; ~CentralClient() override; std::string const& getHostName() const { return _hostName; } - // &&& int getUdpPort() const { return _udpPort; } int getTcpPort() const { return 0; } ///< No tcp port at this time. /// @return the default worker's host name. @@ -161,17 +158,10 @@ class CentralClient : public CentralFollower { CentralClient* central; }; - - /// TODO The worker IP becomes default worker as it should be able to get - /// that information from the master in the future. DM-16555 - // &&& const std::string _hostName; - // &&& const int _udpPort; - // If const is removed, these will need mutex protection. const std::string _defWorkerHost; ///< Default worker host const int _defWorkerPortUdp; ///< Default worker UDP port - size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config /// Time to sleep between checking requests when at max length, set by config diff --git a/core/modules/loader/CentralFollower.cc b/core/modules/loader/CentralFollower.cc index e51c07bc5e..8a38e94f2e 100644 --- a/core/modules/loader/CentralFollower.cc +++ b/core/modules/loader/CentralFollower.cc @@ -65,7 +65,7 @@ void CentralFollower::startMonitoring() { bool CentralFollower::workerInfoReceive(BufferUdp::Ptr const& data) { // Open the data protobuffer and add it to our list. - StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " CentralFollower::workerInfoReceive&&& ")); + StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "CentralFollower::workerInfoReceive")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, "CentralFollower::workerInfoRecieve Failed to parse list"); return false; diff --git a/core/modules/loader/CentralFollower.h b/core/modules/loader/CentralFollower.h index c30b467c4a..61157f8580 100644 --- a/core/modules/loader/CentralFollower.h +++ b/core/modules/loader/CentralFollower.h @@ -47,9 +47,6 @@ class WorkerKeysInfo; namespace loader { - -// &&& class CentralWorkerDoListItem; - /// This class is used a base central class for servers that need to get /// lists of of worker from the master. /// CentralFollower provides no service on its own. The derived classes must: @@ -61,20 +58,6 @@ class CentralFollower : public Central { public: typedef std::pair CompKeyPair; - /* &&& - enum SocketStatus { - VOID0 = 0, - STARTING1, - ESTABLISHED2 - }; - - enum Direction { - NONE0 = 0, - TORIGHT1, - FROMRIGHT2 - }; - */ - CentralFollower(boost::asio::io_service& ioService, std::string const& hostName_, std::string const& masterHost, int masterPortUdp, int threadPoolSize, int loopSleepTime, int ioThreads, int fPortUdp) @@ -107,34 +90,23 @@ class CentralFollower : public Central { std::string getOurLogId() const override { return "CentralFollower"; } - // &&& friend CentralWorkerDoListItem; - protected: // &&& make some or all private again /// Real workers need to check this for initial ranges. virtual void checkForThisWorkerValues(uint32_t wId, std::string const& ip, int portUdp, int portTcp, KeyRange& strRange) {}; - /// &&& This function is needed to fill the map. On real workers, CentralWorker - /// needs to do additional work to set its id. + /// This function is needed to fill the map. On real workers, CentralWorker + /// needs to do additional work to set its own id. void _workerInfoReceive(std::unique_ptr& protoBuf); - /// See workerWorkerKeysInfoReq(...) - // &&& void _workerWorkerKeysInfoReq(LoaderMsg const& inMsg); - const std::string _hostName; const int _udpPort; - // &&& const int _tcpPort; - WWorkerList::Ptr _wWorkerList{std::make_shared(this)}; ///< Maps of workers. + /// Maps of workers with their key ranges. + WWorkerList::Ptr _wWorkerList{std::make_shared(this)}; - /// The DoListItem that makes sure _monitor() is run. &&& needs to ask master for worker map occasionally - // &&& replace with item to refresh _wWorkerList (see CentralWorker::_startMonitoring)// std::shared_ptr _centralWorkerDoListItem; }; - - - - }}} // namespace lsst::qserv::loader #endif // LSST_QSERV_LOADER_CENTRAL_FOLLOWER_H diff --git a/core/modules/loader/CentralMaster.cc b/core/modules/loader/CentralMaster.cc index 86f385e8c3..a75487247d 100644 --- a/core/modules/loader/CentralMaster.cc +++ b/core/modules/loader/CentralMaster.cc @@ -43,11 +43,6 @@ namespace lsst { namespace qserv { namespace loader { -/* &&& -void CentralMaster::start() { - _server = std::make_shared(ioService, getMasterHostName(), getMasterPort(), this); -} -*/ void CentralMaster::startService() { _server = std::make_shared(ioService, getMasterHostName(), getMasterPort(), this); } diff --git a/core/modules/loader/CentralMaster.h b/core/modules/loader/CentralMaster.h index 84e7e37e5d..36b639bcd3 100644 --- a/core/modules/loader/CentralMaster.h +++ b/core/modules/loader/CentralMaster.h @@ -59,7 +59,6 @@ class CentralMaster : public Central { _maxKeysPerWorker(cfg.getMaxKeysPerWorker()) {} /// Open the UDP port. This can throw boost::system::system_error. - // &&&void start(); void startService() override; ~CentralMaster() override { _mWorkerList.reset(); } diff --git a/core/modules/loader/CentralWorker.cc b/core/modules/loader/CentralWorker.cc index b3b27cec17..ebcb69cc70 100644 --- a/core/modules/loader/CentralWorker.cc +++ b/core/modules/loader/CentralWorker.cc @@ -51,20 +51,6 @@ namespace lsst { namespace qserv { namespace loader { -/* &&& -CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::io_context& io_context_, - std::string const& hostName_, WorkerConfig const& cfg) - : Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), - cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()), - _hostName(hostName_), - _udpPort(cfg.getWPortUdp()), - _tcpPort(cfg.getWPortTcp()), - _ioContext(io_context_), - _recentAddLimit(cfg.getRecentAddLimit()), - _thresholdNeighborShift(cfg.getThresholdNeighborShift()), - _maxKeysToShift(cfg.getMaxKeysToShift()) { -} -*/ CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::io_context& io_context_, std::string const& hostName_, WorkerConfig const& cfg) : CentralFollower(ioService_, hostName_, cfg.getMasterHost(), cfg.getMasterPortUdp(), @@ -76,14 +62,7 @@ CentralWorker::CentralWorker(boost::asio::io_service& ioService_, boost::asio::i _maxKeysToShift(cfg.getMaxKeysToShift()) { } -/* &&& -void CentralWorker::start() { - _server = std::make_shared(ioService, _hostName, _udpPort, this); - _tcpServer = std::make_shared(_ioContext, _tcpPort, this); - _tcpServer->runThread(); - _startMonitoring(); -} -*/ + void CentralWorker::startService() { _server = std::make_shared(ioService, _hostName, _udpPort, this); _tcpServer = std::make_shared(_ioContext, _tcpPort, this); @@ -91,11 +70,10 @@ void CentralWorker::startService() { } - CentralWorker::~CentralWorker() { // Members that contain pointers to this. Deleting while this != null. - // &&&_wWorkerList.reset(); - // TODO: wait for reference count to drop to one or less. + // TODO: Wait for reference count to drop to one or less, + // although CentralWorker is never really shutdown. _tcpServer.reset(); } @@ -111,7 +89,6 @@ void CentralWorker::startMonitoring() { CentralFollower::startMonitoring(); // Add _workerList to _doList so it starts checking new entries. _centralWorkerDoListItem = std::make_shared(this); - // &&& doList->addItem(_wWorkerList); doList->addItem(_centralWorkerDoListItem); } @@ -123,11 +100,9 @@ void CentralWorker::_monitor() { if (_isOurIdInvalid()) { _registerWithMaster(); // Give the master a half second to answer. - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); usleep(500000); return; } - LOGS(_log, LOG_LVL_WARN, "&&& CentralWorker::_monitor A"); // If data gets shifted, check everything again as ranges will have // changed and there may be a lot more data to shift. @@ -193,7 +168,6 @@ void CentralWorker::_monitor() { _sendWorkerKeysInfo(masterAddr, getNextMsgId()); } } while (dataShifted); - LOGS(_log, LOG_LVL_WARN, "&&& CentralWorker::_monitor Z"); } @@ -248,7 +222,7 @@ bool CentralWorker::_determineRange() { // Must send the number of bytes in the message so TCP server knows how many bytes to read. bytesInMsg.appendToData(data); strElem.appendToData(data); - ServerTcpBase::writeData(*_rightSocket, data, "detRange"); + ServerTcpBase::writeData(*_rightSocket, data); } // Get back their basic info { @@ -389,7 +363,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { bytesInMsg.appendToData(data); keyShiftReq.appendToData(data); LOGS(_log, LOG_LVL_INFO, fName << " FROMRIGHT " << keysToShift); - ServerTcpBase::writeData(*_rightSocket, data, "_shift FROMRIGHT"); + ServerTcpBase::writeData(*_rightSocket, data); } // Wait for the KeyList response { @@ -420,7 +394,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { data.reset(); UInt32Element elem(LoaderMsg::SHIFT_FROM_RIGHT_RECEIVED); elem.appendToData(data); - ServerTcpBase::writeData(*_rightSocket, data, "shift SHIFT_FROM_RIGHT_RECEIVED"); + ServerTcpBase::writeData(*_rightSocket, data); LOGS(_log, LOG_LVL_INFO, fName << " direction=" << direction << " keys=" << keysToShift); } else if (direction == TORIGHT1) { @@ -463,7 +437,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { LOGS(_log, LOG_LVL_ERROR, errMsg); // This will keep getting thrown and never work, but at least it will show up // in the logs. - // &&& create new exception, catch it and halve the number of keys to shift ??? + // TODO Maybe create new exception, catch it and halve the number of keys to shift? throw LoaderMsgErr(ERR_LOC, errMsg); } kindShiftRight.appendToData(data); @@ -471,7 +445,7 @@ void CentralWorker::_shift(Direction direction, int keysToShift) { keyList.appendToData(data); LOGS(_log, LOG_LVL_INFO, fName << " TORIGHT sending keys"); - ServerTcpBase::writeData(*_rightSocket, data, "shift TORIGHT sending keys"); + ServerTcpBase::writeData(*_rightSocket, data); // read back LoaderMsg::SHIFT_TO_RIGHT_KEYS_RECEIVED data.reset(); @@ -843,7 +817,7 @@ void CentralWorker::_forwardKeyInsertRequest(NetworkAddress const& targetAddr, L LOGS(_log, LOG_LVL_WARN, "Too many hops, dropping insert request hops=" << hops << " key=" << key); return; } - LOGS(_log, LOG_LVL_WARN, "&&&INFO Forwarding key insert hops=" << hops << " key=" << key); + LOGS(_log, LOG_LVL_INFO, "Forwarding key insert hops=" << hops << " key=" << key); LoaderMsg msg(LoaderMsg::KEY_INSERT_REQ, inMsg.msgId->element, getHostName(), getUdpPort()); BufferUdp msgData; msg.appendToData(msgData); @@ -946,7 +920,7 @@ void CentralWorker::_workerKeyInfoReq(LoaderMsg const& inMsg, std::unique_ptr(msgElem); if (neighborName == nullptr) { return false; @@ -960,7 +934,7 @@ bool CentralWorker::workerWorkerSetRightNeighbor(LoaderMsg const& inMsg, BufferU bool CentralWorker::workerWorkerSetLeftNeighbor(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) { - auto msgElem = MsgElement::retrieve(*data, " CentralWorker::workerWorkerSetLeftNeighbor&&& "); + auto msgElem = MsgElement::retrieve(*data, "CentralWorker::workerWorkerSetLeftNeighbor"); UInt32Element::Ptr neighborName = std::dynamic_pointer_cast(msgElem); if (neighborName == nullptr) { return false; diff --git a/core/modules/loader/CentralWorker.h b/core/modules/loader/CentralWorker.h index d8baaa3a20..14d8360ad8 100644 --- a/core/modules/loader/CentralWorker.h +++ b/core/modules/loader/CentralWorker.h @@ -40,19 +40,11 @@ namespace lsst { namespace qserv { - -namespace proto { -// &&& class WorkerKeysInfo; -// &&& class WorkerListItem; -} - namespace loader { - class CentralWorkerDoListItem; -// &&& class CentralWorker : public Central { class CentralWorker : public CentralFollower { public: typedef std::pair CompKeyPair; @@ -73,16 +65,11 @@ class CentralWorker : public CentralFollower { std::string const& hostName_, WorkerConfig const& cfg); /// Open the UDP and TCP ports and start monitoring. This can throw boost::system::system_error. - // &&&void start(); void startService() override; void startMonitoring() override; ~CentralWorker() override; - // &&& WWorkerList::Ptr getWorkerList() const { return _wWorkerList; } - - // &&& std::string const& getHostName() const { return _hostName; } - // &&& int getUdpPort() const { return _udpPort; } int getTcpPort() const override { return _tcpPort; } uint32_t getOurId() const { @@ -114,9 +101,6 @@ class CentralWorker : public CentralFollower { /// @returns what it thinks the range of the left neighbor should be. KeyRange updateRangeWithLeftData(KeyRange const& strRange); - /// Receive our name from the master. Returns true if successful. - // &&& bool workerInfoReceive(BufferUdp::Ptr const& data) override; - /// Receive a request to insert a key value pair. /// If the key value pair could not be inserted, it tries to forward the request appropriately. /// @Returns true if the request could be parsed. @@ -160,7 +144,6 @@ class CentralWorker : public CentralFollower { friend CentralWorkerDoListItem; protected: - // &&& void _workerInfoReceive(std::unique_ptr& protoBuf) override; ///< see workerInfoReceive() // &&& need this to work properly with new class void checkForThisWorkerValues(uint32_t wId, std::string const& ip, int portUdp, int portTcp, KeyRange& strRange) override; private: @@ -223,23 +206,18 @@ class CentralWorker : public CentralFollower { /// Connect to the right neighbor. Must hold _rightMtx in the lock. void _rightConnect(std::lock_guard const& rightMtxLG); ///< Disconnect from the right neighbor. Must hold _rightMtx in the lock. - void _rightDisconnect(std::lock_guard const& rightMtxLG, std::string const& note); // &&& remove note ?? + void _rightDisconnect(std::lock_guard const& rightMtxLG, std::string const& note); void _cancelShiftsWithRightNeighbor(); ///< Cancel shifts to/from the right neighbor. void _finishShiftToRight(); ///< The shift to the right neighbor is complete, cleanup. - // &&& const std::string _hostName; - // &&& const int _udpPort; const int _tcpPort; boost::asio::io_context& _ioContext; - // &&& WWorkerList::Ptr _wWorkerList{std::make_shared(this)}; ///< Maps of workers. - bool _ourIdInvalid{true}; ///< true until our id has been set by the master. std::atomic _ourId{0}; ///< id given by the master, 0 is invalid id. mutable std::mutex _ourIdMtx; ///< protects _ourIdInvalid, _ourId - KeyRange _keyRange; ///< range for this worker std::atomic _rangeChanged{false}; std::map _keyValueMap; diff --git a/core/modules/loader/ClientServer.cc b/core/modules/loader/ClientServer.cc index 9779381413..704e73b556 100644 --- a/core/modules/loader/ClientServer.cc +++ b/core/modules/loader/ClientServer.cc @@ -82,8 +82,6 @@ BufferUdp::Ptr ClientServer::parseMsg(BufferUdp::Ptr const& data, // following not expected by client case LoaderMsg::KEY_INSERT_REQ: // This is what this client should send out case LoaderMsg::KEY_LOOKUP_REQ: // This is what this client should send out - // &&& case LoaderMsg::MAST_WORKER_INFO: - // &&& case LoaderMsg::MAST_WORKER_LIST: // TODO having the client know would be useful. case LoaderMsg::MAST_INFO: case LoaderMsg::MAST_INFO_REQ: case LoaderMsg::MAST_WORKER_LIST_REQ: @@ -137,7 +135,8 @@ void ClientServer::_msgRecievedHandler(LoaderMsg const& inMsg, BufferUdp::Ptr co bool success = true; // This is only really expected for parsing errors. Most responses to // requests come in as normal messages. - StringElement::Ptr seData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "ClientServer::_msgRecievedHandler&&& ")); + StringElement::Ptr seData = + std::dynamic_pointer_cast(MsgElement::retrieve(*data, "ClientServer::_msgRecievedHandler")); if (seData == nullptr) { success = false; } diff --git a/core/modules/loader/DoListItem.cc b/core/modules/loader/DoListItem.cc new file mode 100644 index 0000000000..36d79f7e98 --- /dev/null +++ b/core/modules/loader/DoListItem.cc @@ -0,0 +1,74 @@ +// -*- LSST-C++ -*- +/* + * LSST Data Management System + * Copyright 2019 LSST. + * + * This product includes software developed by the + * LSST Project (http://www.lsst.org/). + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU General Public License as published by + * the Free Software Foundation, either version 3 of the License, or + * (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. + * + * You should have received a copy of the LSST License Statement and + * the GNU General Public License along with this program. If not, + * see . + */ + + +// Class header +#include "loader/DoListItem.h" + +// System headers +#include + +// Qserv headers +#include "loader/Central.h" +#include "loader/LoaderMsg.h" +#include "proto/loader.pb.h" + +// LSST headers +#include "lsst/log/Log.h" + +namespace { + +LOG_LOGGER _log = LOG_GET("lsst.qserv.loader.DoListItem"); + +} + +namespace lsst { +namespace qserv { +namespace loader { + + +util::CommandTracked::Ptr DoListItem::runIfNeeded(TimeOut::TimePoint now) { + std::lock_guard lock(_mtx); + if (_command == nullptr) { + if (_isOneShotDone()) return nullptr; + if ((_needInfo || _timeOut.due(now)) && _timeRateLimit.due(now)) { + _timeRateLimit.triggered(); + // Randomly vary the next rate limit timeout + int rand = (std::rand()/(RAND_MAX/1000)); // 0 to 1000 + rand += std::min(_commandsCreated * 10000, 120000); + auto rateLimitRandom = now + std::chrono::milliseconds(rand); + _timeRateLimit.triggered(rateLimitRandom); + _command = createCommand(); + if (_oneShot) ++_commandsCreated; + LOGS(_log, LOG_LVL_DEBUG, "cCreated=" << _commandsCreated << " rand=" << rand); + return _command; + } + } else if (_command->isFinished()) { + _command.reset(); // Allow the command to be sent again later. + } + return nullptr; + } + + +}}} // namespace lsst:qserv::loader + diff --git a/core/modules/loader/DoListItem.h b/core/modules/loader/DoListItem.h index 685d6f8701..bb192d7dae 100644 --- a/core/modules/loader/DoListItem.h +++ b/core/modules/loader/DoListItem.h @@ -99,27 +99,6 @@ class DoListItem : public std::enable_shared_from_this { virtual ~DoListItem() = default; util::CommandTracked::Ptr runIfNeeded(TimeOut::TimePoint now); - /* &&& - util::CommandTracked::Ptr runIfNeeded(TimeOut::TimePoint now) { - std::lock_guard lock(_mtx); - if (_command == nullptr) { - if (_isOneShotDone()) return nullptr; - if ((_needInfo || _timeOut.due(now)) && _timeRateLimit.due(now)) { - _timeRateLimit.triggered(); - // Randomly vary the next rate limit timeout - int rand = (std::rand()/(RAND_MAX/100)); // 0 to 500 - auto rateLimitRandom = now + std::chrono::milliseconds(rand); - _timeRateLimit.triggered(rateLimitRandom); - _command = createCommand(); - if (_oneShot) ++_commandsCreated; - return _command; - } - } else if (_command->isFinished()) { - _command.reset(); // Allow the command to be sent again later. - } - return nullptr; - } - */ bool isAlreadyOnList() { return _addedToList; } diff --git a/core/modules/loader/LoaderMsg.cc b/core/modules/loader/LoaderMsg.cc index d81643a984..87f3a8deab 100644 --- a/core/modules/loader/LoaderMsg.cc +++ b/core/modules/loader/LoaderMsg.cc @@ -49,28 +49,28 @@ LoaderMsg::LoaderMsg(uint16_t kind, uint64_t id, std::string const& host, uint32 void LoaderMsg::parseFromData(BufferUdp& data) { - MsgElement::Ptr elem = MsgElement::retrieve(data, " 1parseFromData&&& "); + MsgElement::Ptr elem = MsgElement::retrieve(data, "1parseFromData"); msgKind = std::dynamic_pointer_cast(elem); if (msgKind == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for msgKind:" + MsgElement::getStringVal(elem)); } - elem = MsgElement::retrieve(data, " 2parseFromData&&& "); + elem = MsgElement::retrieve(data, "2parseFromData"); msgId = std::dynamic_pointer_cast(elem); if (msgId == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for msgId:" + MsgElement::getStringVal(elem)); } - elem = MsgElement::retrieve(data, " 3parseFromData&&& "); + elem = MsgElement::retrieve(data, "3parseFromData"); senderHost = std::dynamic_pointer_cast(elem); if (senderHost == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for senderHost:" + MsgElement::getStringVal(elem)); } - elem = MsgElement::retrieve(data, " 4parseFromData&&& "); + elem = MsgElement::retrieve(data, "4parseFromData"); senderPort = std::dynamic_pointer_cast(elem); if (senderPort == nullptr) { throw LoaderMsgErr(ERR_LOC, "LoaderMsg::parseMsg wrong type for senderPort:" + diff --git a/core/modules/loader/MasterServer.cc b/core/modules/loader/MasterServer.cc index eae9bed4f9..a93bca8509 100644 --- a/core/modules/loader/MasterServer.cc +++ b/core/modules/loader/MasterServer.cc @@ -73,7 +73,7 @@ BufferUdp::Ptr MasterServer::parseMsg(BufferUdp::Ptr const& data, case LoaderMsg::MSG_RECEIVED: // TODO: locate msg id in send messages and take appropriate action break; - case LoaderMsg::MAST_INFO_REQ: + case LoaderMsg::MAST_INFO_REQ: // &&& TODO delete this enum ??? // TODO: sendData = masterInfoRequest(inMsg, data, senderEndpoint); &&& break; case LoaderMsg::MAST_WORKER_LIST_REQ: diff --git a/core/modules/loader/MsgElement.cc b/core/modules/loader/MsgElement.cc index ae9c31ccc4..b8be45e8b9 100644 --- a/core/modules/loader/MsgElement.cc +++ b/core/modules/loader/MsgElement.cc @@ -45,7 +45,7 @@ bool MsgElement::retrieveType(BufferUdp &data, char& elemType) { } -MsgElement::Ptr MsgElement::retrieve(BufferUdp& data, std::string const& note, bool throwOnMissing) { // &&& delete note, maybe, the thrown error is pretty useful. +MsgElement::Ptr MsgElement::retrieve(BufferUdp& data, std::string const& note, bool throwOnMissing) { char elemT; if (not retrieveType(data, elemT)) { LOGS(_log, LOG_LVL_INFO, note << "no type retrieved "); diff --git a/core/modules/loader/MsgElement.h b/core/modules/loader/MsgElement.h index 903cfb9a2a..8682b20e20 100644 --- a/core/modules/loader/MsgElement.h +++ b/core/modules/loader/MsgElement.h @@ -300,7 +300,7 @@ class StringElement : public MsgElement { /// This the case with UDP, and boost asio async reads that return after X bytes read. template static std::unique_ptr protoParse(BufferUdp& data) { - StringElement::Ptr itemData = std::dynamic_pointer_cast(MsgElement::retrieve(data, "protoParse &&&")); + StringElement::Ptr itemData = std::dynamic_pointer_cast(MsgElement::retrieve(data, "protoParse")); if (itemData == nullptr) { return nullptr; } return itemData->protoParse(); } diff --git a/core/modules/loader/NetworkAddress.cc b/core/modules/loader/NetworkAddress.cc index cdb8a78144..db31494545 100644 --- a/core/modules/loader/NetworkAddress.cc +++ b/core/modules/loader/NetworkAddress.cc @@ -50,7 +50,7 @@ namespace loader { NetworkAddress::UPtr NetworkAddress::create(BufferUdp::Ptr const& bufData, int& tcpPort, std::string const& note) { - StringElement::Ptr data = std::dynamic_pointer_cast(MsgElement::retrieve(*bufData, " NetworkAddress::create&&& ")); + StringElement::Ptr data = std::dynamic_pointer_cast(MsgElement::retrieve(*bufData, "NetworkAddress::create")); if (data == nullptr) { LOGS(_log, LOG_LVL_WARN, "NetworkAddress::create data==nullptr " + note); diff --git a/core/modules/loader/ServerTcpBase.cc b/core/modules/loader/ServerTcpBase.cc index 36663ea3f8..81b468dcdb 100644 --- a/core/modules/loader/ServerTcpBase.cc +++ b/core/modules/loader/ServerTcpBase.cc @@ -68,9 +68,8 @@ void ServerTcpBase::_startAccept() { } -bool ServerTcpBase::writeData(AsioTcp::socket& socket, BufferUdp& data, std::string note) { +bool ServerTcpBase::writeData(AsioTcp::socket& socket, BufferUdp& data) { while (data.getBytesLeftToRead() > 0) { - LOGS(_log, LOG_LVL_INFO, note << " &&& write data bytesLeft=" << data.getBytesLeftToRead()); // Read cursor advances (manually in this case) as data is read from the buffer. auto res = boost::asio::write(socket, boost::asio::buffer(data.getReadCursor(), data.getBytesLeftToRead())); @@ -115,7 +114,7 @@ bool ServerTcpBase::testConnect() { kind.appendToData(data); UInt32Element bytes(1234); // dummy value bytes.appendToData(data); - writeData(socket, data, "tc"); + writeData(socket, data); // send back our name and left neighbor message. data.reset(); @@ -125,7 +124,7 @@ bool ServerTcpBase::testConnect() { ourName.appendToData(data); UInt64Element valuePairCount(testNewNodeValuePairCount); valuePairCount.appendToData(data); - writeData(socket, data, "tc"); + writeData(socket, data); // Get back left neighbor information auto msgKind = std::dynamic_pointer_cast( @@ -159,7 +158,7 @@ bool ServerTcpBase::testConnect() { data.reset(); UInt32Element verified(LoaderMsg::NEIGHBOR_VERIFIED); verified.appendToData(data); - writeData(socket, data, "tc"); + writeData(socket, data); boost::system::error_code ec; socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec); @@ -239,14 +238,14 @@ void TcpBaseConnection::_recvKind(const boost::system::error_code& ec, size_t by } // Fix the buffer with the information given. _buf.advanceWriteCursor(bytesTrans); - auto msgElem = MsgElement::retrieve(_buf, " 1TcpBaseConnection::_recvKind&&& "); // &&& should all tcp stuff be safe retrieve? + auto msgElem = MsgElement::retrieve(_buf, "1TcpBaseConnection::_recvKind"); auto msgKind = std::dynamic_pointer_cast(msgElem); if (msgKind == nullptr) { LOGS(_log, LOG_LVL_ERROR, "_recvKind unexpected type of msg"); _freeConnect(); return; } - msgElem = MsgElement::retrieve(_buf, " 2TcpBaseConnection::_recvKind&&& "); + msgElem = MsgElement::retrieve(_buf, "2TcpBaseConnection::_recvKind"); auto msgBytes = std::dynamic_pointer_cast(msgElem); if (msgBytes == nullptr) { LOGS(_log, LOG_LVL_ERROR, "_recvKind missing bytes"); @@ -310,11 +309,11 @@ void TcpBaseConnection::_handleTest2(const boost::system::error_code& ec, size_t } // Fix the buffer with the information given. _buf.advanceWriteCursor(bytesTrans); - auto msgElem = MsgElement::retrieve(_buf, " _handleTest2&&& "); + auto msgElem = MsgElement::retrieve(_buf, "_handleTest2_a"); auto msgKind = std::dynamic_pointer_cast(msgElem); - msgElem = MsgElement::retrieve(_buf, " _handleTest2&&& "); + msgElem = MsgElement::retrieve(_buf, "_handleTest2_b"); auto msgName = std::dynamic_pointer_cast(msgElem); - msgElem = MsgElement::retrieve(_buf, " _handleTest2&&& "); + msgElem = MsgElement::retrieve(_buf, " _handleTest2_c"); auto msgKeys = std::dynamic_pointer_cast(msgElem); // TODO move most of this to CentralWorker @@ -373,7 +372,7 @@ void TcpBaseConnection::_handleTest2c(const boost::system::error_code& ec, size_ } // Fix the buffer with the information given. _buf.advanceWriteCursor(bytesTrans); - auto msgElem = MsgElement::retrieve(_buf, " _handleTest2c&&& "); + auto msgElem = MsgElement::retrieve(_buf, "_handleTest2c"); if (msgElem == nullptr) { LOGS(_log, LOG_LVL_ERROR, "_handleTest2b Kind nullptr error"); _freeConnect(); @@ -466,7 +465,7 @@ void TcpBaseConnection::_handleImYourLNeighbor1(boost::system::error_code const& // Send the number of bytes in the message so TCP client knows how many bytes to read. bytesInMsg.appendToData(_buf); strWKI.appendToData(_buf); - ServerTcpBase::writeData(_socket, _buf, "shift _handleImYourLNeighbor1"); + ServerTcpBase::writeData(_socket, _buf); LOGS(_log, LOG_LVL_INFO, funcName << " done"); } catch (LoaderMsgErr const& ex) { LOGS(_log, LOG_LVL_ERROR, funcName << " Buffer failed " << ex.what()); @@ -540,7 +539,7 @@ void TcpBaseConnection::_handleShiftToRight1(boost::system::error_code const& ec _buf.reset(); UInt32Element elem(LoaderMsg::SHIFT_TO_RIGHT_RECEIVED); elem.appendToData(_buf); - ServerTcpBase::writeData(_socket, _buf, "_shift SHIFT_TO_RIGHT_RECEIVED"); + ServerTcpBase::writeData(_socket, _buf); LOGS(_log, LOG_LVL_INFO, funcName << " done dumpKeys " << _serverTcpBase->getCentralWorker()->dumpKeysStr(2)); } catch (LoaderMsgErr const& ex) { @@ -595,7 +594,7 @@ void TcpBaseConnection::_handleShiftFromRight1(boost::system::error_code const& } // Extract keysToShift from the protobuffer int keyShiftReq = protoKeyShiftReq->keystoshift(); - LOGS(_log, LOG_LVL_INFO, fName << " &&& keystoshift=" << keyShiftReq); + LOGS(_log, LOG_LVL_INFO, fName << " keystoshift=" << keyShiftReq); if (keyShiftReq < 1) { throw LoaderMsgErr(ERR_LOC, " KeyShiftRequest for < 1 key"); } @@ -610,11 +609,11 @@ void TcpBaseConnection::_handleShiftFromRight1(boost::system::error_code const& LOGS(_log, LOG_LVL_ERROR, errMsg); // This will keep getting thrown and never work, but at least it will show up // in the logs. - // &&& create new exception, catch it and halve the number of keys to shift ??? + // TODO create new exception, catch it and halve the number of keys to shift ??? throw LoaderMsgErr(ERR_LOC, errMsg); } keyList->appendToData(data); - ServerTcpBase::writeData(_socket, data, std::string(" &&& _handleShiftFromRight1 " + data.dumpStr(false))); + ServerTcpBase::writeData(_socket, data); // Wait for the SHIFT_FROM_RIGHT_KEYS_RECEIVED response back. _buf.reset(); diff --git a/core/modules/loader/ServerTcpBase.h b/core/modules/loader/ServerTcpBase.h index 6be3acb9d0..dd0f1b2901 100644 --- a/core/modules/loader/ServerTcpBase.h +++ b/core/modules/loader/ServerTcpBase.h @@ -147,7 +147,7 @@ class ServerTcpBase { CentralWorker* getCentralWorker() const { return _centralWorker; } - static bool writeData(AsioTcp::socket& socket, BufferUdp& data, std::string note); // &&& delete note + static bool writeData(AsioTcp::socket& socket, BufferUdp& data); private: void _startAccept(); diff --git a/core/modules/loader/ServerUdpBase.cc b/core/modules/loader/ServerUdpBase.cc index 96f806a318..0eab1316e8 100644 --- a/core/modules/loader/ServerUdpBase.cc +++ b/core/modules/loader/ServerUdpBase.cc @@ -74,7 +74,6 @@ void ServerUdpBase::_receiveCallback(boost::system::error_code const& error, siz void ServerUdpBase::_sendResponse() { - LOGS(_log, LOG_LVL_INFO, "&&&udp:_sendResponse"); _socket.async_send_to(boost::asio::buffer(_sendData->getReadCursor(), _sendData->getBytesLeftToRead()), _senderEndpoint, [this](boost::system::error_code const& ec, std::size_t bytes_transferred) { @@ -95,9 +94,9 @@ void ServerUdpBase::sendBufferTo(std::string const& hostName, int port, BufferUd dest = resolve(hostName, port); // may throw boost::system::system_error _resolvMap[addr] = dest; } else { + // TODO if the entry is old, call resolv to freshen. dest = iter->second; } - // &&& ip::udp::endpoint dest = resolve(hostName, port); _socket.send_to(buffer(sendBuf.getReadCursor(), sendBuf.getBytesLeftToRead()), dest); } catch (boost::system::system_error const& e) { LOGS(_log, LOG_LVL_ERROR, "ServerUdpBase::sendBufferTo boost system_error=" << e.what() << @@ -124,7 +123,6 @@ void ServerUdpBase::_sendCallback(const boost::system::error_code& error, size_t } void ServerUdpBase::_receivePrepare() { - LOGS(_log, LOG_LVL_INFO, "&&&udp:_receivePrepare"); _data = std::make_shared(); // New buffer for next response, the old buffer // may still be in use elsewhere. _socket.async_receive_from(boost::asio::buffer(_data->getWriteCursor(), @@ -141,10 +139,9 @@ boost::asio::ip::udp::endpoint ServerUdpBase::resolve(std::string const& hostNam using namespace boost::asio; // Resolver returns an iterator. This uses the first item only. // Failure to resolve anything throws a boost::system::error. - LOGS(_log, LOG_LVL_INFO, "&&&udp:resolve a"); + // There's a 5 second timeout, which is extremely painful and frequent. ip::udp::endpoint dest = *_resolver.resolve(ip::udp::v4(), hostName, std::to_string(port)).begin(); - LOGS(_log, LOG_LVL_INFO, "&&&udp:resolve b"); return dest; } diff --git a/core/modules/loader/Util.cc b/core/modules/loader/Util.cc index dd01b61d87..4a906be992 100644 --- a/core/modules/loader/Util.cc +++ b/core/modules/loader/Util.cc @@ -77,7 +77,7 @@ std::vector split(std::string const& in, std::function } -/// Test to be put in unit tests &&& +/// TODO Test to be put in unit tests bool splitTest() { auto out = split("www.github.com", [](char c) {return c == '.';}); auto test = (out[0] == "www" && out[1] == "github" && out[2] == "com"); diff --git a/core/modules/loader/WWorkerList.cc b/core/modules/loader/WWorkerList.cc index f60e25b7c0..3b82c6b042 100644 --- a/core/modules/loader/WWorkerList.cc +++ b/core/modules/loader/WWorkerList.cc @@ -110,7 +110,7 @@ bool WWorkerList::workerListReceive(BufferUdp::Ptr const& data) { std::string const funcName("WWorkerList::workerListReceive"); LOGS(_log, LOG_LVL_INFO, funcName << " data=" << data->dumpStr()); // Open the data protobuffer and add it to our list. - StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " WWorkerList::workerListReceive&&& ")); + StringElement::Ptr sData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "WWorkerList::workerListReceive")); if (sData == nullptr) { LOGS(_log, LOG_LVL_WARN, funcName << " Failed to parse list"); return false; @@ -142,7 +142,7 @@ bool WWorkerList::workerListReceive(BufferUdp::Ptr const& data) { strNames += std::to_string(wId) + ","; item->addDoListItems(_central); } - // TODO: Should this call updateEntry() to fill in the information for the worker? &&& + // TODO: Should this call updateEntry() to fill in the information for the worker? } sizeChange = _wIdMap.size() - initialSize; if (sizeChange > 0) { diff --git a/core/modules/loader/WWorkerList.h b/core/modules/loader/WWorkerList.h index 6bb14b0f6e..bcd9b2bcf9 100644 --- a/core/modules/loader/WWorkerList.h +++ b/core/modules/loader/WWorkerList.h @@ -41,7 +41,6 @@ namespace qserv { namespace loader { class CentralFollower; -//class CentralWorker; // &&& class LoaderMsg; diff --git a/core/modules/loader/WorkerServer.cc b/core/modules/loader/WorkerServer.cc index 6c8325b9c5..92923fbd67 100644 --- a/core/modules/loader/WorkerServer.cc +++ b/core/modules/loader/WorkerServer.cc @@ -137,7 +137,7 @@ void WorkerServer::_msgRecieved(LoaderMsg const& inMsg, BufferUdp::Ptr const& da bool success = true; // This is only really expected for parsing errors. Most responses to // requests come in as normal messages. - StringElement::Ptr seData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, " WorkerServer::_msgRecieved&&& ")); + StringElement::Ptr seData = std::dynamic_pointer_cast(MsgElement::retrieve(*data, "WorkerServer::_msgRecieved")); if (seData == nullptr) { success = false; } diff --git a/core/modules/loader/appClientNum.cc b/core/modules/loader/appClientNum.cc index 6498ff3f6a..0afdc23f9a 100644 --- a/core/modules/loader/appClientNum.cc +++ b/core/modules/loader/appClientNum.cc @@ -223,7 +223,6 @@ int main(int argc, char* argv[]) { if (waitForKeysCount > maxWaitCount) maxWaitCount = waitForKeysCount; while (!keyInsertListClean(kList, successCount, failedCount) && count < waitForKeysCount) { LOGS(_log, LOG_LVL_INFO, "waiting for inserts to finish count=" << count); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(1); ++count; } @@ -272,7 +271,6 @@ int main(int argc, char* argv[]) { // About 1 second per 1000 keys) while (!keyLookupListClean(kList, successCount, failedCount) && count < waitForKeysCount) { LOGS(_log, LOG_LVL_INFO, "waiting for lookups to finish count=" << count); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(1); ++count; } @@ -303,7 +301,7 @@ int main(int argc, char* argv[]) { std::chrono::duration_cast(lookupEnd - insertEnd).count()); ioService.stop(); LOGS(_log, LOG_LVL_WARN, "client DONE"); - while(true) sleep(100); // &&& keep kubernetes from restarting this + while(true) sleep(100); // prevent kubernetes from restarting this TODO: make this program run as a job. return 0; } diff --git a/core/modules/loader/appMaster.cc b/core/modules/loader/appMaster.cc index a5242eb760..f141f70b59 100644 --- a/core/modules/loader/appMaster.cc +++ b/core/modules/loader/appMaster.cc @@ -47,7 +47,6 @@ int main(int argc, char* argv[]) { } LOGS(_log, LOG_LVL_INFO, "masterCfg=" << mCfgFile); - //std::string const ourHost = boost::asio::ip::host_name(); &&& std::string const ourHost = getOurHostName(0); LOGS(_log, LOG_LVL_INFO, "ourHost=" << ourHost); boost::asio::io_service ioService; @@ -64,7 +63,6 @@ int main(int argc, char* argv[]) { bool loop = true; while(loop) { - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(10); } ioService.stop(); diff --git a/core/modules/loader/appTest.cc b/core/modules/loader/appTest.cc index 00d96bc2de..29dc27a143 100644 --- a/core/modules/loader/appTest.cc +++ b/core/modules/loader/appTest.cc @@ -220,7 +220,6 @@ int main(int argc, char* argv[]) { server.testConnect(); LOGS(_log, LOG_LVL_INFO, "ServTcpBase e"); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(5); } catch (std::exception const& e) { @@ -373,7 +372,6 @@ int main(int argc, char* argv[]) { auto originalErrCount = wCentral1.getErrCount(); LOGS(_log, LOG_LVL_INFO, "1TSTAGE testSendBadMessage start"); wCentral1.testSendBadMessage(); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(2); // TODO handshaking instead of sleep if (originalErrCount == wCentral1.getErrCount()) { @@ -383,7 +381,6 @@ int main(int argc, char* argv[]) { } LOGS(_log, LOG_LVL_INFO, "sleeping"); - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(5); // TODO change to 20 second timeout with a check every 0.1 seconds. // The workers should agree on the worker list, and it should have 2 elements. if (wCentral1.getWorkerList()->getIdMapSize() == 0) { diff --git a/core/modules/loader/appWorker.cc b/core/modules/loader/appWorker.cc index 0528fa0bfd..bd2300d7d2 100644 --- a/core/modules/loader/appWorker.cc +++ b/core/modules/loader/appWorker.cc @@ -24,9 +24,6 @@ // System headers #include #include -//#include // &&& -//#include // &&& -//#include // &&& // qserv headers #include "loader/CentralWorker.h" @@ -53,169 +50,7 @@ int main(int argc, char* argv[]) { boost::asio::io_service ioService; boost::asio::io_context ioContext; - /* &&& - if (!splitTest()) { - LOGS(_log, LOG_LVL_ERROR, "split test failed! &&&"); - exit(1); - } - - - std::string const ourHost = boost::asio::ip::host_name(); - std::string ourHostIp; - LOGS(_log, LOG_LVL_INFO, "ourHost=" << ourHost); - boost::asio::io_service ioService; - boost::asio::io_context ioContext; - - { - char hostbuffer[256]; - char *IPbuffer; - struct hostent *host_entry; - int hostname; - hostname = gethostname(hostbuffer, sizeof(hostbuffer)); - - // To retrieve host information - //host_entry = gethostbyname(hostbuffer); - host_entry = gethostbyname(ourHost.c_str()); - - // To convert an Internet network - // address into ASCII string - IPbuffer = inet_ntoa(*((struct in_addr*) // &&& replace with inet_ntop - host_entry->h_addr_list[0])); - LOGS(_log, LOG_LVL_ERROR, "hostname=" << hostname << " buf=" << hostbuffer); - LOGS(_log, LOG_LVL_ERROR, "host_entry=" << host_entry); - LOGS(_log, LOG_LVL_ERROR, "IPbuffer=" << IPbuffer); - ourHostIp = IPbuffer; - - //gethostbyaddr(); &&& - hostent *he; - in_addr ipv4addr; - //in6_addr ipv6addr; - - inet_pton(AF_INET, ourHostIp.c_str(), &ipv4addr); - he = gethostbyaddr(&ipv4addr, sizeof ipv4addr, AF_INET); - if (he == nullptr) { - printf("he == nullptr\n"); - } else { - printf("Host name: %s\n", he->h_name); - LOGS(_log, LOG_LVL_INFO, " host name=" << he->h_name); // *** this gets the correct full name - - for(int i=0; he->h_aliases[i] != NULL; ++i) { - LOGS(_log, LOG_LVL_INFO, std::to_string(i) << " host=" << he->h_aliases[i]); - } - } - - //inet_pton(AF_INET6, "2001:db8:63b3:1::beef", &ipv6addr); - //he = gethostbyaddr(&ipv6addr, sizeof ipv6addr, AF_INET6); - //printf("Host name: %s\n", he->h_name); - - } - - - { - addrinfo hints, *info, *p; - int gai_result; - - char hostname[1024]; - hostname[1023] = '\0'; - gethostname(hostname, 1023); - //std::string hostname("127.0.0.1"); - printf("hostname0: %s\n", hostname); - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - //hints.ai_socktype = SOCK_STREAM; - hints.ai_flags = AI_CANONNAME; - - //if ((gai_result = getaddrinfo(hostname, "http", &hints, &info)) != 0) { - if ((gai_result = getaddrinfo(hostname, NULL, &hints, &info)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(gai_result)); - exit(1); - } - - for(p = info; p != NULL; p = p->ai_next) { - printf("hostname1: %s\n", p->ai_canonname); // ** correct - LOGS(_log, LOG_LVL_INFO, "*a*hostname1: " << p->ai_canonname); - } - - freeaddrinfo(info); - - } - - { - addrinfo hints, *info, *p; - int gai_result; - - char hostname[1024]; - hostname[1023] = '\0'; - gethostname(hostname, 1023); - //std::string hostname("127.0.0.1"); - printf("hostname0: %s\n", hostname); - - memset(&hints, 0, sizeof hints); - hints.ai_family = AF_UNSPEC; - //hints.ai_socktype = SOCK_STREAM; - //hints.ai_flags = AI_CANONNAME; - - //if ((gai_result = getaddrinfo(hostname, "http", &hints, &info)) != 0) { - if ((gai_result = getaddrinfo(hostname, NULL, &hints, &info)) != 0) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(gai_result)); - exit(1); - } - - for(p = info; p != NULL; p = p->ai_next) { - printf("hostname1: %s\n", p->ai_canonname); - LOGS(_log, LOG_LVL_INFO, "*b*hostname1: " << p->ai_canonname); - } - - freeaddrinfo(info); - - } - - { - addrinfo hints; - addrinfo *infoptr; - hints.ai_family = AF_INET; // AF_INET means IPv4 only addresses - - //int result = getaddrinfo("www.bbc.com", NULL, &hints, &infoptr); - //int result = getaddrinfo("127.0.0.1", NULL, &hints, &infoptr); // results in hostname "localhost" - int result = getaddrinfo(ourHostIp.c_str(), NULL, &hints, &infoptr); - if (result) { - fprintf(stderr, "getaddrinfo: %s\n", gai_strerror(result)); - exit(1); - } - - - struct addrinfo *p; - char host[1024]; - - for (p = infoptr; p != NULL; p = p->ai_next) { - //getnameinfo(p->ai_addr, p->ai_addrlen, host, sizeof (host), NULL, 0, NI_NUMERICHOST); - getnameinfo(p->ai_addr, p->ai_addrlen, host, sizeof (host), NULL, 0, 0); - LOGS(_log, LOG_LVL_INFO, "getnameinfo host=" << host << " addr=" << p->ai_addr); // ** correct - } - - freeaddrinfo(infoptr); - - - - } - - - auto hostN2 = getOurHostName(2); - LOGS(_log, LOG_LVL_INFO, "hostN2=" << hostN2); - auto hostN1 = getOurHostName(1); - LOGS(_log, LOG_LVL_INFO, "hostN1=" << hostN1); - auto hostN0 = getOurHostName(0); - LOGS(_log, LOG_LVL_INFO, "hostN0=" << hostN0); - auto hostN10 = getOurHostName(10); - LOGS(_log, LOG_LVL_INFO, "hostN10=" << hostN10); - auto hostN4 = getOurHostName(4); - LOGS(_log, LOG_LVL_INFO, "hostN4=" << hostN4); - - //exit(1); -*/ - - std::string ourHostName = getOurHostName(0); // change to return shortest name that resolves. &&& + std::string ourHostName = getOurHostName(0); LOGS(_log, LOG_LVL_INFO, "ourHostName=" << ourHostName); WorkerConfig wCfg(wCfgFile); @@ -230,7 +65,6 @@ int main(int argc, char* argv[]) { bool loop = true; while(loop) { - LOGS(_log, LOG_LVL_INFO, "&&& SLEEP"); sleep(10); } ioService.stop(); // this doesn't seem to work cleanly