Skip to content

Commit

Permalink
Taking a file lock during download resumption and error code clean-up
Browse files Browse the repository at this point in the history
Summary: 1) Taking a file lock during download resumption
2) error code clean-up

Reviewed By: nikunjy

Differential Revision: D2606367

fb-gh-sync-id: 3d07099ce36f247a6a159cb4dd899c922821d05a
  • Loading branch information
uddipta authored and ldemailly committed Nov 5, 2015
1 parent 7bedbac commit 468a6d0
Show file tree
Hide file tree
Showing 27 changed files with 459 additions and 389 deletions.
3 changes: 3 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -317,4 +317,7 @@ if (BUILD_TESTING)

add_test(NAME WdtLongRunningTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/test/wdt_long_running_test.py")

add_test(NAME TransferLogLockTest COMMAND
"${CMAKE_CURRENT_SOURCE_DIR}/test/transfer_log_lock_test.sh")
endif(BUILD_TESTING)
4 changes: 4 additions & 0 deletions ErrorCodes.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ std::string errorCodeToStr(ErrorCode code) {
return folly::to<std::string>(code);
}

ErrorCode getMoreInterestingError(ErrorCode err1, ErrorCode err2) {
return std::max(err1, err2);
}

std::string strerrorStr(int errnum) {
std::string result;
char buf[1024], *res = buf;
Expand Down
8 changes: 7 additions & 1 deletion ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ namespace wdt {
transfer log */ \
X(INVALID_LOG) /** Transfer log invalid */ \
X(INVALID_CHECKPOINT) /** Received checkpoint is invalid */ \
X(NO_PROGRESS) /** Transfer has not progressed */
X(NO_PROGRESS) /** Transfer has not progressed */ \
X(TRANSFER_LOG_ACQUIRE_ERROR) /** Failed to acquire lock for transfer log */

enum ErrorCode {
#define X(A) A,
Expand All @@ -73,6 +74,11 @@ std::string const kErrorToStr[] = {
*/
std::string errorCodeToStr(ErrorCode code);

/**
* returns more interesting of two errors
*/
ErrorCode getMoreInterestingError(ErrorCode err1, ErrorCode err2);

/**
* Thread safe version of strerror(), easier than strerror_r
* (similar to folly::errnoStr() but without pulling in all the dependencies)
Expand Down
117 changes: 53 additions & 64 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ bool Receiver::hasNewTransferStarted() const {
}

void Receiver::endCurGlobalSession() {
setTransferStatus(FINISHED);
if (!hasNewTransferStarted_) {
LOG(WARNING) << "WDT transfer did not start, no need to end session";
return;
Expand Down Expand Up @@ -192,7 +193,8 @@ void Receiver::setRecoveryId(const std::string &recoveryId) {
}

Receiver::~Receiver() {
if (hasPendingTransfer()) {
TransferStatus status = getTransferStatus();
if (status == ONGOING) {
LOG(WARNING) << "There is an ongoing transfer and the destructor"
<< " is being called. Trying to finish the transfer";
abort(ABORTED_BY_APPLICATION);
Expand Down Expand Up @@ -224,24 +226,17 @@ int64_t Receiver::getTransferConfig() const {
return config;
}

bool Receiver::hasPendingTransfer() {
std::unique_lock<std::mutex> lock(mutex_);
return !transferFinished_;
}

void Receiver::markTransferFinished(bool isFinished) {
std::unique_lock<std::mutex> lock(mutex_);
transferFinished_ = isFinished;
if (isFinished) {
conditionRecvFinished_.notify_one();
}
}

std::unique_ptr<TransferReport> Receiver::finish() {
std::unique_lock<std::mutex> instanceLock(instanceManagementMutex_);
if (areThreadsJoined_) {
VLOG(1) << "Threads have already been joined. Returning the "
<< "transfer report";
TransferStatus status = getTransferStatus();
if (status == NOT_STARTED) {
LOG(WARNING) << "Even though transfer has not started, finish is called";
// getTransferReport will set the error code to ERROR
return getTransferReport();
}
if (status == THREADS_JOINED) {
LOG(WARNING) << "Threads have already been joined. Returning the "
<< "transfer report";
return getTransferReport();
}
const auto &options = WdtOptions::get();
Expand All @@ -253,18 +248,16 @@ std::unique_ptr<TransferReport> Receiver::finish() {
for (auto &receiverThread : receiverThreads_) {
receiverThread->finish();
}
// A very important step to mark the transfer finished
// No other transferAsync, or runForever can be called on this
// instance unless the current transfer has finished
markTransferFinished(true);

setTransferStatus(THREADS_JOINED);

if (isJoinable_) {
// Make sure to join the progress thread.
progressTrackerThread_.join();
}
std::unique_ptr<TransferReport> report = getTransferReport();
auto &summary = report->getSummary();
bool transferSuccess = (report->getSummary().getCombinedErrorCode() == OK);
bool transferSuccess = (report->getSummary().getErrorCode() == OK);
fixAndCloseTransferLog(transferSuccess);
auto totalSenderBytes = summary.getTotalSenderBytes();
if (progressReporter_ && totalSenderBytes >= 0) {
Expand All @@ -282,7 +275,6 @@ std::unique_ptr<TransferReport> Receiver::finish() {

LOG(WARNING) << "WDT receiver's transfer has been finished";
LOG(INFO) << *report;
areThreadsJoined_ = true;
return report;
}

Expand All @@ -291,48 +283,40 @@ std::unique_ptr<TransferReport> Receiver::getTransferReport() {
for (const auto &receiverThread : receiverThreads_) {
globalStats += receiverThread->getTransferStats();
}
globalStats.validate();
std::unique_ptr<TransferReport> report =
std::unique_ptr<TransferReport> transferReport =
folly::make_unique<TransferReport>(std::move(globalStats));
return report;
TransferStatus status = getTransferStatus();
ErrorCode errCode = transferReport->getSummary().getErrorCode();
if (status == NOT_STARTED && errCode == OK) {
LOG(INFO) << "Transfer not started, setting the error code to ERROR";
transferReport->setLocalErrorCode(ERROR);
}
return transferReport;
}

ErrorCode Receiver::transferAsync() {
const auto &options = WdtOptions::get();
if (hasPendingTransfer()) {
// finish is the only method that should be able to
// change the value of transferFinished_
LOG(ERROR) << "There is already a transfer running on this "
<< "instance of receiver";
return ERROR;
}
isJoinable_ = true;
int progressReportIntervalMillis = options.progress_report_interval_millis;
if (!progressReporter_ && progressReportIntervalMillis > 0) {
// if progress reporter has not been set, use the default one
progressReporter_ = folly::make_unique<ProgressReporter>();
}
start();
return OK;
return start();
}

ErrorCode Receiver::runForever() {
if (hasPendingTransfer()) {
// finish is the only method that should be able to
// change the value of transferFinished_
LOG(ERROR) << "There is already a transfer running on this "
<< "instance of receiver";
return ERROR;
}

const auto &options = WdtOptions::get();
WDT_CHECK(!options.enable_download_resumption)
<< "Transfer resumption not supported in long running mode";

// Enforce the full reporting to be false in the daemon mode.
// These statistics are expensive, and useless as they will never
// be received/reviewed in a forever running process.
start();
ErrorCode errCode = start();
if (errCode != OK) {
return errCode;
}
finish();
// This method should never finish
return ERROR;
Expand Down Expand Up @@ -363,8 +347,8 @@ void Receiver::progressTracker() {
while (true) {
{
std::unique_lock<std::mutex> lock(mutex_);
conditionRecvFinished_.wait_for(lock, waitingTime);
if (transferFinished_ || getCurAbortCode() != OK) {
conditionFinished_.wait_for(lock, waitingTime);
if (transferStatus_ == THREADS_JOINED) {
break;
}
}
Expand Down Expand Up @@ -395,15 +379,12 @@ void Receiver::progressTracker() {
}
}

void Receiver::start() {
ErrorCode Receiver::start() {
WDT_CHECK_EQ(getTransferStatus(), NOT_STARTED)
<< "There is already a transfer running on this instance of receiver";
startTime_ = Clock::now();
if (hasPendingTransfer()) {
LOG(WARNING) << "There is an existing transfer in progress on this object";
}
areThreadsJoined_ = false;
LOG(INFO) << "Starting (receiving) server on ports [ " << getPorts()
<< "] Target dir : " << destDir_;
markTransferFinished(false);
const auto &options = WdtOptions::get();
// TODO do the init stuff here
if (options.enable_download_resumption) {
Expand All @@ -413,7 +394,11 @@ void Receiver::start() {
WDT_CHECK(!options.shouldPreallocateFiles())
<< "Can not resume using directory tree if preallocation is enabled";
}
transferLogManager_.openLog();
ErrorCode errCode = transferLogManager_.openLog();
if (errCode != OK) {
LOG(ERROR) << "Failed to open transfer log " << errorCodeToStr(errCode);
return errCode;
}
ErrorCode code = transferLogManager_.parseAndMatch(
recoveryId_, getTransferConfig(), fileChunksInfo_);
if (code == OK && options.resume_using_dir_tree) {
Expand All @@ -426,22 +411,25 @@ void Receiver::start() {
} else {
LOG(INFO) << "Throttler set externally. Throttler : " << *throttler_;
}
setTransferStatus(ONGOING);
while (true) {
for (auto &receiverThread : receiverThreads_) {
receiverThread->startThread();
}
if (!isJoinable_) {
// If it is long running mode, finish the threads
// processing the current transfer and re spawn them again
// with the same sockets
for (auto &receiverThread : receiverThreads_) {
receiverThread->finish();
receiverThread->reset();
}
threadsController_->reset();
continue;
if (isJoinable_) {
break;
}
break;
// If it is long running mode, finish the threads
// processing the current transfer and re spawn them again
// with the same sockets
for (auto &receiverThread : receiverThreads_) {
receiverThread->finish();
receiverThread->reset();
}
threadsController_->reset();
// reset transfer status
setTransferStatus(NOT_STARTED);
continue;
}
if (isJoinable_) {
if (progressReporter_) {
Expand All @@ -450,6 +438,7 @@ void Receiver::start() {
std::thread trackerThread(&Receiver::progressTracker, this);
progressTrackerThread_ = std::move(trackerThread);
}
return OK;
}

void Receiver::addTransferLogHeader(bool isBlockMode, bool isSenderResuming) {
Expand Down
45 changes: 1 addition & 44 deletions Receiver.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@
#include <wdt/util/TransferLogManager.h>
#include <memory>
#include <string>
#include <condition_variable>
#include <mutex>
#include <thread>
#include <chrono>

Expand Down Expand Up @@ -82,12 +80,6 @@ class Receiver : public WdtBase {
*/
virtual ~Receiver();

/**
* Take a lock on the instance mutex and return the value of
* whether the existing transfer has been finished
*/
bool hasPendingTransfer();

/**
* Use the method to get the list of ports receiver is listening on
*/
Expand All @@ -96,11 +88,6 @@ class Receiver : public WdtBase {
protected:
friend class ReceiverThread;

/**
* @param isFinished Mark transfer active/inactive
*/
void markTransferFinished(bool isFinished);

/**
* Traverses root directory and returns discovered file information
*
Expand All @@ -118,7 +105,7 @@ class Receiver : public WdtBase {
TransferLogManager &getTransferLogManager();

/// Responsible for basic setup and starting threads
void start();
ErrorCode start();

/**
* Periodically calculates current transfer report and send it to progress
Expand Down Expand Up @@ -165,13 +152,6 @@ class Receiver : public WdtBase {
/// The thread that is responsible for calling running the progress tracker
std::thread progressTrackerThread_;

/**
* Flags that represents if a transfer has finished. Threads on completion
* set this flag. This is always accurate even if you don't call finish()
* No transfer can be started as long as this flag is false.
*/
bool transferFinished_{true};

/// Flag based on which threads finish processing on receiving a done
bool isJoinable_{false};

Expand All @@ -187,14 +167,6 @@ class Receiver : public WdtBase {
*/
std::string recoveryId_;

/**
* Progress tracker thread is a thread which has to be joined when the
* transfer is finished. The root thread in finish() and the progress
* tracker coordinate with each other through the boolean and this
* condition variable.
*/
std::condition_variable conditionRecvFinished_;

/**
* The instance of the receiver threads are stored in this vector.
* This will not be destroyed until this object is destroyed, hence
Expand All @@ -215,24 +187,9 @@ class Receiver : public WdtBase {
/// already transferred file chunks
std::vector<FileChunksInfo> fileChunksInfo_;

/// Mutex to guard all the shared variables
mutable std::mutex mutex_;

/// Marks when a new transfer has started
std::atomic<bool> hasNewTransferStarted_{false};

/**
* Returns true if threads have been joined (done in finish())
* This is how destructor determines whether it should join threads
*/
bool areThreadsJoined_{false};

/**
* Mutex for the management of this instance, specifically to keep the
* instance sane for multi threaded public API calls
*/
std::mutex instanceManagementMutex_;

/// Buffer size used by this receiver
int64_t bufferSize_;

Expand Down
Loading

0 comments on commit 468a6d0

Please sign in to comment.