diff --git a/be/src/runtime/snapshot_loader.cpp b/be/src/runtime/snapshot_loader.cpp index 784904c78a3fb1..faf9abe925a63e 100644 --- a/be/src/runtime/snapshot_loader.cpp +++ b/be/src/runtime/snapshot_loader.cpp @@ -57,7 +57,17 @@ #include "util/thrift_rpc_helper.h" namespace doris { -namespace { + +static std::string get_loaded_tag_path(const std::string& snapshot_path) { + return snapshot_path + "/LOADED"; +} + +static Status write_loaded_tag(const std::string& snapshot_path, int64_t tablet_id) { + std::unique_ptr writer; + std::string file = get_loaded_tag_path(snapshot_path); + RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer)); + return writer->close(); +} Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path, std::string_view remote_path, std::string_view checksum) { @@ -84,8 +94,6 @@ bool _end_with(std::string_view str, std::string_view match) { str.compare(str.size() - match.size(), match.size(), match) == 0; } -} // namespace - SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, int64_t task_id, const TNetworkAddress& broker_addr, const std::map& prop) @@ -753,6 +761,14 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta return Status::InternalError(ss.str()); } + std::string loaded_tag_path = get_loaded_tag_path(snapshot_path); + bool already_loaded = false; + RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, &already_loaded)); + if (already_loaded) { + LOG(INFO) << "snapshot path already moved: " << snapshot_path; + return Status::OK(); + } + // rename the rowset ids and tabletid info in rowset meta auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id, tablet->replica_id(), tablet->table_id(), @@ -822,6 +838,10 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta LOG(WARNING) << ss.str(); return Status::InternalError(ss.str()); } + + // mark the snapshot path as loaded + RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id)); + LOG(INFO) << "finished to reload header of tablet: " << tablet_id; return status; diff --git a/be/test/runtime/snapshot_loader_test.cpp b/be/test/runtime/snapshot_loader_test.cpp index 9e4de5f3f334a3..cb75760bb6db9e 100644 --- a/be/test/runtime/snapshot_loader_test.cpp +++ b/be/test/runtime/snapshot_loader_test.cpp @@ -17,18 +17,215 @@ #include "runtime/snapshot_loader.h" +#include +#include +#include +#include #include #include +#include +#include #include +#include +#include -#include "gtest/gtest_pred_impl.h" +#include "common/config.h" +#include "common/object_pool.h" +#include "exec/tablet_info.h" +#include "io/fs/local_file_system.h" +#include "olap/data_dir.h" +#include "olap/delta_writer.h" +#include "olap/iterators.h" +#include "olap/olap_define.h" +#include "olap/options.h" +#include "olap/rowset/beta_rowset.h" +#include "olap/rowset/segment_v2/segment.h" +#include "olap/schema.h" +#include "olap/segment_loader.h" +#include "olap/snapshot_manager.h" #include "olap/storage_engine.h" +#include "olap/tablet.h" +#include "olap/tablet_manager.h" +#include "olap/task/engine_publish_version_task.h" +#include "olap/txn_manager.h" +#include "runtime/define_primitive_type.h" +#include "runtime/descriptor_helper.h" +#include "runtime/descriptors.h" #include "runtime/exec_env.h" +#include "vec/columns/column.h" +#include "vec/core/block.h" +#include "vec/core/column_with_type_and_name.h" +#include "vec/runtime/vdatetime_value.h" namespace doris { -TEST(SnapshotLoaderTest, NormalCase) { +static const uint32_t MAX_PATH_LEN = 1024; +static StorageEngine* engine_ref = nullptr; +static std::string storage_root_path; + +static void set_up() { + char buffer[MAX_PATH_LEN]; + EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr); + storage_root_path = std::string(buffer) + "/snapshot_data_test"; + auto st = io::global_local_filesystem()->delete_directory(storage_root_path); + ASSERT_TRUE(st.ok()) << st; + st = io::global_local_filesystem()->create_directory(storage_root_path); + ASSERT_TRUE(st.ok()) << st; + std::vector paths; + paths.emplace_back(storage_root_path, -1); + + doris::EngineOptions options; + options.store_paths = paths; + options.backend_uid = UniqueId::gen_uid(); + auto engine = std::make_unique(options); + engine_ref = engine.get(); + Status s = engine->open(); + ASSERT_TRUE(s.ok()) << s; + ASSERT_TRUE(s.ok()) << s; + + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter()); + exec_env->set_storage_engine(std::move(engine)); +} + +static void tear_down() { + ExecEnv* exec_env = doris::ExecEnv::GetInstance(); + exec_env->set_memtable_memory_limiter(nullptr); + engine_ref = nullptr; + exec_env->set_storage_engine(nullptr); + + if (storage_root_path.empty()) { + return; + } + + // Status s = io::global_local_filesystem()->delete_directory(storage_root_path); + // EXPECT_TRUE(s.ok()) << "delete directory " << s; +} + +static TCreateTabletReq create_tablet(int64_t partition_id, int64_t tablet_id, + int32_t schema_hash) { + TColumnType col_type; + col_type.__set_type(TPrimitiveType::SMALLINT); + TColumn col1; + col1.__set_column_name("col1"); + col1.__set_column_type(col_type); + col1.__set_is_key(true); + std::vector cols; + cols.push_back(col1); + TTabletSchema tablet_schema; + tablet_schema.__set_short_key_column_count(1); + tablet_schema.__set_schema_hash(schema_hash); + tablet_schema.__set_keys_type(TKeysType::AGG_KEYS); + tablet_schema.__set_storage_type(TStorageType::COLUMN); + tablet_schema.__set_columns(cols); + TCreateTabletReq create_tablet_req; + create_tablet_req.__set_tablet_schema(tablet_schema); + create_tablet_req.__set_tablet_id(tablet_id); + create_tablet_req.__set_partition_id(partition_id); + create_tablet_req.__set_version(2); + return create_tablet_req; +} + +static TDescriptorTable create_descriptor_tablet() { + TDescriptorTableBuilder dtb; + TTupleDescriptorBuilder tuple_builder; + tuple_builder.add_slot( + TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("col1").column_pos(0).build()); + tuple_builder.build(&dtb); + return dtb.desc_tbl(); +} + +static void add_rowset(int64_t tablet_id, int32_t schema_hash, int64_t partition_id, int64_t txn_id, + int16_t value) { + TDescriptorTable tdesc_tbl = create_descriptor_tablet(); + ObjectPool obj_pool; + DescriptorTbl* desc_tbl = nullptr; + static_cast(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl)); + TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0); + auto param = std::make_shared(); + + PUniqueId load_id; + load_id.set_hi(0); + load_id.set_lo(0); + WriteRequest write_req; + write_req.tablet_id = tablet_id; + write_req.schema_hash = schema_hash; + write_req.txn_id = txn_id; + write_req.partition_id = partition_id; + write_req.load_id = load_id; + write_req.tuple_desc = tuple_desc; + write_req.slots = &(tuple_desc->slots()); + write_req.is_high_priority = false; + write_req.table_schema_param = param; + auto profile = std::make_unique("LoadChannels"); + auto delta_writer = + std::make_unique(*engine_ref, write_req, profile.get(), TUniqueId {}); + + vectorized::Block block; + for (const auto& slot_desc : tuple_desc->slots()) { + std::cout << "slot_desc: " << slot_desc->col_name() << std::endl; + block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(), + slot_desc->get_data_type_ptr(), + slot_desc->col_name())); + } + + std::cout << "total column " << block.mutate_columns().size() << std::endl; + auto columns = block.mutate_columns(); + int16_t c1 = value; + columns[0]->insert_data((const char*)&c1, sizeof(c1)); + Status res = delta_writer->write(&block, {0}); + EXPECT_TRUE(res.ok()); + + res = delta_writer->close(); + ASSERT_TRUE(res.ok()); + res = delta_writer->wait_flush(); + ASSERT_TRUE(res.ok()); + res = delta_writer->build_rowset(); + ASSERT_TRUE(res.ok()); + res = delta_writer->submit_calc_delete_bitmap_task(); + ASSERT_TRUE(res.ok()); + res = delta_writer->wait_calc_delete_bitmap(); + ASSERT_TRUE(res.ok()); + res = delta_writer->commit_txn(PSlaveTabletNodes()); + ASSERT_TRUE(res.ok()) << res; + + TabletSharedPtr tablet = engine_ref->tablet_manager()->get_tablet(tablet_id); + ASSERT_TRUE(tablet != nullptr); + + std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl; + Version version; + version.first = tablet->get_rowset_with_max_version()->end_version() + 1; + version.second = tablet->get_rowset_with_max_version()->end_version() + 1; + std::cout << "start to add rowset version:" << version.first << "-" << version.second + << std::endl; + std::map tablet_related_rs; + engine_ref->txn_manager()->get_txn_related_tablets(txn_id, partition_id, &tablet_related_rs); + ASSERT_EQ(1, tablet_related_rs.size()); + + std::cout << "start to publish txn" << std::endl; + RowsetSharedPtr rowset = tablet_related_rs.begin()->second; + + TabletPublishStatistics stats; + res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id, version, &stats); + ASSERT_TRUE(res.ok()) << res; + std::cout << "start to add inc rowset:" << rowset->rowset_id() + << ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first + << "-" << rowset->version().second << std::endl; + res = tablet->add_inc_rowset(rowset); + ASSERT_TRUE(res.ok()) << res; +} + +class SnapshotLoaderTest : public ::testing::Test { +public: + SnapshotLoaderTest() {} + ~SnapshotLoaderTest() {} + static void SetUpTestSuite() { set_up(); } + + static void TearDownTestSuite() { tear_down(); } +}; + +TEST_F(SnapshotLoaderTest, NormalCase) { StorageEngine engine({}); SnapshotLoader loader(engine, ExecEnv::GetInstance(), 1L, 2L); @@ -87,4 +284,60 @@ TEST(SnapshotLoaderTest, NormalCase) { EXPECT_EQ(10005, tablet_id); } +TEST_F(SnapshotLoaderTest, DirMoveTaskIsIdempotent) { + // 1. create a tablet + int64_t tablet_id = 111; + int32_t schema_hash = 222; + int64_t partition_id = 333; + TCreateTabletReq req = create_tablet(partition_id, tablet_id, schema_hash); + RuntimeProfile profile("CreateTablet"); + Status status = engine_ref->create_tablet(req, &profile); + EXPECT_TRUE(status.ok()); + TabletSharedPtr tablet = engine_ref->tablet_manager()->get_tablet(tablet_id); + EXPECT_TRUE(tablet != nullptr); + + // 2. add a rowset + add_rowset(tablet_id, schema_hash, partition_id, 100, 100); + auto version = tablet->max_version(); + std::cout << "version: " << version.first << ", " << version.second << std::endl; + + // 3. make a snapshot + string snapshot_path; + bool allow_incremental_clone = false; // not used + TSnapshotRequest snapshot_request; + snapshot_request.tablet_id = tablet_id; + snapshot_request.schema_hash = schema_hash; + snapshot_request.version = version.second; + status = engine_ref->snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path, + &allow_incremental_clone); + ASSERT_TRUE(status.ok()); + + // 4. load the snapshot to another tablet + snapshot_path = fmt::format("{}/{}/{}", snapshot_path, tablet_id, schema_hash); + SnapshotLoader loader1(*engine_ref, ExecEnv::GetInstance(), 1L, tablet_id); + status = loader1.move(snapshot_path, tablet, true); + ASSERT_TRUE(status.ok()) << status; + + // 5. Insert a rowset to the tablet + // reload tablet + tablet = engine_ref->tablet_manager()->get_tablet(tablet_id); + EXPECT_TRUE(tablet != nullptr); + add_rowset(tablet_id, schema_hash, partition_id, 200, 200); + version = tablet->max_version(); + std::cout << "version: " << version.first << ", " << version.second << std::endl; + + // 6. load the snapshot to the tablet again, this request should be idempotent + SnapshotLoader loader2(*engine_ref, ExecEnv::GetInstance(), 2L, tablet_id); + status = loader2.move(snapshot_path, tablet, true); + ASSERT_TRUE(status.ok()) << status; + + // reload tablet + tablet = engine_ref->tablet_manager()->get_tablet(tablet_id); + EXPECT_TRUE(tablet != nullptr); + auto last_version = tablet->max_version(); + std::cout << "last version: " << last_version.first << ", " << last_version.second << std::endl; + ASSERT_EQ(version.first, last_version.first); + ASSERT_EQ(version.second, last_version.second); +} + } // namespace doris