Skip to content

Commit

Permalink
Panic when not able to delete compaction waste (#573)
Browse files Browse the repository at this point in the history
  • Loading branch information
wi11dey authored Nov 8, 2024
1 parent 2b748bf commit cfbbb42
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 36 deletions.
25 changes: 18 additions & 7 deletions src/java/org/apache/cassandra/db/compaction/CompactionTask.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import java.util.concurrent.TimeUnit;
import java.util.function.BiFunction;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Predicate;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
Expand Down Expand Up @@ -166,7 +167,6 @@ public boolean apply(SSTableReader sstable)
// SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references
// to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed.
// See CASSANDRA-8019 and CASSANDRA-8399
boolean abortFailed = false;
UUID taskId = null;
String taskIdLoggerMsg;
List<SSTableReader> newSStables;
Expand Down Expand Up @@ -216,11 +216,9 @@ public boolean apply(SSTableReader sstable)
CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e);
if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0)
{
abortFailed = true;
logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " +
"system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " +
"compaction-product sstables are marked final while others remain tmp",
logger.error("CompactionAwareWriter failed to close correctly for {}/{}. Continuing to compact now can cause resurrection. Exiting",
cfs.keyspace.getName(), cfs.name, e);
panic();
}
throw exception;
}
Expand All @@ -229,14 +227,23 @@ public boolean apply(SSTableReader sstable)
finally
{
Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS);
if (taskId != null && (!abortFailed))
if (taskId != null)
SystemKeyspace.finishCompaction(taskId);

if (collector != null && ci != null)
collector.finishCompaction(ci);
}

ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());
try
{
ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors());
}
catch (Exception e)
{
logger.error("Failed to write to the write-ahead log for {}/{}. Continuing to compact now can cause resurrection. Exiting",
cfs.keyspace.getName(), cfs.name, e);
panic();
}

// log a bunch of statistics about the result and save to system table compaction_history
long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
Expand Down Expand Up @@ -269,6 +276,10 @@ public boolean apply(SSTableReader sstable)
}
}

protected void panic() {
System.exit(1);
}

@Override
public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, LifecycleTransaction transaction, Set<SSTableReader> nonExpiredSSTables)
{
Expand Down
39 changes: 10 additions & 29 deletions test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ public void incompletedCompactionAbortNotRemovedFromCompactionsInProgress() thro

Set<SSTableReader> compacting = Sets.newHashSet(s, s2);
LifecycleTransaction txn = cfs.getTracker().tryModify(compacting, OperationType.UNKNOWN);
CompactionTask compaction = spy(new FailedAbortCompactionTask(cfs, txn, 0, CompactionManager.NO_GC, 1024 * 1024, true));
FailedAbortCompactionTask compaction = spy(new FailedAbortCompactionTask(cfs, txn, 0, CompactionManager.NO_GC, 1024 * 1024, true));
try
{
compaction.runMayThrow();
Expand All @@ -582,34 +582,7 @@ public void incompletedCompactionAbortNotRemovedFromCompactionsInProgress() thro
assertTrue(e.getCause().getMessage().contains("Exception thrown while some sstables in finish"));
assertTrue(e.getCause().getSuppressed()[0].getMessage().contains("Failed to do anything for abort"));
}

Collection<SSTableReader> sstablesAfter = cfs.getSSTables();
assertEquals(50, sstablesAfter.size());
Set<Integer> nonTmp = ImmutableSet.of(1, 2, 3, 4, 5);
Set<Integer> actualNonTmp = new Directories(cfs.metadata).sstableLister().skipTemporary(true).list().keySet()
.stream().map(desc -> desc.generation).collect(Collectors.toSet());
assertEquals(nonTmp, actualNonTmp);

Map<Pair<String, String>, Map<Integer, UUID>> compactionLogs = SystemKeyspace.getUnfinishedCompactions();
Pair<String, String> pair = Pair.create(KEYSPACE1, cfName);
assertTrue(compactionLogs.containsKey(pair));

// Copy to a new CF in case in-memory tracking affects testing
File src = new Directories(cfs.metadata).getCFDirectories().get(0);
File dst = Arrays.stream(src.getParentFile().listFiles())
.filter(file -> file.getName().contains(CF_STANDARD6)).findFirst().orElseThrow(() -> new SafeIllegalStateException("No Standard6 CF found"));
FileUtils.copyDirectory(src, dst);
CFMetaData cf2Metadata = Schema.instance.getKSMetaData(keyspace.getName()).cfMetaData().get(CF_STANDARD6);
// removes incomplete compaction product and tmp files
ColumnFamilyStore.removeUnusedSstables(cf2Metadata, compactionLogs.getOrDefault(pair, ImmutableMap.of()));
ColumnFamilyStore.scrubDataDirectories(cf2Metadata);

Set<Integer> allGenerations = new HashSet<>();
for (Descriptor desc : new Directories(cf2Metadata).sstableLister().list().keySet())
allGenerations.add(desc.generation);
// When we don't retain the compaction log, the ancestors 2 and 3 are deleted and products 4 and 5 are retained, despite 40+ tmp files in the unfinished
// product not having been committed!
assertEquals(ImmutableSet.of(1, 2, 3), allGenerations);
assertTrue(compaction.panicked);
}

private static class FailedAbortCompactionWriter extends MaxSSTableSizeWriter
Expand All @@ -636,6 +609,8 @@ protected Throwable doAbort(Throwable _accumulate) {

private static class FailedAbortCompactionTask extends LeveledCompactionTask
{
private boolean panicked;

public FailedAbortCompactionTask(ColumnFamilyStore cfs, LifecycleTransaction txn, int level, int gcBefore, long maxSSTableBytes, boolean majorCompaction)
{
super(cfs, txn, level, gcBefore, maxSSTableBytes, majorCompaction);
Expand All @@ -646,6 +621,12 @@ public CompactionAwareWriter getCompactionAwareWriter(ColumnFamilyStore cfs, Lif
{
return new FailedAbortCompactionWriter(cfs, txn, nonExpiredSSTables, 1024 * 1024, 0, false, compactionType);
}

@Override
protected void panic()
{
panicked = true;
}
}

private static Range<Token> rangeFor(int start, int end)
Expand Down

0 comments on commit cfbbb42

Please sign in to comment.