Skip to content

Commit

Permalink
wdt cygwin found problems
Browse files Browse the repository at this point in the history
Summary: fixes facebook#114

Reviewed By: uddipta

Differential Revision: D2999244

fb-gh-sync-id: 1a9fca5778aaf42c359326fad4a3857de025e2f4
shipit-source-id: 1a9fca5778aaf42c359326fad4a3857de025e2f4
  • Loading branch information
ldemailly committed Mar 2, 2016
1 parent a1a2ba0 commit 9bde9b6
Show file tree
Hide file tree
Showing 15 changed files with 102 additions and 46 deletions.
1 change: 1 addition & 0 deletions .clang-format
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,5 @@ AllowShortFunctionsOnASingleLine: false
#IndentWidth: 4
UseTab: Never
Standard: Cpp11
SortIncludes: false
...
11 changes: 8 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -308,15 +308,20 @@ wdt_max_send_test.sh
(facebook only:)
Make sure to do the following, before "arc diff":
```
(cd wdt ; ./build/clangformat.sh ; ./build/version_update.tcl )
(cd wdt ; ./build/clangformat.sh )
# if you changed the minor version of the protocol (in CMakeLists.txt)
# run (cd wdt ; ./build/version_update.tcl ) to sync with fbcode's WdtConfig.h
fbconfig --clang --with-project-version clang:dev -r wdt
fbconfig --clang --sanitize=address -r wdt
fbmake runtests --run-disabled --extended-tests
# Optionally: opt build
fbmake runtests_opt
fbmake opt
# Sender max speed test
wdt/test/wdt_max_send_test.sh
# Check buck build
buck build wdt/...
```

and check the output of the last step to make sure one of the 3 runs is
Expand Down
18 changes: 12 additions & 6 deletions ReceiverThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,12 +81,18 @@ int64_t readAtMost(ServerSocket &s, char *buf, int64_t max, int64_t atMost) {
}

const ReceiverThread::StateFunction ReceiverThread::stateMap_[] = {
&ReceiverThread::listen, &ReceiverThread::acceptFirstConnection,
&ReceiverThread::acceptWithTimeout, &ReceiverThread::sendLocalCheckpoint,
&ReceiverThread::readNextCmd, &ReceiverThread::processFileCmd,
&ReceiverThread::processSettingsCmd, &ReceiverThread::processDoneCmd,
&ReceiverThread::processSizeCmd, &ReceiverThread::sendFileChunks,
&ReceiverThread::sendGlobalCheckpoint, &ReceiverThread::sendDoneCmd,
&ReceiverThread::listen,
&ReceiverThread::acceptFirstConnection,
&ReceiverThread::acceptWithTimeout,
&ReceiverThread::sendLocalCheckpoint,
&ReceiverThread::readNextCmd,
&ReceiverThread::processFileCmd,
&ReceiverThread::processSettingsCmd,
&ReceiverThread::processDoneCmd,
&ReceiverThread::processSizeCmd,
&ReceiverThread::sendFileChunks,
&ReceiverThread::sendGlobalCheckpoint,
&ReceiverThread::sendDoneCmd,
&ReceiverThread::sendAbortCmd,
&ReceiverThread::waitForFinishOrNewCheckpoint,
&ReceiverThread::finishWithError};
Expand Down
18 changes: 15 additions & 3 deletions Reporting.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -313,9 +313,21 @@ void ProgressReporter::logProgress(int64_t effectiveDataBytes, int progress,
}

const std::string PerfStatReport::statTypeDescription_[] = {
"Socket Read", "Socket Write", "File Open", "File Close", "File Read",
"File Write", "Sync File Range", "fsync", "File Seek", "Throttler Sleep",
"Receiver Wait Sleep", "Directory creation", "Ioctl", "Unlink", "Fadvise"};
"Socket Read",
"Socket Write",
"File Open",
"File Close",
"File Read",
"File Write",
"Sync File Range",
"fsync",
"File Seek",
"Throttler Sleep",
"Receiver Wait Sleep",
"Directory creation",
"Ioctl",
"Unlink",
"Fadvise"};

PerfStatReport::PerfStatReport(const WdtOptions& options) {
static_assert(
Expand Down
2 changes: 1 addition & 1 deletion Reporting.h
Original file line number Diff line number Diff line change
Expand Up @@ -511,7 +511,7 @@ class PerfStatReport {
FILE_READ,
FILE_WRITE,
SYNC_FILE_RANGE,
FSYNC,
FSYNC_STATS, // just 'FSYNC' is defined on Windows/conflicts
FILE_SEEK,
THROTTLER_SLEEP,
RECEIVER_WAIT_SLEEP, // receiver sleep duration between sending wait cmd to
Expand Down
3 changes: 2 additions & 1 deletion Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -373,7 +373,8 @@ ErrorCode Sender::start() {
dirQueue_->enableFileDeletion();
} else {
LOG(WARNING) << "Turning off extra file deletion on the receiver side "
"because of protocol version " << protocolVersion_;
"because of protocol version "
<< protocolVersion_;
}
}
dirThread_ = dirQueue_->buildQueueAsynchronously();
Expand Down
18 changes: 9 additions & 9 deletions SenderThread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ std::ostream &operator<<(std::ostream &os, const SenderThread &senderThread) {
}

const SenderThread::StateFunction SenderThread::stateMap_[] = {
&SenderThread::connect, &SenderThread::readLocalCheckPoint,
&SenderThread::sendSettings, &SenderThread::sendBlocks,
&SenderThread::sendDoneCmd, &SenderThread::sendSizeCmd,
&SenderThread::checkForAbort, &SenderThread::readFileChunks,
&SenderThread::connect, &SenderThread::readLocalCheckPoint,
&SenderThread::sendSettings, &SenderThread::sendBlocks,
&SenderThread::sendDoneCmd, &SenderThread::sendSizeCmd,
&SenderThread::checkForAbort, &SenderThread::readFileChunks,
&SenderThread::readReceiverCmd, &SenderThread::processDoneCmd,
&SenderThread::processWaitCmd, &SenderThread::processErrCmd,
&SenderThread::processWaitCmd, &SenderThread::processErrCmd,
&SenderThread::processAbortCmd, &SenderThread::processVersionMismatch};

std::unique_ptr<ClientSocket> SenderThread::connectToReceiver(
Expand Down Expand Up @@ -500,8 +500,8 @@ SenderState SenderThread::readFileChunks() {
// fileChunksInfoList. Number of chunks we decode should match with the
// number mentioned in the Chunks cmd.
LOG(ERROR) << "Number of file chunks received is more than the number "
"mentioned in CHUNKS_CMD " << numFileChunks << " "
<< numFiles;
"mentioned in CHUNKS_CMD "
<< numFileChunks << " " << numFiles;
threadStats_.setLocalErrorCode(PROTOCOL_ERROR);
return END;
}
Expand Down Expand Up @@ -588,8 +588,8 @@ ErrorCode SenderThread::readNextReceiverCmd() {
return SOCKET_READ_ERROR;
}
LOG(INFO) << "Read receiver command failed, but number of unacked "
"bytes decreased, retrying socket read " << numUnackedBytes
<< " " << curUnackedBytes;
"bytes decreased, retrying socket read "
<< numUnackedBytes << " " << curUnackedBytes;
numUnackedBytes = curUnackedBytes;
}
// we are assuming that sender and receiver tcp buffer sizes are same. So, we
Expand Down
3 changes: 3 additions & 0 deletions build/BUILD.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ See also [README.md](../README.md#dependencies) Dependencies section
Checkout the .travis.yml and travis_linux.sh and travis_osx.sh
for build bootstrap without root/sudo requirement

We don't have yet a native Windows port (please contribute!) but it does
build and runs with Cygwin64 using the Linux instructions (and static linking)

# Notes:
On Ubuntu 14.04 - to get g++ 4.9
```
Expand Down
4 changes: 2 additions & 2 deletions util/DirectorySourceQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,8 @@ void DirectorySourceQueue::setFollowSymlinks(const bool followSymlinks) {
}
}

std::vector<SourceMetaData *> &
DirectorySourceQueue::getDiscoveredFilesMetaData() {
std::vector<SourceMetaData *>
&DirectorySourceQueue::getDiscoveredFilesMetaData() {
return sharedFileData_;
}

Expand Down
3 changes: 2 additions & 1 deletion util/DirectorySourceQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,8 @@ class DirectorySourceQueue : public SourceQueue {
*/
std::priority_queue<std::unique_ptr<ByteSource>,
std::vector<std::unique_ptr<ByteSource>>,
SourceComparator> sourceQueue_;
SourceComparator>
sourceQueue_;

/// Transfer stats for sources which are not transferred
std::vector<TransferStats> failedSourceStats_;
Expand Down
6 changes: 4 additions & 2 deletions util/FileCreator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,10 @@ int FileCreator::openForBlocks(ThreadCtx &threadCtx,
auto it = fileStatusMap_.find(blockDetails->seqId);
if (blockDetails->allocationStatus == EXISTS_CORRECT_SIZE &&
it == fileStatusMap_.end()) {
it = fileStatusMap_.insert(std::make_pair(blockDetails->seqId,
FileCreator::ALLOCATED)).first;
it =
fileStatusMap_
.insert(std::make_pair(blockDetails->seqId, FileCreator::ALLOCATED))
.first;
}
if (it == fileStatusMap_.end()) {
// allocation has not started for this file
Expand Down
2 changes: 1 addition & 1 deletion util/FileWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ ErrorCode FileWriter::write(char *buf, int64_t size) {
<< " for file " << blockDetails_->fileName;
bool finished = ((totalWritten_ + size) == blockDetails_->dataSize);
if (finished && options.isLogBasedResumption()) {
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FSYNC);
PerfStatCollector statCollector(threadCtx_, PerfStatReport::FSYNC_STATS);
if (fsync(fd_) != 0) {
PLOG(ERROR) << "fsync failed for " << blockDetails_->fileName
<< " offset " << blockDetails_->offset << " file-size "
Expand Down
4 changes: 2 additions & 2 deletions util/ThreadTransferHistory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,8 @@ ErrorCode ThreadTransferHistory::validateCheckpoint(
}
if (checkpoint.numBlocks < lastCheckpoint_->numBlocks) {
LOG(ERROR) << "Current checkpoint must be higher than previous checkpoint, "
"Last checkpoint: " << *lastCheckpoint_
<< ", Current checkpoint: " << checkpoint;
"Last checkpoint: "
<< *lastCheckpoint_ << ", Current checkpoint: " << checkpoint;
return INVALID_CHECKPOINT;
}
if (checkpoint.numBlocks > lastCheckpoint_->numBlocks) {
Expand Down
50 changes: 37 additions & 13 deletions util/TransferLogManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@ const int TransferLogManager::LOG_VERSION = 2;
int64_t LogEncoderDecoder::timestampInMicroseconds() const {
auto timestamp = Clock::now();
return std::chrono::duration_cast<std::chrono::microseconds>(
timestamp.time_since_epoch()).count();
timestamp.time_since_epoch())
.count();
}

int64_t LogEncoderDecoder::encodeLogHeader(char *dest,
Expand Down Expand Up @@ -239,12 +240,31 @@ ErrorCode TransferLogManager::openLog() {
WDT_CHECK(!rootDir_.empty()) << "Root directory not set";
WDT_CHECK(options_.enable_download_resumption);

int openFlags = O_CREAT | O_RDWR;
const std::string logPath = getFullPath(kWdtLogName);
fd_ = ::open(logPath.c_str(), openFlags, 0644);
fd_ = ::open(logPath.c_str(), O_RDWR);
if (fd_ < 0) {
PLOG(ERROR) << "Could not open wdt log " << logPath;
return BYTE_SOURCE_READ_ERROR;
if (errno != ENOENT) {
PLOG(ERROR) << "Could not open wdt log " << logPath;
return TRANSFER_LOG_ACQUIRE_ERROR;
} else {
// creation of the log path (which can still be a race)
LOG(INFO) << logPath << " doesn't exist... creating...";
fd_ = ::open(logPath.c_str(), O_CREAT | O_EXCL, 0644);
if (fd_ < 0) {
PLOG(WARNING) << "Could not create wdt log (maybe ok if race): "
<< logPath;
} else {
// On windows/cygwin for instance the flock will silently succeed yet
// not lock on a newly created file... workaround is to close and reopen
::close(fd_);
}
fd_ = ::open(logPath.c_str(), O_RDWR);
if (fd_ < 0) {
PLOG(ERROR) << "Still couldn't open wdt log after create attempt: "
<< logPath;
return TRANSFER_LOG_ACQUIRE_ERROR;
}
}
}
// try to acquire file lock
if (::flock(fd_, LOCK_EX | LOCK_NB) != 0) {
Expand Down Expand Up @@ -380,7 +400,8 @@ bool TransferLogManager::verifySenderIp(const std::string &curSenderIp) {
<< curSenderIp;
} else if (senderIp_ != curSenderIp) {
LOG(ERROR) << "Current sender ip does not match ip in the "
"transfer log " << curSenderIp << " " << senderIp_
"transfer log "
<< curSenderIp << " " << senderIp_
<< ", ignoring transfer log";
verifySuccessful = false;
invalidateDirectory();
Expand Down Expand Up @@ -577,7 +598,8 @@ bool LogParser::writeFileInvalidationEntries(int fd,
int written = ::write(fd, buf, size);
if (written != size) {
PLOG(ERROR) << "Disk write error while writing invalidation entry to "
"transfer log " << written << " " << size;
"transfer log "
<< written << " " << size;
return false;
}
}
Expand Down Expand Up @@ -691,8 +713,8 @@ ErrorCode LogParser::processFileCreationEntry(char *buf, int size) {
}
if (options_.resume_using_dir_tree) {
LOG(ERROR) << "Can not have a file creation entry in directory based "
"resumption mode " << fileName << " " << seqId << " "
<< fileSize;
"resumption mode "
<< fileName << " " << seqId << " " << fileSize;
return INVALID_LOG;
}
if (fileInfoMap_.find(seqId) != fileInfoMap_.end() ||
Expand Down Expand Up @@ -747,7 +769,8 @@ ErrorCode LogParser::processFileResizeEntry(char *buf, int size) {
}
if (options_.resume_using_dir_tree) {
LOG(ERROR) << "Can not have a file resize entry in directory based "
"resumption mode " << seqId << " " << fileSize;
"resumption mode "
<< seqId << " " << fileSize;
return INVALID_LOG;
}
auto it = fileInfoMap_.find(seqId);
Expand Down Expand Up @@ -796,8 +819,8 @@ ErrorCode LogParser::processBlockWriteEntry(char *buf, int size) {
}
if (options_.resume_using_dir_tree) {
LOG(ERROR) << "Can not have a block write entry in directory based "
"resumption mode " << seqId << " " << offset << " "
<< blockSize;
"resumption mode "
<< seqId << " " << offset << " " << blockSize;
return INVALID_LOG;
}
if (invalidSeqIds_.find(seqId) != invalidSeqIds_.end()) {
Expand Down Expand Up @@ -842,7 +865,8 @@ ErrorCode LogParser::processFileInvalidationEntry(char *buf, int size) {
}
if (options_.resume_using_dir_tree) {
LOG(ERROR) << "Can not have a file invalidation entry in directory based "
"resumption mode " << seqId;
"resumption mode "
<< seqId;
return INVALID_LOG;
}
if (fileInfoMap_.find(seqId) == fileInfoMap_.end() &&
Expand Down
5 changes: 3 additions & 2 deletions util/WdtSocket.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,8 @@ int WdtSocket::readWithTimeout(char *buf, int nbyte, int timeoutMs,
if (supportUnencryptedPeer_ && readErrorCode_ == UNEXPECTED_CMD_ERROR) {
LOG(WARNING)
<< "Turning off encryption since the other side does not support "
"encryption " << port_;
"encryption "
<< port_;
readErrorCode_ = OK;
buf[0] = buf_[0];
numRead = 1;
Expand Down Expand Up @@ -517,7 +518,7 @@ int WdtSocket::getUnackedBytes() const {
}
WdtSocket::~WdtSocket() {
VLOG(1) << "~WdtSocket " << port_ << " " << fd_;
closeConnection();
closeNoCheck();
}
}
}

0 comments on commit 9bde9b6

Please sign in to comment.