Skip to content

Commit

Permalink
Couple of bug fixes in wdt in 9688293 and 8722340
Browse files Browse the repository at this point in the history
Summary:
Fixes couple of isses
1. When transfer request has empty directory, we shouldn't crash instead report an error at the time of init
2. Tests added for unaligned offset and slight refactor

Reviewed By: ldemailly

Differential Revision: D2826569

fb-gh-sync-id: 1543d3a1c75f7067f1ce96585cee6959c28ac784
  • Loading branch information
nikunjy authored and ldemailly committed Jan 22, 2016
1 parent a86b902 commit d01b8b4
Show file tree
Hide file tree
Showing 10 changed files with 126 additions and 11 deletions.
3 changes: 2 additions & 1 deletion ErrorCodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ namespace wdt {
X(UNEXPECTED_CMD_ERROR) /** Unexpected cmd received */ \
X(ENCRYPTION_ERROR) /** Error related to encryption */ \
X(ALREADY_EXISTS) /** Create attempt for existing id */ \
X(GLOBAL_CHECKPOINT_ABORT) /** Abort due to global checkpoint */
X(GLOBAL_CHECKPOINT_ABORT) /** Abort due to global checkpoint */ \
X(INVALID_REQUEST) /** Request for creation of wdt object invalid */

enum ErrorCode {
#define X(A) A,
Expand Down
18 changes: 10 additions & 8 deletions Receiver.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,15 +46,7 @@ std::vector<Checkpoint> Receiver::getNewCheckpoints(int startIndex) {

Receiver::Receiver(const WdtTransferRequest &transferRequest) {
LOG(INFO) << "WDT Receiver " << Protocol::getFullVersion();
// TODO: move to init and validate input transfer request (like empty dir)
// and ports and pv - issue#95
transferRequest_ = transferRequest;
if (getTransferId().empty()) {
setTransferId(WdtBase::generateTransferId());
}
// TODO clean that up too - take from transferId_
setProtocolVersion(transferRequest.protocolVersion);
setDir(transferRequest.directory);
}

Receiver::Receiver(int port, int numSockets, const std::string &destDir)
Expand Down Expand Up @@ -117,8 +109,18 @@ void Receiver::endCurGlobalSession() {
}

const WdtTransferRequest &Receiver::init() {
if (validateTransferRequest() != OK) {
LOG(ERROR) << "Couldn't validate the transfer request "
<< transferRequest_.getLogSafeString();
return transferRequest_;
}
backlog_ = options_.backlog;
bufferSize_ = options_.buffer_size;
if (getTransferId().empty()) {
setTransferId(WdtBase::generateTransferId());
}
setProtocolVersion(transferRequest_.protocolVersion);
setDir(transferRequest_.directory);
if (bufferSize_ < Protocol::kMaxHeader) {
// round up to even k
bufferSize_ = 2 * 1024 * ((Protocol::kMaxHeader - 1) / (2 * 1024) + 1);
Expand Down
18 changes: 18 additions & 0 deletions Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,27 @@ Sender::Sender(const std::string &destHost, const std::string &srcDir,
folly::make_unique<TransferHistoryController>(*dirQueue_);
}

ErrorCode Sender::validateTransferRequest() {
ErrorCode code = WdtBase::validateTransferRequest();
// If the request is still valid check for other
// sender specific validations
if (code == OK && transferRequest_.hostName.empty()) {
LOG(ERROR) << "Transfer request validation failed for wdt sender "
<< transferRequest_.getLogSafeString();
code = INVALID_REQUEST;
}
transferRequest_.errorCode = code;
return code;
}

const WdtTransferRequest &Sender::init() {
VLOG(1) << "Sender Init() with encryption set = "
<< transferRequest_.encryptionData.isSet();
if (validateTransferRequest() != OK) {
LOG(ERROR) << "Couldn't validate the transfer request "
<< transferRequest_.getLogSafeString();
return transferRequest_;
}
// TODO cleanup / most not necessary / duplicate state
transferRequest_.protocolVersion = protocolVersion_;
transferRequest_.directory = srcDir_;
Expand Down
3 changes: 3 additions & 0 deletions Sender.h
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,9 @@ class Sender : public WdtBase {
friend class SenderThread;
friend class QueueAbortChecker;

/// Validate the transfer request
ErrorCode validateTransferRequest() override;

/// Get the sum of all the thread transfer stats
TransferStats getGlobalTransferStats() const;

Expand Down
6 changes: 6 additions & 0 deletions Wdt.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,12 @@ ErrorCode Wdt::wdtSend(const std::string &wdtNamespace,
wdtSetAbortSocketCreatorAndReporter(wdtNamespace, sender.get(), req,
abortChecker);

auto validatedReq = sender->init();
if (validatedReq.errorCode != OK) {
LOG(ERROR) << "Couldn't init sender with request for " << wdtNamespace
<< " " << secondKey;
return validatedReq.errorCode;
}
auto transferReport = sender->transfer();
ErrorCode ret = transferReport->getSummary().getErrorCode();
wdtController->releaseSender(wdtNamespace, secondKey);
Expand Down
18 changes: 18 additions & 0 deletions WdtBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,24 @@ WdtBase::TransferStatus WdtBase::getTransferStatus() {
return transferStatus_;
}

ErrorCode WdtBase::validateTransferRequest() {
ErrorCode code = transferRequest_.errorCode;
if (code != OK) {
LOG(ERROR) << "WDT object initiated with erroneous transfer request "
<< transferRequest_.getLogSafeString();
return code;
}
if (transferRequest_.directory.empty() ||
(transferRequest_.protocolVersion < 0) ||
transferRequest_.ports.empty()) {
LOG(ERROR) << "Transfer request validation failed for wdt object "
<< transferRequest_.getLogSafeString();
code = INVALID_REQUEST;
transferRequest_.errorCode = code;
}
return code;
}

void WdtBase::setTransferStatus(TransferStatus transferStatus) {
std::lock_guard<std::mutex> lock(mutex_);
transferStatus_ = transferStatus;
Expand Down
3 changes: 3 additions & 0 deletions WdtBase.h
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ class WdtBase {
THREADS_JOINED, // threads joined
};

/// Validate the transfer request
virtual ErrorCode validateTransferRequest();

/// @return current transfer status
TransferStatus getTransferStatus();

Expand Down
40 changes: 40 additions & 0 deletions test/FileReaderTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,9 @@ void testFileRead(int64_t fileSize, int64_t bufferSize, bool directReads) {
testReadSize(file.getSize(), byteSource);
}

// All the tests here have to run any operation on the FileByteSource
// in a thread because FileByteSource has thread local buffer

TEST(FileByteSource, ODIRECT_NONMULTIPLE) {
if (!canSupportODirect()) {
LOG(WARNING) << "Wdt can't support O_DIRECT skipping this test";
Expand All @@ -135,6 +138,43 @@ TEST(FileByteSource, ODIRECT_NONMULTIPLE) {
t.join();
}

TEST(FileByteSource, ODIRECT_NONMULTIPLE_OFFSET) {
if (!canSupportODirect()) {
LOG(WARNING) << "Wdt can't support O_DIRECT skipping this test";
return;
}
for (int numTests = 0; numTests < 10; ++numTests) {
std::thread t([=]() {
int64_t fileSize = kDiskBlockSize + 10;
int64_t bufferSize = fileSize * 2;
if (numTests % 2 == 0) {
int fraction = folly::Random::rand32() % (fileSize / 2);
bufferSize = fileSize / fraction;
}
int64_t offset = folly::Random::rand32() % fileSize;
RandomFile file(fileSize);
auto metaData = file.getMetaData();
metaData->directReads = true;
FileByteSource byteSource(metaData, metaData->size, offset, bufferSize);
ErrorCode code = byteSource.open();
EXPECT_EQ(code, OK);
testBufferSize(bufferSize, alignedBufferNeeded(), byteSource);
int64_t totalSizeRead = 0;
while (true) {
int64_t size;
char* data = byteSource.read(size);
if (size <= 0) {
break;
}
WDT_CHECK(data);
totalSizeRead += size;
}
EXPECT_EQ(totalSizeRead, fileSize - offset);
});
t.join();
}
}

TEST(FileByteSource, SMALL_MULTIPLE_ODIRECT) {
if (!canSupportODirect()) {
LOG(WARNING) << "Wdt can't support O_DIRECT skipping this test";
Expand Down
1 change: 0 additions & 1 deletion test/WdtResourceControllerTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ class WdtResourceControllerTest : public WdtResourceController {
void AddObjectsWithLimitsTest();
void InvalidNamespaceTest();
void ReleaseStaleTest();
void RequestSerializationTest();

private:
string getTransferId(const string &wdtNamespace, int index) {
Expand Down
27 changes: 26 additions & 1 deletion test/WdtUrlTest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
* of patent rights can be found in the PATENTS file in the same directory.
*/
#include <wdt/Receiver.h>

#include <wdt/Sender.h>
#include <folly/Random.h>
#include <gflags/gflags.h>
#include <glog/logging.h>
Expand Down Expand Up @@ -232,6 +232,31 @@ TEST(RequestSerializationTest, UrlTests) {
EXPECT_EQ(transferRequest.errorCode, URI_PARSE_ERROR);
EXPECT_EQ(transferRequest.genWdtUrlWithSecret(), "URI_PARSE_ERROR");
}
{
string url = "wdt://localhost:22355?num_ports=3";
WdtTransferRequest transferRequest(url);
Receiver receiver(transferRequest);
auto retTransferRequest = receiver.init();
EXPECT_EQ(retTransferRequest.errorCode, INVALID_REQUEST);
}
{
string url = "wdt://localhost:22355";
WdtTransferRequest transferRequest(url);
Sender sender(transferRequest);
auto retTransferRequest = sender.init();
EXPECT_EQ(retTransferRequest.errorCode, INVALID_REQUEST);
}
{
string url = "wdt://localhost:22355?num_ports=3";
WdtTransferRequest transferRequest(url);
EXPECT_EQ(transferRequest.errorCode, OK);
transferRequest.directory = "blah";
// Artificial error
transferRequest.errorCode = ERROR;
Receiver receiver(transferRequest);
auto retTransferRequest = receiver.init();
EXPECT_EQ(retTransferRequest.errorCode, ERROR);
}
}

TEST(RequestSerializationTest, TransferIdGenerationTest) {
Expand Down

0 comments on commit d01b8b4

Please sign in to comment.