Skip to content

Commit

Permalink
Added missing files.
Browse files Browse the repository at this point in the history
  • Loading branch information
jgates108 committed Dec 13, 2019
1 parent 91805e2 commit fb83e32
Show file tree
Hide file tree
Showing 31 changed files with 154 additions and 375 deletions.
18 changes: 18 additions & 0 deletions admin/templates/configuration/etc/log4cxx.index_master.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
#
# Configuration file for log4cxx
# can be used for unit test
# by launching next command before unit tests:
# export LSST_LOG_CONFIG=$HOME/.lsst/log4cxx.unittest.properties
#

#log4j.rootLogger=INFO, CONSOLE
log4j.rootLogger=DEBUG, CONSOLE
#log4j.rootLogger=WARN, CONSOLE

log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
#log4j.appender.CONSOLE.layout.ConversionPattern=[%d{yyyy-MM-ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n
log4j.appender.CONSOLE.layout.ConversionPattern=[%d{ddTHH:mm:ss.SSSZ}] [%t] %-5p %c{2} (%F:%L) - %m%n

# Tune log at the module level
#log4j.logger.lsst.qserv.util=DEBUG
8 changes: 4 additions & 4 deletions core/modules/loader/BufferUdp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket,

// If there's something in the buffer already, get it and return.
// This can happen when the previous read of socket read multiple elements.
MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket&&&" + note);
MsgElement::Ptr msgElem = _safeRetrieve("1readFromSocket" + note);
if (msgElem != nullptr) {
return msgElem;
}
Expand All @@ -69,7 +69,7 @@ MsgElement::Ptr BufferUdp::readFromSocket(boost::asio::ip::tcp::socket& socket,

/// Try to retrieve an element (there's no guarantee that an entire element got read in a single read.
// Store original cursor positions so they can be restored if the read fails.
msgElem = _safeRetrieve("2readFromSocket&&&" + note);
msgElem = _safeRetrieve("2readFromSocket" + note);
if (msgElem != nullptr) {
return msgElem;
}
Expand Down Expand Up @@ -117,11 +117,11 @@ void BufferUdp::advanceReadCursor(size_t len) {
}


std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve(std::string const& note) { // &&& delete note, maybe
std::shared_ptr<MsgElement> BufferUdp::_safeRetrieve(std::string const& note) {
auto wCursorOriginal = _wCursor;
auto rCursorOriginal = _rCursor;
// throwOnMissing=false since missing data is possible with TCP.
MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve &&&", false);
MsgElement::Ptr msgElem = MsgElement::retrieve(*this, note + " _safeRetrieve", false);
if (msgElem != nullptr) {
return msgElem;
} else {
Expand Down
2 changes: 1 addition & 1 deletion core/modules/loader/BufferUdp.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class MsgElement;

/// A buffer for reading and writing. Nothing can be read from the buffer until
/// something has been written to it.
/// TODO: rename BufferUdp is not really accurate anymore. &&&
/// TODO: rename BufferUdp is not really accurate anymore.
class BufferUdp {
public:
using Ptr = std::shared_ptr<BufferUdp>;
Expand Down
1 change: 0 additions & 1 deletion core/modules/loader/Central.cc
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,6 @@ void Central::_checkDoList() {
while(_loop) {
// Run and then sleep for a second. TODO A more advanced timer should be used
doList->checkList();
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
usleep(_loopSleepTime);
}
}
Expand Down
32 changes: 6 additions & 26 deletions core/modules/loader/CentralClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,36 +49,18 @@ namespace lsst {
namespace qserv {
namespace loader {

/* &&&
CentralClient::CentralClient(boost::asio::io_service& ioService_,
std::string const& hostName, ClientConfig const& cfg)
: Central(ioService_, cfg.getMasterHost(), cfg.getMasterPortUdp(), cfg.getThreadPoolSize(), cfg.getLoopSleepTime(), cfg.getIOThreads()),
_hostName(hostName), _udpPort(cfg.getClientPortUdp()),
_defWorkerHost(cfg.getDefWorkerHost()),
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
_doListMaxLookups(cfg.getMaxLookups()),
_doListMaxInserts(cfg.getMaxInserts()),
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()) {
}
*/

CentralClient::CentralClient(boost::asio::io_service& ioService_,
std::string const& hostName, ClientConfig const& cfg)
: CentralFollower(ioService_, hostName, cfg.getMasterHost(), cfg.getMasterPortUdp(),
cfg.getThreadPoolSize(),cfg.getLoopSleepTime(), cfg.getIOThreads(), cfg.getClientPortUdp()),
// &&& _hostName(hostName),
// &&& _udpPort(cfg.getClientPortUdp()),
_defWorkerHost(cfg.getDefWorkerHost()),
_defWorkerPortUdp(cfg.getDefWorkerPortUdp()),
_doListMaxLookups(cfg.getMaxLookups()),
_doListMaxInserts(cfg.getMaxInserts()),
_maxRequestSleepTime(cfg.getMaxRequestSleepTime()) {
}

/* &&&
void CentralClient::start() {
_server = std::make_shared<ClientServer>(ioService, _hostName, _udpPort, this);
}
*/

void CentralClient::startService() {
_server = std::make_shared<ClientServer>(ioService, _hostName, _udpPort, this);
Expand All @@ -92,7 +74,7 @@ CentralClient::~CentralClient() {
void CentralClient::handleKeyLookup(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyLookup");

auto const sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup&&& "));
auto const sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyLookup "));
if (sData == nullptr) {
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyLookup Failed to parse list");
return;
Expand Down Expand Up @@ -128,14 +110,14 @@ void CentralClient::_handleKeyLookup(LoaderMsg const& inMsg, std::unique_ptr<pro
_waitingKeyLookupMap.erase(iter);
}
keyLookupOneShot->keyInfoComplete(key, chunkInfo.chunk, chunkInfo.subchunk, protoData->success());
LOGS(_log, LOG_LVL_WARN, "&&&INFO Successful KEY_LOOKUP key=" << key << " " << chunkInfo);
LOGS(_log, LOG_LVL_INFO, "Successful KEY_LOOKUP key=" << key << " " << chunkInfo);
}


void CentralClient::handleKeyInsertComplete(LoaderMsg const& inMsg, BufferUdp::Ptr const& data) {
LOGS(_log, LOG_LVL_DEBUG, "CentralClient::handleKeyInsertComplete");

auto sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete&&& "));
auto sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralClient::handleKeyInsertComplete "));
if (sData == nullptr) {
LOGS(_log, LOG_LVL_WARN, "CentralClient::handleKeyInsertComplete Failed to retrieve element");
return;
Expand Down Expand Up @@ -173,7 +155,7 @@ void CentralClient::_handleKeyInsertComplete(LoaderMsg const& inMsg, std::unique
mapSize = _waitingKeyInsertMap.size();
}
keyInsertOneShot->keyInsertComplete();
LOGS(_log, LOG_LVL_WARN, "&&&INFO Successful KEY_INSERT_COMPLETE key=" << key << " " << chunkInfo <<
LOGS(_log, LOG_LVL_INFO, "Successful KEY_INSERT_COMPLETE key=" << key << " " << chunkInfo <<
" mapSize=" << mapSize);
}

Expand All @@ -199,11 +181,10 @@ KeyInfoData::Ptr CentralClient::keyInsertReq(CompositeKey const& key, int chunk,
size_t sz = _waitingKeyInsertMap.size();
lck.unlock();
if (loopCount % 100 == 0) {
LOGS(_log, LOG_LVL_INFO, "keyInsertReq waiting key=" << key <<
LOGS(_log, LOG_LVL_DEBUG, "keyInsertReq waiting key=" << key <<
"size=" << sz << " loopCount=" << loopCount);
}
// Let the CPU do something else while waiting for some requests to finish.
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
usleep(_maxRequestSleepTime);
++loopCount;
lck.lock();
Expand Down Expand Up @@ -294,7 +275,6 @@ KeyInfoData::Ptr CentralClient::keyLookupReq(CompositeKey const& key) {
"size=" << sz << " loopCount=" << loopCount);
}
// Let the CPU do something else while waiting for some requests to finish.
LOGS(_log, LOG_LVL_INFO, "&&& SLEEP");
usleep(_maxRequestSleepTime);
sleptForMicroSec += _maxRequestSleepTime;
++loopCount;
Expand Down
10 changes: 0 additions & 10 deletions core/modules/loader/CentralClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,20 +67,17 @@ class KeyInfoData : public util::Tracker {
/// so replies to its request can be sent directly back to it.
/// 'Central' provides access to the master and a DoList for handling requests.
/// TODO Maybe base this on CentralWorker or have a common base class?
// &&& class CentralClient : public Central {
class CentralClient : public CentralFollower {
public:
/// The client needs to know the master's IP and its own IP.
CentralClient(boost::asio::io_service& ioService_,
std::string const& hostName, ClientConfig const& cfg);

// &&&void start();
void startService() override;

~CentralClient() override;

std::string const& getHostName() const { return _hostName; }
// &&& int getUdpPort() const { return _udpPort; }
int getTcpPort() const { return 0; } ///< No tcp port at this time.

/// @return the default worker's host name.
Expand Down Expand Up @@ -161,17 +158,10 @@ class CentralClient : public CentralFollower {
CentralClient* central;
};


/// TODO The worker IP becomes default worker as it should be able to get
/// that information from the master in the future. DM-16555
// &&& const std::string _hostName;
// &&& const int _udpPort;

// If const is removed, these will need mutex protection.
const std::string _defWorkerHost; ///< Default worker host
const int _defWorkerPortUdp; ///< Default worker UDP port


size_t const _doListMaxLookups = 1000; ///< Maximum number of concurrent lookups in DoList, set by config
size_t const _doListMaxInserts = 1000; ///< Maximum number of concurrent inserts in DoList, set by config
/// Time to sleep between checking requests when at max length, set by config
Expand Down
2 changes: 1 addition & 1 deletion core/modules/loader/CentralFollower.cc
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ void CentralFollower::startMonitoring() {

bool CentralFollower::workerInfoReceive(BufferUdp::Ptr const& data) {
// Open the data protobuffer and add it to our list.
StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, " CentralFollower::workerInfoReceive&&& "));
StringElement::Ptr sData = std::dynamic_pointer_cast<StringElement>(MsgElement::retrieve(*data, "CentralFollower::workerInfoReceive"));
if (sData == nullptr) {
LOGS(_log, LOG_LVL_WARN, "CentralFollower::workerInfoRecieve Failed to parse list");
return false;
Expand Down
36 changes: 4 additions & 32 deletions core/modules/loader/CentralFollower.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,9 +47,6 @@ class WorkerKeysInfo;

namespace loader {


// &&& class CentralWorkerDoListItem;

/// This class is used a base central class for servers that need to get
/// lists of of worker from the master.
/// CentralFollower provides no service on its own. The derived classes must:
Expand All @@ -61,20 +58,6 @@ class CentralFollower : public Central {
public:
typedef std::pair<CompositeKey, ChunkSubchunk> CompKeyPair;

/* &&&
enum SocketStatus {
VOID0 = 0,
STARTING1,
ESTABLISHED2
};
enum Direction {
NONE0 = 0,
TORIGHT1,
FROMRIGHT2
};
*/

CentralFollower(boost::asio::io_service& ioService,
std::string const& hostName_, std::string const& masterHost, int masterPortUdp,
int threadPoolSize, int loopSleepTime, int ioThreads, int fPortUdp)
Expand Down Expand Up @@ -107,34 +90,23 @@ class CentralFollower : public Central {

std::string getOurLogId() const override { return "CentralFollower"; }

// &&& friend CentralWorkerDoListItem;

protected: // &&& make some or all private again

/// Real workers need to check this for initial ranges.
virtual void checkForThisWorkerValues(uint32_t wId, std::string const& ip,
int portUdp, int portTcp, KeyRange& strRange) {};
/// &&& This function is needed to fill the map. On real workers, CentralWorker
/// needs to do additional work to set its id.
/// This function is needed to fill the map. On real workers, CentralWorker
/// needs to do additional work to set its own id.
void _workerInfoReceive(std::unique_ptr<proto::WorkerListItem>& protoBuf);

/// See workerWorkerKeysInfoReq(...)
// &&& void _workerWorkerKeysInfoReq(LoaderMsg const& inMsg);

const std::string _hostName;
const int _udpPort;
// &&& const int _tcpPort;

WWorkerList::Ptr _wWorkerList{std::make_shared<WWorkerList>(this)}; ///< Maps of workers.
/// Maps of workers with their key ranges.
WWorkerList::Ptr _wWorkerList{std::make_shared<WWorkerList>(this)};

/// The DoListItem that makes sure _monitor() is run. &&& needs to ask master for worker map occasionally
// &&& replace with item to refresh _wWorkerList (see CentralWorker::_startMonitoring)// std::shared_ptr<CentralWorkerDoListItem> _centralWorkerDoListItem;
};





}}} // namespace lsst::qserv::loader

#endif // LSST_QSERV_LOADER_CENTRAL_FOLLOWER_H
Expand Down
5 changes: 0 additions & 5 deletions core/modules/loader/CentralMaster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,6 @@ namespace lsst {
namespace qserv {
namespace loader {

/* &&&
void CentralMaster::start() {
_server = std::make_shared<MasterServer>(ioService, getMasterHostName(), getMasterPort(), this);
}
*/
void CentralMaster::startService() {
_server = std::make_shared<MasterServer>(ioService, getMasterHostName(), getMasterPort(), this);
}
Expand Down
1 change: 0 additions & 1 deletion core/modules/loader/CentralMaster.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,6 @@ class CentralMaster : public Central {
_maxKeysPerWorker(cfg.getMaxKeysPerWorker()) {}

/// Open the UDP port. This can throw boost::system::system_error.
// &&&void start();
void startService() override;

~CentralMaster() override { _mWorkerList.reset(); }
Expand Down
Loading

0 comments on commit fb83e32

Please sign in to comment.