Skip to content

Commit

Permalink
Improve s3 random file by introducing cache (#9614)
Browse files Browse the repository at this point in the history
close #8673

Signed-off-by: Calvin Neo <[email protected]>

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
  • Loading branch information
CalvinNeo and ti-chi-bot[bot] authored Nov 19, 2024
1 parent d84d282 commit c089c9a
Show file tree
Hide file tree
Showing 24 changed files with 477 additions and 36 deletions.
4 changes: 4 additions & 0 deletions dbms/src/Common/ProfileEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,10 @@
\
M(S3WriteBytes) \
M(S3ReadBytes) \
M(S3PageReaderReusedFile) \
M(S3PageReaderNotReusedFile) \
M(S3PageReaderNotReusedFileReadback) \
M(S3PageReaderNotReusedFileChangeFile) \
M(S3CreateMultipartUpload) \
M(S3UploadPart) \
M(S3CompleteMultipartUpload) \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/IO/BaseFile/MemoryRandomAccessFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ class MemoryRandomAccessFile : public RandomAccessFile
}

std::string getFileName() const override { return filename; }
std::string getInitialFileName() const override { return filename; }

int getFd() const override { return -1; }

Expand Down
1 change: 1 addition & 0 deletions dbms/src/IO/BaseFile/PosixRandomAccessFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ class PosixRandomAccessFile : public RandomAccessFile
ssize_t pread(char * buf, size_t size, off_t offset) const override;

std::string getFileName() const override { return file_name; }
std::string getInitialFileName() const override { return file_name; }

bool isClosed() const override { return fd == -1; }

Expand Down
4 changes: 4 additions & 0 deletions dbms/src/IO/BaseFile/RandomAccessFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ class RandomAccessFile

virtual std::string getFileName() const = 0;

// This is a temporary hack interface for `S3RandomAccessFile`
// See the difference on `S3RandomAccessFile::getFileName` and `S3RandomAccessFile::getInitialFileName`
virtual std::string getInitialFileName() const = 0;

virtual int getFd() const = 0;

virtual bool isClosed() const = 0;
Expand Down
6 changes: 3 additions & 3 deletions dbms/src/IO/Buffer/ReadBuffer.h
Original file line number Diff line number Diff line change
Expand Up @@ -163,8 +163,8 @@ class ReadBuffer : public BufferBase
/** Reads n bytes, if there are less - throws an exception. */
void readStrict(char * to, size_t n)
{
if (n != read(to, n))
throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA);
if (size_t actual_n = read(to, n); actual_n != n)
throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, n={} actual_n={}", n, actual_n);
}

/** A method that can be more efficiently implemented in successors, in the case of reading large enough blocks.
Expand All @@ -184,7 +184,7 @@ class ReadBuffer : public BufferBase
* Return `false` in case of the end, `true` otherwise.
* Throw an exception if something is wrong.
*/
virtual bool nextImpl() { return false; };
virtual bool nextImpl() { return false; }
};


Expand Down
5 changes: 5 additions & 0 deletions dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ std::string ReadBufferFromRandomAccessFile::getFileName() const
return file->getFileName();
}

std::string ReadBufferFromRandomAccessFile::getInitialFileName() const
{
return file->getInitialFileName();
}

int ReadBufferFromRandomAccessFile::getFD() const
{
return file->getFd();
Expand Down
6 changes: 4 additions & 2 deletions dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ class ReadBufferFromRandomAccessFile : public ReadBufferFromFileDescriptor

std::string getFileName() const override;

std::string getInitialFileName() const;

int getFD() const override;

private:
Expand All @@ -45,6 +47,6 @@ class ReadBufferFromRandomAccessFile : public ReadBufferFromFileDescriptor
private:
RandomAccessFilePtr file;
};

using ReadBufferFromRandomAccessFilePtr = std::unique_ptr<ReadBufferFromRandomAccessFile>;
using ReadBufferFromRandomAccessFilePtr = std::shared_ptr<ReadBufferFromRandomAccessFile>;
using ReadBufferFromRandomAccessFileUPtr = std::unique_ptr<ReadBufferFromRandomAccessFile>;
} // namespace DB
2 changes: 2 additions & 0 deletions dbms/src/IO/Encryption/EncryptedRandomAccessFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ class EncryptedRandomAccessFile : public RandomAccessFile

std::string getFileName() const override { return file->getFileName(); }

std::string getInitialFileName() const override { return file->getFileName(); }

int getFd() const override { return file->getFd(); }

bool isClosed() const override { return file->isClosed(); }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace DB
{

ReadBufferFromRandomAccessFilePtr ReadBufferFromRandomAccessFileBuilder::buildPtr(
ReadBufferFromRandomAccessFileUPtr ReadBufferFromRandomAccessFileBuilder::buildPtr(
const FileProviderPtr & file_provider,
const std::string & file_name_,
const EncryptionPath & encryption_path_,
Expand All @@ -33,6 +33,20 @@ ReadBufferFromRandomAccessFilePtr ReadBufferFromRandomAccessFileBuilder::buildPt
return std::make_unique<ReadBufferFromRandomAccessFile>(file, buf_size, existing_memory, alignment);
}

ReadBufferFromRandomAccessFilePtr ReadBufferFromRandomAccessFileBuilder::buildSharedPtr(
const FileProviderPtr & file_provider,
const std::string & file_name_,
const EncryptionPath & encryption_path_,
size_t buf_size,
const ReadLimiterPtr & read_limiter,
int flags,
char * existing_memory,
size_t alignment)
{
auto file = file_provider->newRandomAccessFile(file_name_, encryption_path_, read_limiter, flags);
return std::make_shared<ReadBufferFromRandomAccessFile>(file, buf_size, existing_memory, alignment);
}

ReadBufferFromRandomAccessFile ReadBufferFromRandomAccessFileBuilder::build(
const FileProviderPtr & file_provider,
const std::string & file_name_,
Expand Down
12 changes: 11 additions & 1 deletion dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,17 @@ namespace DB
class ReadBufferFromRandomAccessFileBuilder
{
public:
static ReadBufferFromRandomAccessFilePtr buildPtr(
static ReadBufferFromRandomAccessFileUPtr buildPtr(
const FileProviderPtr & file_provider,
const std::string & file_name_,
const EncryptionPath & encryption_path_,
size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE,
const ReadLimiterPtr & read_limiter = nullptr,
int flags = -1,
char * existing_memory = nullptr,
size_t alignment = 0);

static ReadBufferFromRandomAccessFilePtr buildSharedPtr(
const FileProviderPtr & file_provider,
const std::string & file_name_,
const EncryptionPath & encryption_path_,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,7 @@ class DeltaMergeStoreTestFastAddPeer
// clear data
store->clearData();
auto table_column_defines = DMTestEnv::getDefaultColumns();
LOG_INFO(DB::Logger::get(), "reload to clear data");
store = reload(table_column_defines);
store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::newAll(false, 1));
store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true);
Expand Down Expand Up @@ -358,6 +359,11 @@ try

dumpCheckpoint(write_store_id);

/// The test will then create a new UniPS based on the persist files of the currrent UniPS.
/// In some cases, a "FullGC" could happen concurrently with the creation of the creation of the delta merge instance,
/// in the `reload` method. The panic will happen if DMStore tries to recover some segments, and failed to read them from UniPS.

LOG_INFO(DB::Logger::get(), "clear data to prepare for FAP");
clearData();

verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0);
Expand All @@ -374,6 +380,7 @@ try
{
auto table_column_defines = DMTestEnv::getDefaultColumns();

LOG_INFO(DB::Logger::get(), "reload to apple fap snapshot");
store = reload(table_column_defines);
}

Expand Down Expand Up @@ -431,6 +438,7 @@ try
RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()),
num_rows_write / 2 + 2 * num_rows_write);

LOG_INFO(DB::Logger::get(), "reload to check consistency");
reload();

verifyRows(
Expand Down Expand Up @@ -494,6 +502,7 @@ try
UInt64 write_store_id = current_store_id + 1;
dumpCheckpoint(write_store_id);

LOG_INFO(DB::Logger::get(), "clear data to prepare for FAP");
clearData();

verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0);
Expand Down
23 changes: 20 additions & 3 deletions dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -358,9 +358,16 @@ void KVStore::onSnapshot(
template <typename RegionPtrWrap>
void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTContext & tmt)
{
auto keyspace_id = new_region->getKeyspaceID();
auto table_id = new_region->getMappedTableID();
try
{
LOG_INFO(log, "Begin apply snapshot, new_region={}", new_region->toString(true));
LOG_INFO(
log,
"Begin apply snapshot, new_region={} keyspace_id={} table_id={}",
new_region->toString(true),
keyspace_id,
table_id);

Stopwatch watch;
SCOPE_EXIT({
Expand All @@ -373,11 +380,21 @@ void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTConte
FAIL_POINT_PAUSE(FailPoints::pause_until_apply_raft_snapshot);

// `new_region` may change in the previous function, just log the region_id down
LOG_INFO(log, "Finish apply snapshot, cost={:.3f}s region_id={}", watch.elapsedSeconds(), new_region->id());
LOG_INFO(
log,
"Finish apply snapshot, cost={:.3f}s region_id={} keyspace_id={} table_id={}",
watch.elapsedSeconds(),
new_region->id(),
keyspace_id,
table_id);
}
catch (Exception & e)
{
e.addMessage(fmt::format("(while applyPreHandledSnapshot region_id={})", new_region->id()));
e.addMessage(fmt::format(
"(while applyPreHandledSnapshot region_id={} keyspace_id={} table_id={})",
new_region->id(),
keyspace_id,
table_id));
e.rethrow();
}
}
Expand Down
5 changes: 3 additions & 2 deletions dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1243,11 +1243,12 @@ try
e.message(),
fmt::format(
"try to apply with older index, region_id={} applied_index={} new_index={}: (while "
"applyPreHandledSnapshot region_id={})",
"applyPreHandledSnapshot region_id={} keyspace_id=4294967295 table_id={})",
region_id,
8,
6,
region_id));
region_id,
table_id));
}
}

Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,18 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo(
write_down_stats.num_records,
sequence,
manifest_file_id);

// TODO: Optimize the read performance by grouping the read of
// - the same S3DataFile in remote store (S3)
// - the same blob_file in local blob_store
// Espcially grouping the reading on the same S3DataFile. Because we can ONLY read
// sequentially through the S3 response.
//
// Now as a workaround, we utilize the assumption that for remote pages, "the page
// data with nearly page_id is more likely to be stored in the same S3DataFile".
// By enlarging and the ReadBuffer size and reusing the ReadBuffer in "DataSource"
// to make it more likely to hint the buffer.

for (auto & rec_edit : records)
{
StorageType id_storage_type = StorageType::Unknown;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,12 @@ Page CPWriteDataSourceBlobStore::read(const BlobStore<universal::BlobStoreTrait>
if (page_id_and_entry.second.checkpoint_info.has_value()
&& page_id_and_entry.second.checkpoint_info.is_local_data_reclaimed)
{
return remote_reader->read(page_id_and_entry);
// Read from S3. Try read from current cursor of the previous read buffer if possible,
// otherwise create a new one and seek from the beginning.
// Returns the read buffer we eventually read from, for later re-use.
auto [page, s3_file_buf] = remote_reader->readFromS3File(page_id_and_entry, current_s3file_buf, prefetch_size);
current_s3file_buf = s3_file_buf;
return page;
}
else
{
Expand Down
25 changes: 22 additions & 3 deletions dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,11 @@
#include <Storages/Page/V3/BlobStore.h>
#include <Storages/Page/V3/Universal/S3PageReader.h>

namespace DB::PS::universal::tests
{
class UniPageStorageRemoteReadTest;
}

namespace DB::PS::V3
{

Expand Down Expand Up @@ -46,25 +51,39 @@ class CPWriteDataSourceBlobStore : public CPWriteDataSource
*/
explicit CPWriteDataSourceBlobStore(
BlobStore<universal::BlobStoreTrait> & blob_store_,
const FileProviderPtr & file_provider_)
const FileProviderPtr & file_provider_,
size_t prefetch_size_)
: blob_store(blob_store_)
, remote_reader(std::make_unique<S3PageReader>())
, file_provider(file_provider_)
, prefetch_size(prefetch_size_)
{}

static CPWriteDataSourcePtr create(
BlobStore<universal::BlobStoreTrait> & blob_store_,
const FileProviderPtr & file_provider_)
const FileProviderPtr & file_provider_,
size_t prefetch_size = DBMS_DEFAULT_BUFFER_SIZE)
{
return std::make_shared<CPWriteDataSourceBlobStore>(blob_store_, file_provider_);
return std::make_shared<CPWriteDataSourceBlobStore>(blob_store_, file_provider_, prefetch_size);
}

Page read(const BlobStore<universal::BlobStoreTrait>::PageIdAndEntry & page_id_and_entry) override;

private:
BlobStore<universal::BlobStoreTrait> & blob_store;

/// There could be some remote page stored in `blob_store` which need to be fetched
/// from S3 through `remote_reader`. In order to reduce the cost and also improve the
/// read performance, we keep a read buffer with `prefetch_size` and try to reuse the
/// buffer in next `read`.

S3PageReaderPtr remote_reader;
FileProviderPtr file_provider;
ReadBufferFromRandomAccessFilePtr current_s3file_buf;
const size_t prefetch_size;

// for testing
friend class DB::PS::universal::tests::UniPageStorageRemoteReadTest;
};

/**
Expand Down
Loading

0 comments on commit c089c9a

Please sign in to comment.