From 13cece160b8abc603fdae87710c85de8730f91d2 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Mon, 30 Dec 2024 17:14:03 +0300 Subject: [PATCH 01/10] IGNITE-24267 Heap lock manager behave incorrectly in case of overflow --- .../benchmark/AbstractMultiNodeBenchmark.java | 2 +- .../ignite/distributed/ItLockTableTest.java | 11 +- .../ignite/internal/tx/TxStateMeta.java | 3 +- .../internal/tx/impl/HeapLockManager.java | 109 ++++++++---------- .../internal/tx/HeapLockManagerTest.java | 71 ++++++++++++ 5 files changed, 127 insertions(+), 69 deletions(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java index 706ba639821..ac365c12dcf 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java @@ -206,7 +206,7 @@ private void startCluster() throws Exception { + " rest.port: {},\n" + " raft.fsync = " + fsync() + ",\n" + " system.partitionsLogPath = \"" + logPath() + "\",\n" - + " failureHandler.handler: {\n" + + " failureHandler.handler: {\n" + " type: \"" + StopNodeOrHaltFailureHandlerConfigurationSchema.TYPE + "\",\n" + " tryStop: true,\n" + " timeoutMillis: 60000,\n" // 1 minute for graceful shutdown diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index 335db346817..a547dcbf545 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestampTracker; +import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.lowwatermark.LowWatermark; @@ -184,7 +185,7 @@ public void testCollision() { RecordView view = testTable.recordView(); int i = 0; - final int count = 1000; + final int count = 100; List txns = new ArrayList<>(); while (i++ < count) { Transaction tx = txTestCluster.igniteTransactions().begin(); @@ -200,7 +201,7 @@ public void testCollision() { total += slot.waitersCount(); } - return total == count && lockManager.available() == 0; + return total == CACHE_SIZE && lockManager.available() == 0; }, 10_000), "Some lockers are missing"); int empty = 0; @@ -212,19 +213,19 @@ public void testCollision() { int cnt = slot.waitersCount(); if (cnt == 0) { empty++; - } - if (cnt > 1) { + } else { coll += cnt; } } LOG.info("LockTable [emptySlots={} collisions={}]", empty, coll); + System.out.println(IgniteStringFormatter.format("LockTable [emptySlots={} collisions={}]", empty, coll)); assertTrue(coll > 0); List> finishFuts = new ArrayList<>(); for (Transaction txn : txns) { - finishFuts.add(txn.commitAsync()); + finishFuts.add(txn.rollbackAsync()); } for (CompletableFuture finishFut : finishFuts) { diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index 7956fce3c92..13a18982d59 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -29,6 +29,7 @@ import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.tx.message.TxStateMetaMessage; +import org.apache.ignite.internal.util.FastTimestamps; import org.jetbrains.annotations.Nullable; /** @@ -108,7 +109,7 @@ public TxStateMeta( this.txCoordinatorId = txCoordinatorId; this.commitPartitionId = commitPartitionId; this.commitTimestamp = commitTimestamp; - this.initialVacuumObservationTimestamp = initialVacuumObservationTimestamp; + this.initialVacuumObservationTimestamp = TxState.isFinalState(txState) ? FastTimestamps.coarseCurrentTimeMillis() : null; this.cleanupCompletionTimestamp = cleanupCompletionTimestamp; } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java index 2911fb9e2aa..2574a18dbef 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java @@ -43,14 +43,11 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; -import java.util.concurrent.ForkJoinPool; import java.util.concurrent.TimeUnit; import org.apache.ignite.internal.configuration.SystemLocalConfiguration; import org.apache.ignite.internal.configuration.SystemPropertyView; import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.lang.IgniteBiTuple; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.tostring.IgniteToStringExclude; import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.DeadlockPreventionPolicy; @@ -80,66 +77,47 @@ *

Additionally limits the lock map size. */ public class HeapLockManager extends AbstractEventProducer implements LockManager { - /** Throttled logger. */ - private static final IgniteLogger THROTTLED_LOG = Loggers.toThrottledLogger( - Loggers.forClass(HeapLockManager.class), - // TODO: IGNITE-24181 Get rid of Common thread pool. - ForkJoinPool.commonPool() - ); + /** Table size. TODO make it configurable IGNITE-20694 */ + private static final int SLOTS = 1_048_576; - /** - * Table size. - */ + /** Table size. */ public static final int DEFAULT_SLOTS = 1_048_576; private static final String LOCK_MAP_SIZE_PROPERTY_NAME = "lockMapSize"; private static final String RAW_SLOTS_MAX_SIZE_PROPERTY_NAME = "rawSlotsMaxSize"; - /** - * Striped lock concurrency. - */ + /** Striped lock concurrency. */ private static final int CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + /** An unused state to avoid concurrent allocation. */ + private LockState removedLockState; + /** Lock map size. */ private final int lockMapSize; /** Raw slots size. */ private final int rawSlotsMaxSize; - /** - * Empty slots. - */ + /** Empty slots. */ private final ConcurrentLinkedQueue empty = new ConcurrentLinkedQueue<>(); - /** - * Mapped slots. - */ + /** Mapped slots. */ private ConcurrentHashMap locks; - /** - * Raw slots. - */ + /** Raw slots. */ private LockState[] slots; - /** - * The policy. - */ + /** The policy. */ private DeadlockPreventionPolicy deadlockPreventionPolicy; - /** - * Executor that is used to fail waiters after timeout. - */ + /** Executor that is used to fail waiters after timeout. */ private Executor delayedExecutor; - /** - * Enlisted transactions. - */ + /** Enlisted transactions. */ private final ConcurrentHashMap> txMap = new ConcurrentHashMap<>(1024); - /** - * Coarse locks. - */ + /** Coarse locks. */ private final ConcurrentHashMap coarseMap = new ConcurrentHashMap<>(); /** @@ -183,6 +161,7 @@ private static int intProperty(SystemLocalConfiguration systemProperties, String @Override public void start(DeadlockPreventionPolicy deadlockPreventionPolicy) { this.deadlockPreventionPolicy = deadlockPreventionPolicy; + this.removedLockState = new LockState(); this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0 ? CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), TimeUnit.MILLISECONDS) @@ -211,7 +190,14 @@ public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lock } while (true) { - LockState state = lockState(lockKey); + LockState state = acquireLockState(lockKey); + + if (state == null) { + return failedFuture(new LockException( + ACQUIRE_LOCK_ERR, + "Failed to acquire a lock due to lock table overflow [txId=" + txId + ", limit=" + rawSlotsMaxSize + ']' + )); + } IgniteBiTuple, LockMode> futureTuple = state.tryAcquire(txId, lockMode); @@ -299,38 +285,32 @@ public Iterator locks(UUID txId) { } /** - * Returns the lock state for the key. + * Gets a lock state. Use this method where sure this lock state was already acquired. * - * @param key The key. + * @param key A lock key. + * @return A state matched with the key or unused state. */ private LockState lockState(LockKey key) { - int h = spread(key.hashCode()); - int index = h & (slots.length - 1); - - LockState[] res = new LockState[1]; + return locks.getOrDefault(key, removedLockState); + } - locks.compute(key, (k, v) -> { + /** + * Returns the lock state for the key. + * + * @param key The key. + * @return A state matched with the key. + */ + private @Nullable LockState acquireLockState(LockKey key) { + return locks.compute(key, (k, v) -> { if (v == null) { v = empty.poll(); - if (v == null) { - res[0] = slots[index]; - - THROTTLED_LOG.warn( - "Log manager runs out of slots. So the lock state starts to share, and conflicts may appear frequently." - ); - } else { - v.markedForRemove = false; + if (v != null) { v.key = k; - res[0] = v; } - } else { - res[0] = v; } return v; }); - - return res[0]; } /** {@inheritDoc} */ @@ -376,7 +356,6 @@ private LockState adjustLockState(LockState state, LockState v) { synchronized (v.waiters) { if (v.waiters.isEmpty()) { - v.markedForRemove = true; v.key = null; empty.add(v); return null; @@ -776,9 +755,6 @@ public class LockState implements Releasable { /** Waiters. */ private final TreeMap waiters; - /** Marked for removal flag. */ - private volatile boolean markedForRemove = false; - /** Lock key. */ private volatile LockKey key; @@ -789,6 +765,15 @@ public class LockState implements Releasable { this.waiters = new TreeMap<>(txComparator); } + /** + * Checks if lock state is used to hold a lock. + * + * @return True if the state is used, otherwise false. + */ + boolean isUsed() { + return key != null; + } + @Override public LockKey key() { return key; @@ -821,7 +806,7 @@ public boolean coarse() { WaiterImpl waiter = new WaiterImpl(txId, lockMode); synchronized (waiters) { - if (markedForRemove) { + if (!isUsed()) { return new IgniteBiTuple(null, lockMode); } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java index 3c6d3528be1..69c93476ef3 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java @@ -17,15 +17,21 @@ package org.apache.ignite.internal.tx; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed; +import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.apache.ignite.internal.tx.impl.HeapLockManager.DEFAULT_SLOTS; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.arrayWithSize; import static org.hamcrest.Matchers.is; +import static org.junit.jupiter.api.Assertions.assertTrue; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.configuration.SystemLocalConfiguration; import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; import org.apache.ignite.internal.tx.impl.WaitDieDeadlockPreventionPolicy; +import org.apache.ignite.internal.tx.test.TestTransactionIds; import org.junit.jupiter.api.Test; /** @@ -44,6 +50,71 @@ protected LockKey lockKey() { return new LockKey(0, "test"); } + @Test + public void testLockTableOverflow() throws Exception { + int maxSlots = 16; + + HeapLockManager lockManager = new HeapLockManager(maxSlots, maxSlots); + lockManager.start(new WaitDieDeadlockPreventionPolicy()); + + UUID[] txs = new UUID[maxSlots]; + + for (int i = 0; i < maxSlots; i++) { + txs[i] = TestTransactionIds.newTransactionId(); + lockManager.acquire(txs[i], new LockKey(txs[i], txs[i]), LockMode.S).get(); + } + + UUID overflowTx = TestTransactionIds.newTransactionId(); + + CompletableFuture overflowLockFut = lockManager.acquire(overflowTx, new LockKey(overflowTx, overflowTx), LockMode.S); + + assertThat(overflowLockFut, willThrowWithCauseOrSuppressed( + LockException.class, + "Failed to acquire a lock due to lock table overflow" + )); + + for (int i = 0; i < maxSlots; i++) { + lockManager.releaseAll(txs[i]); + } + + overflowLockFut = lockManager.acquire(overflowTx, new LockKey(overflowTx, overflowTx), LockMode.S); + + assertThat(overflowLockFut, willCompleteSuccessfully()); + + lockManager.releaseAll(overflowTx); + + assertTrue(lockManager.isEmpty()); + } + + @Test + public void testLockTooManyKeysInTx() throws Exception { + int maxSlots = 16; + + HeapLockManager lockManager = new HeapLockManager(maxSlots, maxSlots); + lockManager.start(new WaitDieDeadlockPreventionPolicy()); + + UUID txId = TestTransactionIds.newTransactionId(); + + for (int i = 0; i < maxSlots; i++) { + lockManager.acquire(txId, new LockKey(i, i), LockMode.S).get(); + } + + int moreKeys = 2 * maxSlots; + + for (int i = maxSlots; i < moreKeys; i++) { + CompletableFuture overflowLockFut = lockManager.acquire(txId, new LockKey(i, i), LockMode.S); + + assertThat(overflowLockFut, willThrowWithCauseOrSuppressed( + LockException.class, + "Failed to acquire a lock due to lock table overflow" + )); + } + + lockManager.releaseAll(txId); + + assertTrue(lockManager.isEmpty()); + } + @Test public void testDefaultConfiguration() { assertThat(((HeapLockManager) lockManager).available(), is(DEFAULT_SLOTS)); From 103b22b68e4e63d526c0dd51271184379771b177 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Wed, 22 Jan 2025 18:44:47 +0300 Subject: [PATCH 02/10] Fixed code style. --- .../org/apache/ignite/internal/tx/impl/HeapLockManager.java | 3 --- 1 file changed, 3 deletions(-) diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java index 2574a18dbef..651d8a6e89c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java @@ -77,9 +77,6 @@ *

Additionally limits the lock map size. */ public class HeapLockManager extends AbstractEventProducer implements LockManager { - /** Table size. TODO make it configurable IGNITE-20694 */ - private static final int SLOTS = 1_048_576; - /** Table size. */ public static final int DEFAULT_SLOTS = 1_048_576; From 35d6877a4fa7505134e3df41b1458f5720fde2b2 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 00:51:03 +0300 Subject: [PATCH 03/10] Removed exceed changes. --- .../java/org/apache/ignite/distributed/ItLockTableTest.java | 2 -- .../main/java/org/apache/ignite/internal/tx/TxStateMeta.java | 3 +-- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index a547dcbf545..ceb95dd6575 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.configuration.testframework.InjectConfiguration; import org.apache.ignite.internal.hlc.ClockService; import org.apache.ignite.internal.hlc.HybridTimestampTracker; -import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.lowwatermark.LowWatermark; @@ -219,7 +218,6 @@ public void testCollision() { } LOG.info("LockTable [emptySlots={} collisions={}]", empty, coll); - System.out.println(IgniteStringFormatter.format("LockTable [emptySlots={} collisions={}]", empty, coll)); assertTrue(coll > 0); diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java index 13a18982d59..7956fce3c92 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/TxStateMeta.java @@ -29,7 +29,6 @@ import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.tx.message.TxMessagesFactory; import org.apache.ignite.internal.tx.message.TxStateMetaMessage; -import org.apache.ignite.internal.util.FastTimestamps; import org.jetbrains.annotations.Nullable; /** @@ -109,7 +108,7 @@ public TxStateMeta( this.txCoordinatorId = txCoordinatorId; this.commitPartitionId = commitPartitionId; this.commitTimestamp = commitTimestamp; - this.initialVacuumObservationTimestamp = TxState.isFinalState(txState) ? FastTimestamps.coarseCurrentTimeMillis() : null; + this.initialVacuumObservationTimestamp = initialVacuumObservationTimestamp; this.cleanupCompletionTimestamp = cleanupCompletionTimestamp; } From 6050ce5d84a17d352ef7e5a4324a1eaa855bbe42 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 16:13:47 +0300 Subject: [PATCH 04/10] UpsertKvBenchmark is configured. --- build.gradle | 2 +- .../benchmark/AbstractMultiNodeBenchmark.java | 2 +- .../internal/benchmark/UpsertKvBenchmark.java | 28 +- .../ignite/internal/app/IgniteImpl.java | 14 +- .../internal/tx/impl/HeapLockManagerV2.java | 1351 +++++++++++++++++ 5 files changed, 1390 insertions(+), 7 deletions(-) create mode 100644 modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java diff --git a/build.gradle b/build.gradle index 31ff0ba21dc..9069c6fcbe9 100644 --- a/build.gradle +++ b/build.gradle @@ -66,7 +66,7 @@ ext { "-Dio.netty.tryReflectionSetAccessible=true", "-XX:+HeapDumpOnOutOfMemoryError", "-ea", - "-Xmx1g" + "-Xmx16g" ] compilerArgs = [ diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java index ac365c12dcf..f9a07168cbd 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java @@ -200,7 +200,7 @@ private void startCluster() throws Exception { + " },\n" + " storage.profiles: {" + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, " - + " " + DEFAULT_STORAGE_PROFILE + ".size: 2073741824 " // Avoid page replacement. + + " " + DEFAULT_STORAGE_PROFILE + ".size: 21474836480 " // Avoid page replacement. + " },\n" + " clientConnector: { port:{} },\n" + " rest.port: {},\n" diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java index a0c5c9da364..0cea960a295 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java @@ -25,6 +25,7 @@ import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Tuple; +import org.jetbrains.annotations.Nullable; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -47,7 +48,7 @@ */ @State(Scope.Benchmark) @Fork(1) -@Threads(1) +@Threads(32) @Warmup(iterations = 10, time = 2) @Measurement(iterations = 20, time = 2) @BenchmarkMode(Mode.Throughput) @@ -63,20 +64,39 @@ public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark { @Param({"false"}) private boolean fsync; - @Param({"8"}) + @Param({"32"}) private int partitionCount; + @Param({"1048576", "131072", "1024"}) + private int lockSlots; + + @Param({"V1", "V2"}) + private String useLocks; + private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ThreadLocal GEN = ThreadLocal.withInitial(() -> COUNTER.getAndIncrement() * 20_000_000); @Override public void nodeSetUp() throws Exception { - System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, "true"); - System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK, "true"); + System.setProperty("LOGIT_STORAGE_ENABLED", "true"); + System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, "false"); + System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK, "false"); + System.setProperty("IGNITE_USE_LOCKS", useLocks); super.nodeSetUp(); } + @Override + protected @Nullable String clusterConfiguration() { + return String.format( + "system.properties: {" + + "lockMapSize = \"%s\", " + + "rawSlotsMaxSize = \"%s\"" + + "}", + lockSlots, lockSlots + ); + } + /** * Initializes the tuple. */ diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index 338fd5dae42..d3b1e00a1db 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -148,6 +148,7 @@ import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.index.IndexNodeFinishedRwTransactionsChecker; import org.apache.ignite.internal.lang.IgniteInternalException; +import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -262,6 +263,7 @@ import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; +import org.apache.ignite.internal.tx.impl.HeapLockManagerV2; import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions; import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; @@ -971,7 +973,17 @@ public class IgniteImpl implements Ignite { var transactionInflights = new TransactionInflights(placementDriverMgr.placementDriver(), clockService); - LockManager lockMgr = new HeapLockManager(systemConfiguration); + String useLocks = IgniteSystemProperties.getString("IGNITE_USE_LOCKS", "V1"); + + LockManager lockMgr; + + switch (useLocks) { + case "V2": + lockMgr = new HeapLockManagerV2(systemConfiguration); + break; + default: + lockMgr = new HeapLockManager(systemConfiguration); + } // TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently). txManager = new TxManagerImpl( diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java new file mode 100644 index 00000000000..964d40cb965 --- /dev/null +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java @@ -0,0 +1,1351 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.tx.impl; + +import static java.util.Collections.emptyList; +import static java.util.concurrent.CompletableFuture.completedFuture; +import static java.util.concurrent.CompletableFuture.failedFuture; +import static org.apache.ignite.internal.tx.event.LockEvent.LOCK_CONFLICT; +import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; +import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; +import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Map.Entry; +import java.util.Set; +import java.util.TreeMap; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.ignite.internal.configuration.SystemLocalConfiguration; +import org.apache.ignite.internal.configuration.SystemPropertyView; +import org.apache.ignite.internal.event.AbstractEventProducer; +import org.apache.ignite.internal.lang.IgniteBiTuple; +import org.apache.ignite.internal.tostring.IgniteToStringExclude; +import org.apache.ignite.internal.tostring.S; +import org.apache.ignite.internal.tx.DeadlockPreventionPolicy; +import org.apache.ignite.internal.tx.Lock; +import org.apache.ignite.internal.tx.LockException; +import org.apache.ignite.internal.tx.LockKey; +import org.apache.ignite.internal.tx.LockManager; +import org.apache.ignite.internal.tx.LockMode; +import org.apache.ignite.internal.tx.Waiter; +import org.apache.ignite.internal.tx.event.LockEvent; +import org.apache.ignite.internal.tx.event.LockEventParameters; +import org.apache.ignite.internal.util.CollectionUtils; +import org.apache.ignite.internal.util.IgniteStripedReadWriteLock; +import org.jetbrains.annotations.Nullable; +import org.jetbrains.annotations.TestOnly; + +/** + * A {@link LockManager} implementation which stores lock queues in the heap. + * + *

Lock waiters are placed in the queue, ordered according to comparator provided by {@link HeapLockManagerV2#deadlockPreventionPolicy}. + * When a new waiter is placed in the queue, it's validated against current lock owner: if there is an owner with a higher priority (as + * defined by comparator) lock request is denied. + * + *

Read lock can be upgraded to write lock (only available for the lowest read-locked entry of + * the queue). + * + *

Additionally limits the lock map size. + */ +public class HeapLockManagerV2 extends AbstractEventProducer implements LockManager { + /** Table size. */ + public static final int DEFAULT_SLOTS = 1_048_576; + + private static final String LOCK_MAP_SIZE_PROPERTY_NAME = "lockMapSize"; + + private static final String RAW_SLOTS_MAX_SIZE_PROPERTY_NAME = "rawSlotsMaxSize"; + + /** Striped lock concurrency. */ + private static final int CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); + + private final AtomicInteger lockTableSize = new AtomicInteger(); + + /** An unused state to avoid concurrent allocation. */ + private LockState removedLockState; + + /** Lock map size. */ + private final int lockMapSize; + + /** Raw slots size. */ + private final int rawSlotsMaxSize; + + /** Mapped slots. */ + private ConcurrentHashMap locks; + + /** The policy. */ + private DeadlockPreventionPolicy deadlockPreventionPolicy; + + /** Executor that is used to fail waiters after timeout. */ + private Executor delayedExecutor; + + /** Enlisted transactions. */ + private final ConcurrentHashMap> txMap = new ConcurrentHashMap<>(1024); + + /** Coarse locks. */ + private final ConcurrentHashMap coarseMap = new ConcurrentHashMap<>(); + + /** + * Creates an instance of {@link HeapLockManagerV2} with a few slots eligible for tests which don't stress the lock manager too much. + * Such a small instance is started way faster than a full-blown production ready instance with a lot of slots. + */ + @TestOnly + public static HeapLockManagerV2 smallInstance() { + return new HeapLockManagerV2(1024, 1024); + } + + /** Constructor. */ + public HeapLockManagerV2(SystemLocalConfiguration systemProperties) { + this( + intProperty(systemProperties, RAW_SLOTS_MAX_SIZE_PROPERTY_NAME, DEFAULT_SLOTS), + intProperty(systemProperties, LOCK_MAP_SIZE_PROPERTY_NAME, DEFAULT_SLOTS) + ); + } + + /** + * Constructor. + * + * @param rawSlotsMaxSize Raw slots size. + * @param lockMapSize Lock map size. + */ + public HeapLockManagerV2(int rawSlotsMaxSize, int lockMapSize) { + if (lockMapSize > rawSlotsMaxSize) { + throw new IllegalArgumentException("maxSize=" + rawSlotsMaxSize + " < mapSize=" + lockMapSize); + } + + this.rawSlotsMaxSize = rawSlotsMaxSize; + this.lockMapSize = lockMapSize; + } + + private static int intProperty(SystemLocalConfiguration systemProperties, String name, int defaultValue) { + SystemPropertyView property = systemProperties.properties().value().get(name); + + return property == null ? defaultValue : Integer.parseInt(property.propertyValue()); + } + + @Override + public void start(DeadlockPreventionPolicy deadlockPreventionPolicy) { + this.deadlockPreventionPolicy = deadlockPreventionPolicy; + this.removedLockState = new LockState(); + + this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0 + ? CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), TimeUnit.MILLISECONDS) + : null; + + locks = new ConcurrentHashMap<>(lockMapSize); + } + + @Override + public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lockMode) { + if (lockKey.contextId() == null) { // Treat this lock as a hierarchy(coarse) lock. + CoarseLockState state = coarseMap.computeIfAbsent(lockKey, key -> new CoarseLockState(lockKey)); + + return state.acquire(txId, lockMode); + } + + while (true) { + LockState state = acquireLockState(lockKey); + + if (state == null) { + return failedFuture(new LockException( + ACQUIRE_LOCK_ERR, + "Failed to acquire a lock due to lock table overflow [txId=" + txId + ", limit=" + rawSlotsMaxSize + ']' + )); + } + + IgniteBiTuple, LockMode> futureTuple = state.tryAcquire(txId, lockMode); + + if (futureTuple.get1() == null) { + continue; // State is marked for remove, need retry. + } + + LockMode newLockMode = futureTuple.get2(); + + return futureTuple.get1().thenApply(res -> new Lock(lockKey, newLockMode, txId)); + } + } + + @Override + @TestOnly + public void release(Lock lock) { + if (lock.lockKey().contextId() == null) { + CoarseLockState lockState2 = coarseMap.get(lock.lockKey()); + if (lockState2 != null) { + lockState2.release(lock); + } + return; + } + + LockState state = lockState(lock.lockKey()); + + if (state.tryRelease(lock.txId())) { + locks.compute(lock.lockKey(), (k, v) -> adjustLockState(state, v)); + } + } + + @Override + public void release(UUID txId, LockKey lockKey, LockMode lockMode) { + if (lockKey.contextId() == null) { + throw new IllegalArgumentException("Coarse locks don't support downgrading"); + } + + LockState state = lockState(lockKey); + + if (state.tryRelease(txId, lockMode)) { + locks.compute(lockKey, (k, v) -> adjustLockState(state, v)); + } + } + + @Override + public void releaseAll(UUID txId) { + ConcurrentLinkedQueue states = this.txMap.remove(txId); + + if (states != null) { + // Default size corresponds to average number of entities used by transaction. Estimate it to 5. + List delayed = new ArrayList<>(4); + for (Releasable state : states) { + if (state.coarse()) { + delayed.add(state); // Delay release. + continue; + } + + if (state.tryRelease(txId)) { + LockKey key = state.key(); // State may be already invalidated. + if (key != null) { + locks.compute(key, (k, v) -> adjustLockState((LockState) state, v)); + } + } + } + + // Unlock coarse locks after all. + for (Releasable state : delayed) { + state.tryRelease(txId); + } + } + } + + @Override + public Iterator locks() { + return txMap.entrySet().stream() + .flatMap(e -> collectLocksFromStates(e.getKey(), e.getValue()).stream()) + .iterator(); + } + + @Override + public Iterator locks(UUID txId) { + ConcurrentLinkedQueue lockStates = txMap.get(txId); + + return collectLocksFromStates(txId, lockStates).iterator(); + } + + /** + * Gets a lock state. Use this method where sure this lock state was already acquired. + * + * @param key A lock key. + * @return A state matched with the key or unused state. + */ + private LockState lockState(LockKey key) { + return locks.getOrDefault(key, removedLockState); + } + + /** + * Returns the lock state for the key. + * + * @param key The key. + * @return A state matched with the key. + */ + private @Nullable LockState acquireLockState(LockKey key) { + return locks.computeIfAbsent(key, (k) -> { + int acquiredLocks; + LockState v; + + do { + acquiredLocks = lockTableSize.get(); + + if (acquiredLocks < rawSlotsMaxSize) { + v = new LockState(); + v.key = k; + } else { + return null; + } + } while (!lockTableSize.compareAndSet(acquiredLocks, acquiredLocks + 1)); + + return v; + }); + } + + /** {@inheritDoc} */ + @Override + public Collection queue(LockKey key) { + return lockState(key).queue(); + } + + /** {@inheritDoc} */ + @Override + public Waiter waiter(LockKey key, UUID txId) { + return lockState(key).waiter(txId); + } + + /** {@inheritDoc} */ + @Override + public boolean isEmpty() { + if (lockTableSize.get() != 0) { + return false; + } + + for (CoarseLockState value : coarseMap.values()) { + if (!value.slockOwners.isEmpty()) { + return false; + } + + if (!value.ixlockOwners.isEmpty()) { + return false; + } + } + + return true; + } + + @Nullable + private LockState adjustLockState(LockState state, LockState v) { + // Mapping may already change. + if (v != state) { + return v; + } + + synchronized (v.waiters) { + if (v.waiters.isEmpty()) { + int locks = lockTableSize.decrementAndGet(); + + assert locks >= 0; + + return null; + } else { + return v; + } + } + } + + private void track(UUID txId, Releasable val) { + txMap.compute(txId, (k, v) -> { + if (v == null) { + v = new ConcurrentLinkedQueue<>(); + } + + v.add(val); + + return v; + }); + } + + private static List collectLocksFromStates(UUID txId, ConcurrentLinkedQueue lockStates) { + List result = new ArrayList<>(); + + if (lockStates != null) { + for (Releasable lockState : lockStates) { + Lock lock = lockState.lock(txId); + if (lock != null) { + result.add(lock); + } + } + } + + return result; + } + + /** + * Create lock exception with given parameters. + * + * @param locker Locker. + * @param holder Lock holder. + * @return Lock exception. + */ + private static LockException lockException(UUID locker, UUID holder) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire a lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); + } + + /** + * Create lock exception when lock holder is believed to be missing. + * + * @param locker Locker. + * @param holder Lock holder. + * @return Lock exception. + */ + private static LockException abandonedLockException(UUID locker, UUID holder) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire an abandoned lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); + } + + /** + * Create coarse lock exception. + * + * @param locker Locker. + * @param holder Lock holder. + * @param abandoned If locker is abandoned. + * @return Lock exception. + */ + private static LockException coarseLockException(UUID locker, UUID holder, boolean abandoned) { + return new LockException(ACQUIRE_LOCK_ERR, + "Failed to acquire the intention table lock due to a conflict [locker=" + locker + ", holder=" + holder + ", abandoned=" + + abandoned + ']'); + } + + /** + * Common interface for releasing transaction locks. + */ + interface Releasable { + /** + * Tries to release a lock. + * + * @param txId Tx id. + * @return {@code True} if lock state requires cleanup after release. + */ + boolean tryRelease(UUID txId); + + /** + * Gets associated lock key. + * + * @return Lock key. + */ + LockKey key(); + + /** + * Returns the lock which is requested by given tx. + * + * @param txId Tx id. + * @return The lock or null if no lock exist. + */ + @Nullable Lock lock(UUID txId); + + /** + * Returns lock type. + * + * @return The type. + */ + boolean coarse(); + } + + /** + * Coarse lock. + */ + public class CoarseLockState implements Releasable { + private final IgniteStripedReadWriteLock stripedLock = new IgniteStripedReadWriteLock(CONCURRENCY); + private final ConcurrentHashMap ixlockOwners = new ConcurrentHashMap<>(); + private final Map>> slockWaiters = new HashMap<>(); + private final ConcurrentHashMap slockOwners = new ConcurrentHashMap<>(); + private final LockKey lockKey; + private final Comparator txComparator; + + CoarseLockState(LockKey lockKey) { + this.lockKey = lockKey; + txComparator = + deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo; + } + + @Override + public boolean tryRelease(UUID txId) { + Lock lock = lock(txId); + + release(lock); + + return false; + } + + @Override + public LockKey key() { + return lockKey; + } + + @Override + public Lock lock(UUID txId) { + Lock lock = ixlockOwners.get(txId); + + if (lock != null) { + return lock; + } + + int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY); + + stripedLock.readLock(idx).lock(); + + try { + lock = slockOwners.get(txId); + + if (lock != null) { + return lock; + } + + IgniteBiTuple> tuple = slockWaiters.get(txId); + + if (tuple != null) { + return tuple.get1(); + } + } finally { + stripedLock.readLock(idx).unlock(); + } + + return null; + } + + @Override + public boolean coarse() { + return true; + } + + /** + * Acquires a lock. + * + * @param txId Tx id. + * @param lockMode Lock mode. + * @return The future. + */ + public CompletableFuture acquire(UUID txId, LockMode lockMode) { + switch (lockMode) { + case S: + stripedLock.writeLock().lock(); + + try { + // IX-locks can't be modified under the striped write lock. + if (!ixlockOwners.isEmpty()) { + if (ixlockOwners.containsKey(txId)) { + if (ixlockOwners.size() == 1) { + // Safe to upgrade. + track(txId, this); // Double track. + Lock lock = new Lock(lockKey, lockMode, txId); + slockOwners.putIfAbsent(txId, lock); + return completedFuture(lock); + } else { + // Attempt to upgrade to SIX in the presence of concurrent transactions. Deny lock attempt. + for (Lock lock : ixlockOwners.values()) { + if (!lock.txId().equals(txId)) { + return notifyAndFail(txId, lock.txId()); + } + } + } + + assert false : "Should not reach here"; + } + + // Validate reordering with IX locks if prevention is enabled. + if (deadlockPreventionPolicy.usePriority()) { + for (Lock lock : ixlockOwners.values()) { + // Allow only high priority transactions to wait. + if (txComparator.compare(lock.txId(), txId) < 0) { + return notifyAndFail(txId, lock.txId()); + } + } + } + + track(txId, this); + + CompletableFuture fut = new CompletableFuture<>(); + IgniteBiTuple> prev = slockWaiters.putIfAbsent(txId, + new IgniteBiTuple<>(new Lock(lockKey, lockMode, txId), fut)); + return prev == null ? fut : prev.get2(); + } else { + Lock lock = new Lock(lockKey, lockMode, txId); + Lock prev = slockOwners.putIfAbsent(txId, lock); + + if (prev == null) { + track(txId, this); // Do not track on reenter. + } + + return completedFuture(lock); + } + } finally { + stripedLock.writeLock().unlock(); + } + + case IX: + int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY); + + stripedLock.readLock(idx).lock(); + + try { + // S-locks can't be modified under the striped read lock. + if (!slockOwners.isEmpty()) { + if (slockOwners.containsKey(txId)) { + if (slockOwners.size() == 1) { + // Safe to upgrade. + track(txId, this); // Double track. + Lock lock = new Lock(lockKey, lockMode, txId); + ixlockOwners.putIfAbsent(txId, lock); + return completedFuture(lock); + } else { + // Attempt to upgrade to SIX in the presence of concurrent transactions. Deny lock attempt. + for (Lock lock : slockOwners.values()) { + if (!lock.txId().equals(txId)) { + return notifyAndFail(txId, lock.txId()); + } + } + } + + assert false : "Should not reach here"; + } + + // IX locks never allowed to wait. + UUID holderTx = slockOwners.keySet().iterator().next(); + return notifyAndFail(txId, holderTx); + } else { + Lock lock = new Lock(lockKey, lockMode, txId); + Lock prev = ixlockOwners.putIfAbsent(txId, lock); // Avoid overwrite existing lock. + + if (prev == null) { + track(txId, this); // Do not track on reenter. + } + + return completedFuture(lock); + } + } finally { + stripedLock.readLock(idx).unlock(); + } + + default: + assert false : "Unsupported coarse lock mode: " + lockMode; + + return null; // Should not be here. + } + } + + private Set allLockHolderTxs() { + return CollectionUtils.union(ixlockOwners.keySet(), slockOwners.keySet()); + } + + /** + * Triggers event and fails. + * + * @param txId Tx id. + * @param conflictedHolderId Holder tx id. + * @return Failed future. + */ + CompletableFuture notifyAndFail(UUID txId, UUID conflictedHolderId) { + CompletableFuture res = fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, allLockHolderTxs())); + // TODO: https://issues.apache.org/jira/browse/IGNITE-21153 + return failedFuture(coarseLockException(txId, conflictedHolderId, res.isCompletedExceptionally())); + } + + /** + * Releases the lock. Should be called from {@link #releaseAll(UUID)}. + * + * @param lock The lock. + */ + public void release(@Nullable Lock lock) { + if (lock == null) { + return; + } + + switch (lock.lockMode()) { + case S: + IgniteBiTuple> waiter = null; + + stripedLock.writeLock().lock(); + + try { + Lock removed = slockOwners.remove(lock.txId()); + + if (removed == null) { + waiter = slockWaiters.remove(lock.txId()); + + if (waiter != null) { + removed = waiter.get1(); + } + } + + assert removed != null : "Attempt to release not requested lock: " + lock.txId(); + } finally { + stripedLock.writeLock().unlock(); + } + + if (waiter != null) { + waiter.get2().complete(waiter.get1()); + } + + break; + case IX: + int idx = Math.floorMod(spread(lock.txId().hashCode()), CONCURRENCY); + + Map>> wakeups; + + stripedLock.readLock(idx).lock(); + + try { + var removed = ixlockOwners.remove(lock.txId()); + + assert removed != null : "Attempt to release not acquired lock: " + lock.txId(); + + if (slockWaiters.isEmpty()) { + return; // Nothing to do. + } + + if (!ixlockOwners.isEmpty()) { + assert slockOwners.isEmpty() || slockOwners.containsKey(lock.txId()); + + return; // Nothing to do. + } + + // No race here because no new locks can be acquired after releaseAll due to 2-phase locking protocol. + + // Promote waiters to owners. + wakeups = new HashMap<>(slockWaiters); + + slockWaiters.clear(); + + for (IgniteBiTuple> value : wakeups.values()) { + slockOwners.put(value.getKey().txId(), value.getKey()); + } + } finally { + stripedLock.readLock(idx).unlock(); + } + + for (Entry>> entry : wakeups.entrySet()) { + entry.getValue().get2().complete(entry.getValue().get1()); + } + + break; + default: + assert false : "Unsupported coarse unlock mode: " + lock.lockMode(); + } + } + } + + /** + * Key lock. + */ + public class LockState implements Releasable { + /** Waiters. */ + private final TreeMap waiters; + + /** Lock key. */ + private volatile LockKey key; + + LockState() { + Comparator txComparator = + deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo; + + this.waiters = new TreeMap<>(txComparator); + } + + /** + * Checks if lock state is used to hold a lock. + * + * @return True if the state is used, otherwise false. + */ + boolean isUsed() { + return key != null; + } + + @Override + public LockKey key() { + return key; + } + + @Override + public Lock lock(UUID txId) { + Waiter waiter = waiters.get(txId); + + if (waiter != null) { + return new Lock(key, waiter.lockMode(), txId); + } + + return null; + } + + @Override + public boolean coarse() { + return false; + } + + /** + * Attempts to acquire a lock for the specified {@code key} in specified lock mode. + * + * @param txId Transaction id. + * @param lockMode Lock mode. + * @return The future or null if state is marked for removal and acquired lock mode. + */ + @Nullable IgniteBiTuple, LockMode> tryAcquire(UUID txId, LockMode lockMode) { + WaiterImpl waiter = new WaiterImpl(txId, lockMode); + + synchronized (waiters) { + if (!isUsed()) { + return new IgniteBiTuple(null, lockMode); + } + + // We always replace the previous waiter with the new one. If the previous waiter has lock intention then incomplete + // lock future is copied to the new waiter. This guarantees that, if the previous waiter was locked concurrently, then + // it doesn't have any lock intentions, and the future is not copied to the new waiter. Otherwise, if there is lock + // intention, this means that the lock future contained in previous waiter, is not going to be completed and can be + // copied safely. + WaiterImpl prev = waiters.put(txId, waiter); + + // Reenter + if (prev != null) { + if (prev.locked() && prev.lockMode().allowReenter(lockMode)) { + waiter.lock(); + + waiter.upgrade(prev); + + return new IgniteBiTuple(nullCompletedFuture(), prev.lockMode()); + } else { + waiter.upgrade(prev); + + assert prev.lockMode() == waiter.lockMode() : + "Lock modes are incorrect [prev=" + prev.lockMode() + ", new=" + waiter.lockMode() + ']'; + } + } + + if (!isWaiterReadyToNotify(waiter, false)) { + if (deadlockPreventionPolicy.waitTimeout() > 0) { + setWaiterTimeout(waiter); + } + + // Put to wait queue, track. + if (prev == null) { + track(waiter.txId, this); + } + + return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); + } + + if (!waiter.locked()) { + waiters.remove(waiter.txId()); + } else if (waiter.hasLockIntent()) { + waiter.refuseIntent(); // Restore old lock. + } else { + // Lock granted, track. + if (prev == null) { + track(waiter.txId, this); + } + } + } + + // Notify outside the monitor. + waiter.notifyLocked(); + + return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); + } + + /** + * Returns waiters count. + * + * @return waiters count. + */ + public int waitersCount() { + synchronized (waiters) { + return waiters.size(); + } + } + + /** + * Checks current waiter. It can change the internal state of the waiter. + * + * @param waiter Checked waiter. + * @return True if current waiter ready to notify, false otherwise. + */ + private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { + for (Entry entry : waiters.tailMap(waiter.txId(), false).entrySet()) { + WaiterImpl tmp = entry.getValue(); + LockMode mode = tmp.lockMode; + + if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { + if (conflictFound(waiter.txId())) { + waiter.fail(abandonedLockException(waiter.txId, tmp.txId)); + + return true; + } else if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { + waiter.fail(lockException(waiter.txId, tmp.txId)); + + return true; + } + + return false; + } + } + + for (Entry entry : waiters.headMap(waiter.txId()).entrySet()) { + WaiterImpl tmp = entry.getValue(); + LockMode mode = tmp.lockMode; + + if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { + if (skipFail) { + return false; + } else if (conflictFound(waiter.txId())) { + waiter.fail(abandonedLockException(waiter.txId, tmp.txId)); + + return true; + } else if (deadlockPreventionPolicy.waitTimeout() == 0) { + waiter.fail(lockException(waiter.txId, tmp.txId)); + + return true; + } else { + return false; + } + } + } + + waiter.lock(); + + return true; + } + + /** + * Attempts to release a lock for the specified {@code key} in exclusive mode. + * + * @param txId Transaction id. + * @return {@code True} if the queue is empty. + */ + @Override + public boolean tryRelease(UUID txId) { + Collection toNotify; + + synchronized (waiters) { + toNotify = release(txId); + } + + // Notify outside the monitor. + for (WaiterImpl waiter : toNotify) { + waiter.notifyLocked(); + } + + return key != null && waitersCount() == 0; + } + + /** + * Releases a specific lock of the key, if a key is locked in multiple modes by the same locker. + * + * @param txId Transaction id. + * @param lockMode Lock mode. + * @return If the value is true, no one waits of any lock of the key, false otherwise. + */ + boolean tryRelease(UUID txId, LockMode lockMode) { + List toNotify = emptyList(); + synchronized (waiters) { + WaiterImpl waiter = waiters.get(txId); + + if (waiter != null) { + assert LockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() : + "The lock is not locked in specified mode [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']'; + + LockMode modeFromDowngrade = waiter.recalculateMode(lockMode); + + if (!waiter.locked() && !waiter.hasLockIntent()) { + toNotify = release(txId); + } else if (modeFromDowngrade != waiter.lockMode()) { + toNotify = unlockCompatibleWaiters(); + } + } + } + + // Notify outside the monitor. + for (WaiterImpl waiter : toNotify) { + waiter.notifyLocked(); + } + + return key != null && waitersCount() == 0; + } + + /** + * Releases all locks are held by a specific transaction. This method should be invoked synchronously. + * + * @param txId Transaction id. + * @return List of waiters to notify. + */ + private List release(UUID txId) { + waiters.remove(txId); + + if (waiters.isEmpty()) { + return emptyList(); + } + + return unlockCompatibleWaiters(); + } + + /** + * Unlock compatible waiters. + * + * @return List of waiters to notify. + */ + private List unlockCompatibleWaiters() { + if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { + return emptyList(); + } + + ArrayList toNotify = new ArrayList<>(); + Set toFail = new HashSet<>(); + + for (Entry entry : waiters.entrySet()) { + WaiterImpl tmp = entry.getValue(); + + if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) { + assert !tmp.hasLockIntent() : "This waiter in not locked for notification [waiter=" + tmp + ']'; + + toNotify.add(tmp); + } + } + + if (deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() >= 0) { + for (Entry entry : waiters.entrySet()) { + WaiterImpl tmp = entry.getValue(); + + if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, false)) { + assert tmp.hasLockIntent() : "Only failed waiter can be notified here [waiter=" + tmp + ']'; + + toNotify.add(tmp); + toFail.add(tmp.txId()); + } + } + + for (UUID failTx : toFail) { + var w = waiters.get(failTx); + + if (w.locked()) { + w.refuseIntent(); + } else { + waiters.remove(failTx); + } + } + } + + return toNotify; + } + + /** + * Makes the waiter fail after specified timeout (in milliseconds), if intended lock was not acquired within this timeout. + * + * @param waiter Waiter. + */ + private void setWaiterTimeout(WaiterImpl waiter) { + delayedExecutor.execute(() -> { + if (!waiter.fut.isDone()) { + waiter.fut.completeExceptionally(new LockException(ACQUIRE_LOCK_TIMEOUT_ERR, "Failed to acquire a lock due to " + + "timeout [txId=" + waiter.txId() + ", waiter=" + waiter + + ", timeout=" + deadlockPreventionPolicy.waitTimeout() + ']')); + } + }); + } + + /** + * Returns a collection of timestamps that is associated with the specified {@code key}. + * + * @return The waiters queue. + */ + public Collection queue() { + synchronized (waiters) { + return new ArrayList<>(waiters.keySet()); + } + } + + /** + * Returns a waiter for the specified {@code key}. + * + * @param txId Transaction id. + * @return The waiter. + */ + public Waiter waiter(UUID txId) { + synchronized (waiters) { + return waiters.get(txId); + } + } + + /** + * Notifies about the lock conflict found between transactions. + * + * @param acquirerTx Transaction which tries to acquire the lock. + * @return True if the conflict connected with an abandoned transaction, false in the other case. + */ + private boolean conflictFound(UUID acquirerTx) { + CompletableFuture eventResult = fireEvent(LOCK_CONFLICT, new LockEventParameters(acquirerTx, allLockHolderTxs())); + // No async handling is expected. + // TODO: https://issues.apache.org/jira/browse/IGNITE-21153 + assert eventResult.isDone() : "Async lock conflict handling is not supported"; + + return eventResult.isCompletedExceptionally(); + } + + private Set allLockHolderTxs() { + return waiters.keySet(); + } + } + + /** + * A waiter implementation. + */ + private static class WaiterImpl implements Comparable, Waiter { + /** + * Holding locks by type. + */ + private final Map locks = new EnumMap<>(LockMode.class); + + /** + * Lock modes are marked as intended, but have not taken yet. This is NOT specific to intention lock modes, such as IS and IX. + */ + private final Set intendedLocks = EnumSet.noneOf(LockMode.class); + + /** Locked future. */ + @IgniteToStringExclude + private CompletableFuture fut; + + /** Waiter transaction id. */ + private final UUID txId; + + /** The lock mode to intend to hold. This is NOT specific to intention lock modes, such as IS and IX. */ + private LockMode intendedLockMode; + + /** The lock mode. */ + private LockMode lockMode; + + /** + * The filed has a value when the waiter couldn't lock a key. + */ + private LockException ex; + + /** + * The constructor. + * + * @param txId Transaction id. + * @param lockMode Lock mode. + */ + WaiterImpl(UUID txId, LockMode lockMode) { + this.fut = new CompletableFuture<>(); + this.txId = txId; + this.intendedLockMode = lockMode; + + locks.put(lockMode, 1); + intendedLocks.add(lockMode); + } + + /** + * Adds a lock mode. + * + * @param lockMode Lock mode. + * @param increment Value to increment amount. + */ + void addLock(LockMode lockMode, int increment) { + locks.merge(lockMode, increment, Integer::sum); + } + + /** + * Removes a lock mode. + * + * @param lockMode Lock mode. + * @return True if the lock is not locked in the passed mode, false otherwise. + */ + private boolean removeLock(LockMode lockMode) { + Integer counter = locks.get(lockMode); + + if (counter == null || counter < 2) { + locks.remove(lockMode); + + return true; + } else { + locks.put(lockMode, counter - 1); + + return false; + } + } + + /** + * Recalculates lock mode based of all locks which the waiter has taken. + * + * @param modeToRemove Mode without which, the recalculation will happen. + * @return Previous lock mode. + */ + LockMode recalculateMode(LockMode modeToRemove) { + if (!removeLock(modeToRemove)) { + return lockMode; + } + + return recalculate(); + } + + /** + * Recalculates lock supremums. + * + * @return Previous lock mode. + */ + private LockMode recalculate() { + LockMode newIntendedLockMode = null; + LockMode newLockMode = null; + + for (LockMode mode : locks.keySet()) { + assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" + txId + ", mode=" + mode + "]"; + + if (intendedLocks.contains(mode)) { + newIntendedLockMode = newIntendedLockMode == null ? mode : LockMode.supremum(newIntendedLockMode, mode); + } else { + newLockMode = newLockMode == null ? mode : LockMode.supremum(newLockMode, mode); + } + } + + LockMode mode = lockMode; + + lockMode = newLockMode; + intendedLockMode = newLockMode != null && newIntendedLockMode != null ? LockMode.supremum(newLockMode, newIntendedLockMode) + : newIntendedLockMode; + + return mode; + } + + /** + * Merge all locks that were held by another waiter to the current one. + * + * @param other Other waiter. + */ + void upgrade(WaiterImpl other) { + intendedLocks.addAll(other.intendedLocks); + + other.locks.entrySet().forEach(entry -> addLock(entry.getKey(), entry.getValue())); + + recalculate(); + + if (other.hasLockIntent()) { + fut = other.fut; + } + } + + /** + * Removes all locks that were intended to hold. + */ + void refuseIntent() { + for (LockMode mode : intendedLocks) { + locks.remove(mode); + } + + intendedLocks.clear(); + intendedLockMode = null; + } + + /** {@inheritDoc} */ + @Override + public int compareTo(WaiterImpl o) { + return txId.compareTo(o.txId); + } + + /** Notifies a future listeners. */ + private void notifyLocked() { + if (ex != null) { + fut.completeExceptionally(ex); + } else { + assert lockMode != null; + + // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-20985 + fut.complete(null); + } + } + + /** {@inheritDoc} */ + @Override + public boolean locked() { + return this.lockMode != null; + } + + /** + * Checks is the waiter has any intended to lock a key. + * + * @return True if the waiter has an intended lock, false otherwise. + */ + public boolean hasLockIntent() { + return this.intendedLockMode != null; + } + + /** {@inheritDoc} */ + @Override + public LockMode lockMode() { + return lockMode; + } + + /** {@inheritDoc} */ + @Override + public LockMode intendedLockMode() { + return intendedLockMode; + } + + /** Grant a lock. */ + private void lock() { + lockMode = intendedLockMode; + + intendedLockMode = null; + + intendedLocks.clear(); + } + + /** + * Fails the lock waiter. + * + * @param e Lock exception. + */ + private void fail(LockException e) { + ex = e; + } + + /** {@inheritDoc} */ + @Override + public UUID txId() { + return txId; + } + + /** {@inheritDoc} */ + @Override + public boolean equals(Object o) { + if (!(o instanceof WaiterImpl)) { + return false; + } + + return compareTo((WaiterImpl) o) == 0; + } + + /** {@inheritDoc} */ + @Override + public int hashCode() { + return txId.hashCode(); + } + + /** {@inheritDoc} */ + @Override + public String toString() { + return S.toString(WaiterImpl.class, this, "granted", fut.isDone() && !fut.isCompletedExceptionally()); + } + } + + private static int spread(int h) { + return (h ^ (h >>> 16)) & 0x7fffffff; + } + + @TestOnly + public LockState[] getSlots() { + return locks.values().toArray(new LockState[]{}); + } + + public int available() { + return rawSlotsMaxSize - lockTableSize.get(); + } +} From aea40b3ca3b928b1f4bb19b1891c2c5d31800110 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 16:51:09 +0300 Subject: [PATCH 05/10] Removed cache form the lock manager implementation. --- .../ignite/internal/app/IgniteImpl.java | 14 +- .../internal/tx/impl/HeapLockManager.java | 61 +- .../internal/tx/impl/HeapLockManagerV2.java | 1351 ----------------- 3 files changed, 28 insertions(+), 1398 deletions(-) delete mode 100644 modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index d3b1e00a1db..338fd5dae42 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -148,7 +148,6 @@ import org.apache.ignite.internal.index.IndexManager; import org.apache.ignite.internal.index.IndexNodeFinishedRwTransactionsChecker; import org.apache.ignite.internal.lang.IgniteInternalException; -import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; @@ -263,7 +262,6 @@ import org.apache.ignite.internal.tx.configuration.TransactionConfiguration; import org.apache.ignite.internal.tx.configuration.TransactionExtensionConfiguration; import org.apache.ignite.internal.tx.impl.HeapLockManager; -import org.apache.ignite.internal.tx.impl.HeapLockManagerV2; import org.apache.ignite.internal.tx.impl.IgniteTransactionsImpl; import org.apache.ignite.internal.tx.impl.PublicApiThreadingIgniteTransactions; import org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry; @@ -973,17 +971,7 @@ public class IgniteImpl implements Ignite { var transactionInflights = new TransactionInflights(placementDriverMgr.placementDriver(), clockService); - String useLocks = IgniteSystemProperties.getString("IGNITE_USE_LOCKS", "V1"); - - LockManager lockMgr; - - switch (useLocks) { - case "V2": - lockMgr = new HeapLockManagerV2(systemConfiguration); - break; - default: - lockMgr = new HeapLockManager(systemConfiguration); - } + LockManager lockMgr = new HeapLockManager(systemConfiguration); // TODO: IGNITE-19344 - use nodeId that is validated on join (and probably generated differently). txManager = new TxManagerImpl( diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java index 651d8a6e89c..8314feb057c 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java @@ -44,6 +44,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.ignite.internal.configuration.SystemLocalConfiguration; import org.apache.ignite.internal.configuration.SystemPropertyView; import org.apache.ignite.internal.event.AbstractEventProducer; @@ -87,6 +88,8 @@ public class HeapLockManager extends AbstractEventProducer empty = new ConcurrentLinkedQueue<>(); - /** Mapped slots. */ private ConcurrentHashMap locks; - /** Raw slots. */ - private LockState[] slots; - /** The policy. */ private DeadlockPreventionPolicy deadlockPreventionPolicy; @@ -165,17 +162,6 @@ public void start(DeadlockPreventionPolicy deadlockPreventionPolicy) { : null; locks = new ConcurrentHashMap<>(lockMapSize); - - LockState[] tmp = new LockState[rawSlotsMaxSize]; - for (int i = 0; i < tmp.length; i++) { - LockState lockState = new LockState(); - if (i < lockMapSize) { - empty.add(lockState); - } - tmp[i] = lockState; - } - - slots = tmp; // Atomic init. } @Override @@ -298,13 +284,20 @@ private LockState lockState(LockKey key) { * @return A state matched with the key. */ private @Nullable LockState acquireLockState(LockKey key) { - return locks.compute(key, (k, v) -> { - if (v == null) { - v = empty.poll(); - if (v != null) { + return locks.computeIfAbsent(key, (k) -> { + int acquiredLocks; + LockState v; + + do { + acquiredLocks = lockTableSize.get(); + + if (acquiredLocks < rawSlotsMaxSize) { + v = new LockState(); v.key = k; + } else { + return null; } - } + } while (!lockTableSize.compareAndSet(acquiredLocks, acquiredLocks + 1)); return v; }); @@ -325,10 +318,8 @@ public Waiter waiter(LockKey key, UUID txId) { /** {@inheritDoc} */ @Override public boolean isEmpty() { - for (LockState slot : slots) { - if (slot.waitersCount() != 0) { - return false; - } + if (lockTableSize.get() != 0) { + return false; } for (CoarseLockState value : coarseMap.values()) { @@ -353,8 +344,10 @@ private LockState adjustLockState(LockState state, LockState v) { synchronized (v.waiters) { if (v.waiters.isEmpty()) { - v.key = null; - empty.add(v); + int locks = lockTableSize.decrementAndGet(); + + assert locks >= 0; + return null; } else { return v; @@ -879,7 +872,7 @@ public int waitersCount() { * @return True if current waiter ready to notify, false otherwise. */ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { - for (Map.Entry entry : waiters.tailMap(waiter.txId(), false).entrySet()) { + for (Entry entry : waiters.tailMap(waiter.txId(), false).entrySet()) { WaiterImpl tmp = entry.getValue(); LockMode mode = tmp.lockMode; @@ -898,7 +891,7 @@ private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { } } - for (Map.Entry entry : waiters.headMap(waiter.txId()).entrySet()) { + for (Entry entry : waiters.headMap(waiter.txId()).entrySet()) { WaiterImpl tmp = entry.getValue(); LockMode mode = tmp.lockMode; @@ -1009,7 +1002,7 @@ private List unlockCompatibleWaiters() { ArrayList toNotify = new ArrayList<>(); Set toFail = new HashSet<>(); - for (Map.Entry entry : waiters.entrySet()) { + for (Entry entry : waiters.entrySet()) { WaiterImpl tmp = entry.getValue(); if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) { @@ -1020,7 +1013,7 @@ private List unlockCompatibleWaiters() { } if (deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() >= 0) { - for (Map.Entry entry : waiters.entrySet()) { + for (Entry entry : waiters.entrySet()) { WaiterImpl tmp = entry.getValue(); if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, false)) { @@ -1349,10 +1342,10 @@ private static int spread(int h) { @TestOnly public LockState[] getSlots() { - return slots; + return locks.values().toArray(new LockState[]{}); } public int available() { - return empty.size(); + return rawSlotsMaxSize - lockTableSize.get(); } } diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java deleted file mode 100644 index 964d40cb965..00000000000 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManagerV2.java +++ /dev/null @@ -1,1351 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.tx.impl; - -import static java.util.Collections.emptyList; -import static java.util.concurrent.CompletableFuture.completedFuture; -import static java.util.concurrent.CompletableFuture.failedFuture; -import static org.apache.ignite.internal.tx.event.LockEvent.LOCK_CONFLICT; -import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; -import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR; -import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_TIMEOUT_ERR; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.Comparator; -import java.util.EnumMap; -import java.util.EnumSet; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.TreeMap; -import java.util.UUID; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentLinkedQueue; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; -import org.apache.ignite.internal.configuration.SystemLocalConfiguration; -import org.apache.ignite.internal.configuration.SystemPropertyView; -import org.apache.ignite.internal.event.AbstractEventProducer; -import org.apache.ignite.internal.lang.IgniteBiTuple; -import org.apache.ignite.internal.tostring.IgniteToStringExclude; -import org.apache.ignite.internal.tostring.S; -import org.apache.ignite.internal.tx.DeadlockPreventionPolicy; -import org.apache.ignite.internal.tx.Lock; -import org.apache.ignite.internal.tx.LockException; -import org.apache.ignite.internal.tx.LockKey; -import org.apache.ignite.internal.tx.LockManager; -import org.apache.ignite.internal.tx.LockMode; -import org.apache.ignite.internal.tx.Waiter; -import org.apache.ignite.internal.tx.event.LockEvent; -import org.apache.ignite.internal.tx.event.LockEventParameters; -import org.apache.ignite.internal.util.CollectionUtils; -import org.apache.ignite.internal.util.IgniteStripedReadWriteLock; -import org.jetbrains.annotations.Nullable; -import org.jetbrains.annotations.TestOnly; - -/** - * A {@link LockManager} implementation which stores lock queues in the heap. - * - *

Lock waiters are placed in the queue, ordered according to comparator provided by {@link HeapLockManagerV2#deadlockPreventionPolicy}. - * When a new waiter is placed in the queue, it's validated against current lock owner: if there is an owner with a higher priority (as - * defined by comparator) lock request is denied. - * - *

Read lock can be upgraded to write lock (only available for the lowest read-locked entry of - * the queue). - * - *

Additionally limits the lock map size. - */ -public class HeapLockManagerV2 extends AbstractEventProducer implements LockManager { - /** Table size. */ - public static final int DEFAULT_SLOTS = 1_048_576; - - private static final String LOCK_MAP_SIZE_PROPERTY_NAME = "lockMapSize"; - - private static final String RAW_SLOTS_MAX_SIZE_PROPERTY_NAME = "rawSlotsMaxSize"; - - /** Striped lock concurrency. */ - private static final int CONCURRENCY = Math.max(1, Runtime.getRuntime().availableProcessors() / 2); - - private final AtomicInteger lockTableSize = new AtomicInteger(); - - /** An unused state to avoid concurrent allocation. */ - private LockState removedLockState; - - /** Lock map size. */ - private final int lockMapSize; - - /** Raw slots size. */ - private final int rawSlotsMaxSize; - - /** Mapped slots. */ - private ConcurrentHashMap locks; - - /** The policy. */ - private DeadlockPreventionPolicy deadlockPreventionPolicy; - - /** Executor that is used to fail waiters after timeout. */ - private Executor delayedExecutor; - - /** Enlisted transactions. */ - private final ConcurrentHashMap> txMap = new ConcurrentHashMap<>(1024); - - /** Coarse locks. */ - private final ConcurrentHashMap coarseMap = new ConcurrentHashMap<>(); - - /** - * Creates an instance of {@link HeapLockManagerV2} with a few slots eligible for tests which don't stress the lock manager too much. - * Such a small instance is started way faster than a full-blown production ready instance with a lot of slots. - */ - @TestOnly - public static HeapLockManagerV2 smallInstance() { - return new HeapLockManagerV2(1024, 1024); - } - - /** Constructor. */ - public HeapLockManagerV2(SystemLocalConfiguration systemProperties) { - this( - intProperty(systemProperties, RAW_SLOTS_MAX_SIZE_PROPERTY_NAME, DEFAULT_SLOTS), - intProperty(systemProperties, LOCK_MAP_SIZE_PROPERTY_NAME, DEFAULT_SLOTS) - ); - } - - /** - * Constructor. - * - * @param rawSlotsMaxSize Raw slots size. - * @param lockMapSize Lock map size. - */ - public HeapLockManagerV2(int rawSlotsMaxSize, int lockMapSize) { - if (lockMapSize > rawSlotsMaxSize) { - throw new IllegalArgumentException("maxSize=" + rawSlotsMaxSize + " < mapSize=" + lockMapSize); - } - - this.rawSlotsMaxSize = rawSlotsMaxSize; - this.lockMapSize = lockMapSize; - } - - private static int intProperty(SystemLocalConfiguration systemProperties, String name, int defaultValue) { - SystemPropertyView property = systemProperties.properties().value().get(name); - - return property == null ? defaultValue : Integer.parseInt(property.propertyValue()); - } - - @Override - public void start(DeadlockPreventionPolicy deadlockPreventionPolicy) { - this.deadlockPreventionPolicy = deadlockPreventionPolicy; - this.removedLockState = new LockState(); - - this.delayedExecutor = deadlockPreventionPolicy.waitTimeout() > 0 - ? CompletableFuture.delayedExecutor(deadlockPreventionPolicy.waitTimeout(), TimeUnit.MILLISECONDS) - : null; - - locks = new ConcurrentHashMap<>(lockMapSize); - } - - @Override - public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lockMode) { - if (lockKey.contextId() == null) { // Treat this lock as a hierarchy(coarse) lock. - CoarseLockState state = coarseMap.computeIfAbsent(lockKey, key -> new CoarseLockState(lockKey)); - - return state.acquire(txId, lockMode); - } - - while (true) { - LockState state = acquireLockState(lockKey); - - if (state == null) { - return failedFuture(new LockException( - ACQUIRE_LOCK_ERR, - "Failed to acquire a lock due to lock table overflow [txId=" + txId + ", limit=" + rawSlotsMaxSize + ']' - )); - } - - IgniteBiTuple, LockMode> futureTuple = state.tryAcquire(txId, lockMode); - - if (futureTuple.get1() == null) { - continue; // State is marked for remove, need retry. - } - - LockMode newLockMode = futureTuple.get2(); - - return futureTuple.get1().thenApply(res -> new Lock(lockKey, newLockMode, txId)); - } - } - - @Override - @TestOnly - public void release(Lock lock) { - if (lock.lockKey().contextId() == null) { - CoarseLockState lockState2 = coarseMap.get(lock.lockKey()); - if (lockState2 != null) { - lockState2.release(lock); - } - return; - } - - LockState state = lockState(lock.lockKey()); - - if (state.tryRelease(lock.txId())) { - locks.compute(lock.lockKey(), (k, v) -> adjustLockState(state, v)); - } - } - - @Override - public void release(UUID txId, LockKey lockKey, LockMode lockMode) { - if (lockKey.contextId() == null) { - throw new IllegalArgumentException("Coarse locks don't support downgrading"); - } - - LockState state = lockState(lockKey); - - if (state.tryRelease(txId, lockMode)) { - locks.compute(lockKey, (k, v) -> adjustLockState(state, v)); - } - } - - @Override - public void releaseAll(UUID txId) { - ConcurrentLinkedQueue states = this.txMap.remove(txId); - - if (states != null) { - // Default size corresponds to average number of entities used by transaction. Estimate it to 5. - List delayed = new ArrayList<>(4); - for (Releasable state : states) { - if (state.coarse()) { - delayed.add(state); // Delay release. - continue; - } - - if (state.tryRelease(txId)) { - LockKey key = state.key(); // State may be already invalidated. - if (key != null) { - locks.compute(key, (k, v) -> adjustLockState((LockState) state, v)); - } - } - } - - // Unlock coarse locks after all. - for (Releasable state : delayed) { - state.tryRelease(txId); - } - } - } - - @Override - public Iterator locks() { - return txMap.entrySet().stream() - .flatMap(e -> collectLocksFromStates(e.getKey(), e.getValue()).stream()) - .iterator(); - } - - @Override - public Iterator locks(UUID txId) { - ConcurrentLinkedQueue lockStates = txMap.get(txId); - - return collectLocksFromStates(txId, lockStates).iterator(); - } - - /** - * Gets a lock state. Use this method where sure this lock state was already acquired. - * - * @param key A lock key. - * @return A state matched with the key or unused state. - */ - private LockState lockState(LockKey key) { - return locks.getOrDefault(key, removedLockState); - } - - /** - * Returns the lock state for the key. - * - * @param key The key. - * @return A state matched with the key. - */ - private @Nullable LockState acquireLockState(LockKey key) { - return locks.computeIfAbsent(key, (k) -> { - int acquiredLocks; - LockState v; - - do { - acquiredLocks = lockTableSize.get(); - - if (acquiredLocks < rawSlotsMaxSize) { - v = new LockState(); - v.key = k; - } else { - return null; - } - } while (!lockTableSize.compareAndSet(acquiredLocks, acquiredLocks + 1)); - - return v; - }); - } - - /** {@inheritDoc} */ - @Override - public Collection queue(LockKey key) { - return lockState(key).queue(); - } - - /** {@inheritDoc} */ - @Override - public Waiter waiter(LockKey key, UUID txId) { - return lockState(key).waiter(txId); - } - - /** {@inheritDoc} */ - @Override - public boolean isEmpty() { - if (lockTableSize.get() != 0) { - return false; - } - - for (CoarseLockState value : coarseMap.values()) { - if (!value.slockOwners.isEmpty()) { - return false; - } - - if (!value.ixlockOwners.isEmpty()) { - return false; - } - } - - return true; - } - - @Nullable - private LockState adjustLockState(LockState state, LockState v) { - // Mapping may already change. - if (v != state) { - return v; - } - - synchronized (v.waiters) { - if (v.waiters.isEmpty()) { - int locks = lockTableSize.decrementAndGet(); - - assert locks >= 0; - - return null; - } else { - return v; - } - } - } - - private void track(UUID txId, Releasable val) { - txMap.compute(txId, (k, v) -> { - if (v == null) { - v = new ConcurrentLinkedQueue<>(); - } - - v.add(val); - - return v; - }); - } - - private static List collectLocksFromStates(UUID txId, ConcurrentLinkedQueue lockStates) { - List result = new ArrayList<>(); - - if (lockStates != null) { - for (Releasable lockState : lockStates) { - Lock lock = lockState.lock(txId); - if (lock != null) { - result.add(lock); - } - } - } - - return result; - } - - /** - * Create lock exception with given parameters. - * - * @param locker Locker. - * @param holder Lock holder. - * @return Lock exception. - */ - private static LockException lockException(UUID locker, UUID holder) { - return new LockException(ACQUIRE_LOCK_ERR, - "Failed to acquire a lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); - } - - /** - * Create lock exception when lock holder is believed to be missing. - * - * @param locker Locker. - * @param holder Lock holder. - * @return Lock exception. - */ - private static LockException abandonedLockException(UUID locker, UUID holder) { - return new LockException(ACQUIRE_LOCK_ERR, - "Failed to acquire an abandoned lock due to a possible deadlock [locker=" + locker + ", holder=" + holder + ']'); - } - - /** - * Create coarse lock exception. - * - * @param locker Locker. - * @param holder Lock holder. - * @param abandoned If locker is abandoned. - * @return Lock exception. - */ - private static LockException coarseLockException(UUID locker, UUID holder, boolean abandoned) { - return new LockException(ACQUIRE_LOCK_ERR, - "Failed to acquire the intention table lock due to a conflict [locker=" + locker + ", holder=" + holder + ", abandoned=" - + abandoned + ']'); - } - - /** - * Common interface for releasing transaction locks. - */ - interface Releasable { - /** - * Tries to release a lock. - * - * @param txId Tx id. - * @return {@code True} if lock state requires cleanup after release. - */ - boolean tryRelease(UUID txId); - - /** - * Gets associated lock key. - * - * @return Lock key. - */ - LockKey key(); - - /** - * Returns the lock which is requested by given tx. - * - * @param txId Tx id. - * @return The lock or null if no lock exist. - */ - @Nullable Lock lock(UUID txId); - - /** - * Returns lock type. - * - * @return The type. - */ - boolean coarse(); - } - - /** - * Coarse lock. - */ - public class CoarseLockState implements Releasable { - private final IgniteStripedReadWriteLock stripedLock = new IgniteStripedReadWriteLock(CONCURRENCY); - private final ConcurrentHashMap ixlockOwners = new ConcurrentHashMap<>(); - private final Map>> slockWaiters = new HashMap<>(); - private final ConcurrentHashMap slockOwners = new ConcurrentHashMap<>(); - private final LockKey lockKey; - private final Comparator txComparator; - - CoarseLockState(LockKey lockKey) { - this.lockKey = lockKey; - txComparator = - deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo; - } - - @Override - public boolean tryRelease(UUID txId) { - Lock lock = lock(txId); - - release(lock); - - return false; - } - - @Override - public LockKey key() { - return lockKey; - } - - @Override - public Lock lock(UUID txId) { - Lock lock = ixlockOwners.get(txId); - - if (lock != null) { - return lock; - } - - int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY); - - stripedLock.readLock(idx).lock(); - - try { - lock = slockOwners.get(txId); - - if (lock != null) { - return lock; - } - - IgniteBiTuple> tuple = slockWaiters.get(txId); - - if (tuple != null) { - return tuple.get1(); - } - } finally { - stripedLock.readLock(idx).unlock(); - } - - return null; - } - - @Override - public boolean coarse() { - return true; - } - - /** - * Acquires a lock. - * - * @param txId Tx id. - * @param lockMode Lock mode. - * @return The future. - */ - public CompletableFuture acquire(UUID txId, LockMode lockMode) { - switch (lockMode) { - case S: - stripedLock.writeLock().lock(); - - try { - // IX-locks can't be modified under the striped write lock. - if (!ixlockOwners.isEmpty()) { - if (ixlockOwners.containsKey(txId)) { - if (ixlockOwners.size() == 1) { - // Safe to upgrade. - track(txId, this); // Double track. - Lock lock = new Lock(lockKey, lockMode, txId); - slockOwners.putIfAbsent(txId, lock); - return completedFuture(lock); - } else { - // Attempt to upgrade to SIX in the presence of concurrent transactions. Deny lock attempt. - for (Lock lock : ixlockOwners.values()) { - if (!lock.txId().equals(txId)) { - return notifyAndFail(txId, lock.txId()); - } - } - } - - assert false : "Should not reach here"; - } - - // Validate reordering with IX locks if prevention is enabled. - if (deadlockPreventionPolicy.usePriority()) { - for (Lock lock : ixlockOwners.values()) { - // Allow only high priority transactions to wait. - if (txComparator.compare(lock.txId(), txId) < 0) { - return notifyAndFail(txId, lock.txId()); - } - } - } - - track(txId, this); - - CompletableFuture fut = new CompletableFuture<>(); - IgniteBiTuple> prev = slockWaiters.putIfAbsent(txId, - new IgniteBiTuple<>(new Lock(lockKey, lockMode, txId), fut)); - return prev == null ? fut : prev.get2(); - } else { - Lock lock = new Lock(lockKey, lockMode, txId); - Lock prev = slockOwners.putIfAbsent(txId, lock); - - if (prev == null) { - track(txId, this); // Do not track on reenter. - } - - return completedFuture(lock); - } - } finally { - stripedLock.writeLock().unlock(); - } - - case IX: - int idx = Math.floorMod(spread(txId.hashCode()), CONCURRENCY); - - stripedLock.readLock(idx).lock(); - - try { - // S-locks can't be modified under the striped read lock. - if (!slockOwners.isEmpty()) { - if (slockOwners.containsKey(txId)) { - if (slockOwners.size() == 1) { - // Safe to upgrade. - track(txId, this); // Double track. - Lock lock = new Lock(lockKey, lockMode, txId); - ixlockOwners.putIfAbsent(txId, lock); - return completedFuture(lock); - } else { - // Attempt to upgrade to SIX in the presence of concurrent transactions. Deny lock attempt. - for (Lock lock : slockOwners.values()) { - if (!lock.txId().equals(txId)) { - return notifyAndFail(txId, lock.txId()); - } - } - } - - assert false : "Should not reach here"; - } - - // IX locks never allowed to wait. - UUID holderTx = slockOwners.keySet().iterator().next(); - return notifyAndFail(txId, holderTx); - } else { - Lock lock = new Lock(lockKey, lockMode, txId); - Lock prev = ixlockOwners.putIfAbsent(txId, lock); // Avoid overwrite existing lock. - - if (prev == null) { - track(txId, this); // Do not track on reenter. - } - - return completedFuture(lock); - } - } finally { - stripedLock.readLock(idx).unlock(); - } - - default: - assert false : "Unsupported coarse lock mode: " + lockMode; - - return null; // Should not be here. - } - } - - private Set allLockHolderTxs() { - return CollectionUtils.union(ixlockOwners.keySet(), slockOwners.keySet()); - } - - /** - * Triggers event and fails. - * - * @param txId Tx id. - * @param conflictedHolderId Holder tx id. - * @return Failed future. - */ - CompletableFuture notifyAndFail(UUID txId, UUID conflictedHolderId) { - CompletableFuture res = fireEvent(LOCK_CONFLICT, new LockEventParameters(txId, allLockHolderTxs())); - // TODO: https://issues.apache.org/jira/browse/IGNITE-21153 - return failedFuture(coarseLockException(txId, conflictedHolderId, res.isCompletedExceptionally())); - } - - /** - * Releases the lock. Should be called from {@link #releaseAll(UUID)}. - * - * @param lock The lock. - */ - public void release(@Nullable Lock lock) { - if (lock == null) { - return; - } - - switch (lock.lockMode()) { - case S: - IgniteBiTuple> waiter = null; - - stripedLock.writeLock().lock(); - - try { - Lock removed = slockOwners.remove(lock.txId()); - - if (removed == null) { - waiter = slockWaiters.remove(lock.txId()); - - if (waiter != null) { - removed = waiter.get1(); - } - } - - assert removed != null : "Attempt to release not requested lock: " + lock.txId(); - } finally { - stripedLock.writeLock().unlock(); - } - - if (waiter != null) { - waiter.get2().complete(waiter.get1()); - } - - break; - case IX: - int idx = Math.floorMod(spread(lock.txId().hashCode()), CONCURRENCY); - - Map>> wakeups; - - stripedLock.readLock(idx).lock(); - - try { - var removed = ixlockOwners.remove(lock.txId()); - - assert removed != null : "Attempt to release not acquired lock: " + lock.txId(); - - if (slockWaiters.isEmpty()) { - return; // Nothing to do. - } - - if (!ixlockOwners.isEmpty()) { - assert slockOwners.isEmpty() || slockOwners.containsKey(lock.txId()); - - return; // Nothing to do. - } - - // No race here because no new locks can be acquired after releaseAll due to 2-phase locking protocol. - - // Promote waiters to owners. - wakeups = new HashMap<>(slockWaiters); - - slockWaiters.clear(); - - for (IgniteBiTuple> value : wakeups.values()) { - slockOwners.put(value.getKey().txId(), value.getKey()); - } - } finally { - stripedLock.readLock(idx).unlock(); - } - - for (Entry>> entry : wakeups.entrySet()) { - entry.getValue().get2().complete(entry.getValue().get1()); - } - - break; - default: - assert false : "Unsupported coarse unlock mode: " + lock.lockMode(); - } - } - } - - /** - * Key lock. - */ - public class LockState implements Releasable { - /** Waiters. */ - private final TreeMap waiters; - - /** Lock key. */ - private volatile LockKey key; - - LockState() { - Comparator txComparator = - deadlockPreventionPolicy.txIdComparator() != null ? deadlockPreventionPolicy.txIdComparator() : UUID::compareTo; - - this.waiters = new TreeMap<>(txComparator); - } - - /** - * Checks if lock state is used to hold a lock. - * - * @return True if the state is used, otherwise false. - */ - boolean isUsed() { - return key != null; - } - - @Override - public LockKey key() { - return key; - } - - @Override - public Lock lock(UUID txId) { - Waiter waiter = waiters.get(txId); - - if (waiter != null) { - return new Lock(key, waiter.lockMode(), txId); - } - - return null; - } - - @Override - public boolean coarse() { - return false; - } - - /** - * Attempts to acquire a lock for the specified {@code key} in specified lock mode. - * - * @param txId Transaction id. - * @param lockMode Lock mode. - * @return The future or null if state is marked for removal and acquired lock mode. - */ - @Nullable IgniteBiTuple, LockMode> tryAcquire(UUID txId, LockMode lockMode) { - WaiterImpl waiter = new WaiterImpl(txId, lockMode); - - synchronized (waiters) { - if (!isUsed()) { - return new IgniteBiTuple(null, lockMode); - } - - // We always replace the previous waiter with the new one. If the previous waiter has lock intention then incomplete - // lock future is copied to the new waiter. This guarantees that, if the previous waiter was locked concurrently, then - // it doesn't have any lock intentions, and the future is not copied to the new waiter. Otherwise, if there is lock - // intention, this means that the lock future contained in previous waiter, is not going to be completed and can be - // copied safely. - WaiterImpl prev = waiters.put(txId, waiter); - - // Reenter - if (prev != null) { - if (prev.locked() && prev.lockMode().allowReenter(lockMode)) { - waiter.lock(); - - waiter.upgrade(prev); - - return new IgniteBiTuple(nullCompletedFuture(), prev.lockMode()); - } else { - waiter.upgrade(prev); - - assert prev.lockMode() == waiter.lockMode() : - "Lock modes are incorrect [prev=" + prev.lockMode() + ", new=" + waiter.lockMode() + ']'; - } - } - - if (!isWaiterReadyToNotify(waiter, false)) { - if (deadlockPreventionPolicy.waitTimeout() > 0) { - setWaiterTimeout(waiter); - } - - // Put to wait queue, track. - if (prev == null) { - track(waiter.txId, this); - } - - return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); - } - - if (!waiter.locked()) { - waiters.remove(waiter.txId()); - } else if (waiter.hasLockIntent()) { - waiter.refuseIntent(); // Restore old lock. - } else { - // Lock granted, track. - if (prev == null) { - track(waiter.txId, this); - } - } - } - - // Notify outside the monitor. - waiter.notifyLocked(); - - return new IgniteBiTuple<>(waiter.fut, waiter.lockMode()); - } - - /** - * Returns waiters count. - * - * @return waiters count. - */ - public int waitersCount() { - synchronized (waiters) { - return waiters.size(); - } - } - - /** - * Checks current waiter. It can change the internal state of the waiter. - * - * @param waiter Checked waiter. - * @return True if current waiter ready to notify, false otherwise. - */ - private boolean isWaiterReadyToNotify(WaiterImpl waiter, boolean skipFail) { - for (Entry entry : waiters.tailMap(waiter.txId(), false).entrySet()) { - WaiterImpl tmp = entry.getValue(); - LockMode mode = tmp.lockMode; - - if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { - if (conflictFound(waiter.txId())) { - waiter.fail(abandonedLockException(waiter.txId, tmp.txId)); - - return true; - } else if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { - waiter.fail(lockException(waiter.txId, tmp.txId)); - - return true; - } - - return false; - } - } - - for (Entry entry : waiters.headMap(waiter.txId()).entrySet()) { - WaiterImpl tmp = entry.getValue(); - LockMode mode = tmp.lockMode; - - if (mode != null && !mode.isCompatible(waiter.intendedLockMode())) { - if (skipFail) { - return false; - } else if (conflictFound(waiter.txId())) { - waiter.fail(abandonedLockException(waiter.txId, tmp.txId)); - - return true; - } else if (deadlockPreventionPolicy.waitTimeout() == 0) { - waiter.fail(lockException(waiter.txId, tmp.txId)); - - return true; - } else { - return false; - } - } - } - - waiter.lock(); - - return true; - } - - /** - * Attempts to release a lock for the specified {@code key} in exclusive mode. - * - * @param txId Transaction id. - * @return {@code True} if the queue is empty. - */ - @Override - public boolean tryRelease(UUID txId) { - Collection toNotify; - - synchronized (waiters) { - toNotify = release(txId); - } - - // Notify outside the monitor. - for (WaiterImpl waiter : toNotify) { - waiter.notifyLocked(); - } - - return key != null && waitersCount() == 0; - } - - /** - * Releases a specific lock of the key, if a key is locked in multiple modes by the same locker. - * - * @param txId Transaction id. - * @param lockMode Lock mode. - * @return If the value is true, no one waits of any lock of the key, false otherwise. - */ - boolean tryRelease(UUID txId, LockMode lockMode) { - List toNotify = emptyList(); - synchronized (waiters) { - WaiterImpl waiter = waiters.get(txId); - - if (waiter != null) { - assert LockMode.supremum(lockMode, waiter.lockMode()) == waiter.lockMode() : - "The lock is not locked in specified mode [mode=" + lockMode + ", locked=" + waiter.lockMode() + ']'; - - LockMode modeFromDowngrade = waiter.recalculateMode(lockMode); - - if (!waiter.locked() && !waiter.hasLockIntent()) { - toNotify = release(txId); - } else if (modeFromDowngrade != waiter.lockMode()) { - toNotify = unlockCompatibleWaiters(); - } - } - } - - // Notify outside the monitor. - for (WaiterImpl waiter : toNotify) { - waiter.notifyLocked(); - } - - return key != null && waitersCount() == 0; - } - - /** - * Releases all locks are held by a specific transaction. This method should be invoked synchronously. - * - * @param txId Transaction id. - * @return List of waiters to notify. - */ - private List release(UUID txId) { - waiters.remove(txId); - - if (waiters.isEmpty()) { - return emptyList(); - } - - return unlockCompatibleWaiters(); - } - - /** - * Unlock compatible waiters. - * - * @return List of waiters to notify. - */ - private List unlockCompatibleWaiters() { - if (!deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() == 0) { - return emptyList(); - } - - ArrayList toNotify = new ArrayList<>(); - Set toFail = new HashSet<>(); - - for (Entry entry : waiters.entrySet()) { - WaiterImpl tmp = entry.getValue(); - - if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, true)) { - assert !tmp.hasLockIntent() : "This waiter in not locked for notification [waiter=" + tmp + ']'; - - toNotify.add(tmp); - } - } - - if (deadlockPreventionPolicy.usePriority() && deadlockPreventionPolicy.waitTimeout() >= 0) { - for (Entry entry : waiters.entrySet()) { - WaiterImpl tmp = entry.getValue(); - - if (tmp.hasLockIntent() && isWaiterReadyToNotify(tmp, false)) { - assert tmp.hasLockIntent() : "Only failed waiter can be notified here [waiter=" + tmp + ']'; - - toNotify.add(tmp); - toFail.add(tmp.txId()); - } - } - - for (UUID failTx : toFail) { - var w = waiters.get(failTx); - - if (w.locked()) { - w.refuseIntent(); - } else { - waiters.remove(failTx); - } - } - } - - return toNotify; - } - - /** - * Makes the waiter fail after specified timeout (in milliseconds), if intended lock was not acquired within this timeout. - * - * @param waiter Waiter. - */ - private void setWaiterTimeout(WaiterImpl waiter) { - delayedExecutor.execute(() -> { - if (!waiter.fut.isDone()) { - waiter.fut.completeExceptionally(new LockException(ACQUIRE_LOCK_TIMEOUT_ERR, "Failed to acquire a lock due to " - + "timeout [txId=" + waiter.txId() + ", waiter=" + waiter - + ", timeout=" + deadlockPreventionPolicy.waitTimeout() + ']')); - } - }); - } - - /** - * Returns a collection of timestamps that is associated with the specified {@code key}. - * - * @return The waiters queue. - */ - public Collection queue() { - synchronized (waiters) { - return new ArrayList<>(waiters.keySet()); - } - } - - /** - * Returns a waiter for the specified {@code key}. - * - * @param txId Transaction id. - * @return The waiter. - */ - public Waiter waiter(UUID txId) { - synchronized (waiters) { - return waiters.get(txId); - } - } - - /** - * Notifies about the lock conflict found between transactions. - * - * @param acquirerTx Transaction which tries to acquire the lock. - * @return True if the conflict connected with an abandoned transaction, false in the other case. - */ - private boolean conflictFound(UUID acquirerTx) { - CompletableFuture eventResult = fireEvent(LOCK_CONFLICT, new LockEventParameters(acquirerTx, allLockHolderTxs())); - // No async handling is expected. - // TODO: https://issues.apache.org/jira/browse/IGNITE-21153 - assert eventResult.isDone() : "Async lock conflict handling is not supported"; - - return eventResult.isCompletedExceptionally(); - } - - private Set allLockHolderTxs() { - return waiters.keySet(); - } - } - - /** - * A waiter implementation. - */ - private static class WaiterImpl implements Comparable, Waiter { - /** - * Holding locks by type. - */ - private final Map locks = new EnumMap<>(LockMode.class); - - /** - * Lock modes are marked as intended, but have not taken yet. This is NOT specific to intention lock modes, such as IS and IX. - */ - private final Set intendedLocks = EnumSet.noneOf(LockMode.class); - - /** Locked future. */ - @IgniteToStringExclude - private CompletableFuture fut; - - /** Waiter transaction id. */ - private final UUID txId; - - /** The lock mode to intend to hold. This is NOT specific to intention lock modes, such as IS and IX. */ - private LockMode intendedLockMode; - - /** The lock mode. */ - private LockMode lockMode; - - /** - * The filed has a value when the waiter couldn't lock a key. - */ - private LockException ex; - - /** - * The constructor. - * - * @param txId Transaction id. - * @param lockMode Lock mode. - */ - WaiterImpl(UUID txId, LockMode lockMode) { - this.fut = new CompletableFuture<>(); - this.txId = txId; - this.intendedLockMode = lockMode; - - locks.put(lockMode, 1); - intendedLocks.add(lockMode); - } - - /** - * Adds a lock mode. - * - * @param lockMode Lock mode. - * @param increment Value to increment amount. - */ - void addLock(LockMode lockMode, int increment) { - locks.merge(lockMode, increment, Integer::sum); - } - - /** - * Removes a lock mode. - * - * @param lockMode Lock mode. - * @return True if the lock is not locked in the passed mode, false otherwise. - */ - private boolean removeLock(LockMode lockMode) { - Integer counter = locks.get(lockMode); - - if (counter == null || counter < 2) { - locks.remove(lockMode); - - return true; - } else { - locks.put(lockMode, counter - 1); - - return false; - } - } - - /** - * Recalculates lock mode based of all locks which the waiter has taken. - * - * @param modeToRemove Mode without which, the recalculation will happen. - * @return Previous lock mode. - */ - LockMode recalculateMode(LockMode modeToRemove) { - if (!removeLock(modeToRemove)) { - return lockMode; - } - - return recalculate(); - } - - /** - * Recalculates lock supremums. - * - * @return Previous lock mode. - */ - private LockMode recalculate() { - LockMode newIntendedLockMode = null; - LockMode newLockMode = null; - - for (LockMode mode : locks.keySet()) { - assert locks.get(mode) > 0 : "Incorrect lock counter [txId=" + txId + ", mode=" + mode + "]"; - - if (intendedLocks.contains(mode)) { - newIntendedLockMode = newIntendedLockMode == null ? mode : LockMode.supremum(newIntendedLockMode, mode); - } else { - newLockMode = newLockMode == null ? mode : LockMode.supremum(newLockMode, mode); - } - } - - LockMode mode = lockMode; - - lockMode = newLockMode; - intendedLockMode = newLockMode != null && newIntendedLockMode != null ? LockMode.supremum(newLockMode, newIntendedLockMode) - : newIntendedLockMode; - - return mode; - } - - /** - * Merge all locks that were held by another waiter to the current one. - * - * @param other Other waiter. - */ - void upgrade(WaiterImpl other) { - intendedLocks.addAll(other.intendedLocks); - - other.locks.entrySet().forEach(entry -> addLock(entry.getKey(), entry.getValue())); - - recalculate(); - - if (other.hasLockIntent()) { - fut = other.fut; - } - } - - /** - * Removes all locks that were intended to hold. - */ - void refuseIntent() { - for (LockMode mode : intendedLocks) { - locks.remove(mode); - } - - intendedLocks.clear(); - intendedLockMode = null; - } - - /** {@inheritDoc} */ - @Override - public int compareTo(WaiterImpl o) { - return txId.compareTo(o.txId); - } - - /** Notifies a future listeners. */ - private void notifyLocked() { - if (ex != null) { - fut.completeExceptionally(ex); - } else { - assert lockMode != null; - - // TODO FIXME https://issues.apache.org/jira/browse/IGNITE-20985 - fut.complete(null); - } - } - - /** {@inheritDoc} */ - @Override - public boolean locked() { - return this.lockMode != null; - } - - /** - * Checks is the waiter has any intended to lock a key. - * - * @return True if the waiter has an intended lock, false otherwise. - */ - public boolean hasLockIntent() { - return this.intendedLockMode != null; - } - - /** {@inheritDoc} */ - @Override - public LockMode lockMode() { - return lockMode; - } - - /** {@inheritDoc} */ - @Override - public LockMode intendedLockMode() { - return intendedLockMode; - } - - /** Grant a lock. */ - private void lock() { - lockMode = intendedLockMode; - - intendedLockMode = null; - - intendedLocks.clear(); - } - - /** - * Fails the lock waiter. - * - * @param e Lock exception. - */ - private void fail(LockException e) { - ex = e; - } - - /** {@inheritDoc} */ - @Override - public UUID txId() { - return txId; - } - - /** {@inheritDoc} */ - @Override - public boolean equals(Object o) { - if (!(o instanceof WaiterImpl)) { - return false; - } - - return compareTo((WaiterImpl) o) == 0; - } - - /** {@inheritDoc} */ - @Override - public int hashCode() { - return txId.hashCode(); - } - - /** {@inheritDoc} */ - @Override - public String toString() { - return S.toString(WaiterImpl.class, this, "granted", fut.isDone() && !fut.isCompletedExceptionally()); - } - } - - private static int spread(int h) { - return (h ^ (h >>> 16)) & 0x7fffffff; - } - - @TestOnly - public LockState[] getSlots() { - return locks.values().toArray(new LockState[]{}); - } - - public int available() { - return rawSlotsMaxSize - lockTableSize.get(); - } -} From aad8940cda7475055867dbde657e83de2958cd83 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 16:55:54 +0300 Subject: [PATCH 06/10] Fixed tests configuration. --- build.gradle | 2 +- .../benchmark/AbstractMultiNodeBenchmark.java | 2 +- .../internal/benchmark/UpsertKvBenchmark.java | 28 +++---------------- 3 files changed, 6 insertions(+), 26 deletions(-) diff --git a/build.gradle b/build.gradle index 9069c6fcbe9..31ff0ba21dc 100644 --- a/build.gradle +++ b/build.gradle @@ -66,7 +66,7 @@ ext { "-Dio.netty.tryReflectionSetAccessible=true", "-XX:+HeapDumpOnOutOfMemoryError", "-ea", - "-Xmx16g" + "-Xmx1g" ] compilerArgs = [ diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java index f9a07168cbd..ac365c12dcf 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/AbstractMultiNodeBenchmark.java @@ -200,7 +200,7 @@ private void startCluster() throws Exception { + " },\n" + " storage.profiles: {" + " " + DEFAULT_STORAGE_PROFILE + ".engine: aipersist, " - + " " + DEFAULT_STORAGE_PROFILE + ".size: 21474836480 " // Avoid page replacement. + + " " + DEFAULT_STORAGE_PROFILE + ".size: 2073741824 " // Avoid page replacement. + " },\n" + " clientConnector: { port:{} },\n" + " rest.port: {},\n" diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java index 0cea960a295..a0c5c9da364 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/UpsertKvBenchmark.java @@ -25,7 +25,6 @@ import org.apache.ignite.internal.lang.IgniteSystemProperties; import org.apache.ignite.table.KeyValueView; import org.apache.ignite.table.Tuple; -import org.jetbrains.annotations.Nullable; import org.openjdk.jmh.annotations.Benchmark; import org.openjdk.jmh.annotations.BenchmarkMode; import org.openjdk.jmh.annotations.Fork; @@ -48,7 +47,7 @@ */ @State(Scope.Benchmark) @Fork(1) -@Threads(32) +@Threads(1) @Warmup(iterations = 10, time = 2) @Measurement(iterations = 20, time = 2) @BenchmarkMode(Mode.Throughput) @@ -64,39 +63,20 @@ public class UpsertKvBenchmark extends AbstractMultiNodeBenchmark { @Param({"false"}) private boolean fsync; - @Param({"32"}) + @Param({"8"}) private int partitionCount; - @Param({"1048576", "131072", "1024"}) - private int lockSlots; - - @Param({"V1", "V2"}) - private String useLocks; - private static final AtomicInteger COUNTER = new AtomicInteger(); private static final ThreadLocal GEN = ThreadLocal.withInitial(() -> COUNTER.getAndIncrement() * 20_000_000); @Override public void nodeSetUp() throws Exception { - System.setProperty("LOGIT_STORAGE_ENABLED", "true"); - System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, "false"); - System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK, "false"); - System.setProperty("IGNITE_USE_LOCKS", useLocks); + System.setProperty(IgniteSystemProperties.IGNITE_SKIP_REPLICATION_IN_BENCHMARK, "true"); + System.setProperty(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK, "true"); super.nodeSetUp(); } - @Override - protected @Nullable String clusterConfiguration() { - return String.format( - "system.properties: {" - + "lockMapSize = \"%s\", " - + "rawSlotsMaxSize = \"%s\"" - + "}", - lockSlots, lockSlots - ); - } - /** * Initializes the tuple. */ From 19561d1e1034add2052095222aedab0db7517d78 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 17:15:09 +0300 Subject: [PATCH 07/10] Fixed configuration. --- .../ignite/distributed/ItLockTableTest.java | 2 +- .../internal/tx/impl/HeapLockManager.java | 30 ++++++------------- .../internal/tx/HeapLockManagerTest.java | 6 ++-- 3 files changed, 13 insertions(+), 25 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index ceb95dd6575..c1c1abd92a4 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -102,7 +102,7 @@ public class ItLockTableTest extends IgniteAbstractTest { @InjectConfiguration protected static StorageUpdateConfiguration storageUpdateConfiguration; - @InjectConfiguration("mock.properties: { lockMapSize: \"" + CACHE_SIZE + "\", rawSlotsMaxSize: \"131072\" }") + @InjectConfiguration("mock.properties: { lockMapSize: \"" + CACHE_SIZE + " }") private static SystemLocalConfiguration systemLocalConfiguration; @InjectExecutorService diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java index 8314feb057c..06a3d471aa1 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java @@ -83,8 +83,6 @@ public class HeapLockManager extends AbstractEventProducer locks; @@ -120,29 +115,20 @@ public class HeapLockManager extends AbstractEventProducer rawSlotsMaxSize) { - throw new IllegalArgumentException("maxSize=" + rawSlotsMaxSize + " < mapSize=" + lockMapSize); - } - - this.rawSlotsMaxSize = rawSlotsMaxSize; + public HeapLockManager(int lockMapSize) { this.lockMapSize = lockMapSize; } @@ -178,7 +164,7 @@ public CompletableFuture acquire(UUID txId, LockKey lockKey, LockMode lock if (state == null) { return failedFuture(new LockException( ACQUIRE_LOCK_ERR, - "Failed to acquire a lock due to lock table overflow [txId=" + txId + ", limit=" + rawSlotsMaxSize + ']' + "Failed to acquire a lock due to lock table overflow [txId=" + txId + ", limit=" + lockMapSize + ']' )); } @@ -291,7 +277,7 @@ private LockState lockState(LockKey key) { do { acquiredLocks = lockTableSize.get(); - if (acquiredLocks < rawSlotsMaxSize) { + if (acquiredLocks < lockMapSize) { v = new LockState(); v.key = k; } else { @@ -344,9 +330,11 @@ private LockState adjustLockState(LockState state, LockState v) { synchronized (v.waiters) { if (v.waiters.isEmpty()) { + v.key = null; + int locks = lockTableSize.decrementAndGet(); - assert locks >= 0; + assert locks >= 0 : "Lock table released more keys than it acquired."; return null; } else { @@ -1346,6 +1334,6 @@ public LockState[] getSlots() { } public int available() { - return rawSlotsMaxSize - lockTableSize.get(); + return lockMapSize - lockTableSize.get(); } } diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java index 69c93476ef3..e7f9eb7df72 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java @@ -54,7 +54,7 @@ protected LockKey lockKey() { public void testLockTableOverflow() throws Exception { int maxSlots = 16; - HeapLockManager lockManager = new HeapLockManager(maxSlots, maxSlots); + HeapLockManager lockManager = new HeapLockManager(maxSlots); lockManager.start(new WaitDieDeadlockPreventionPolicy()); UUID[] txs = new UUID[maxSlots]; @@ -90,7 +90,7 @@ public void testLockTableOverflow() throws Exception { public void testLockTooManyKeysInTx() throws Exception { int maxSlots = 16; - HeapLockManager lockManager = new HeapLockManager(maxSlots, maxSlots); + HeapLockManager lockManager = new HeapLockManager(maxSlots); lockManager.start(new WaitDieDeadlockPreventionPolicy()); UUID txId = TestTransactionIds.newTransactionId(); @@ -123,7 +123,7 @@ public void testDefaultConfiguration() { @Test public void testNonDefaultConfiguration( - @InjectConfiguration("mock.properties: { lockMapSize: \"42\", rawSlotsMaxSize: \"69\" }") + @InjectConfiguration("mock.properties: { lockMapSize: \"42\" }") SystemLocalConfiguration systemLocalConfiguration ) { var lockManager = new HeapLockManager(systemLocalConfiguration); From d18711903e6c94acc13de7ef806c2b8ef4b1dd95 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 17:19:14 +0300 Subject: [PATCH 08/10] Fixed tests. --- .../java/org/apache/ignite/distributed/ItLockTableTest.java | 2 +- .../org/apache/ignite/internal/tx/HeapLockManagerTest.java | 4 ++-- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index c1c1abd92a4..9865acc8ea8 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -102,7 +102,7 @@ public class ItLockTableTest extends IgniteAbstractTest { @InjectConfiguration protected static StorageUpdateConfiguration storageUpdateConfiguration; - @InjectConfiguration("mock.properties: { lockMapSize: \"" + CACHE_SIZE + " }") + @InjectConfiguration("mock.properties: { lockMapSize: \"" + CACHE_SIZE + "\" }") private static SystemLocalConfiguration systemLocalConfiguration; @InjectExecutorService diff --git a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java index e7f9eb7df72..9c8193c51a6 100644 --- a/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java +++ b/modules/transactions/src/test/java/org/apache/ignite/internal/tx/HeapLockManagerTest.java @@ -118,7 +118,7 @@ public void testLockTooManyKeysInTx() throws Exception { @Test public void testDefaultConfiguration() { assertThat(((HeapLockManager) lockManager).available(), is(DEFAULT_SLOTS)); - assertThat(((HeapLockManager) lockManager).getSlots(), is(arrayWithSize(DEFAULT_SLOTS))); + assertThat(((HeapLockManager) lockManager).getSlots(), is(arrayWithSize(0))); } @Test @@ -131,6 +131,6 @@ public void testNonDefaultConfiguration( lockManager.start(DeadlockPreventionPolicy.NO_OP); assertThat(lockManager.available(), is(42)); - assertThat(lockManager.getSlots(), is(arrayWithSize(69))); + assertThat(lockManager.getSlots(), is(arrayWithSize(0))); } } From 31f7211181c3263ccbc2ad2e1675a4e4ac5b1f50 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Thu, 23 Jan 2025 17:32:02 +0300 Subject: [PATCH 09/10] Fixed compile. --- .../apache/ignite/internal/benchmark/LockManagerBenchmark.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java index 8b1068d604f..fd8e4b1ce91 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/benchmark/LockManagerBenchmark.java @@ -62,7 +62,7 @@ public class LockManagerBenchmark { */ @Setup public void setUp() { - lockManager = new HeapLockManager(DEFAULT_SLOTS, DEFAULT_SLOTS); + lockManager = new HeapLockManager(DEFAULT_SLOTS); lockManager.start(new WaitDieDeadlockPreventionPolicy()); generator = new TransactionIdGenerator(0); clock = new TestHybridClock(() -> 0L); From e193263061139d85c7d7e4c32f326d409e780a80 Mon Sep 17 00:00:00 2001 From: Vladislav Pyatkov Date: Fri, 24 Jan 2025 18:58:49 +0300 Subject: [PATCH 10/10] Fixed after review from A.Scherbakov. --- .../ignite/distributed/ItLockTableTest.java | 2 +- .../internal/tx/impl/HeapLockManager.java | 34 ++++++++----------- 2 files changed, 16 insertions(+), 20 deletions(-) diff --git a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java index 9865acc8ea8..559befc4be6 100644 --- a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java +++ b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItLockTableTest.java @@ -180,7 +180,7 @@ public void after() throws Exception { * Test that a lock table behaves correctly in case of lock cache overflow. */ @Test - public void testCollision() { + public void testTakeMoreLocksThanAfford() { RecordView view = testTable.recordView(); int i = 0; diff --git a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java index 06a3d471aa1..90b8c616f9d 100644 --- a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java +++ b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/HeapLockManager.java @@ -44,7 +44,7 @@ import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.LongAdder; import org.apache.ignite.internal.configuration.SystemLocalConfiguration; import org.apache.ignite.internal.configuration.SystemPropertyView; import org.apache.ignite.internal.event.AbstractEventProducer; @@ -86,7 +86,7 @@ public class HeapLockManager extends AbstractEventProducer { - int acquiredLocks; - LockState v; + int acquiredLocks = lockTableSize.intValue(); - do { - acquiredLocks = lockTableSize.get(); + if (acquiredLocks < lockMapSize) { + lockTableSize.increment(); - if (acquiredLocks < lockMapSize) { - v = new LockState(); - v.key = k; - } else { - return null; - } - } while (!lockTableSize.compareAndSet(acquiredLocks, acquiredLocks + 1)); + LockState v = new LockState(); + v.key = k; + + return v; + } else { + return null; + } - return v; }); } @@ -304,7 +302,7 @@ public Waiter waiter(LockKey key, UUID txId) { /** {@inheritDoc} */ @Override public boolean isEmpty() { - if (lockTableSize.get() != 0) { + if (lockTableSize.sum() != 0) { return false; } @@ -332,9 +330,7 @@ private LockState adjustLockState(LockState state, LockState v) { if (v.waiters.isEmpty()) { v.key = null; - int locks = lockTableSize.decrementAndGet(); - - assert locks >= 0 : "Lock table released more keys than it acquired."; + lockTableSize.decrement(); return null; } else { @@ -1334,6 +1330,6 @@ public LockState[] getSlots() { } public int available() { - return lockMapSize - lockTableSize.get(); + return Math.max(lockMapSize - lockTableSize.intValue(), 0); } }