diff --git a/runtime/Config.h b/runtime/Config.h index 3ee6d87..28ee813 100644 --- a/runtime/Config.h +++ b/runtime/Config.h @@ -25,7 +25,7 @@ namespace NanoLogConfig { // Controls in what mode the compressed log file will be opened - static const int FILE_PARAMS = O_APPEND|O_RDWR|O_CREAT|O_NOATIME|O_DSYNC; + static const char FILE_PARAMS[] = "a+"; // Location of the initial log file static const char DEFAULT_LOG_FILE[] = "./compressedLog"; diff --git a/runtime/DoubleBuffer.h b/runtime/DoubleBuffer.h new file mode 100644 index 0000000..6d3bfbd --- /dev/null +++ b/runtime/DoubleBuffer.h @@ -0,0 +1,99 @@ + +#pragma once +#include +#include +#include +#include +#include + +#include + +#include "Config.h" +#include "Portability.h" + +class DoubleBuffer +{ + using MemPtrT = std::unique_ptr; + + static MemPtrT allocateMemory() + { + return MemPtrT{new char[NanoLogConfig::OUTPUT_BUFFER_SIZE]}; + }; + + static char *accessOnce(const MemPtrT &ptr) + { + NANOLOG_READ_WRITE_BARRIER; + return ptr.get(); + } + + MemPtrT freeBuffer; + MemPtrT compressingBuffer; + MemPtrT writeBuffer; + unsigned size; + int errorCode; + + std::condition_variable condition; + std::mutex mutex; + +public: + DoubleBuffer() + : freeBuffer(allocateMemory()), + compressingBuffer(allocateMemory()), + writeBuffer(nullptr), + size(0), + errorCode(0), + condition(), + mutex(){}; + + char *getCompressingBuffer() noexcept { return compressingBuffer.get(); } + + bool writeInProgress() const { return accessOnce(freeBuffer) == nullptr; } + + int swapBuffer(unsigned count) noexcept + { + while (accessOnce(writeBuffer) != nullptr) {} + + { + std::unique_lock lock(mutex); + size = count; + std::swap(writeBuffer, compressingBuffer); + if (freeBuffer == nullptr) { + condition.wait(lock, [this]() { return freeBuffer != nullptr; }); + } else { + condition.notify_one(); + } + std::swap(freeBuffer, compressingBuffer); + return errorCode; + } + } + + void writeToFile(FILE* file) noexcept + { + unsigned tmp_size = 0; + MemPtrT tmp_ptr = nullptr; + + { + std::unique_lock lock(mutex); + condition.wait(lock, [this]() { return writeBuffer != nullptr; }); + tmp_size = size; + std::swap(writeBuffer, tmp_ptr); + } + + int res = 0; + if (tmp_size != 0) { + size_t wsize = fwrite(tmp_ptr.get(), 1u, tmp_size, file); + res = (wsize != tmp_size) ? errno : 0; + } + + while (accessOnce(freeBuffer) != nullptr) {} + + { + std::unique_lock lock(mutex); + errorCode = res; + std::swap(freeBuffer, tmp_ptr); + if (compressingBuffer == nullptr) { + condition.notify_one(); + } + } + } +}; \ No newline at end of file diff --git a/runtime/FSyncPortable.h b/runtime/FSyncPortable.h new file mode 100644 index 0000000..4f2b464 --- /dev/null +++ b/runtime/FSyncPortable.h @@ -0,0 +1,27 @@ +#pragma once + +#ifndef _WIN32 + +#include + +#else + +#include +#include + +inline int fsync(int fd) { + HANDLE h = (HANDLE)_get_osfhandle(fd); + if (h == INVALID_HANDLE_VALUE) { + return -1; + } + if (!FlushFileBuffers(h)) { + return -1; + } + return 0; +} + +inline int fdatasync(int fd) { + return fsync(fd); +} + +#endif \ No newline at end of file diff --git a/runtime/Portability.h b/runtime/Portability.h index 129c051..59cd097 100644 --- a/runtime/Portability.h +++ b/runtime/Portability.h @@ -49,4 +49,12 @@ #define NANOLOG_PRINTF_FORMAT_ATTR(string_index, first_to_check) #endif +#if defined(_MSC_VER) +extern "C" void _ReadWriteBarrier(void); +#pragma intrinsic(_ReadWriteBarrier) +#define NANOLOG_READ_WRITE_BARRIER _ReadWriteBarrier() +#elif defined(__GNUC__) +#define NANOLOG_READ_WRITE_BARRIER __asm__ __volatile__("" ::: "memory") +#endif + #endif /* NANOLOG_PORTABILITY_H */ \ No newline at end of file diff --git a/runtime/RuntimeLogger.cc b/runtime/RuntimeLogger.cc index 56b7c0f..305532f 100644 --- a/runtime/RuntimeLogger.cc +++ b/runtime/RuntimeLogger.cc @@ -43,14 +43,13 @@ RuntimeLogger::RuntimeLogger() , compressionThread() , hasOutstandingOperation(false) , compressionThreadShouldExit(false) + , writerThreadShouldExit(false) , syncStatus(SYNC_COMPLETED) , condMutex() , workAdded() , hintSyncCompleted() - , outputFd(-1) - , aioCb() - , compressingBuffer(nullptr) - , outputDoubleBuffer(nullptr) + , outputFile(nullptr) + , buffer() , currentLogLevel(NOTICE) , cycleAtThreadStart(0) , cyclesAtLastAIOStart(0) @@ -61,7 +60,6 @@ RuntimeLogger::RuntimeLogger() , cyclesDiskIO_upperBound(0) , totalBytesRead(0) , totalBytesWritten(0) - , padBytesWritten(0) , logsProcessed(0) , numAioWritesCompleted(0) , coreId(-1) @@ -73,35 +71,19 @@ RuntimeLogger::RuntimeLogger() stagingBufferPeekDist[i] = 0; const char *filename = NanoLogConfig::DEFAULT_LOG_FILE; - outputFd = open(filename, NanoLogConfig::FILE_PARAMS, 0666); - if (outputFd < 0) { + outputFile = fopen(filename, NanoLogConfig::FILE_PARAMS); + if (outputFile == nullptr) { fprintf(stderr, "NanoLog could not open the default file location " "for the log file (\"%s\").\r\n Please check the permissions " "or use NanoLog::setLogFile(const char* filename) to " "specify a different log file.\r\n", filename); std::exit(-1); } - - memset(&aioCb, 0, sizeof(aioCb)); - - int err = posix_memalign(reinterpret_cast(&compressingBuffer), - 512, NanoLogConfig::OUTPUT_BUFFER_SIZE); - if (err) { - perror("The NanoLog system was not able to allocate enough memory " - "to support its operations. Quitting...\r\n"); - std::exit(-1); - } - - err = posix_memalign(reinterpret_cast(&outputDoubleBuffer), - 512, NanoLogConfig::OUTPUT_BUFFER_SIZE); - if (err) { - perror("The NanoLog system was not able to allocate enough memory " - "to support its operations. Quitting...\r\n"); - std::exit(-1); - } + setvbuf(outputFile, nullptr, _IONBF, 0); #ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this); + writerThread = std::thread(&RuntimeLogger::writerThreadMain, this); #endif } @@ -116,24 +98,21 @@ RuntimeLogger::~RuntimeLogger() { nanoLogSingleton.workAdded.notify_all(); } + // Stop the writer thread completely + writerThreadShouldExit = true; + buffer.swapBuffer(0); + writerThread.join(); + if (nanoLogSingleton.compressionThread.joinable()) nanoLogSingleton.compressionThread.join(); - // Free all the data structures - if (compressingBuffer) { - free(compressingBuffer); - compressingBuffer = nullptr; - } - - if (outputDoubleBuffer) { - free(outputDoubleBuffer); - outputDoubleBuffer = nullptr; - } + if (nanoLogSingleton.writerThread.joinable()) + nanoLogSingleton.writerThread.join(); - if (outputFd > 0) - close(outputFd); + if (outputFile != nullptr) + fclose(outputFile); - outputFd = 0; + outputFile = nullptr; } // Documentation in NanoLog.h @@ -143,7 +122,7 @@ RuntimeLogger::getStats() { char buffer[1024]; // Leaks abstraction, but basically flush so we get all the time uint64_t start = PerfUtils::Cycles::rdtsc(); - fdatasync(nanoLogSingleton.outputFd); + fdatasync(fileno(nanoLogSingleton.outputFile)); uint64_t stop = PerfUtils::Cycles::rdtsc(); nanoLogSingleton.cyclesDiskIO_upperBound += (stop - start); @@ -157,8 +136,6 @@ RuntimeLogger::getStats() { nanoLogSingleton.totalBytesWritten); double totalBytesReadDouble = static_cast( nanoLogSingleton.totalBytesRead); - double padBytesWrittenDouble = static_cast( - nanoLogSingleton.padBytesWritten); double numEventsProcessedDouble = static_cast( nanoLogSingleton.logsProcessed); @@ -218,14 +195,11 @@ RuntimeLogger::getStats() { compressTime * 1.0e9 / numEventsProcessedDouble); out << buffer; - snprintf(buffer, 1024, "The compression ratio was %0.2lf-%0.2lfx " - "(%lu bytes in, %lu bytes out, %lu pad bytes)\n", - 1.0 * totalBytesReadDouble / (totalBytesWrittenDouble - + padBytesWrittenDouble), + snprintf(buffer, 1024, "The compression ratio was %0.2lf " + "(%lu bytes in, %lu bytes out)\n", 1.0 * totalBytesReadDouble / totalBytesWrittenDouble, nanoLogSingleton.totalBytesRead, - nanoLogSingleton.totalBytesWritten, - nanoLogSingleton.padBytesWritten); + nanoLogSingleton.totalBytesWritten); out << buffer; return out.str(); @@ -314,41 +288,8 @@ RuntimeLogger::preallocate() { } /** -* Internal helper function to wait for AIO completion. -*/ -void -RuntimeLogger::waitForAIO() { - if (hasOutstandingOperation) { - if (aio_error(&aioCb) == EINPROGRESS) { - const struct aiocb *const aiocb_list[] = {&aioCb}; - int err = aio_suspend(aiocb_list, 1, NULL); - - if (err != 0) - perror("LogCompressor's Posix AIO suspend operation failed"); - } - - int err = aio_error(&aioCb); - ssize_t ret = aio_return(&aioCb); - - if (err != 0) { - fprintf(stderr, "LogCompressor's POSIX AIO failed with %d: %s\r\n", - err, strerror(err)); - } else if (ret < 0) { - perror("LogCompressor's Posix AIO Write operation failed"); - } - ++numAioWritesCompleted; - hasOutstandingOperation = false; - - if (syncStatus == WAITING_ON_AIO) { - syncStatus = SYNC_COMPLETED; - hintSyncCompleted.notify_one(); - } - } -} - -/** -* Main compression thread that handles scanning through the StagingBuffers, -* compressing log entries, and outputting a compressed log file. +* Main compression thread that handles scanning through the StagingBuffers and +* compressing log entries. */ void RuntimeLogger::compressionThreadMain() { @@ -362,7 +303,7 @@ RuntimeLogger::compressionThreadMain() { cycleAtThreadStart = cyclesAwakeStart; // Manages the state associated with compressing log messages - Log::Encoder encoder(compressingBuffer, NanoLogConfig::OUTPUT_BUFFER_SIZE); + Log::Encoder encoder(buffer.getCompressingBuffer(), NanoLogConfig::OUTPUT_BUFFER_SIZE); // Indicates whether a compression operation failed or not due // to insufficient space in the outputBuffer @@ -535,51 +476,35 @@ RuntimeLogger::compressionThreadMain() { } if (hasOutstandingOperation) { - if (aio_error(&aioCb) == EINPROGRESS) { - const struct aiocb *const aiocb_list[] = {&aioCb}; - if (outputBufferFull) { - // If the output buffer is full and we're not done, - // wait for completion - cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart; - int err = aio_suspend(aiocb_list, 1, NULL); + if (buffer.writeInProgress() && !outputBufferFull) { + // If there's no new data, go to sleep. + if (bytesConsumedThisIteration == 0 && + NanoLogConfig::POLL_INTERVAL_DURING_IO_US > 0) + { + std::unique_lock lock(condMutex); + cyclesActive += PerfUtils::Cycles::rdtsc() - + cyclesAwakeStart; + workAdded.wait_for(lock, std::chrono::microseconds( + NanoLogConfig::POLL_INTERVAL_DURING_IO_US)); cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); - if (err != 0) - perror("LogCompressor's Posix AIO " - "suspend operation failed"); - } else { - // If there's no new data, go to sleep. - if (bytesConsumedThisIteration == 0 && - NanoLogConfig::POLL_INTERVAL_DURING_IO_US > 0) - { - std::unique_lock lock(condMutex); - cyclesActive += PerfUtils::Cycles::rdtsc() - - cyclesAwakeStart; - workAdded.wait_for(lock, std::chrono::microseconds( - NanoLogConfig::POLL_INTERVAL_DURING_IO_US)); - cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); - } - - if (aio_error(&aioCb) == EINPROGRESS) - continue; } - } - - // Finishing up the IO - int err = aio_error(&aioCb); - ssize_t ret = aio_return(&aioCb); - if (err != 0) { - fprintf(stderr, "LogCompressor's POSIX AIO failed" - " with %d: %s\r\n", err, strerror(err)); - } else if (ret < 0) { - perror("LogCompressor's Posix AIO Write failed"); + if (buffer.writeInProgress()) + continue; } - ++numAioWritesCompleted; - hasOutstandingOperation = false; - cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart); // We've completed an AIO, check if we need to notify if (syncStatus == WAITING_ON_AIO) { + cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart; + int err = buffer.swapBuffer(0); + cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); + if (err != 0) + fprintf(stderr, "LogCompressor's write failed with %d: %s\n", + err, strerror(err)); + + ++numAioWritesCompleted; + hasOutstandingOperation = false; + cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart); std::unique_lock lock(nanoLogSingleton.condMutex); if (syncStatus == WAITING_ON_AIO) { syncStatus = SYNC_COMPLETED; @@ -595,32 +520,26 @@ RuntimeLogger::compressionThreadMain() { if (bytesToWrite == 0) continue; - // Pad the output if necessary - if (NanoLogConfig::FILE_PARAMS & O_DIRECT) { - ssize_t bytesOver = bytesToWrite % 512; - - if (bytesOver != 0) { - memset(compressingBuffer, 0, 512 - bytesOver); - bytesToWrite = bytesToWrite + 512 - bytesOver; - padBytesWritten += (512 - bytesOver); - } - } - - aioCb.aio_fildes = outputFd; - aioCb.aio_buf = compressingBuffer; - aioCb.aio_nbytes = bytesToWrite; totalBytesWritten += bytesToWrite; - cyclesAtLastAIOStart = PerfUtils::Cycles::rdtsc(); - if (aio_write(&aioCb) == -1) - fprintf(stderr, "Error at aio_write(): %s\n", strerror(errno)); + uint64_t tmp = PerfUtils::Cycles::rdtsc(); + int err = buffer.swapBuffer(bytesToWrite); + + if (hasOutstandingOperation) { + cyclesActive += tmp - cyclesAwakeStart; + cyclesAwakeStart = PerfUtils::Cycles::rdtsc(); + if (err != 0) + fprintf(stderr, "LogCompressor's write failed with %d: %s\n", err, + strerror(err)); - hasOutstandingOperation = true; + ++numAioWritesCompleted; + cyclesDiskIO_upperBound += (start - cyclesAtLastAIOStart); + } - // Swap buffers - encoder.swapBuffer(outputDoubleBuffer, + cyclesAtLastAIOStart = tmp; + hasOutstandingOperation = true; + encoder.swapBuffer(buffer.getCompressingBuffer(), NanoLogConfig::OUTPUT_BUFFER_SIZE); - std::swap(outputDoubleBuffer, compressingBuffer); outputBufferFull = false; } @@ -628,25 +547,35 @@ RuntimeLogger::compressionThreadMain() { cyclesActive += PerfUtils::Cycles::rdtsc() - cyclesAwakeStart; } -// Documentation in NanoLog.h +/** +* Main writer thread that outputting a compressed log file. +*/ void -RuntimeLogger::setLogFile_internal(const char *filename) { - // Check if it exists and is readable/writeable - if (access(filename, F_OK) == 0 && access(filename, R_OK | W_OK) != 0) { - std::string err = "Unable to read/write from new log file: "; - err.append(filename); - throw std::ios_base::failure(err); +RuntimeLogger::writerThreadMain() { + while (!writerThreadShouldExit) { + buffer.writeToFile(outputFile); } +} +// Documentation in NanoLog.h +void +RuntimeLogger::setLogFile_internal(const char *filename) { // Try to open the file - int newFd = open(filename, NanoLogConfig::FILE_PARAMS, 0666); - if (newFd < 0) { - std::string err = "Unable to open file new log file: '"; - err.append(filename); - err.append("': "); - err.append(strerror(errno)); + FILE* newFile = fopen(filename, NanoLogConfig::FILE_PARAMS); + if (newFile == nullptr) { + std::string err; + if (errno == EACCES) { + err = "Unable to read/write from new log file: "; + err.append(filename); + } else { + err = "Unable to open file new log file: '"; + err.append(filename); + err.append("': "); + err.append(strerror(errno)); + } throw std::ios_base::failure(err); } + setvbuf(outputFile, nullptr, _IONBF, 0); // Everything seems okay, stop the background thread and change files sync(); @@ -658,18 +587,28 @@ RuntimeLogger::setLogFile_internal(const char *filename) { workAdded.notify_all(); } + // Stop the writer thread completely + writerThreadShouldExit = true; + buffer.swapBuffer(0); + writerThread.join(); + if (compressionThread.joinable()) compressionThread.join(); - if (outputFd > 0) - close(outputFd); - outputFd = newFd; + if (writerThread.joinable()) + writerThread.join(); + + if (outputFile != nullptr) + fclose(outputFile); + outputFile = newFile; // Relaunch thread nextInvocationIndexToBePersisted = 0; // Reset the dictionary compressionThreadShouldExit = false; + writerThreadShouldExit = false; #ifndef BENCHMARK_DISCARD_ENTRIES_AT_STAGINGBUFFER compressionThread = std::thread(&RuntimeLogger::compressionThreadMain, this); + writerThread = std::thread(&RuntimeLogger::writerThreadMain, this); #endif } diff --git a/runtime/RuntimeLogger.h b/runtime/RuntimeLogger.h index 03ba2b9..c44ff0e 100644 --- a/runtime/RuntimeLogger.h +++ b/runtime/RuntimeLogger.h @@ -16,8 +16,8 @@ #ifndef RUNTIME_NANOLOG_H #define RUNTIME_NANOLOG_H -#include #include +#include #include #include @@ -27,6 +27,7 @@ #include "Config.h" #include "Common.h" +#include "DoubleBuffer.h" #include "Fence.h" #include "Log.h" #include "NanoLog.h" @@ -159,9 +160,9 @@ using namespace NanoLog; void compressionThreadMain(); - void setLogFile_internal(const char *filename); + void writerThreadMain(); - void waitForAIO(); + void setLogFile_internal(const char *filename); /** * Allocates thread-local structures if they weren't already allocated. @@ -194,10 +195,13 @@ using namespace NanoLog; // Protects reads and writes to threadBuffers std::mutex bufferMutex; - // Background thread that polls the various staging buffers, compresses - // the staged log messages, and outputs it to a file. + // Background thread that polls the various staging buffers and compresses + // the staged log messages. std::thread compressionThread; + // Background thread that writes log to file + std::thread writerThread; + // Indicates there's an operation in aioCb that should be waited on bool hasOutstandingOperation; @@ -205,6 +209,10 @@ using namespace NanoLog; // typically only set in testing or when the application is exiting. bool compressionThreadShouldExit; + // Flag signaling the writerThread to stop running. This is + // typically only set in testing or when the application is exiting. + bool writerThreadShouldExit; + // Marks the progress of flushing all log messages to disk after a user // invokes the sync() API. To complete the operation, the background // thread has to make two passes through the staging buffers and wait @@ -228,21 +236,10 @@ using namespace NanoLog; // File handle for the output file; should only be opened once at the // construction of the LogCompressor - int outputFd; + FILE* outputFile; - // POSIX AIO structure used to communicate async IO requests - struct aiocb aioCb; - - // Used to stage the compressed log messages before passing it on to the - // POSIX AIO library. - - // Dynamically allocated buffer to stage compressed log message before - // handing it over to the POSIX AIO library for output. - char *compressingBuffer; - - // Dynamically allocated double buffer that is swapped with the - // compressingBuffer when the latter is passed to the POSIX AIO library. - char *outputDoubleBuffer; + // Buffer to stage compressed log message + DoubleBuffer buffer; // Minimum log level that RuntimeLogger will accept. Anything lower will // be dropped. @@ -280,10 +277,6 @@ using namespace NanoLog; // Metric: Number of bytes written to the output file (includes padding) uint64_t totalBytesWritten; - // Metric: Number of pad bytes written to round the file to the nearest - // 512B - uint64_t padBytesWritten; - // Metric: Number of log statements compressed and outputted. uint64_t logsProcessed;