Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce SystemKeyspace truncation record contention #559

Draft
wants to merge 1 commit into
base: palantir-cassandra-2.2.18
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 46 additions & 15 deletions src/java/org/apache/cassandra/db/SystemKeyspace.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,11 @@
import java.net.InetAddress;
import java.nio.ByteBuffer;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import javax.annotation.concurrent.GuardedBy;
import javax.management.openmbean.OpenDataException;
import javax.management.openmbean.TabularData;

Expand Down Expand Up @@ -298,7 +302,9 @@ public static KSMetaData definition()
return new KSMetaData(NAME, LocalStrategy.class, Collections.<String, String>emptyMap(), true, tables);
}

private static volatile Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
@GuardedBy("truncationLock")
private static Map<UUID, Pair<ReplayPosition, Long>> truncationRecords;
private static final ReadWriteLock truncationLock = new ReentrantReadWriteLock(/* fair= */ true);

public enum BootstrapState
{
Expand Down Expand Up @@ -446,21 +452,31 @@ public static TabularData getCompactionHistory() throws OpenDataException

public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
{
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL, "Saving truncation record");
truncationLock.writeLock().lock();
try {
String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position));
truncationRecords = null;
forceBlockingFlush(LOCAL, "Saving truncation record");
} finally {
truncationLock.writeLock().unlock();
}
}

/**
* This method is used to remove information about truncation time for specified column family
*/
public static synchronized void removeTruncationRecord(UUID cfId)
{
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL, "Removing truncation record");
truncationLock.writeLock().lock();
try {
String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'";
executeInternal(String.format(req, LOCAL, LOCAL), cfId);
truncationRecords = null;
forceBlockingFlush(LOCAL, "Removing truncation record");
} finally {
truncationLock.writeLock().unlock();
}
}

private static Map<UUID, ByteBuffer> truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position)
Expand Down Expand Up @@ -489,27 +505,42 @@ public static long getTruncatedAt(UUID cfId)
return record == null ? Long.MIN_VALUE : record.right;
}

private static synchronized Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
private static Pair<ReplayPosition, Long> getTruncationRecord(UUID cfId)
{
if (truncationRecords == null)
truncationLock.readLock().lock();
try {
Map<UUID, Pair<ReplayPosition, Long>> records = truncationRecords;
if (records != null)
return records.get(cfId);

// truncated records snapshot not cached, fall through to release read lock and acquire write lock
} finally {
truncationLock.readLock().unlock();
}

truncationLock.writeLock().lock();
try {
truncationRecords = readTruncationRecords();
return truncationRecords.get(cfId);
return truncationRecords.get(cfId);
} finally {
truncationLock.writeLock().unlock();
}
}

private static Map<UUID, Pair<ReplayPosition, Long>> readTruncationRecords()
{
UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL));

Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>();

if (!rows.isEmpty() && rows.one().has("truncated_at"))
{
Map<UUID, ByteBuffer> map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance);
Map<UUID, Pair<ReplayPosition, Long>> records = new HashMap<>(map.size());
for (Map.Entry<UUID, ByteBuffer> entry : map.entrySet())
records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue()));
return Collections.unmodifiableMap(records);
}

return records;
return Collections.emptyMap();
}

private static Pair<ReplayPosition, Long> truncationRecordFromBlob(ByteBuffer bytes)
Expand Down