diff --git a/ErrorCodes.h b/ErrorCodes.h index 07b31e6f..5535683e 100644 --- a/ErrorCodes.h +++ b/ErrorCodes.h @@ -58,7 +58,8 @@ namespace wdt { X(UNEXPECTED_CMD_ERROR) /** Unexpected cmd received */ \ X(ENCRYPTION_ERROR) /** Error related to encryption */ \ X(ALREADY_EXISTS) /** Create attempt for existing id */ \ - X(GLOBAL_CHECKPOINT_ABORT) /** Abort due to global checkpoint */ + X(GLOBAL_CHECKPOINT_ABORT) /** Abort due to global checkpoint */ \ + X(INVALID_REQUEST) /** Request for creation of wdt object invalid */ enum ErrorCode { #define X(A) A, diff --git a/Receiver.cpp b/Receiver.cpp index 17826c01..80083a25 100644 --- a/Receiver.cpp +++ b/Receiver.cpp @@ -46,15 +46,7 @@ std::vector Receiver::getNewCheckpoints(int startIndex) { Receiver::Receiver(const WdtTransferRequest &transferRequest) { LOG(INFO) << "WDT Receiver " << Protocol::getFullVersion(); - // TODO: move to init and validate input transfer request (like empty dir) - // and ports and pv - issue#95 transferRequest_ = transferRequest; - if (getTransferId().empty()) { - setTransferId(WdtBase::generateTransferId()); - } - // TODO clean that up too - take from transferId_ - setProtocolVersion(transferRequest.protocolVersion); - setDir(transferRequest.directory); } Receiver::Receiver(int port, int numSockets, const std::string &destDir) @@ -117,8 +109,18 @@ void Receiver::endCurGlobalSession() { } const WdtTransferRequest &Receiver::init() { + if (validateTransferRequest() != OK) { + LOG(ERROR) << "Couldn't validate the transfer request " + << transferRequest_.getLogSafeString(); + return transferRequest_; + } backlog_ = options_.backlog; bufferSize_ = options_.buffer_size; + if (getTransferId().empty()) { + setTransferId(WdtBase::generateTransferId()); + } + setProtocolVersion(transferRequest_.protocolVersion); + setDir(transferRequest_.directory); if (bufferSize_ < Protocol::kMaxHeader) { // round up to even k bufferSize_ = 2 * 1024 * ((Protocol::kMaxHeader - 1) / (2 * 1024) + 1); diff --git a/Sender.cpp b/Sender.cpp index a4881246..7a0dff08 100644 --- a/Sender.cpp +++ b/Sender.cpp @@ -90,9 +90,27 @@ Sender::Sender(const std::string &destHost, const std::string &srcDir, folly::make_unique(*dirQueue_); } +ErrorCode Sender::validateTransferRequest() { + ErrorCode code = WdtBase::validateTransferRequest(); + // If the request is still valid check for other + // sender specific validations + if (code == OK && transferRequest_.hostName.empty()) { + LOG(ERROR) << "Transfer request validation failed for wdt sender " + << transferRequest_.getLogSafeString(); + code = INVALID_REQUEST; + } + transferRequest_.errorCode = code; + return code; +} + const WdtTransferRequest &Sender::init() { VLOG(1) << "Sender Init() with encryption set = " << transferRequest_.encryptionData.isSet(); + if (validateTransferRequest() != OK) { + LOG(ERROR) << "Couldn't validate the transfer request " + << transferRequest_.getLogSafeString(); + return transferRequest_; + } // TODO cleanup / most not necessary / duplicate state transferRequest_.protocolVersion = protocolVersion_; transferRequest_.directory = srcDir_; diff --git a/Sender.h b/Sender.h index 55988ecf..5b84ed14 100644 --- a/Sender.h +++ b/Sender.h @@ -144,6 +144,9 @@ class Sender : public WdtBase { friend class SenderThread; friend class QueueAbortChecker; + /// Validate the transfer request + ErrorCode validateTransferRequest() override; + /// Get the sum of all the thread transfer stats TransferStats getGlobalTransferStats() const; diff --git a/Wdt.cpp b/Wdt.cpp index 0500b5c8..19836e0e 100644 --- a/Wdt.cpp +++ b/Wdt.cpp @@ -91,6 +91,12 @@ ErrorCode Wdt::wdtSend(const std::string &wdtNamespace, wdtSetAbortSocketCreatorAndReporter(wdtNamespace, sender.get(), req, abortChecker); + auto validatedReq = sender->init(); + if (validatedReq.errorCode != OK) { + LOG(ERROR) << "Couldn't init sender with request for " << wdtNamespace + << " " << secondKey; + return validatedReq.errorCode; + } auto transferReport = sender->transfer(); ErrorCode ret = transferReport->getSummary().getErrorCode(); wdtController->releaseSender(wdtNamespace, secondKey); diff --git a/WdtBase.cpp b/WdtBase.cpp index dd2d06e2..b7efacd0 100644 --- a/WdtBase.cpp +++ b/WdtBase.cpp @@ -106,6 +106,24 @@ WdtBase::TransferStatus WdtBase::getTransferStatus() { return transferStatus_; } +ErrorCode WdtBase::validateTransferRequest() { + ErrorCode code = transferRequest_.errorCode; + if (code != OK) { + LOG(ERROR) << "WDT object initiated with erroneous transfer request " + << transferRequest_.getLogSafeString(); + return code; + } + if (transferRequest_.directory.empty() || + (transferRequest_.protocolVersion < 0) || + transferRequest_.ports.empty()) { + LOG(ERROR) << "Transfer request validation failed for wdt object " + << transferRequest_.getLogSafeString(); + code = INVALID_REQUEST; + transferRequest_.errorCode = code; + } + return code; +} + void WdtBase::setTransferStatus(TransferStatus transferStatus) { std::lock_guard lock(mutex_); transferStatus_ = transferStatus; diff --git a/WdtBase.h b/WdtBase.h index 03ddcc5f..dfdcc99d 100644 --- a/WdtBase.h +++ b/WdtBase.h @@ -131,6 +131,9 @@ class WdtBase { THREADS_JOINED, // threads joined }; + /// Validate the transfer request + virtual ErrorCode validateTransferRequest(); + /// @return current transfer status TransferStatus getTransferStatus(); diff --git a/test/FileReaderTest.cpp b/test/FileReaderTest.cpp index a3481c8a..e75ea4ea 100644 --- a/test/FileReaderTest.cpp +++ b/test/FileReaderTest.cpp @@ -124,6 +124,9 @@ void testFileRead(int64_t fileSize, int64_t bufferSize, bool directReads) { testReadSize(file.getSize(), byteSource); } +// All the tests here have to run any operation on the FileByteSource +// in a thread because FileByteSource has thread local buffer + TEST(FileByteSource, ODIRECT_NONMULTIPLE) { if (!canSupportODirect()) { LOG(WARNING) << "Wdt can't support O_DIRECT skipping this test"; @@ -135,6 +138,43 @@ TEST(FileByteSource, ODIRECT_NONMULTIPLE) { t.join(); } +TEST(FileByteSource, ODIRECT_NONMULTIPLE_OFFSET) { + if (!canSupportODirect()) { + LOG(WARNING) << "Wdt can't support O_DIRECT skipping this test"; + return; + } + for (int numTests = 0; numTests < 10; ++numTests) { + std::thread t([=]() { + int64_t fileSize = kDiskBlockSize + 10; + int64_t bufferSize = fileSize * 2; + if (numTests % 2 == 0) { + int fraction = folly::Random::rand32() % (fileSize / 2); + bufferSize = fileSize / fraction; + } + int64_t offset = folly::Random::rand32() % fileSize; + RandomFile file(fileSize); + auto metaData = file.getMetaData(); + metaData->directReads = true; + FileByteSource byteSource(metaData, metaData->size, offset, bufferSize); + ErrorCode code = byteSource.open(); + EXPECT_EQ(code, OK); + testBufferSize(bufferSize, alignedBufferNeeded(), byteSource); + int64_t totalSizeRead = 0; + while (true) { + int64_t size; + char* data = byteSource.read(size); + if (size <= 0) { + break; + } + WDT_CHECK(data); + totalSizeRead += size; + } + EXPECT_EQ(totalSizeRead, fileSize - offset); + }); + t.join(); + } +} + TEST(FileByteSource, SMALL_MULTIPLE_ODIRECT) { if (!canSupportODirect()) { LOG(WARNING) << "Wdt can't support O_DIRECT skipping this test"; diff --git a/test/WdtResourceControllerTest.cpp b/test/WdtResourceControllerTest.cpp index dcc1a351..aea668d1 100644 --- a/test/WdtResourceControllerTest.cpp +++ b/test/WdtResourceControllerTest.cpp @@ -35,7 +35,6 @@ class WdtResourceControllerTest : public WdtResourceController { void AddObjectsWithLimitsTest(); void InvalidNamespaceTest(); void ReleaseStaleTest(); - void RequestSerializationTest(); private: string getTransferId(const string &wdtNamespace, int index) { diff --git a/test/WdtUrlTest.cpp b/test/WdtUrlTest.cpp index 2f9b7185..a89670d0 100644 --- a/test/WdtUrlTest.cpp +++ b/test/WdtUrlTest.cpp @@ -7,7 +7,7 @@ * of patent rights can be found in the PATENTS file in the same directory. */ #include - +#include #include #include #include @@ -232,6 +232,31 @@ TEST(RequestSerializationTest, UrlTests) { EXPECT_EQ(transferRequest.errorCode, URI_PARSE_ERROR); EXPECT_EQ(transferRequest.genWdtUrlWithSecret(), "URI_PARSE_ERROR"); } + { + string url = "wdt://localhost:22355?num_ports=3"; + WdtTransferRequest transferRequest(url); + Receiver receiver(transferRequest); + auto retTransferRequest = receiver.init(); + EXPECT_EQ(retTransferRequest.errorCode, INVALID_REQUEST); + } + { + string url = "wdt://localhost:22355"; + WdtTransferRequest transferRequest(url); + Sender sender(transferRequest); + auto retTransferRequest = sender.init(); + EXPECT_EQ(retTransferRequest.errorCode, INVALID_REQUEST); + } + { + string url = "wdt://localhost:22355?num_ports=3"; + WdtTransferRequest transferRequest(url); + EXPECT_EQ(transferRequest.errorCode, OK); + transferRequest.directory = "blah"; + // Artificial error + transferRequest.errorCode = ERROR; + Receiver receiver(transferRequest); + auto retTransferRequest = receiver.init(); + EXPECT_EQ(retTransferRequest.errorCode, ERROR); + } } TEST(RequestSerializationTest, TransferIdGenerationTest) {