diff --git a/src/java/org/apache/cassandra/db/SystemKeyspace.java b/src/java/org/apache/cassandra/db/SystemKeyspace.java index de2e89d94b..0600f962e6 100644 --- a/src/java/org/apache/cassandra/db/SystemKeyspace.java +++ b/src/java/org/apache/cassandra/db/SystemKeyspace.java @@ -21,7 +21,11 @@ import java.net.InetAddress; import java.nio.ByteBuffer; import java.util.*; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.concurrent.GuardedBy; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; @@ -298,7 +302,9 @@ public static KSMetaData definition() return new KSMetaData(NAME, LocalStrategy.class, Collections.emptyMap(), true, tables); } - private static volatile Map> truncationRecords; + @GuardedBy("truncationLock") + private static Map> truncationRecords; + private static final ReadWriteLock truncationLock = new ReentrantReadWriteLock(/* fair= */ true); public enum BootstrapState { @@ -446,10 +452,15 @@ public static TabularData getCompactionHistory() throws OpenDataException public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) { - String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); - truncationRecords = null; - forceBlockingFlush(LOCAL, "Saving truncation record"); + truncationLock.writeLock().lock(); + try { + String req = "UPDATE system.%s SET truncated_at = truncated_at + ? WHERE key = '%s'"; + executeInternal(String.format(req, LOCAL, LOCAL), truncationAsMapEntry(cfs, truncatedAt, position)); + truncationRecords = null; + forceBlockingFlush(LOCAL, "Saving truncation record"); + } finally { + truncationLock.writeLock().unlock(); + } } /** @@ -457,10 +468,15 @@ public static synchronized void saveTruncationRecord(ColumnFamilyStore cfs, long */ public static synchronized void removeTruncationRecord(UUID cfId) { - String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; - executeInternal(String.format(req, LOCAL, LOCAL), cfId); - truncationRecords = null; - forceBlockingFlush(LOCAL, "Removing truncation record"); + truncationLock.writeLock().lock(); + try { + String req = "DELETE truncated_at[?] from system.%s WHERE key = '%s'"; + executeInternal(String.format(req, LOCAL, LOCAL), cfId); + truncationRecords = null; + forceBlockingFlush(LOCAL, "Removing truncation record"); + } finally { + truncationLock.writeLock().unlock(); + } } private static Map truncationAsMapEntry(ColumnFamilyStore cfs, long truncatedAt, ReplayPosition position) @@ -489,27 +505,42 @@ public static long getTruncatedAt(UUID cfId) return record == null ? Long.MIN_VALUE : record.right; } - private static synchronized Pair getTruncationRecord(UUID cfId) + private static Pair getTruncationRecord(UUID cfId) { - if (truncationRecords == null) + truncationLock.readLock().lock(); + try { + Map> records = truncationRecords; + if (records != null) + return records.get(cfId); + + // truncated records snapshot not cached, fall through to release read lock and acquire write lock + } finally { + truncationLock.readLock().unlock(); + } + + truncationLock.writeLock().lock(); + try { truncationRecords = readTruncationRecords(); - return truncationRecords.get(cfId); + return truncationRecords.get(cfId); + } finally { + truncationLock.writeLock().unlock(); + } } private static Map> readTruncationRecords() { UntypedResultSet rows = executeInternal(String.format("SELECT truncated_at FROM system.%s WHERE key = '%s'", LOCAL, LOCAL)); - Map> records = new HashMap<>(); - if (!rows.isEmpty() && rows.one().has("truncated_at")) { Map map = rows.one().getMap("truncated_at", UUIDType.instance, BytesType.instance); + Map> records = new HashMap<>(map.size()); for (Map.Entry entry : map.entrySet()) records.put(entry.getKey(), truncationRecordFromBlob(entry.getValue())); + return Collections.unmodifiableMap(records); } - return records; + return Collections.emptyMap(); } private static Pair truncationRecordFromBlob(ByteBuffer bytes)