Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Write-ahead log for SSTable deletions #557

Open
wants to merge 19 commits into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Original file line number Diff line number Diff line change
Expand Up @@ -77,4 +77,9 @@ public void markForDeletion(CFMetaData cfMetaData, Set<Descriptor> descriptors)
{
writeAheadLogger.markForDeletion(cfMetaData, descriptors);
}

public boolean shouldRemoveUnusedSstables() {
// TODO(wdey): delegate
return true;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.palantir.cassandra.db;

import java.util.EnumMap;
import java.util.function.Supplier;

import org.apache.cassandra.db.Keyspace;
Expand Down Expand Up @@ -52,15 +53,25 @@
public class CompactionsInProgressFlusher {
private static final boolean COALESCE_FLUSHES = Boolean.getBoolean("palantir_cassandra.coalesce_cip_flushes");

public static final CompactionsInProgressFlusher INSTANCE = new CompactionsInProgressFlusher();

private final Supplier<ReplayPosition> flusher = () -> FBUtilities.waitOnFuture(
Keyspace.open(SystemKeyspace.NAME)
.getColumnFamilyStore(SystemKeyspace.COMPACTIONS_IN_PROGRESS)
.forceFlush("CompactionsInProgressFlusher"));
private final Supplier<ReplayPosition> coalescingFlusher = new CoalescingSupplier<ReplayPosition>(flusher);

private CompactionsInProgressFlusher() { }
public static final EnumMap<SystemKeyspace.CompactionsInProgressTable, CompactionsInProgressFlusher> INSTANCES = new EnumMap<>(SystemKeyspace.CompactionsInProgressTable.class);

static {
INSTANCES.put(SystemKeyspace.CompactionsInProgressTable.DEFAULT, new CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable.DEFAULT));
INSTANCES.put(SystemKeyspace.CompactionsInProgressTable.WAL, new CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable.WAL));
}

private final SystemKeyspace.CompactionsInProgressTable compactionsInProgressTable;
private final Supplier<ReplayPosition> flusher;
private final Supplier<ReplayPosition> coalescingFlusher;

private CompactionsInProgressFlusher(SystemKeyspace.CompactionsInProgressTable compactionsInProgressTable) {
this.compactionsInProgressTable = compactionsInProgressTable;
flusher = () -> FBUtilities.waitOnFuture(
Keyspace.open(SystemKeyspace.NAME)
.getColumnFamilyStore(compactionsInProgressTable.toString())
.forceFlush("CompactionsInProgressFlusher"));
coalescingFlusher = new CoalescingSupplier<>(flusher);
}

public ReplayPosition forceBlockingFlush() {
if (COALESCE_FLUSHES) {
Expand Down
9 changes: 5 additions & 4 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public class ColumnFamilyStore implements ColumnFamilyStoreMBean
private static final Logger logger = LoggerFactory.getLogger(ColumnFamilyStore.class);

private static final boolean DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP = Boolean.getBoolean(
"palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup");
"palantir_cassandra.dry_run_non_compacting_unused_sstable_cleanup");

private static final ExecutorService flushExecutor = new JMXEnabledThreadPoolExecutor(DatabaseDescriptor.getFlushWriters(),
StageManager.KEEPALIVE,
Expand Down Expand Up @@ -755,15 +755,16 @@ public static void removeUnusedSstables(CFMetaData metadata, Map<Integer, UUID>
completedAncestors.addAll(ancestors);
}
}
cleanedUnfinishedCompactions.forEach(SystemKeyspace::finishCompaction);
cleanedUnfinishedCompactions.forEach(uuid -> SystemKeyspace.finishCompaction(uuid, SystemKeyspace.CompactionsInProgressTable.DEFAULT));

// remove old sstables from compactions that did complete
for (Map.Entry<Descriptor, Set<Component>> sstableFiles : directories.sstableLister().list().entrySet())
{
Descriptor desc = sstableFiles.getKey();
if (completedAncestors.contains(desc.generation))
{
if (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty())
if (ColumnFamilyStoreManager.instance.shouldSkipAncestorCleanupBasedOnAncestorMetadata()
|| (DRY_RUN_NON_COMPACTING_UNUSED_SSTABLE_CLEANUP && unfinishedCompactions.isEmpty()))
{
logger.warn("Would have deleted leftover compaction ancestor", UnsafeArg.of("desc", desc),
SafeArg.of("keyspace", desc.ksname), SafeArg.of("cf", desc.cfname),
Expand All @@ -776,7 +777,7 @@ public static void removeUnusedSstables(CFMetaData metadata, Map<Integer, UUID>
SafeArg.of("generation", desc.generation), ancestorsArg);
SSTable.delete(desc, sstableFiles.getValue());
Optional.ofNullable(unfinishedCompactions.get(desc.generation))
.ifPresent(SystemKeyspace::finishCompaction);
.ifPresent(uuid -> SystemKeyspace.finishCompaction(uuid, SystemKeyspace.CompactionsInProgressTable.DEFAULT));
}
}
}
Expand Down
57 changes: 45 additions & 12 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ public final class SystemKeyspace
public static final String SIZE_ESTIMATES = "size_estimates";
public static final String AVAILABLE_RANGES = "available_ranges";

public enum CompactionsInProgressTable
{
DEFAULT("compactions_in_progress"),
WAL("post_wal_compactions_in_progress");

private final String name;

CompactionsInProgressTable(String name)
{
this.name = name;
}

@Override
public String toString() {
return name;
}
}

public static final CFMetaData Hints =
compile(HINTS,
"hints awaiting delivery",
Expand Down Expand Up @@ -212,8 +230,20 @@ private static int getCompactionsInProgresStcsMaxThreshold() {
}
return val;
}
private static int compactionsInProgressMaxCompactionThreshold = getCompactionsInProgresStcsMaxThreshold();
private static final CFMetaData CompactionsInProgress =
private static final int compactionsInProgressMaxCompactionThreshold = getCompactionsInProgresStcsMaxThreshold();
private static final CFMetaData DefaultCompactionsInProgress =
compile(COMPACTIONS_IN_PROGRESS,
"unfinished compactions",
"CREATE TABLE %s ("
+ "id uuid,"
+ "columnfamily_name text,"
+ "inputs set<int>,"
+ "keyspace_name text,"
+ "PRIMARY KEY ((id)))")
.maxCompactionThreshold(compactionsInProgressMaxCompactionThreshold)
.compactionStrategyClass(SizeTieredCompactionStrategy.class)
.compactionStrategyOptions(Collections.singletonMap("max_threshold", Integer.toString(compactionsInProgressMaxCompactionThreshold)));
private static final CFMetaData WalCompactionsInProgress =
compile(COMPACTIONS_IN_PROGRESS,
"unfinished compactions",
"CREATE TABLE %s ("
Expand Down Expand Up @@ -290,7 +320,8 @@ public static KSMetaData definition()
Peers,
PeerEvents,
RangeXfers,
CompactionsInProgress,
DefaultCompactionsInProgress,
WalCompactionsInProgress,
CompactionHistory,
SSTableActivity,
SizeEstimates,
Expand Down Expand Up @@ -370,8 +401,10 @@ public Integer apply(SSTableReader sstable)
}
});
String req = "INSERT INTO system.%s (id, keyspace_name, columnfamily_name, inputs) VALUES (?, ?, ?, ?)";
executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
CompactionsInProgressFlusher.INSTANCE.forceBlockingFlush();
CompactionsInProgressFlusher.INSTANCES.forEach((table, flusher) -> {
executeInternal(String.format(req, table.toString()), compactionId, cfs.keyspace.getName(), cfs.name, Sets.newHashSet(generations));
flusher.forceBlockingFlush();
});
return compactionId;
}

Expand All @@ -380,22 +413,22 @@ public Integer apply(SSTableReader sstable)
* to complete successfully for this to be called.
* @param taskId what was returned from {@code startCompaction}
*/
public static void finishCompaction(UUID taskId)
public static void finishCompaction(UUID taskId, CompactionsInProgressTable table)
{
assert taskId != null;

executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", COMPACTIONS_IN_PROGRESS), taskId);
CompactionsInProgressFlusher.INSTANCE.forceBlockingFlush();
executeInternal(String.format("DELETE FROM system.%s WHERE id = ?", table.toString()), taskId);
CompactionsInProgressFlusher.INSTANCES.get(table).forceBlockingFlush();
}

/**
* Returns a Map whose keys are KS.CF pairs and whose values are maps from sstable generation numbers to the
* task ID of the compaction they were participating in.
*/
public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions()
public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompactions(CompactionsInProgressTable table)
{
String req = "SELECT * FROM system.%s";
UntypedResultSet resultSet = executeInternal(String.format(req, COMPACTIONS_IN_PROGRESS));
UntypedResultSet resultSet = executeInternal(String.format(req, table.toString()));

Map<Pair<String, String>, Map<Integer, UUID>> unfinishedCompactions = new HashMap<>();
for (UntypedResultSet.Row row : resultSet)
Expand All @@ -418,9 +451,9 @@ public static Map<Pair<String, String>, Map<Integer, UUID>> getUnfinishedCompact
return unfinishedCompactions;
}

public static void discardCompactionsInProgress()
public static void discardCompactionsInProgress(CompactionsInProgressTable table)
{
ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(COMPACTIONS_IN_PROGRESS);
ColumnFamilyStore compactionLog = Keyspace.open(NAME).getColumnFamilyStore(table.toString());
compactionLog.truncateBlocking(false);
}

Expand Down
Loading