Skip to content

Commit

Permalink
Refactoring WDT code to remove reference global options and also intr…
Browse files Browse the repository at this point in the history
…oducing thread context

Summary:
1) Removing reference to WdtOptions::get
2) Putting all thread local information in ThreadCtx structure
3) Removing SocketUtils and moving all the code to WdtSocket

Only fb progress reporter is now directly using FbWdtOptions::get().

Reviewed By: ldemailly

Differential Revision: D2841825

fb-gh-sync-id: 9948b5b7e79ca7ffb263d7e392d72e5b05c33a21
  • Loading branch information
uddipta authored and ldemailly committed Jan 22, 2016
1 parent e43f21b commit 89214ec
Show file tree
Hide file tree
Showing 54 changed files with 980 additions and 946 deletions.
15 changes: 9 additions & 6 deletions ByteSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
*/
#pragma once

#include <wdt/Reporting.h>
#include <wdt/util/CommonImpl.h>
#include <wdt/Protocol.h>

#include <string>
Expand Down Expand Up @@ -63,9 +63,6 @@ class ByteSource {
/// @return number of bytes in this source
virtual int64_t getSize() const = 0;

/// @return size of buffer
virtual int64_t getBufferSize() const = 0;

/// @return offset from which to start reading
virtual int64_t getOffset() const = 0;

Expand Down Expand Up @@ -97,8 +94,14 @@ class ByteSource {
/// Advances ByteSource offset by numBytes
virtual void advanceOffset(int64_t numBytes) = 0;

/// open the source for reading
virtual ErrorCode open() = 0;
/**
* open the source for reading
*
* @param threadCtx context of the calling thread
*
* @return error code
*/
virtual ErrorCode open(ThreadCtx *threadCtx) = 0;

/// close the source for reading
virtual void close() = 0;
Expand Down
12 changes: 5 additions & 7 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,11 @@ set(CMAKE_RUNTIME_OUTPUT_DIRECTORY "_bin/wdt")

# Check that we have the Folly source tree
set(FOLLY_SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}/../folly" CACHE path
"Folly source tree (folly/ThreadLocal.h should be reachable from there")
"Folly source tree (folly/Conv.h should be reachable from there")
# Check for folly - TODO: this doesn't work well for relative paths
# (because of relative to build dir vs relative to source tree for -I)
if(NOT EXISTS "${FOLLY_SOURCE_DIR}/folly/ThreadLocal.h")
MESSAGE(FATAL_ERROR "${FOLLY_SOURCE_DIR}/folly/ThreadLocal.h not found
if(NOT EXISTS "${FOLLY_SOURCE_DIR}/folly/Conv.h")
MESSAGE(FATAL_ERROR "${FOLLY_SOURCE_DIR}/folly/Conv.h not found
Fix using:
(in a sister directory of the wdt source tree - same level:)
git clone https://github.com/facebook/folly.git
Expand All @@ -68,7 +68,6 @@ set (FOLLY_CPP_SRC
"${FOLLY_SOURCE_DIR}/folly/Conv.cpp"
"${FOLLY_SOURCE_DIR}/folly/Demangle.cpp"
"${FOLLY_SOURCE_DIR}/folly/Checksum.cpp"
"${FOLLY_SOURCE_DIR}/folly/detail/ThreadLocalDetail.cpp"
)

# WDT's library proper - comes from: ls -1 *.cpp | grep -iv test
Expand All @@ -91,14 +90,14 @@ util/ThreadTransferHistory.cpp
SenderThread.cpp
Sender.cpp
util/ServerSocket.cpp
util/SocketUtils.cpp
Throttler.cpp
WdtOptions.cpp
util/FileWriter.cpp
util/TransferLogManager.cpp
util/SerializationUtil.cpp
WdtBase.cpp
WdtResourceController.cpp
util/CommonImpl.cpp
)
add_library(wdtlib
util/WdtFlags.cpp
Expand Down Expand Up @@ -225,8 +224,7 @@ if (BUILD_TESTING)

# Extra code that we use in tests
add_library(wdt4tests_min
"${FOLLY_SOURCE_DIR}/folly/FileUtil.cpp" # used by Random used by tests
"${FOLLY_SOURCE_DIR}/folly/Random.cpp" # used indirectly by tests
test/TestCommon.cpp
)

include(ExternalProject)
Expand Down
17 changes: 5 additions & 12 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
#include <wdt/Receiver.h>
#include <wdt/util/FileWriter.h>
#include <wdt/util/ServerSocket.h>
#include <wdt/util/SocketUtils.h>
#include <wdt/util/EncryptionUtils.h>

#include <folly/Conv.h>
Expand Down Expand Up @@ -44,7 +43,8 @@ std::vector<Checkpoint> Receiver::getNewCheckpoints(int startIndex) {
return checkpoints;
}

Receiver::Receiver(const WdtTransferRequest &transferRequest) {
Receiver::Receiver(const WdtTransferRequest &transferRequest)
: transferLogManager_(options_) {
LOG(INFO) << "WDT Receiver " << Protocol::getFullVersion();
transferRequest_ = transferRequest;
}
Expand All @@ -55,7 +55,7 @@ Receiver::Receiver(int port, int numSockets, const std::string &destDir)

void Receiver::traverseDestinationDir(
std::vector<FileChunksInfo> &fileChunksInfo) {
DirectorySourceQueue dirQueue(destDir_, &abortCheckerCallback_);
DirectorySourceQueue dirQueue(options_, destDir_, &abortCheckerCallback_);
dirQueue.buildQueueSynchronously();
auto &discoveredFilesInfo = dirQueue.getDiscoveredFilesMetaData();
for (auto &fileInfo : discoveredFilesInfo) {
Expand Down Expand Up @@ -114,20 +114,13 @@ const WdtTransferRequest &Receiver::init() {
<< transferRequest_.getLogSafeString();
return transferRequest_;
}
checkAndUpdateBufferSize();
backlog_ = options_.backlog;
bufferSize_ = options_.buffer_size;
if (getTransferId().empty()) {
setTransferId(WdtBase::generateTransferId());
}
setProtocolVersion(transferRequest_.protocolVersion);
setDir(transferRequest_.directory);
if (bufferSize_ < Protocol::kMaxHeader) {
// round up to even k
bufferSize_ = 2 * 1024 * ((Protocol::kMaxHeader - 1) / (2 * 1024) + 1);
LOG(INFO) << "Specified -buffer_size " << options_.buffer_size
<< " smaller than " << Protocol::kMaxHeader << " using "
<< bufferSize_ << " instead";
}
auto numThreads = transferRequest_.ports.size();
// This creates the destination directory (which is needed for transferLogMgr)
fileCreator_.reset(
Expand Down Expand Up @@ -317,7 +310,7 @@ std::unique_ptr<TransferReport> Receiver::finish() {
progressReporter_->end(report);
}
if (options_.enable_perf_stat_collection) {
PerfStatReport globalPerfReport;
PerfStatReport globalPerfReport(options_);
for (auto &receiverThread : receiverThreads_) {
globalPerfReport += receiverThread->getPerfReport();
}
Expand Down
3 changes: 0 additions & 3 deletions Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,6 @@ class Receiver : public WdtBase {
/// Marks when a new transfer has started
std::atomic<bool> hasNewTransferStarted_{false};

/// Buffer size used by this receiver
int64_t bufferSize_;

/// Backlog used by the sockets
int backlog_;
};
Expand Down
Loading

0 comments on commit 89214ec

Please sign in to comment.