diff --git a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java index 291aa37f01..d6da5c7be7 100644 --- a/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java +++ b/src/java/com/palantir/cassandra/db/ColumnFamilyStoreManager.java @@ -77,4 +77,9 @@ public void markForDeletion(CFMetaData cfMetaData, Set descriptors) { writeAheadLogger.markForDeletion(cfMetaData, descriptors); } + + public boolean shouldRemoveUnusedSstables() { + // TODO(wdey): delegate + return true; + } } diff --git a/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java b/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java index 4f1881e86b..29110bc200 100644 --- a/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java +++ b/src/java/com/palantir/cassandra/db/CompactionsInProgressFlusher.java @@ -16,6 +16,7 @@ package com.palantir.cassandra.db; +import java.util.EnumMap; import java.util.function.Supplier; import org.apache.cassandra.db.Keyspace; @@ -52,15 +53,25 @@ public class CompactionsInProgressFlusher { private static final boolean COALESCE_FLUSHES = Boolean.getBoolean("palantir_cassandra.coalesce_cip_flushes"); - public static final CompactionsInProgressFlusher INSTANCE = new CompactionsInProgressFlusher(); - - private final Supplier flusher = () -> FBUtilities.waitOnFuture( - Keyspace.open(SystemKeyspace.NAME) - .getColumnFamilyStore(SystemKeyspace.COMPACTIONS_IN_PROGRESS) - .forceFlush("CompactionsInProgressFlusher")); - private final Supplier coalescingFlusher = new CoalescingSupplier(flusher); - - private CompactionsInProgressFlusher() { } + public static final EnumMap INSTANCES = new EnumMap<>(SystemKeyspace.CompactionsInProgressTable.class); + + static { + INSTANCES.put(SystemKeyspace.CompactionsInProgressTable.DEFAULT, new CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable.DEFAULT)); + INSTANCES.put(SystemKeyspace.CompactionsInProgressTable.WAL, new CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable.WAL)); + } + + private final SystemKeyspace.CompactionsInProgressTable compactionsInProgressTable; + private final Supplier flusher; + private final Supplier coalescingFlusher; + + private CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable compactionsInProgressTable) { + this.compactionsInProgressTable = compactionsInProgressTable; + flusher = () -> FBUtilities.waitOnFuture( + Keyspace.open(SystemKeyspace.NAME) + .getColumnFamilyStore(compactionsInProgressTable.toString()) + .forceFlush("CompactionsInProgressFlusher")); + coalescingFlusher = new CoalescingSupplier<>(flusher); + } public ReplayPosition forceBlockingFlush() { if (COALESCE_FLUSHES) { diff --git a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java index 60f2d25dd5..fbca406966 100644 --- a/src/java/org/apache/cassandra/db/ColumnFamilyStore.java +++ b/src/java/org/apache/cassandra/db/ColumnFamilyStore.java @@ -108,7 +108,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class); private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean( - "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); + "palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup"); private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(), StageManager.KEEPALIVE, @@ -755,7 +755,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map completedAncestors.addAll(ancestors); } } - cleanedUnfinishedCompactions.forEach(SystemKeyspace::finishCompaction); + cleanedUnfinishedCompactions.forEach(uuid -> SystemKeyspace.finishCompaction(uuid, SystemKeyspace.CompactionsInProgressTable.DEFAULT)); // remove old sstables from compactions that did complete for (Map.Entry> sstableFiles : directories.sstableLister().list().entrySet()) @@ -763,7 +763,8 @@ public static void removeUnusedSstables(CFMetaData metadata, Map Descriptor desc = sstableFiles.getKey(); if (completedAncestors.contains(desc.generation)) { - if (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty()) + if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata() + || (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())) { logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc), SafeArg.of("keyspace", desc.ksname), SafeArg.of("cf", desc.cfname), @@ -776,7 +777,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map SafeArg.of("generation", desc.generation), ancestorsArg); SSTable.delete(desc, sstableFiles.getValue()); Optional.ofNullable(unfinishedCompactions.get(desc.generation)) - .ifPresent(SystemKeyspace::finishCompaction); + .ifPresent(uuid -> SystemKeyspace.finishCompaction(uuid, SystemKeyspace.CompactionsInProgressTable.DEFAULT)); } } } diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index de2e89d94b..7ccd45c70e 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -95,6 +95,24 @@ public final class SystemKeyspace public static final String SIZE_ESTIMATES = "size_estimates"; public static final String AVAILABLE_RANGES = "available_ranges"; + public enum CompactionsInProgressTable + { + DEFAULT("compactions_in_progress"), + WAL("post_wal_compactions_in_progress"); + + private final String name; + + CompactionsInProgressTable(String name) + { + this.name = name; + } + + @Override + public String toString() { + return name; + } + } + public static final CFMetaData Hints = compile(HINTS, "hints awaiting delivery", @@ -212,8 +230,20 @@ private static int getCompactionsInProgresStcsMaxThreshold() { } return val; } - private static int compactionsInProgressMaxCompactionThreshold = getCompactionsInProgresStcsMaxThreshold(); - private static final CFMetaData CompactionsInProgress = + private static final int compactionsInProgressMaxCompactionThreshold = getCompactionsInProgresStcsMaxThreshold(); + private static final CFMetaData DefaultCompactionsInProgress = + compile(COMPACTIONS_IN_PROGRESS, + "unfinished compactions", + "CREATE TABLE %s (" + + "id uuid," + + "columnfamily_name text," + + "inputs set," + + "keyspace_name text," + + "PRIMARY KEY ((id)))") + .maxCompactionThreshold(compactionsInProgressMaxCompactionThreshold) + .compactionStrategyClass(SizeTieredCompactionStrategy.class) + .compactionStrategyOptions(Collections.singletonMap("max_threshold", Integer.toString(compactionsInProgressMaxCompactionThreshold))); + private static final CFMetaData WalCompactionsInProgress = compile(COMPACTIONS_IN_PROGRESS, "unfinished compactions", "CREATE TABLE %s (" @@ -290,7 +320,8 @@ public static KSMetaData definition() Peers, PeerEvents, RangeXfers, - CompactionsInProgress, + DefaultCompactionsInProgress, + WalCompactionsInProgress, CompactionHistory, SSTableActivity, SizeEstimates, @@ -370,8 +401,10 @@ public Integer apply(SSTableReader sstable) } }); String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)"; - executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); - CompactionsInProgressFlusher.INSTANCE.forceBlockingFlush(); + CompactionsInProgressFlusher.INSTANCES.forEach((table, flusher) -> { + executeInternal(String.format(req, table.toString()), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations)); + flusher.forceBlockingFlush(); + }); return compactionId; } @@ -380,22 +413,22 @@ public Integer apply(SSTableReader sstable) * to complete successfully for this to be called. * @param taskId what was returned from {@code startCompaction} */ - public static void finishCompaction(UUID taskId) + public static void finishCompaction(UUID taskId, CompactionsInProgressTable table) { assert taskId != null; - executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId); - CompactionsInProgressFlusher.INSTANCE.forceBlockingFlush(); + executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", table.toString()), taskId); + CompactionsInProgressFlusher.INSTANCES.get(table).forceBlockingFlush(); } /** * Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the * task ID of the compaction they were participating in. */ - public static Map, Map> getUnfinishedCompactions() + public static Map, Map> getUnfinishedCompactions(CompactionsInProgressTable table) { String req = "SELECT * FROM system.%s"; - UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS)); + UntypedResultSet resultSet = executeInternal(String.format(req, table.toString())); Map, Map> unfinishedCompactions = new HashMap<>(); for (UntypedResultSet.Row row : resultSet) @@ -418,9 +451,9 @@ public static Map, Map> getUnfinishedCompact return unfinishedCompactions; } - public static void discardCompactionsInProgress() + public static void discardCompactionsInProgress(CompactionsInProgressTable table) { - ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS); + ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(table.toString()); compactionLog.truncateBlocking(false); } diff --git a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java index 89ea4d36ee..742b707dd9 100644 --- a/src/java/org/apache/cassandra/db/compaction/CompactionTask.java +++ b/src/java/org/apache/cassandra/db/compaction/CompactionTask.java @@ -100,6 +100,7 @@ public boolean reduceScopeForLimitedSpace(long expectedSize) * which are properly serialized. * Caller is in charge of marking/unmarking the sstables as compacting. */ + @SuppressWarnings("resource") // It is dangerous to close refs for a failed transaction protected void runMayThrow() throws Exception { // The collection of sstables passed may be empty (but not null); even if @@ -156,116 +157,124 @@ public boolean apply(SSTableReader sstable) long totalKeysWritten = 0; long estimatedKeys = 0; - try (CompactionController controller = getCompactionController(transaction.originals())) + CompactionController controller = getCompactionController(transaction.originals()); + Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); + + SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); + + + // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references + // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. + // See CASSANDRA-8019 and CASSANDRA-8399 + boolean abortFailed = false; + UUID taskId = null; + String taskIdLoggerMsg; + List newSStables; + AbstractCompactionIterable ci = null; + Refs refs = Refs.ref(actuallyCompact); + boolean readyToFinish = false; + try (AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) { - Set actuallyCompact = Sets.difference(transaction.originals(), controller.getFullyExpiredSSTables()); - - SSTableFormat.Type sstableFormat = getFormatType(transaction.originals()); - - - // SSTableScanners need to be closed before markCompactedSSTablesReplaced call as scanners contain references - // to both ifile and dfile and SSTR will throw deletion errors on Windows if it tries to delete before scanner is closed. - // See CASSANDRA-8019 and CASSANDRA-8399 - boolean abortFailed = false; - UUID taskId = null; - String taskIdLoggerMsg; - List newSStables; - AbstractCompactionIterable ci = null; - try (Refs refs = Refs.ref(actuallyCompact); - AbstractCompactionStrategy.ScannerList scanners = strategy.getScanners(actuallyCompact)) + taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); + taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); + logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); + ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); + try (CloseableIterator iter = ci.iterator()) { - taskId = offline ? null : SystemKeyspace.startCompaction(cfs, transaction.originals()); - taskIdLoggerMsg = taskId == null ? UUIDGen.getTimeUUID().toString() : taskId.toString(); - logger.debug("Compacting ({}) {}", taskIdLoggerMsg, ssTableLoggerMsg); - ci = new CompactionIterable(compactionType, scanners.scanners, controller, sstableFormat, taskId); - try (CloseableIterator iter = ci.iterator()) - { - long lastCheckObsoletion = start; + long lastCheckObsoletion = start; - if (!controller.cfs.getCompactionStrategy().isActive) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (!controller.cfs.getCompactionStrategy().isActive) + throw new CompactionInterruptedException(ci.getCompactionInfo()); - if (collector != null) - collector.beginCompaction(ci); + if (collector != null) + collector.beginCompaction(ci); - boolean readyToFinish = false; - try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + try (CompactionAwareWriter writer = getCompactionAwareWriter(cfs, transaction, actuallyCompact)) + { + estimatedKeys = writer.estimatedKeys(); + while (iter.hasNext()) { - estimatedKeys = writer.estimatedKeys(); - while (iter.hasNext()) + if (ci.isStopRequested()) + throw new CompactionInterruptedException(ci.getCompactionInfo()); + + try (AbstractCompactedRow row = iter.next()) { - if (ci.isStopRequested()) - throw new CompactionInterruptedException(ci.getCompactionInfo()); + if (writer.append(row)) + totalKeysWritten++; - try (AbstractCompactedRow row = iter.next()) + if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) { - if (writer.append(row)) - totalKeysWritten++; - - if (System.nanoTime() - lastCheckObsoletion > TimeUnit.MINUTES.toNanos(1L)) - { - controller.maybeRefreshOverlaps(); - lastCheckObsoletion = System.nanoTime(); - } + controller.maybeRefreshOverlaps(); + lastCheckObsoletion = System.nanoTime(); } } + } - readyToFinish = true; - newSStables = writer.finish(); - } catch (Exception e) { - CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); - if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) - { - abortFailed = true; - logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " + - "system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " + - "compaction-product sstables are marked final while others remain tmp", - cfs.keyspace.getName(), cfs.name, e); - } - throw exception; + readyToFinish = true; + newSStables = writer.finish(); + } catch (Exception e) { + CompactionException exception = new CompactionException(taskIdLoggerMsg, ssTableLoggerMsg.toString(), e); + if (readyToFinish && e.getSuppressed() != null && e.getSuppressed().length != 0) + { + abortFailed = true; + logger.warn("CompactionAwareWriter failed to close correctly for {}/{}. This compaction won't be removed from " + + "system.compactions_in_progress to ensure sstable cleanup on startup proceeds correctly in case some " + + "compaction-product sstables are marked final while others remain tmp", + cfs.keyspace.getName(), cfs.name, e); } + throw exception; } } - finally - { - Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); - if (taskId != null && (!abortFailed)) - SystemKeyspace.finishCompaction(taskId); - - if (collector != null && ci != null) - collector.finishCompaction(ci); + } + finally + { + if (!readyToFinish) { + // TODO(wdey): refactor all of the trys + try (Refs closedRefs = refs; CompactionController closedController = controller) {} } + Directories.removeExpectedSpaceUsedByCompaction(expectedWriteSize, CONSIDER_CONCURRENT_COMPACTIONS); + if (taskId != null && (!abortFailed)) + SystemKeyspace.finishCompaction(taskId, SystemKeyspace.CompactionsInProgressTable.DEFAULT); - ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); + if (collector != null && ci != null) + collector.finishCompaction(ci); + } - // log a bunch of statistics about the result and save to system table compaction_history - long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); - long startsize = SSTableReader.getTotalBytes(transaction.originals()); - long endsize = SSTableReader.getTotalBytes(newSStables); - double ratio = (double) endsize / (double) startsize; + ColumnFamilyStoreManager.instance.markForDeletion(cfs.metadata, transaction.logged.obsoleteDescriptors()); + refs.close(); + controller.close(); + if (taskId != null) + { + SystemKeyspace.finishCompaction(taskId, SystemKeyspace.CompactionsInProgressTable.WAL); + } - StringBuilder newSSTableNames = new StringBuilder(); - for (SSTableReader reader : newSStables) - newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + // log a bunch of statistics about the result and save to system table compaction_history + long dTime = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start); + long startsize = SSTableReader.getTotalBytes(transaction.originals()); + long endsize = SSTableReader.getTotalBytes(newSStables); + double ratio = (double) endsize / (double) startsize; - if (offline) - { - Refs.release(Refs.selfRefs(newSStables)); - } - else - { - double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; - long totalSourceRows = 0; - String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); - logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", - taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); - logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); - logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); - - // update the metrics - cfs.metric.compactionBytesWritten.inc(endsize); - cfs.metric.compactionsCompleted.inc(); - } + StringBuilder newSSTableNames = new StringBuilder(); + for (SSTableReader reader : newSStables) + newSSTableNames.append(reader.descriptor.baseFilename()).append(","); + + if (offline) + { + Refs.release(Refs.selfRefs(newSStables)); + } + else + { + double mbps = dTime > 0 ? (double) endsize / (1024 * 1024) / ((double) dTime / 1000) : 0; + long totalSourceRows = 0; + String mergeSummary = updateCompactionHistory(cfs.keyspace.getName(), cfs.getColumnFamilyName(), ci, startsize, endsize); + logger.debug(String.format("Compacted (%s) %d sstables to [%s] to level=%d. %,d bytes to %,d (~%d%% of original) in %,dms = %fMB/s. %,d total partitions merged to %,d. Partition merge counts were {%s}", + taskIdLoggerMsg, transaction.originals().size(), newSSTableNames.toString(), getLevel(), startsize, endsize, (int) (ratio * 100), dTime, mbps, totalSourceRows, totalKeysWritten, mergeSummary)); + logger.trace(String.format("CF Total Bytes Compacted: %,d", CompactionTask.addToTotalBytesCompacted(endsize))); + logger.trace("Actual #keys: {}, Estimated #keys:{}, Err%: {}", totalKeysWritten, estimatedKeys, ((double) (totalKeysWritten - estimatedKeys) / totalKeysWritten)); + + // update the metrics + cfs.metric.compactionBytesWritten.inc(endsize); + cfs.metric.compactionsCompleted.inc(); } } diff --git a/src/java/org/apache/cassandra/service/CassandraDaemon.java b/src/java/org/apache/cassandra/service/CassandraDaemon.java index 16ead7921e..5825204bef 100644 --- a/src/java/org/apache/cassandra/service/CassandraDaemon.java +++ b/src/java/org/apache/cassandra/service/CassandraDaemon.java @@ -61,9 +61,9 @@ import com.palantir.cassandra.concurrent.LocalReadRunnableTimeoutWatcher; import com.palantir.cassandra.db.BootstrappingSafetyException; +import com.palantir.cassandra.db.ColumnFamilyStoreManager; import com.palantir.cassandra.settings.DisableClientInterfaceSetting; import com.palantir.logsafe.Preconditions; -import com.palantir.logsafe.Safe; import com.palantir.logsafe.SafeArg; import org.apache.cassandra.config.ColumnDefinition; import org.apache.cassandra.config.Config; @@ -281,7 +281,7 @@ private void completeSetupMayThrowSstableException() { } } - Map, Map> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> unfinishedCompactions = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); for (String keyspaceName : Schema.instance.getKeyspaces()) { // Skip system as we'll already clean it after the other tables @@ -290,11 +290,13 @@ private void completeSetupMayThrowSstableException() { for (CFMetaData cfm : Schema.instance.getKeyspaceMetaData(keyspaceName).values()) { - ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); + if (ColumnFamilyStoreManager.instance.shouldRemoveUnusedSstablesBasedOnAncestorMetadata()) { + ColumnFamilyStore.removeUnusedSstables(cfm, unfinishedCompactions.getOrDefault(cfm.ksAndCFName, ImmutableMap.of())); + } ColumnFamilyStore.scrubDataDirectories(cfm); } } - SystemKeyspace.discardCompactionsInProgress(); + SystemKeyspace.discardCompactionsInProgress(SystemKeyspace.CompactionsInProgressTable.DEFAULT); Keyspace.setInitialized(); diff --git a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java index a4f25844ce..bffaaf8342 100644 --- a/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java +++ b/test/unit/org/apache/cassandra/db/ColumnFamilyStoreTest.java @@ -118,6 +118,7 @@ public class ColumnFamilyStoreTest public static final String CF_STANDARD5 = "Standard5"; public static final String CF_STANDARD6 = "Standard6"; public static final String CF_STANDARD7 = "Standard7"; + public static final String CF_STANDARD8 = "Standard8"; public static final String CF_STANDARDINT = "StandardInteger1"; public static final String CF_SUPER1 = "Super1"; public static final String CF_SUPER6 = "Super6"; @@ -148,6 +149,7 @@ public static void defineSchema() throws ConfigurationException SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD5), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD6), SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD7), + SchemaLoader.standardCFMD(KEYSPACE1, CF_STANDARD8), SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEX1, true), SchemaLoader.indexCFMD(KEYSPACE1, CF_INDEX2, false), SchemaLoader.superCFMD(KEYSPACE1, CF_SUPER1, LongType.instance), @@ -1883,8 +1885,8 @@ protected SSTableWriter getWriter() SSTableReader sstable2 = SSTableReader.open(sstable1.descriptor); UUID compactionTaskID = SystemKeyspace.startCompaction( - Keyspace.open(ks).getColumnFamilyStore(cf), - Collections.singleton(sstable2)); + Keyspace.open(ks).getColumnFamilyStore(cf), + Collections.singleton(sstable2)); Map unfinishedCompaction = new HashMap<>(); unfinishedCompaction.put(sstable1.descriptor.generation, compactionTaskID); @@ -1895,7 +1897,7 @@ protected SSTableWriter getWriter() assertEquals(1, sstables.size()); assertTrue(sstables.containsKey(sstable1.descriptor)); - Map, Map> unfinished = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> unfinished = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); assertTrue(unfinished.isEmpty()); sstable1.selfRef().release(); sstable2.selfRef().release(); @@ -1984,7 +1986,8 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); }; - try { + try + { ColumnFamilyStoreManager.instance.registerValidator(validator); ColumnFamilyStore.removeUnusedSstables(cfmeta, ImmutableMap.of()); } @@ -2002,6 +2005,65 @@ public void testRemoveUnusedSstablesOnlyRemovesFiltered() throws IOException assertEquals(expected, sstables.keySet()); } + @Test + public void testShouldSkipAncestorCleanupSkipsAncestorCleanup() throws IOException + { + final String ks = KEYSPACE1; + final String cf = CF_STANDARD8; + + final CFMetaData cfmeta = Schema.instance.getCFMetaData(ks, cf); + Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf).disableAutoCompaction(); + Directories dir = new Directories(cfmeta); + + int gen1 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); + int gen2 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); + int gen3 = writeNextGenerationSstable(ImmutableSet.of(gen1, gen2), dir, cfmeta); + int gen4 = writeNextGenerationSstable(ImmutableSet.of(), dir, cfmeta); + int gen5 = writeNextGenerationSstable(ImmutableSet.of(gen4), dir, cfmeta); + + Map> sstables = dir.sstableLister().list(); + Descriptor sstable3Desc = sstables.keySet().iterator().next().withGeneration(gen3); + assertEquals(5, sstables.size()); + assertTrue(sstables.containsKey(sstable3Desc)); + + IColumnFamilyStoreValidator validator = new IColumnFamilyStoreValidator() + { + @Override + public Map> filterValidAncestors(CFMetaData _cfMetaData, Map> sstableToCompletedAncestors, Map _unfinishedCompactions) + { + Set allowedGenerations = ImmutableSet.of(gen1, gen2, gen4, gen5); + return sstableToCompletedAncestors.entrySet().stream() + .filter(entry -> allowedGenerations.contains(entry.getKey().generation)) + .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); + } + + @Override + public boolean shouldSkipAncestorCleanupBasedOnAncestorMetadata() + { + return true; + } + }; + + try + { + ColumnFamilyStoreManager.instance.registerValidator(validator); + ColumnFamilyStore.removeUnusedSstables(cfmeta, ImmutableMap.of()); + } + finally + { + ColumnFamilyStoreManager.instance.unregisterValidator(); + } + + sstables = dir.sstableLister().list(); + ImmutableSet expected = ImmutableSet.of( + sstable3Desc.withGeneration(gen1), + sstable3Desc.withGeneration(gen2), + sstable3Desc.withGeneration(gen3), + sstable3Desc.withGeneration(gen4), + sstable3Desc.withGeneration(gen5)); + assertEquals(expected, sstables.keySet()); + } + @Test public void testLoadNewSSTablesAvoidsOverwrites() throws Throwable { diff --git a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java index 2b4b2032a8..7871c78c6d 100644 --- a/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java +++ b/test/unit/org/apache/cassandra/db/compaction/CompactionsTest.java @@ -474,7 +474,7 @@ public void testRangeTombstones() @Test public void testCompactionLog() throws Exception { - SystemKeyspace.discardCompactionsInProgress(); + SystemKeyspace.discardCompactionsInProgress(SystemKeyspace.CompactionsInProgressTable.DEFAULT); String cf = "Standard4"; ColumnFamilyStore cfs = Keyspace.open(KEYSPACE1).getColumnFamilyStore(cf); @@ -491,12 +491,12 @@ public Integer apply(SSTableReader sstable) } })); UUID taskId = SystemKeyspace.startCompaction(cfs, sstables); - Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); Set unfinishedCompactions = compactionLogs.get(Pair.create(KEYSPACE1, cf)).keySet(); assertTrue(unfinishedCompactions.containsAll(generations)); - SystemKeyspace.finishCompaction(taskId); - compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + SystemKeyspace.finishCompaction(taskId, SystemKeyspace.CompactionsInProgressTable.DEFAULT); + compactionLogs = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); assertFalse(compactionLogs.containsKey(Pair.create(KEYSPACE1, cf))); } @@ -590,7 +590,7 @@ public void incompletedCompactionAbortNotRemovedFromCompactionsInProgress() thro .stream().map(desc -> desc.generation).collect(Collectors.toSet()); assertEquals(nonTmp, actualNonTmp); - Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(); + Map, Map> compactionLogs = SystemKeyspace.getUnfinishedCompactions(SystemKeyspace.CompactionsInProgressTable.DEFAULT); Pair pair = Pair.create(KEYSPACE1, cfName); assertTrue(compactionLogs.containsKey(pair));