Skip to content

Commit

Permalink
[fix](restore) Make the DirMoveTask idempotent. (#47313)
Browse files Browse the repository at this point in the history
Since the DirMoveTask is executed asynchronously, the FE might send the
task again to ensure its completion eventually. But the rowsets
committed during two DirMoveTasks (if any) will be dropped, which causes
the data loss.

This PR adds a LOADED tag file to indicate that the snapshot has been
loaded into a tablet and should not be reloaded again.
  • Loading branch information
w41ter authored and dataroaring committed Jan 31, 2025
1 parent 4ec7f89 commit e0e78fa
Show file tree
Hide file tree
Showing 2 changed files with 278 additions and 5 deletions.
26 changes: 23 additions & 3 deletions be/src/runtime/snapshot_loader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,17 @@
#include "util/thrift_rpc_helper.h"

namespace doris {
namespace {

static std::string get_loaded_tag_path(const std::string& snapshot_path) {
return snapshot_path + "/LOADED";
}

static Status write_loaded_tag(const std::string& snapshot_path, int64_t tablet_id) {
std::unique_ptr<io::FileWriter> writer;
std::string file = get_loaded_tag_path(snapshot_path);
RETURN_IF_ERROR(io::global_local_filesystem()->create_file(file, &writer));
return writer->close();
}

Status upload_with_checksum(io::RemoteFileSystem& fs, std::string_view local_path,
std::string_view remote_path, std::string_view checksum) {
Expand All @@ -84,8 +94,6 @@ bool _end_with(std::string_view str, std::string_view match) {
str.compare(str.size() - match.size(), match.size(), match) == 0;
}

} // namespace

SnapshotLoader::SnapshotLoader(StorageEngine& engine, ExecEnv* env, int64_t job_id, int64_t task_id,
const TNetworkAddress& broker_addr,
const std::map<std::string, std::string>& prop)
Expand Down Expand Up @@ -753,6 +761,14 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
return Status::InternalError(ss.str());
}

std::string loaded_tag_path = get_loaded_tag_path(snapshot_path);
bool already_loaded = false;
RETURN_IF_ERROR(io::global_local_filesystem()->exists(loaded_tag_path, &already_loaded));
if (already_loaded) {
LOG(INFO) << "snapshot path already moved: " << snapshot_path;
return Status::OK();
}

// rename the rowset ids and tabletid info in rowset meta
auto res = _engine.snapshot_mgr()->convert_rowset_ids(snapshot_path, tablet_id,
tablet->replica_id(), tablet->table_id(),
Expand Down Expand Up @@ -822,6 +838,10 @@ Status SnapshotLoader::move(const std::string& snapshot_path, TabletSharedPtr ta
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
}

// mark the snapshot path as loaded
RETURN_IF_ERROR(write_loaded_tag(snapshot_path, tablet_id));

LOG(INFO) << "finished to reload header of tablet: " << tablet_id;

return status;
Expand Down
257 changes: 255 additions & 2 deletions be/test/runtime/snapshot_loader_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,215 @@

#include "runtime/snapshot_loader.h"

#include <gen_cpp/AgentService_types.h>
#include <gen_cpp/Descriptors_types.h>
#include <gen_cpp/Types_types.h>
#include <gen_cpp/internal_service.pb.h>
#include <gtest/gtest-message.h>
#include <gtest/gtest-test-part.h>
#include <gtest/gtest_pred_impl.h>

#include <cstdint>
#include <filesystem>
#include <iostream>
#include <string>

#include "gtest/gtest_pred_impl.h"
#include "common/config.h"
#include "common/object_pool.h"
#include "exec/tablet_info.h"
#include "io/fs/local_file_system.h"
#include "olap/data_dir.h"
#include "olap/delta_writer.h"
#include "olap/iterators.h"
#include "olap/olap_define.h"
#include "olap/options.h"
#include "olap/rowset/beta_rowset.h"
#include "olap/rowset/segment_v2/segment.h"
#include "olap/schema.h"
#include "olap/segment_loader.h"
#include "olap/snapshot_manager.h"
#include "olap/storage_engine.h"
#include "olap/tablet.h"
#include "olap/tablet_manager.h"
#include "olap/task/engine_publish_version_task.h"
#include "olap/txn_manager.h"
#include "runtime/define_primitive_type.h"
#include "runtime/descriptor_helper.h"
#include "runtime/descriptors.h"
#include "runtime/exec_env.h"
#include "vec/columns/column.h"
#include "vec/core/block.h"
#include "vec/core/column_with_type_and_name.h"
#include "vec/runtime/vdatetime_value.h"

namespace doris {

TEST(SnapshotLoaderTest, NormalCase) {
static const uint32_t MAX_PATH_LEN = 1024;
static StorageEngine* engine_ref = nullptr;
static std::string storage_root_path;

static void set_up() {
char buffer[MAX_PATH_LEN];
EXPECT_NE(getcwd(buffer, MAX_PATH_LEN), nullptr);
storage_root_path = std::string(buffer) + "/snapshot_data_test";
auto st = io::global_local_filesystem()->delete_directory(storage_root_path);
ASSERT_TRUE(st.ok()) << st;
st = io::global_local_filesystem()->create_directory(storage_root_path);
ASSERT_TRUE(st.ok()) << st;
std::vector<StorePath> paths;
paths.emplace_back(storage_root_path, -1);

doris::EngineOptions options;
options.store_paths = paths;
options.backend_uid = UniqueId::gen_uid();
auto engine = std::make_unique<StorageEngine>(options);
engine_ref = engine.get();
Status s = engine->open();
ASSERT_TRUE(s.ok()) << s;
ASSERT_TRUE(s.ok()) << s;

ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(new MemTableMemoryLimiter());
exec_env->set_storage_engine(std::move(engine));
}

static void tear_down() {
ExecEnv* exec_env = doris::ExecEnv::GetInstance();
exec_env->set_memtable_memory_limiter(nullptr);
engine_ref = nullptr;
exec_env->set_storage_engine(nullptr);

if (storage_root_path.empty()) {
return;
}

// Status s = io::global_local_filesystem()->delete_directory(storage_root_path);
// EXPECT_TRUE(s.ok()) << "delete directory " << s;
}

static TCreateTabletReq create_tablet(int64_t partition_id, int64_t tablet_id,
int32_t schema_hash) {
TColumnType col_type;
col_type.__set_type(TPrimitiveType::SMALLINT);
TColumn col1;
col1.__set_column_name("col1");
col1.__set_column_type(col_type);
col1.__set_is_key(true);
std::vector<TColumn> cols;
cols.push_back(col1);
TTabletSchema tablet_schema;
tablet_schema.__set_short_key_column_count(1);
tablet_schema.__set_schema_hash(schema_hash);
tablet_schema.__set_keys_type(TKeysType::AGG_KEYS);
tablet_schema.__set_storage_type(TStorageType::COLUMN);
tablet_schema.__set_columns(cols);
TCreateTabletReq create_tablet_req;
create_tablet_req.__set_tablet_schema(tablet_schema);
create_tablet_req.__set_tablet_id(tablet_id);
create_tablet_req.__set_partition_id(partition_id);
create_tablet_req.__set_version(2);
return create_tablet_req;
}

static TDescriptorTable create_descriptor_tablet() {
TDescriptorTableBuilder dtb;
TTupleDescriptorBuilder tuple_builder;
tuple_builder.add_slot(
TSlotDescriptorBuilder().type(TYPE_SMALLINT).column_name("col1").column_pos(0).build());
tuple_builder.build(&dtb);
return dtb.desc_tbl();
}

static void add_rowset(int64_t tablet_id, int32_t schema_hash, int64_t partition_id, int64_t txn_id,
int16_t value) {
TDescriptorTable tdesc_tbl = create_descriptor_tablet();
ObjectPool obj_pool;
DescriptorTbl* desc_tbl = nullptr;
static_cast<void>(DescriptorTbl::create(&obj_pool, tdesc_tbl, &desc_tbl));
TupleDescriptor* tuple_desc = desc_tbl->get_tuple_descriptor(0);
auto param = std::make_shared<OlapTableSchemaParam>();

PUniqueId load_id;
load_id.set_hi(0);
load_id.set_lo(0);
WriteRequest write_req;
write_req.tablet_id = tablet_id;
write_req.schema_hash = schema_hash;
write_req.txn_id = txn_id;
write_req.partition_id = partition_id;
write_req.load_id = load_id;
write_req.tuple_desc = tuple_desc;
write_req.slots = &(tuple_desc->slots());
write_req.is_high_priority = false;
write_req.table_schema_param = param;
auto profile = std::make_unique<RuntimeProfile>("LoadChannels");
auto delta_writer =
std::make_unique<DeltaWriter>(*engine_ref, write_req, profile.get(), TUniqueId {});

vectorized::Block block;
for (const auto& slot_desc : tuple_desc->slots()) {
std::cout << "slot_desc: " << slot_desc->col_name() << std::endl;
block.insert(vectorized::ColumnWithTypeAndName(slot_desc->get_empty_mutable_column(),
slot_desc->get_data_type_ptr(),
slot_desc->col_name()));
}

std::cout << "total column " << block.mutate_columns().size() << std::endl;
auto columns = block.mutate_columns();
int16_t c1 = value;
columns[0]->insert_data((const char*)&c1, sizeof(c1));
Status res = delta_writer->write(&block, {0});
EXPECT_TRUE(res.ok());

res = delta_writer->close();
ASSERT_TRUE(res.ok());
res = delta_writer->wait_flush();
ASSERT_TRUE(res.ok());
res = delta_writer->build_rowset();
ASSERT_TRUE(res.ok());
res = delta_writer->submit_calc_delete_bitmap_task();
ASSERT_TRUE(res.ok());
res = delta_writer->wait_calc_delete_bitmap();
ASSERT_TRUE(res.ok());
res = delta_writer->commit_txn(PSlaveTabletNodes());
ASSERT_TRUE(res.ok()) << res;

TabletSharedPtr tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
ASSERT_TRUE(tablet != nullptr);

std::cout << "before publish, tablet row nums:" << tablet->num_rows() << std::endl;
Version version;
version.first = tablet->get_rowset_with_max_version()->end_version() + 1;
version.second = tablet->get_rowset_with_max_version()->end_version() + 1;
std::cout << "start to add rowset version:" << version.first << "-" << version.second
<< std::endl;
std::map<TabletInfo, RowsetSharedPtr> tablet_related_rs;
engine_ref->txn_manager()->get_txn_related_tablets(txn_id, partition_id, &tablet_related_rs);
ASSERT_EQ(1, tablet_related_rs.size());

std::cout << "start to publish txn" << std::endl;
RowsetSharedPtr rowset = tablet_related_rs.begin()->second;

TabletPublishStatistics stats;
res = engine_ref->txn_manager()->publish_txn(partition_id, tablet, txn_id, version, &stats);
ASSERT_TRUE(res.ok()) << res;
std::cout << "start to add inc rowset:" << rowset->rowset_id()
<< ", num rows:" << rowset->num_rows() << ", version:" << rowset->version().first
<< "-" << rowset->version().second << std::endl;
res = tablet->add_inc_rowset(rowset);
ASSERT_TRUE(res.ok()) << res;
}

class SnapshotLoaderTest : public ::testing::Test {
public:
SnapshotLoaderTest() {}
~SnapshotLoaderTest() {}
static void SetUpTestSuite() { set_up(); }

static void TearDownTestSuite() { tear_down(); }
};

TEST_F(SnapshotLoaderTest, NormalCase) {
StorageEngine engine({});
SnapshotLoader loader(engine, ExecEnv::GetInstance(), 1L, 2L);

Expand Down Expand Up @@ -87,4 +284,60 @@ TEST(SnapshotLoaderTest, NormalCase) {
EXPECT_EQ(10005, tablet_id);
}

TEST_F(SnapshotLoaderTest, DirMoveTaskIsIdempotent) {
// 1. create a tablet
int64_t tablet_id = 111;
int32_t schema_hash = 222;
int64_t partition_id = 333;
TCreateTabletReq req = create_tablet(partition_id, tablet_id, schema_hash);
RuntimeProfile profile("CreateTablet");
Status status = engine_ref->create_tablet(req, &profile);
EXPECT_TRUE(status.ok());
TabletSharedPtr tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
EXPECT_TRUE(tablet != nullptr);

// 2. add a rowset
add_rowset(tablet_id, schema_hash, partition_id, 100, 100);
auto version = tablet->max_version();
std::cout << "version: " << version.first << ", " << version.second << std::endl;

// 3. make a snapshot
string snapshot_path;
bool allow_incremental_clone = false; // not used
TSnapshotRequest snapshot_request;
snapshot_request.tablet_id = tablet_id;
snapshot_request.schema_hash = schema_hash;
snapshot_request.version = version.second;
status = engine_ref->snapshot_mgr()->make_snapshot(snapshot_request, &snapshot_path,
&allow_incremental_clone);
ASSERT_TRUE(status.ok());

// 4. load the snapshot to another tablet
snapshot_path = fmt::format("{}/{}/{}", snapshot_path, tablet_id, schema_hash);
SnapshotLoader loader1(*engine_ref, ExecEnv::GetInstance(), 1L, tablet_id);
status = loader1.move(snapshot_path, tablet, true);
ASSERT_TRUE(status.ok()) << status;

// 5. Insert a rowset to the tablet
// reload tablet
tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
EXPECT_TRUE(tablet != nullptr);
add_rowset(tablet_id, schema_hash, partition_id, 200, 200);
version = tablet->max_version();
std::cout << "version: " << version.first << ", " << version.second << std::endl;

// 6. load the snapshot to the tablet again, this request should be idempotent
SnapshotLoader loader2(*engine_ref, ExecEnv::GetInstance(), 2L, tablet_id);
status = loader2.move(snapshot_path, tablet, true);
ASSERT_TRUE(status.ok()) << status;

// reload tablet
tablet = engine_ref->tablet_manager()->get_tablet(tablet_id);
EXPECT_TRUE(tablet != nullptr);
auto last_version = tablet->max_version();
std::cout << "last version: " << last_version.first << ", " << last_version.second << std::endl;
ASSERT_EQ(version.first, last_version.first);
ASSERT_EQ(version.second, last_version.second);
}

} // namespace doris

0 comments on commit e0e78fa

Please sign in to comment.