diff --git a/dbms/src/Common/ProfileEvents.cpp b/dbms/src/Common/ProfileEvents.cpp index ae8fa384b0c..5eb6f4be368 100644 --- a/dbms/src/Common/ProfileEvents.cpp +++ b/dbms/src/Common/ProfileEvents.cpp @@ -121,6 +121,10 @@ \ M(S3WriteBytes) \ M(S3ReadBytes) \ + M(S3PageReaderReusedFile) \ + M(S3PageReaderNotReusedFile) \ + M(S3PageReaderNotReusedFileReadback) \ + M(S3PageReaderNotReusedFileChangeFile) \ M(S3CreateMultipartUpload) \ M(S3UploadPart) \ M(S3CompleteMultipartUpload) \ diff --git a/dbms/src/IO/BaseFile/MemoryRandomAccessFile.h b/dbms/src/IO/BaseFile/MemoryRandomAccessFile.h index 330d2faca47..c975c7629f6 100644 --- a/dbms/src/IO/BaseFile/MemoryRandomAccessFile.h +++ b/dbms/src/IO/BaseFile/MemoryRandomAccessFile.h @@ -58,6 +58,7 @@ class MemoryRandomAccessFile : public RandomAccessFile } std::string getFileName() const override { return filename; } + std::string getInitialFileName() const override { return filename; } int getFd() const override { return -1; } diff --git a/dbms/src/IO/BaseFile/PosixRandomAccessFile.h b/dbms/src/IO/BaseFile/PosixRandomAccessFile.h index 8ac33d94bdc..c2695c80357 100644 --- a/dbms/src/IO/BaseFile/PosixRandomAccessFile.h +++ b/dbms/src/IO/BaseFile/PosixRandomAccessFile.h @@ -52,6 +52,7 @@ class PosixRandomAccessFile : public RandomAccessFile ssize_t pread(char * buf, size_t size, off_t offset) const override; std::string getFileName() const override { return file_name; } + std::string getInitialFileName() const override { return file_name; } bool isClosed() const override { return fd == -1; } diff --git a/dbms/src/IO/BaseFile/RandomAccessFile.h b/dbms/src/IO/BaseFile/RandomAccessFile.h index f62e3565541..1c7a6397eb5 100644 --- a/dbms/src/IO/BaseFile/RandomAccessFile.h +++ b/dbms/src/IO/BaseFile/RandomAccessFile.h @@ -37,6 +37,10 @@ class RandomAccessFile virtual std::string getFileName() const = 0; + // This is a temporary hack interface for `S3RandomAccessFile` + // See the difference on `S3RandomAccessFile::getFileName` and `S3RandomAccessFile::getInitialFileName` + virtual std::string getInitialFileName() const = 0; + virtual int getFd() const = 0; virtual bool isClosed() const = 0; diff --git a/dbms/src/IO/Buffer/ReadBuffer.h b/dbms/src/IO/Buffer/ReadBuffer.h index b421e26d8c0..921d15eb90b 100644 --- a/dbms/src/IO/Buffer/ReadBuffer.h +++ b/dbms/src/IO/Buffer/ReadBuffer.h @@ -163,8 +163,8 @@ class ReadBuffer : public BufferBase /** Reads n bytes, if there are less - throws an exception. */ void readStrict(char * to, size_t n) { - if (n != read(to, n)) - throw Exception("Cannot read all data", ErrorCodes::CANNOT_READ_ALL_DATA); + if (size_t actual_n = read(to, n); actual_n != n) + throw Exception(ErrorCodes::CANNOT_READ_ALL_DATA, "Cannot read all data, n={} actual_n={}", n, actual_n); } /** A method that can be more efficiently implemented in successors, in the case of reading large enough blocks. @@ -184,7 +184,7 @@ class ReadBuffer : public BufferBase * Return `false` in case of the end, `true` otherwise. * Throw an exception if something is wrong. */ - virtual bool nextImpl() { return false; }; + virtual bool nextImpl() { return false; } }; diff --git a/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.cpp b/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.cpp index 1df1fddcb2a..e905c22bff4 100644 --- a/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.cpp +++ b/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.cpp @@ -98,6 +98,11 @@ std::string ReadBufferFromRandomAccessFile::getFileName() const return file->getFileName(); } +std::string ReadBufferFromRandomAccessFile::getInitialFileName() const +{ + return file->getInitialFileName(); +} + int ReadBufferFromRandomAccessFile::getFD() const { return file->getFd(); diff --git a/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.h b/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.h index 2665a7f68fd..5be21c504ad 100644 --- a/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.h +++ b/dbms/src/IO/Buffer/ReadBufferFromRandomAccessFile.h @@ -37,6 +37,8 @@ class ReadBufferFromRandomAccessFile : public ReadBufferFromFileDescriptor std::string getFileName() const override; + std::string getInitialFileName() const; + int getFD() const override; private: @@ -45,6 +47,6 @@ class ReadBufferFromRandomAccessFile : public ReadBufferFromFileDescriptor private: RandomAccessFilePtr file; }; - -using ReadBufferFromRandomAccessFilePtr = std::unique_ptr; +using ReadBufferFromRandomAccessFilePtr = std::shared_ptr; +using ReadBufferFromRandomAccessFileUPtr = std::unique_ptr; } // namespace DB diff --git a/dbms/src/IO/Encryption/EncryptedRandomAccessFile.h b/dbms/src/IO/Encryption/EncryptedRandomAccessFile.h index fa1980ddf3c..576510ac49f 100644 --- a/dbms/src/IO/Encryption/EncryptedRandomAccessFile.h +++ b/dbms/src/IO/Encryption/EncryptedRandomAccessFile.h @@ -40,6 +40,8 @@ class EncryptedRandomAccessFile : public RandomAccessFile std::string getFileName() const override { return file->getFileName(); } + std::string getInitialFileName() const override { return file->getFileName(); } + int getFd() const override { return file->getFd(); } bool isClosed() const override { return file->isClosed(); } diff --git a/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.cpp b/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.cpp index 5133cd1b61c..eb8537b4387 100644 --- a/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.cpp +++ b/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.cpp @@ -19,7 +19,7 @@ namespace DB { -ReadBufferFromRandomAccessFilePtr ReadBufferFromRandomAccessFileBuilder::buildPtr( +ReadBufferFromRandomAccessFileUPtr ReadBufferFromRandomAccessFileBuilder::buildPtr( const FileProviderPtr & file_provider, const std::string & file_name_, const EncryptionPath & encryption_path_, @@ -33,6 +33,20 @@ ReadBufferFromRandomAccessFilePtr ReadBufferFromRandomAccessFileBuilder::buildPt return std::make_unique(file, buf_size, existing_memory, alignment); } +ReadBufferFromRandomAccessFilePtr ReadBufferFromRandomAccessFileBuilder::buildSharedPtr( + const FileProviderPtr & file_provider, + const std::string & file_name_, + const EncryptionPath & encryption_path_, + size_t buf_size, + const ReadLimiterPtr & read_limiter, + int flags, + char * existing_memory, + size_t alignment) +{ + auto file = file_provider->newRandomAccessFile(file_name_, encryption_path_, read_limiter, flags); + return std::make_shared(file, buf_size, existing_memory, alignment); +} + ReadBufferFromRandomAccessFile ReadBufferFromRandomAccessFileBuilder::build( const FileProviderPtr & file_provider, const std::string & file_name_, diff --git a/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.h b/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.h index e2aadd12833..ec6f2240d9c 100644 --- a/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.h +++ b/dbms/src/IO/FileProvider/ReadBufferFromRandomAccessFileBuilder.h @@ -26,7 +26,17 @@ namespace DB class ReadBufferFromRandomAccessFileBuilder { public: - static ReadBufferFromRandomAccessFilePtr buildPtr( + static ReadBufferFromRandomAccessFileUPtr buildPtr( + const FileProviderPtr & file_provider, + const std::string & file_name_, + const EncryptionPath & encryption_path_, + size_t buf_size = DBMS_DEFAULT_BUFFER_SIZE, + const ReadLimiterPtr & read_limiter = nullptr, + int flags = -1, + char * existing_memory = nullptr, + size_t alignment = 0); + + static ReadBufferFromRandomAccessFilePtr buildSharedPtr( const FileProviderPtr & file_provider, const std::string & file_name_, const EncryptionPath & encryption_path_, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp index 9a2d3572fc7..3e2a71a1bb5 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_delta_merge_store_fast_add_peer.cpp @@ -244,6 +244,7 @@ class DeltaMergeStoreTestFastAddPeer // clear data store->clearData(); auto table_column_defines = DMTestEnv::getDefaultColumns(); + LOG_INFO(DB::Logger::get(), "reload to clear data"); store = reload(table_column_defines); store->deleteRange(*db_context, db_context->getSettingsRef(), RowKeyRange::newAll(false, 1)); store->flushCache(*db_context, RowKeyRange::newAll(false, 1), true); @@ -358,6 +359,11 @@ try dumpCheckpoint(write_store_id); + /// The test will then create a new UniPS based on the persist files of the currrent UniPS. + /// In some cases, a "FullGC" could happen concurrently with the creation of the creation of the delta merge instance, + /// in the `reload` method. The panic will happen if DMStore tries to recover some segments, and failed to read them from UniPS. + + LOG_INFO(DB::Logger::get(), "clear data to prepare for FAP"); clearData(); verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0); @@ -374,6 +380,7 @@ try { auto table_column_defines = DMTestEnv::getDefaultColumns(); + LOG_INFO(DB::Logger::get(), "reload to apple fap snapshot"); store = reload(table_column_defines); } @@ -431,6 +438,7 @@ try RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), num_rows_write / 2 + 2 * num_rows_write); + LOG_INFO(DB::Logger::get(), "reload to check consistency"); reload(); verifyRows( @@ -494,6 +502,7 @@ try UInt64 write_store_id = current_store_id + 1; dumpCheckpoint(write_store_id); + LOG_INFO(DB::Logger::get(), "clear data to prepare for FAP"); clearData(); verifyRows(RowKeyRange::newAll(store->isCommonHandle(), store->getRowKeyColumnSize()), 0); diff --git a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp index 2055fb335e2..e2fdbf31364 100644 --- a/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp +++ b/dbms/src/Storages/KVStore/MultiRaft/ApplySnapshot.cpp @@ -358,9 +358,16 @@ void KVStore::onSnapshot( template void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTContext & tmt) { + auto keyspace_id = new_region->getKeyspaceID(); + auto table_id = new_region->getMappedTableID(); try { - LOG_INFO(log, "Begin apply snapshot, new_region={}", new_region->toString(true)); + LOG_INFO( + log, + "Begin apply snapshot, new_region={} keyspace_id={} table_id={}", + new_region->toString(true), + keyspace_id, + table_id); Stopwatch watch; SCOPE_EXIT({ @@ -373,11 +380,21 @@ void KVStore::applyPreHandledSnapshot(const RegionPtrWrap & new_region, TMTConte FAIL_POINT_PAUSE(FailPoints::pause_until_apply_raft_snapshot); // `new_region` may change in the previous function, just log the region_id down - LOG_INFO(log, "Finish apply snapshot, cost={:.3f}s region_id={}", watch.elapsedSeconds(), new_region->id()); + LOG_INFO( + log, + "Finish apply snapshot, cost={:.3f}s region_id={} keyspace_id={} table_id={}", + watch.elapsedSeconds(), + new_region->id(), + keyspace_id, + table_id); } catch (Exception & e) { - e.addMessage(fmt::format("(while applyPreHandledSnapshot region_id={})", new_region->id())); + e.addMessage(fmt::format( + "(while applyPreHandledSnapshot region_id={} keyspace_id={} table_id={})", + new_region->id(), + keyspace_id, + table_id)); e.rethrow(); } } diff --git a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp index 7a70e673fad..d6e7e073b53 100644 --- a/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp +++ b/dbms/src/Storages/KVStore/tests/gtest_kvstore.cpp @@ -1243,11 +1243,12 @@ try e.message(), fmt::format( "try to apply with older index, region_id={} applied_index={} new_index={}: (while " - "applyPreHandledSnapshot region_id={})", + "applyPreHandledSnapshot region_id={} keyspace_id=4294967295 table_id={})", region_id, 8, 6, - region_id)); + region_id, + table_id)); } } diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp index 44ca8e6933d..d478950954a 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPFilesWriter.cpp @@ -103,6 +103,18 @@ CPDataDumpStats CPFilesWriter::writeEditsAndApplyCheckpointInfo( write_down_stats.num_records, sequence, manifest_file_id); + + // TODO: Optimize the read performance by grouping the read of + // - the same S3DataFile in remote store (S3) + // - the same blob_file in local blob_store + // Espcially grouping the reading on the same S3DataFile. Because we can ONLY read + // sequentially through the S3 response. + // + // Now as a workaround, we utilize the assumption that for remote pages, "the page + // data with nearly page_id is more likely to be stored in the same S3DataFile". + // By enlarging and the ReadBuffer size and reusing the ReadBuffer in "DataSource" + // to make it more likely to hint the buffer. + for (auto & rec_edit : records) { StorageType id_storage_type = StorageType::Unknown; diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp index b8e47c92b6a..2b1ee68e3de 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.cpp @@ -24,7 +24,12 @@ Page CPWriteDataSourceBlobStore::read(const BlobStore if (page_id_and_entry.second.checkpoint_info.has_value() && page_id_and_entry.second.checkpoint_info.is_local_data_reclaimed) { - return remote_reader->read(page_id_and_entry); + // Read from S3. Try read from current cursor of the previous read buffer if possible, + // otherwise create a new one and seek from the beginning. + // Returns the read buffer we eventually read from, for later re-use. + auto [page, s3_file_buf] = remote_reader->readFromS3File(page_id_and_entry, current_s3file_buf, prefetch_size); + current_s3file_buf = s3_file_buf; + return page; } else { diff --git a/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h index 1a822fde2d4..dec090bd3be 100644 --- a/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h +++ b/dbms/src/Storages/Page/V3/CheckpointFile/CPWriteDataSource.h @@ -17,6 +17,11 @@ #include #include +namespace DB::PS::universal::tests +{ +class UniPageStorageRemoteReadTest; +} + namespace DB::PS::V3 { @@ -46,25 +51,39 @@ class CPWriteDataSourceBlobStore : public CPWriteDataSource */ explicit CPWriteDataSourceBlobStore( BlobStore & blob_store_, - const FileProviderPtr & file_provider_) + const FileProviderPtr & file_provider_, + size_t prefetch_size_) : blob_store(blob_store_) , remote_reader(std::make_unique()) , file_provider(file_provider_) + , prefetch_size(prefetch_size_) {} static CPWriteDataSourcePtr create( BlobStore & blob_store_, - const FileProviderPtr & file_provider_) + const FileProviderPtr & file_provider_, + size_t prefetch_size = DBMS_DEFAULT_BUFFER_SIZE) { - return std::make_shared(blob_store_, file_provider_); + return std::make_shared(blob_store_, file_provider_, prefetch_size); } Page read(const BlobStore::PageIdAndEntry & page_id_and_entry) override; private: BlobStore & blob_store; + + /// There could be some remote page stored in `blob_store` which need to be fetched + /// from S3 through `remote_reader`. In order to reduce the cost and also improve the + /// read performance, we keep a read buffer with `prefetch_size` and try to reuse the + /// buffer in next `read`. + S3PageReaderPtr remote_reader; FileProviderPtr file_provider; + ReadBufferFromRandomAccessFilePtr current_s3file_buf; + const size_t prefetch_size; + + // for testing + friend class DB::PS::universal::tests::UniPageStorageRemoteReadTest; }; /** diff --git a/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp b/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp index a1decd626fe..917ca9c8339 100644 --- a/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp +++ b/dbms/src/Storages/Page/V3/Universal/S3PageReader.cpp @@ -12,6 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. +#include +#include #include #include #include @@ -19,39 +21,97 @@ #include #include +namespace ProfileEvents +{ +extern const Event S3PageReaderReusedFile; +extern const Event S3PageReaderNotReusedFile; +extern const Event S3PageReaderNotReusedFileReadback; +extern const Event S3PageReaderNotReusedFileChangeFile; +} // namespace ProfileEvents + namespace DB::PS::V3 { Page S3PageReader::read(const UniversalPageIdAndEntry & page_id_and_entry) { + return std::get<0>(readFromS3File(page_id_and_entry, nullptr, DBMS_DEFAULT_BUFFER_SIZE)); +} + +std::tuple S3PageReader::readFromS3File( + const UniversalPageIdAndEntry & page_id_and_entry, + ReadBufferFromRandomAccessFilePtr file_buf, + size_t prefetch_size) +{ + assert(prefetch_size != 0); // 0-size read buffer can not read any data, should not happen + const auto & page_entry = page_id_and_entry.second; RUNTIME_CHECK(page_entry.checkpoint_info.has_value()); - auto location = page_entry.checkpoint_info.data_location; - const auto & remote_name = *location.data_file_id; - auto remote_name_view = S3::S3FilenameView::fromKey(remote_name); - RandomAccessFilePtr remote_file; + const auto & location = page_entry.checkpoint_info.data_location; + const auto & data_file_id = *location.data_file_id; + const auto remote_fname_view = S3::S3FilenameView::fromKey(data_file_id); + const auto remote_fname + = remote_fname_view.isLockFile() ? remote_fname_view.asDataFile().toFullKey() : data_file_id; + auto s3_client = S3::ClientFactory::instance().sharedTiFlashClient(); -#ifdef DBMS_PUBLIC_GTEST - if (remote_name_view.isLockFile()) + ReadBufferFromRandomAccessFilePtr read_buff; + if (file_buf == nullptr || file_buf->getPositionInFile() < 0 + || location.offset_in_file < static_cast(file_buf->getPositionInFile()) + // note that S3RandomAccessFile will prepand the bucket name as prefix, we should + // use "getInitialFileName" instead of "getFileName" + || remote_fname != file_buf->getInitialFileName()) { + if (file_buf != nullptr) + { + if (location.offset_in_file < static_cast(file_buf->getPositionInFile())) + ProfileEvents::increment(ProfileEvents::S3PageReaderNotReusedFileReadback, 1); + else if (remote_fname != file_buf->getInitialFileName()) + ProfileEvents::increment(ProfileEvents::S3PageReaderNotReusedFileChangeFile, 1); + } + S3::S3RandomAccessFilePtr s3_remote_file; + ProfileEvents::increment(ProfileEvents::S3PageReaderNotReusedFile, 1); + if (remote_fname_view.isLockFile()) + { + s3_remote_file = std::make_shared(s3_client, remote_fname); + } + else + { +#ifndef DBMS_PUBLIC_GTEST + RUNTIME_CHECK_MSG( + false, + "Can not read from an invalid remote location, location={}", + location.toDebugString()); +#else + // In unit test, we directly read from `location.data_file_id` which want to just focus on read write logic + s3_remote_file = std::make_shared(s3_client, remote_fname); #endif - remote_file = std::make_shared(s3_client, remote_name_view.asDataFile().toFullKey()); -#ifdef DBMS_PUBLIC_GTEST + } + read_buff = std::make_shared(s3_remote_file, prefetch_size); } else { - // Just used in unit test which want to just focus on read write logic - remote_file = std::make_shared(s3_client, *location.data_file_id); + ProfileEvents::increment(ProfileEvents::S3PageReaderReusedFile, 1); + // Reuse the previous read buffer on the same file + read_buff = file_buf; } -#endif - ReadBufferFromRandomAccessFile buf(remote_file); - buf.seek(location.offset_in_file, SEEK_SET); - auto buf_size = location.size_in_file; + const auto buf_size = location.size_in_file; RUNTIME_CHECK(buf_size != 0, page_id_and_entry); char * data_buf = static_cast(alloc(buf_size)); MemHolder mem_holder = createMemHolder(data_buf, [&, buf_size](char * p) { free(p, buf_size); }); - // TODO: support checksum verification - buf.readStrict(data_buf, buf_size); + try + { + RUNTIME_CHECK(read_buff != nullptr); + read_buff->seek(location.offset_in_file, SEEK_SET); + // TODO: support checksum verification + read_buff->readStrict(data_buf, buf_size); + } + catch (DB::Exception & e) + { + e.addMessage(fmt::format( + "while reading from S3, page_id={} location={}", + page_id_and_entry.first, + location.toDebugString())); + e.rethrow(); + } Page page{UniversalPageIdFormat::getU64ID(page_id_and_entry.first)}; page.data = std::string_view(data_buf, buf_size); page.mem_holder = mem_holder; @@ -61,9 +121,10 @@ Page S3PageReader::read(const UniversalPageIdAndEntry & page_id_and_entry) const auto offset = page_entry.field_offsets[index].first; page.field_offsets.emplace(index, offset); } - return page; + return std::make_tuple(page, read_buff); } + UniversalPageMap S3PageReader::read(const UniversalPageIdAndEntries & page_id_and_entries) { UniversalPageMap page_map; diff --git a/dbms/src/Storages/Page/V3/Universal/S3PageReader.h b/dbms/src/Storages/Page/V3/Universal/S3PageReader.h index 26b53a899ff..aec5234f89b 100644 --- a/dbms/src/Storages/Page/V3/Universal/S3PageReader.h +++ b/dbms/src/Storages/Page/V3/Universal/S3PageReader.h @@ -26,6 +26,12 @@ namespace Aws::S3 class S3Client; } +namespace DB +{ +class ReadBufferFromRandomAccessFile; +using ReadBufferFromRandomAccessFilePtr = std::shared_ptr; +} // namespace DB + namespace DB::PS::V3 { using UniversalPageMap = std::map; @@ -41,6 +47,13 @@ class S3PageReader : private Allocator S3PageReader() = default; Page read(const UniversalPageIdAndEntry & page_id_and_entry); + // Give an S3RandomAccessFile, try read from current cursor of this file if possible, + // otherwise create a new one and seek from the beginning. + // Returns the S3RandomAccessFile we eventually read from, for later use. + std::tuple readFromS3File( + const UniversalPageIdAndEntry & page_id_and_entry, + ReadBufferFromRandomAccessFilePtr file_buf, + size_t prefetch_size); UniversalPageMap read(const UniversalPageIdAndEntries & page_id_and_entries); diff --git a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp index f7f9bb53ac2..262a4fcbc87 100644 --- a/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp +++ b/dbms/src/Storages/Page/V3/Universal/UniversalPageStorage.cpp @@ -540,7 +540,8 @@ std::optional UniversalPageStorage::dumpIncrementalChec .data_file_id_pattern = options.data_file_id_pattern, .manifest_file_path = manifest_file_path, .manifest_file_id = manifest_file_id, - .data_source = PS::V3::CPWriteDataSourceBlobStore::create(*blob_store, file_provider), + .data_source + = PS::V3::CPWriteDataSourceBlobStore::create(*blob_store, file_provider, /*prefetch_size=*/5 * 1024 * 1024), .must_locked_files = options.must_locked_files, .sequence = sequence, .max_data_file_size = options.max_data_file_size, diff --git a/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp b/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp index 836f7888d8f..b6695c04923 100644 --- a/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp +++ b/dbms/src/Storages/Page/V3/Universal/tests/gtest_remote_read.cpp @@ -17,6 +17,7 @@ #include #include #include +#include #include #include #include @@ -32,6 +33,7 @@ #include #include #include +#include #include #include #include @@ -106,6 +108,14 @@ class UniPageStorageRemoteReadTest void TearDown() override { DB::tests::TiFlashTestEnv::disableS3Config(); } + static ReadBufferFromRandomAccessFilePtr getReadBuffFromDataSource(const V3::CPWriteDataSourcePtr & source) + { + auto * raw_ds_ptr = dynamic_cast(source.get()); + if (!raw_ds_ptr) + return nullptr; + return raw_ds_ptr->current_s3file_buf; + } + protected: void deleteBucket() { ::DB::tests::TiFlashTestEnv::deleteBucket(*s3_client); } @@ -796,6 +806,7 @@ try } uploadFile(manifest_file_path); + // Read the manifest from remote store(S3) auto manifest_file = PosixRandomAccessFile::create(manifest_file_path); auto manifest_reader = PS::V3::CPManifestFileReader::create({ .plain_file = manifest_file, @@ -807,6 +818,8 @@ try auto r = edits_r->getRecords(); ASSERT_EQ(2, r.size()); + // Mock that some remote page and ref to remote page is + // ingest into page_storage. UniversalWriteBatch wb; wb.disableRemoteLock(); wb.putRemotePage( @@ -819,6 +832,7 @@ try page_storage->write(std::move(wb)); } + // Read the remote page "page_foo" { std::vector page_fields; std::vector read_indices = {0, 2}; @@ -836,6 +850,7 @@ try ASSERT_EQ("riage rocked", String(fields3_buf.begin(), fields3_buf.size())); } + // Read the ref page "page_foo2", which ref to the remote page "page_foo" { std::vector page_fields; std::vector read_indices = {0, 2}; @@ -855,6 +870,216 @@ try } CATCH +struct FilenameWithData +{ + std::string_view filename; + std::vector data; +}; + +/// The "CPWriteDataSourceBlobStore" should reuse the underlying +/// read buffer as possible +TEST_P(UniPageStorageRemoteReadTest, OptimizedRemoteRead) +try +{ + std::vector test_input{ + FilenameWithData{ + .filename = "raw_data0", + .data = { + "The flower carriage rocked", + "Nahida opened her eyes", + "Said she just dreamed a dream", + }, + }, + FilenameWithData{ + .filename = "raw_data1", + .data = { + "Dreamed of the day that she was born", + "Dreamed of the day that the sages found her", + }, + } + }; + + // Prepare a file on remote store with "data" + for (const auto & filename_with_data : test_input) + { + const auto filename = String(filename_with_data.filename); + const String full_path = fmt::format("{}/{}", getTemporaryPath(), filename); + PosixWritableFile wf(full_path, true, -1, 0600, nullptr); + for (const auto & d : filename_with_data.data) + { + wf.write(const_cast(d.data()), d.size()); + } + wf.fsync(); + wf.close(); + uploadFile(full_path); + } + + using namespace DB::PS::V3; + auto data_file0 = std::make_shared(test_input[0].filename); + auto data_file1 = std::make_shared(test_input[1].filename); + const PageEntriesV3 all_entries{ + // test_input[0] + PageEntryV3{ + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = data_file0, + .offset_in_file = 0, + .size_in_file = test_input[0].data[0].size(), + }, + true, + true), + }, + PageEntryV3{ + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = data_file0, + .offset_in_file = test_input[0].data[0].size(), + .size_in_file = test_input[0].data[1].size(), + }, + true, + true), + }, + PageEntryV3{ + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = data_file0, + .offset_in_file = test_input[0].data[0].size() + test_input[0].data[1].size(), + .size_in_file = test_input[0].data[2].size(), + }, + true, + true), + }, + // test_input[1] + PageEntryV3{ + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = data_file1, + .offset_in_file = 0, + .size_in_file = test_input[1].data[0].size(), + }, + true, + true), + }, + PageEntryV3{ + .checkpoint_info = OptionalCheckpointInfo( + CheckpointLocation{ + .data_file_id = data_file1, + .offset_in_file = test_input[1].data[0].size(), + .size_in_file = test_input[1].data[1].size(), + }, + true, + true), + }, + }; + auto gen_read_entries = [&all_entries](const std::vector indexes) { + UniversalPageIdAndEntries to_read; + to_read.reserve(indexes.size()); + for (const auto index : indexes) + to_read.emplace_back(std::make_pair(fmt::format("page_{}", index), all_entries[index])); + return to_read; + }; + + PageTypeAndConfig page_type_and_config{ + {PageType::Normal, PageTypeConfig{.heavy_gc_valid_rate = 0.5}}, + {PageType::RaftData, PageTypeConfig{.heavy_gc_valid_rate = 0.1}}, + }; + auto blob_store = PS::V3::BlobStore( + getCurrentTestName(), + file_provider, + delegator, + PS::V3::BlobConfig{}, + page_type_and_config); + + /// Begin the read testing + { + // Read all the data in order, should reuse the underlying read buffer + UniversalPageIdAndEntries to_read = gen_read_entries({0, 1, 2, 3, 4}); + + auto data_source = PS::V3::CPWriteDataSourceBlobStore::create( + blob_store, + std::shared_ptr(nullptr), + 5 * 1024 * 1024); + ASSERT_EQ(nullptr, getReadBuffFromDataSource(data_source)); + + Page page = data_source->read(to_read[0]); + ASSERT_STRVIEW_EQ(page.data, test_input[0].data[0]); + auto prev_read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_NE(nullptr, prev_read_buff_of_source); + + page = data_source->read(to_read[1]); + ASSERT_STRVIEW_EQ(page.data, test_input[0].data[1]); + // should reuse the read buffer + auto read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_EQ(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + + page = data_source->read(to_read[2]); + ASSERT_STRVIEW_EQ(page.data, test_input[0].data[2]); + // should reuse the read buffer + read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_EQ(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + + page = data_source->read(to_read[3]); + ASSERT_STRVIEW_EQ(page.data, test_input[1].data[0]); + // A new filename, should NOT reuse the read buffer + read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_NE(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + + page = data_source->read(to_read[4]); + ASSERT_STRVIEW_EQ(page.data, test_input[1].data[1]); + // should reuse the read buffer + read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_EQ(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + } + { + // Read page_0, page_2, page_4, page_3, page_1 + UniversalPageIdAndEntries to_read = gen_read_entries({0, 2, 4, 3, 1}); + + auto data_source = PS::V3::CPWriteDataSourceBlobStore::create( + blob_store, + std::shared_ptr(nullptr), + 5 * 1024 * 1024); + ASSERT_EQ(nullptr, getReadBuffFromDataSource(data_source)); + + Page page = data_source->read(to_read[0]); + ASSERT_STRVIEW_EQ(page.data, test_input[0].data[0]); + auto prev_read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_NE(nullptr, prev_read_buff_of_source); + + page = data_source->read(to_read[1]); + ASSERT_STRVIEW_EQ(page.data, test_input[0].data[2]); + // should reuse the read buffer + auto read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_EQ(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + + page = data_source->read(to_read[2]); + ASSERT_STRVIEW_EQ(page.data, test_input[1].data[1]); + // A new filename, should NOT reuse the read buffer + read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_NE(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + + page = data_source->read(to_read[3]); + ASSERT_STRVIEW_EQ(page.data, test_input[1].data[0]); + // Rewind back in the same file, should NOT reuse the read buffer + read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_NE(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + + page = data_source->read(to_read[4]); + ASSERT_STRVIEW_EQ(page.data, test_input[0].data[1]); + // A new filename, should NOT reuse the read buffer + read_buff_of_source = getReadBuffFromDataSource(data_source); + ASSERT_NE(read_buff_of_source, prev_read_buff_of_source); + prev_read_buff_of_source = read_buff_of_source; + } +} +CATCH + TEST_P(UniPageStorageRemoteReadTest, WriteReadExternal) try { diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.cpp b/dbms/src/Storages/S3/S3RandomAccessFile.cpp index 86c65e88905..867892bd5d9 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.cpp +++ b/dbms/src/Storages/S3/S3RandomAccessFile.cpp @@ -39,6 +39,11 @@ extern const Event S3IOSeek; namespace DB::S3 { +String S3RandomAccessFile::summary() const +{ + return fmt::format("remote_fname={} cur_offset={} cur_retry={}", remote_fname, cur_offset, cur_retry); +} + S3RandomAccessFile::S3RandomAccessFile(std::shared_ptr client_ptr_, const String & remote_fname_) : client_ptr(std::move(client_ptr_)) , remote_fname(remote_fname_) @@ -54,6 +59,11 @@ std::string S3RandomAccessFile::getFileName() const return fmt::format("{}/{}", client_ptr->bucket(), remote_fname); } +std::string S3RandomAccessFile::getInitialFileName() const +{ + return remote_fname; +} + bool isRetryableError(int e) { return e == ECONNRESET || e == EAGAIN; @@ -172,7 +182,6 @@ off_t S3RandomAccessFile::seekImpl(off_t offset_, int whence) cur_offset = offset_; return cur_offset; } - String S3RandomAccessFile::readRangeOfObject() { return fmt::format("bytes={}-", cur_offset); diff --git a/dbms/src/Storages/S3/S3RandomAccessFile.h b/dbms/src/Storages/S3/S3RandomAccessFile.h index c7809411619..6f7bea23875 100644 --- a/dbms/src/Storages/S3/S3RandomAccessFile.h +++ b/dbms/src/Storages/S3/S3RandomAccessFile.h @@ -52,8 +52,12 @@ class S3RandomAccessFile final : public RandomAccessFile ssize_t read(char * buf, size_t size) override; + // Note that this will return "{S3Client.bucket_name}/{remote_fname}" std::string getFileName() const override; + // Return "remote_fname" + std::string getInitialFileName() const override; + ssize_t pread(char * /*buf*/, size_t /*size*/, off_t /*offset*/) const override { throw Exception("S3RandomAccessFile not support pread", ErrorCodes::NOT_IMPLEMENTED); @@ -77,6 +81,8 @@ class S3RandomAccessFile final : public RandomAccessFile return ext::make_scope_guard([]() { read_file_info.reset(); }); } + String summary() const; + private: bool initialize(); off_t seekImpl(off_t offset, int whence); @@ -102,4 +108,6 @@ class S3RandomAccessFile final : public RandomAccessFile static constexpr Int32 max_retry = 3; }; +using S3RandomAccessFilePtr = std::shared_ptr; + } // namespace DB::S3 diff --git a/dbms/src/TestUtils/TiFlashTestBasic.cpp b/dbms/src/TestUtils/TiFlashTestBasic.cpp index 66b7b4517bb..fa76fc610f3 100644 --- a/dbms/src/TestUtils/TiFlashTestBasic.cpp +++ b/dbms/src/TestUtils/TiFlashTestBasic.cpp @@ -41,4 +41,14 @@ ::testing::AssertionResult fieldCompare( return ::testing::internal::EqFailure(lhs_expr, rhs_expr, lhs.toString(), rhs.toString(), false); } +::testing::AssertionResult StringViewCompare( + const char * lhs_expr, + const char * rhs_expr, + std::string_view lhs, + std::string_view rhs) +{ + if (strncmp(lhs.data(), rhs.data(), lhs.size()) == 0) + return ::testing::AssertionSuccess(); + return ::testing::internal::EqFailure(lhs_expr, rhs_expr, String(lhs), String(rhs), false); +} } // namespace DB::tests diff --git a/dbms/src/TestUtils/TiFlashTestBasic.h b/dbms/src/TestUtils/TiFlashTestBasic.h index d3572d8ed4a..1acf12312a2 100644 --- a/dbms/src/TestUtils/TiFlashTestBasic.h +++ b/dbms/src/TestUtils/TiFlashTestBasic.h @@ -95,6 +95,14 @@ ::testing::AssertionResult fieldCompare( #define ASSERT_FIELD_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::tests::fieldCompare, val1, val2) #define EXPECT_FIELD_EQ(val1, val2) EXPECT_PRED_FORMAT2(::DB::tests::fieldCompare, val1, val2) +::testing::AssertionResult StringViewCompare( + const char * lhs_expr, + const char * rhs_expr, + std::string_view lhs, + std::string_view rhs); + +#define ASSERT_STRVIEW_EQ(val1, val2) ASSERT_PRED_FORMAT2(::DB::tests::StringViewCompare, val1, val2) + // A simple helper for getting DataType from type name inline DataTypePtr typeFromString(const String & str) { @@ -116,7 +124,7 @@ inline DataTypes typesFromString(const String & str) #define CHECK_TESTS_WITH_DATA_ENABLED \ if (!TiFlashTestEnv::isTestsWithDataEnabled()) \ { \ - const auto * test_info = ::testing::UnitTest::GetInstance()->current_test_info(); \ + const auto * test_info = ::testing::UnitTest::GetInstance() -> current_test_info(); \ LOG_INFO( \ &Poco::Logger::get("GTEST"), \ fmt::format("Test: {}.{} is disabled.", test_info->test_case_name(), test_info->name())); \