Skip to content

Commit

Permalink
STAR-1872: Implements multi-task compactions
Browse files Browse the repository at this point in the history
This splits compactions that are to produce more than one
output sstable into tasks that can execute in parallel.
Such tasks share a transaction and have combined progress
and observer. Because we cannot mark parts of an sstable
as unneeded, the transaction is only applied when all
tasks have succeeded. This also means that early open
is not supported for such tasks.

The new behaviour is off by default and enabled by the
parallelize_output_shards option. Patch also adds a flag
reshard_major_compactions to combine non-overlapping sets
in major compactions and reshard them, which can now
be a parallelizable operation.

Also:
- Changes SSTable expiration to be done in a separate
  getNextBackgroundCompactions round to improve the
  efficiency of expiration (separate task can run quickly
  and remove the relevant sstables without waiting for
  a compaction to end).

- Applies small-partition-count correction in
  ShardManager.calculateCombinedDensity.
  • Loading branch information
blambov committed Oct 29, 2024
1 parent 9620e87 commit 9512132
Show file tree
Hide file tree
Showing 64 changed files with 2,885 additions and 441 deletions.
1 change: 1 addition & 0 deletions src/java/org/apache/cassandra/db/ColumnFamilyStore.java
Original file line number Diff line number Diff line change
Expand Up @@ -1444,6 +1444,7 @@ public Collection<SSTableReader> flushMemtable(ColumnFamilyStore cfs, Memtable m
}

Throwable accumulate = null;

for (SSTableMultiWriter writer : flushResults)
{
accumulate = writer.commit(accumulate);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,11 @@
import org.apache.cassandra.config.CassandraRelevantProperties;
import org.apache.cassandra.db.Directories;
import org.apache.cassandra.db.compaction.writers.CompactionAwareWriter;
import org.apache.cassandra.db.lifecycle.ILifecycleTransaction;
import org.apache.cassandra.io.FSDiskFullWriteError;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.utils.Throwables;
import org.apache.cassandra.utils.WrappedRunnable;
import org.apache.cassandra.db.lifecycle.LifecycleTransaction;

import static com.google.common.base.Throwables.propagate;

Expand All @@ -45,7 +45,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
CassandraRelevantProperties.COMPACTION_SKIP_REPAIR_STATE_CHECKING.getBoolean();

protected final CompactionRealm realm;
protected LifecycleTransaction transaction;
protected ILifecycleTransaction transaction;
protected boolean isUserDefined;
protected OperationType compactionType;
protected TableOperationObserver opObserver;
Expand All @@ -55,7 +55,7 @@ public abstract class AbstractCompactionTask extends WrappedRunnable
* @param realm
* @param transaction the modifying managing the status of the sstables we're replacing
*/
protected AbstractCompactionTask(CompactionRealm realm, LifecycleTransaction transaction)
protected AbstractCompactionTask(CompactionRealm realm, ILifecycleTransaction transaction)
{
this.realm = realm;
this.transaction = transaction;
Expand All @@ -66,10 +66,13 @@ protected AbstractCompactionTask(CompactionRealm realm, LifecycleTransaction tra

try
{
// enforce contract that caller should mark sstables compacting
Set<SSTableReader> compacting = transaction.getCompacting();
for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
if (!transaction.isOffline())
{
// enforce contract that caller should mark sstables compacting
var compacting = realm.getCompactingSSTables();
for (SSTableReader sstable : transaction.originals())
assert compacting.contains(sstable) : sstable.getFilename() + " is not correctly marked compacting";
}

validateSSTables(transaction.originals());
}
Expand Down Expand Up @@ -120,18 +123,18 @@ private void validateSSTables(Set<SSTableReader> sstables)
* Executes the task after setting a new observer, normally the observer is the
* compaction manager metrics.
*/
public int execute(TableOperationObserver observer)
public void execute(TableOperationObserver observer)
{
return setOpObserver(observer).execute();
setOpObserver(observer).execute();
}

/** Executes the task */
public int execute()
public void execute()
{
Throwable t = null;
try
{
return executeInternal();
executeInternal();
}
catch (FSDiskFullWriteError e)
{
Expand All @@ -151,6 +154,11 @@ public int execute()
}
}

public Throwable rejected(Throwable t)
{
return cleanup(t);
}

public Throwable cleanup(Throwable err)
{
final boolean isSuccess = err == null;
Expand All @@ -160,21 +168,16 @@ public Throwable cleanup(Throwable err)
return Throwables.perform(err, () -> transaction.close());
}

public abstract CompactionAwareWriter getCompactionAwareWriter(CompactionRealm realm, Directories directories, LifecycleTransaction txn, Set<SSTableReader> nonExpiredSSTables);

@VisibleForTesting
public LifecycleTransaction getTransaction()
{
return transaction;
}

@VisibleForTesting
public OperationType getCompactionType()
{
return compactionType;
}

protected abstract int executeInternal();
protected void executeInternal()
{
run();
}

// TODO Eventually these three setters should be passed in to the constructor.

Expand Down Expand Up @@ -205,13 +208,13 @@ public void addObserver(CompactionObserver compObserver)
}

@VisibleForTesting
public List<CompactionObserver> getCompObservers()
List<CompactionObserver> getCompObservers()
{
return compObservers;
}

@VisibleForTesting
public LifecycleTransaction transaction()
ILifecycleTransaction getTransaction()
{
return transaction;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,12 @@ CompletableFuture<?>[] startCompactionTasks(ColumnFamilyStore cfs, Collection<Ab
if (!compactionTasks.isEmpty())
{
logger.debug("Running compaction tasks: {}", compactionTasks);
return compactionTasks.stream()
.map(task -> startTask(cfs, task))
.toArray(CompletableFuture<?>[]::new);
CompletableFuture<Void>[] arr = new CompletableFuture[compactionTasks.size()];
int index = 0;
for (AbstractCompactionTask task : compactionTasks)
arr[index++] = startTask(cfs, task);

return arr;
}
else
{
Expand Down Expand Up @@ -382,6 +385,7 @@ private CompletableFuture<Void> startTask(ColumnFamilyStore cfs, AbstractCompact
{
ongoingCompactions.decrementAndGet();
logger.debug("Background compaction task for {} was rejected", cfs);
task.rejected(ex);
return CompletableFuture.completedFuture(null);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.*;
import java.util.function.LongPredicate;
import java.util.function.UnaryOperator;

import javax.annotation.Nullable;

Expand Down Expand Up @@ -120,7 +121,7 @@ public Set<CompactionSSTable> getFullyExpiredSSTables()
{
if (overlapTracker == null)
return Collections.emptySet();
return getFullyExpiredSSTables(realm, compacting, overlapTracker.overlaps(), gcBefore, ignoreOverlaps());
return getFullyExpiredSSTables(realm, compacting, c -> overlapTracker.overlaps(), gcBefore, ignoreOverlaps());
}

/**
Expand All @@ -135,15 +136,15 @@ public Set<CompactionSSTable> getFullyExpiredSSTables()
*
* @param realm
* @param compacting we take the drop-candidates from this set, it is usually the sstables included in the compaction
* @param overlapping the sstables that overlap the ones in compacting.
* @param overlappingSupplier function used to get the sstables that overlap the ones in compacting.
* @param gcBefore
* @param ignoreOverlaps don't check if data shadows/overlaps any data in other sstables
* @return
*/
public static
Set<CompactionSSTable> getFullyExpiredSSTables(CompactionRealm realm,
Iterable<? extends CompactionSSTable> compacting,
Iterable<? extends CompactionSSTable> overlapping,
UnaryOperator<Iterable<? extends CompactionSSTable>> overlappingSupplier,
int gcBefore,
boolean ignoreOverlaps)
{
Expand All @@ -158,6 +159,7 @@ Set<CompactionSSTable> getFullyExpiredSSTables(CompactionRealm realm,
long minTimestamp;
if (!ignoreOverlaps)
{
var overlapping = overlappingSupplier.apply(compacting);
minTimestamp = Math.min(Math.min(minSurvivingTimestamp(overlapping, gcBefore),
minSurvivingTimestamp(compacting, gcBefore)),
minTimestamp(realm.getAllMemtables()));
Expand Down Expand Up @@ -215,10 +217,10 @@ private static long minSurvivingTimestamp(Iterable<? extends CompactionSSTable>
public static
Set<CompactionSSTable> getFullyExpiredSSTables(CompactionRealm realm,
Iterable<? extends CompactionSSTable> compacting,
Iterable<? extends CompactionSSTable> overlapping,
UnaryOperator<Iterable<? extends CompactionSSTable>> overlappingSupplier,
int gcBefore)
{
return getFullyExpiredSSTables(realm, compacting, overlapping, gcBefore, false);
return getFullyExpiredSSTables(realm, compacting, overlappingSupplier, gcBefore, false);
}

/**
Expand Down
18 changes: 6 additions & 12 deletions src/java/org/apache/cassandra/db/compaction/CompactionCursor.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@
import org.apache.cassandra.db.rows.BTreeRow;
import org.apache.cassandra.db.rows.Row;
import org.apache.cassandra.db.rows.Unfiltered;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.compaction.SortedStringTableCursor;
import org.apache.cassandra.io.sstable.compaction.IteratorFromCursor;
import org.apache.cassandra.io.sstable.compaction.PurgeCursor;
Expand Down Expand Up @@ -69,26 +71,23 @@ public class CompactionCursor implements SSTableCursorMerger.MergeListener, Auto
private final long[] mergedPartitionsHistogram;
private final long[] mergedRowsHistogram;

private final long totalCompressedSize;

@SuppressWarnings("resource")
public CompactionCursor(OperationType type, Collection<SSTableReader> readers, CompactionController controller, RateLimiter limiter, int nowInSec, UUID compactionId)
public CompactionCursor(OperationType type, Collection<SSTableReader> readers, Range<Token> tokenRange, CompactionController controller, RateLimiter limiter, int nowInSec, UUID compactionId)
{
this.controller = controller;
this.type = type;
this.compactionId = compactionId;
this.totalCompressedSize = readers.stream().mapToLong(SSTableReader::onDiskLength).sum();
this.mergedPartitionsHistogram = new long[readers.size()];
this.mergedRowsHistogram = new long[readers.size()];
this.rowBuilder = BTreeRow.sortedBuilder();
this.sstables = ImmutableSet.copyOf(readers);
this.cursor = makeMergedAndPurgedCursor(readers, controller, limiter, nowInSec);
this.cursor = makeMergedAndPurgedCursor(readers, tokenRange, controller, limiter, nowInSec);
this.totalBytes = cursor.bytesTotal();
this.currentBytes = 0;
this.currentProgressMillisSinceStartup = System.currentTimeMillis();
}

private SSTableCursor makeMergedAndPurgedCursor(Collection<SSTableReader> readers,
Range<Token> tokenRange,
CompactionController controller,
RateLimiter limiter,
int nowInSec)
Expand All @@ -97,7 +96,7 @@ private SSTableCursor makeMergedAndPurgedCursor(Collection<SSTableReader> reader
return SSTableCursor.empty();

SSTableCursor merged = new SSTableCursorMerger(readers.stream()
.map(r -> new SortedStringTableCursor(r, limiter))
.map(r -> new SortedStringTableCursor(r, tokenRange, limiter))
.collect(Collectors.toList()),
metadata(),
this);
Expand Down Expand Up @@ -247,11 +246,6 @@ long totalSourceRows()
return Arrays.stream(mergedRowsHistogram).reduce(0L, Long::sum);
}

public long getTotalCompressedSize()
{
return totalCompressedSize;
}

long[] mergedPartitionsHistogram()
{
return mergedPartitionsHistogram;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.cassandra.concurrent.NamedThreadFactory;
import org.apache.cassandra.dht.Range;
import org.apache.cassandra.dht.Token;
import org.apache.cassandra.io.sstable.format.SSTableReader;
import org.apache.cassandra.schema.TableMetadata;
import org.apache.cassandra.utils.JVMStabilityInspector;
Expand Down Expand Up @@ -276,7 +278,7 @@ public void flush(Collection<SSTableReader> sstables)
}
}

public void compaction(long startTime, Collection<SSTableReader> input, long endTime, Collection<SSTableReader> output)
public void compaction(long startTime, Collection<SSTableReader> input, Range<Token> tokenRange, long endTime, Collection<SSTableReader> output)
{
if (enabled.get())
{
Expand All @@ -287,6 +289,8 @@ public void compaction(long startTime, Collection<SSTableReader> input, long end
node.put("end", String.valueOf(endTime));
node.set("input", sstableMap(input));
node.set("output", sstableMap(output));
if (tokenRange != null)
node.put("range", tokenRange.toString());
jsonWriter.write(node, this::getEventJsonNode, this);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,10 +96,18 @@ public interface CompactionProgress extends TableOperation.Progress
*/
long uncompressedBytesWritten();

/**
* @return the start time of this operation in nanoTime.
*/
long startTimeNanos();

/**
* @return the duration so far in nanoseconds.
*/
long durationInNanos();
default long durationInNanos()
{
return System.nanoTime() - startTimeNanos();
}

/**
* @return total number of partitions read
Expand Down Expand Up @@ -128,7 +136,16 @@ public interface CompactionProgress extends TableOperation.Progress
/**
* @return the ratio of bytes before and after compaction, using the adjusted input and output disk sizes (uncompressed values).
*/
double sizeRatio();
default double sizeRatio()
{
long estInputSizeBytes = adjustedInputDiskSize();
if (estInputSizeBytes > 0)
return outputDiskSize() / (double) estInputSizeBytes;

// this is a valid case, when there are no sstables to actually compact
// the previous code would return a NaN that would be logged as zero
return 0;
}

default double readThroughput()
{
Expand Down
Loading

1 comment on commit 9512132

@cassci-bot
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Build rejected: 1 NEW test failure(s) in 15 builds.
Butler analysis done on ds-cassandra-pr-gate/STAR-1872 vs last 16 runs of ds-cassandra-build-nightly/main.

Status Test Description Branch story Upstream story
QueryRowDeletionsTest.testRowDeletions[aa_CompositePartitionKeyDataModel{primaryKey=p1, p2}] test failed in the recent build. No failures on upstream 🔴🔵🔵🔵 🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵🔵
butler comparison

Please sign in to comment.