Skip to content

Commit

Permalink
add broken part check(404) to cloud table checksums for part load
Browse files Browse the repository at this point in the history
  • Loading branch information
ozcelgozde committed Mar 19, 2024
1 parent ecd5458 commit 6c575ae
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 3 deletions.
87 changes: 85 additions & 2 deletions src/Storages/MergeTree/MergeTreeCloudData.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
*/

#include <Storages/MergeTree/MergeTreeCloudData.h>
#include <Storages/MergeTree/checkDataPart.h>
#include "Processors/QueryPipeline.h"

namespace DB
Expand Down Expand Up @@ -260,6 +261,10 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)

auto cnch_parallel_prefetching = getSettings()->cnch_parallel_prefetching ? getSettings()->cnch_parallel_prefetching : 16;

std::mutex mutex;
MutableDataPartsVector broken_parts_to_detach;
size_t suspicious_broken_parts = 0;

MutableDataPartsVector partial_parts;
// auto it = std::remove_if(parts.begin(), parts.end(), [](const auto & part) { return part->isPartial(); });
// std::copy(it, parts.end(), std::back_inserter(partial_parts));
Expand All @@ -270,7 +275,22 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)
std::atomic<bool> has_non_adaptive_parts = false;
size_t pool_size = std::min(parts.size(), UInt64(cnch_parallel_prefetching));
runOverPartsInParallel(parts, pool_size, [&](auto & part) {
part->loadColumnsChecksumsIndexes(false, false);
bool broken = loadDataPart(part);
if (broken) {
/// Ignore broken parts that can appear as a result of hard server restart.
LOG_ERROR(
log,
"Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually",
part->relative_path,
part->name
);
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;

return;
}

if (part->index_granularity_info.is_adaptive)
has_adaptive_parts.store(true, std::memory_order_relaxed);
else
Expand All @@ -279,7 +299,22 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)

pool_size = std::min(partial_parts.size(), UInt64(cnch_parallel_prefetching));
runOverPartsInParallel(partial_parts, pool_size, [&](auto & part) {
part->loadColumnsChecksumsIndexes(false, false);
bool broken = loadDataPart(part);
if (broken) {
/// Ignore broken parts that can appear as a result of hard server restart.
LOG_ERROR(
log,
"Detaching broken part {}{}. If it happened after update, it is likely because of backward incompability. You need to resolve this manually",
part->relative_path,
part->name
);
std::lock_guard loading_lock(mutex);
broken_parts_to_detach.push_back(part);
++suspicious_broken_parts;

return;
}

if (part->index_granularity_info.is_adaptive)
has_adaptive_parts.store(true, std::memory_order_relaxed);
else
Expand All @@ -292,6 +327,54 @@ void MergeTreeCloudData::loadDataPartsInParallel(MutableDataPartsVector & parts)
}

has_non_adaptive_index_granularity_parts = has_non_adaptive_parts;
if (suspicious_broken_parts > 0) {
LOG_WARNING(
log,
"Broken parts count on start is bigger than 0, count: {}",
suspicious_broken_parts
);
}

auto deactivate_part = [&](MutableDataPartPtr part) {
part->remove_time.store(part->modification_time, std::memory_order_relaxed);
modifyPartState(part, DataPartState::Outdated);
};


for (auto & part : broken_parts_to_detach) {
deactivate_part(part);
}

parts.erase(std::remove_if(parts.begin(), parts.end(),
[&broken_parts_to_detach](const auto& part) {
// Check if this part is in the broken_parts_to_detach vector
return std::find(broken_parts_to_detach.begin(), broken_parts_to_detach.end(), part) != broken_parts_to_detach.end();
}), parts.end());
}

bool MergeTreeCloudData::loadDataPart(MutableDataPartPtr part) {
bool broken = false;
try {
part->loadColumnsChecksumsIndexes(false, false);
}
catch (const Exception & e)
{
/// Don't count the part as broken if there is not enough memory to load it.
/// In fact, there can be many similar situations.
/// But it is OK, because there is a safety guard against deleting too many parts.
if (isNotEnoughMemoryErrorCode(e.code()))
throw;

broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}
catch (...)
{
broken = true;
tryLogCurrentException(__PRETTY_FUNCTION__);
}

return broken;
}

void MergeTreeCloudData::runOverPartsInParallel(
Expand Down
1 change: 1 addition & 0 deletions src/Storages/MergeTree/MergeTreeCloudData.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class MergeTreeCloudData : public MergeTreeMetaBase
/// Load prepared parts, deactivate outdated parts and construct coverage link
/// [Preallocate Mode] if worker_topology_hash is not empty, need to check whether the given topology is matched with worker's topology
void loadDataParts(MutableDataPartsVector & parts, UInt64 worker_topology_hash = 0);
bool loadDataPart(MutableDataPartPtr part);

/// Remove Outdated parts of which timestamp is less than expired ts from container.
/// DO NOT check reference count of parts.
Expand Down
8 changes: 7 additions & 1 deletion src/Storages/MergeTree/MergeTreeDataPartCNCH.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -804,7 +804,13 @@ IMergeTreeDataPart::ChecksumsPtr MergeTreeDataPartCNCH::loadChecksumsFromRemote(
return checksums;

String data_rel_path = fs::path(getFullRelativePath()) / DATA_FILE;
auto data_footer = loadPartDataFooter();
MergeTreeDataPartChecksums::FileChecksums data_footer;
try {
data_footer = loadPartDataFooter();
}
catch (...) {
throw Exception(ErrorCodes::NO_FILE_IN_DATA_PART, "The checksums file in part {} under path {} does not exist", name, data_rel_path);
}
const auto & checksum_file = data_footer["checksums.txt"];

if (checksum_file.file_size == 0 /* && isDeleted() */)
Expand Down

0 comments on commit 6c575ae

Please sign in to comment.