diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java index c07b3ce1ed6e0..59b04fec8f81c 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerImpl.java @@ -762,8 +762,8 @@ private synchronized void internalAsyncAddEntry(OpAddEntry addOperation) { } } else if (state == State.ClosedLedger) { // No ledger and no pending operations. Create a new ledger - log.info("[{}] Creating a new ledger", name); if (STATE_UPDATER.compareAndSet(this, State.ClosedLedger, State.CreatingLedger)) { + log.info("[{}] Creating a new ledger", name); this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); mbean.startDataLedgerCreateOp(); asyncCreateLedger(bookKeeper, config, digestType, this, Collections.emptyMap()); @@ -1588,8 +1588,8 @@ synchronized void ledgerClosed(final LedgerHandle lh) { } synchronized void createLedgerAfterClosed() { - if(isNeededCreateNewLedgerAfterCloseLedger()) { - log.info("[{}] Creating a new ledger", name); + if (isNeededCreateNewLedgerAfterCloseLedger()) { + log.info("[{}] Creating a new ledger after closed", name); STATE_UPDATER.set(this, State.CreatingLedger); this.lastLedgerCreationInitiationTimestamp = System.currentTimeMillis(); mbean.startDataLedgerCreateOp(); @@ -1612,8 +1612,8 @@ boolean isNeededCreateNewLedgerAfterCloseLedger() { @Override public void rollCurrentLedgerIfFull() { log.info("[{}] Start checking if current ledger is full", name); - if (currentLedgerEntries > 0 && currentLedgerIsFull()) { - STATE_UPDATER.set(this, State.ClosingLedger); + if (currentLedgerEntries > 0 && currentLedgerIsFull() + && STATE_UPDATER.compareAndSet(this, State.LedgerOpened, State.ClosingLedger)) { currentLedger.asyncClose(new AsyncCallback.CloseCallback() { @Override public void closeComplete(int rc, LedgerHandle lh, Object o) { diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java index 09f11f2c16b96..074a55db50091 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedCursorTest.java @@ -2238,6 +2238,9 @@ void testFindNewestMatchingAfterLedgerRollover() throws Exception { // roll a new ledger int numLedgersBefore = ledger.getLedgersInfo().size(); ledger.getConfig().setMaxEntriesPerLedger(1); + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened); ledger.rollCurrentLedgerIfFull(); Awaitility.await().atMost(20, TimeUnit.SECONDS) .until(() -> ledger.getLedgersInfo().size() > numLedgersBefore); diff --git a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java index d837651f1fac5..648d3e671d862 100644 --- a/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java +++ b/managed-ledger/src/test/java/org/apache/bookkeeper/mledger/impl/ManagedLedgerTest.java @@ -1936,6 +1936,9 @@ public void testDeletionAfterLedgerClosedAndRetention() throws Exception { c1.skipEntries(1, IndividualDeletedEntries.Exclude); c2.skipEntries(1, IndividualDeletedEntries.Exclude); // let current ledger close + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(ml, ManagedLedgerImpl.State.LedgerOpened); ml.rollCurrentLedgerIfFull(); // let retention expire Thread.sleep(1500); @@ -2205,6 +2208,9 @@ public void testGetPositionAfterN() throws Exception { managedCursor.markDelete(positionMarkDelete); //trigger ledger rollover and wait for the new ledger created + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened); managedLedger.rollCurrentLedgerIfFull(); Awaitility.await().untilAsserted(() -> assertEquals(managedLedger.getLedgersInfo().size(), 3)); assertEquals(5, managedLedger.getLedgersInfoAsList().get(0).getEntries()); @@ -3063,7 +3069,7 @@ public void testManagedLedgerRollOverIfFull() throws Exception { ledger.addEntry(new byte[1024 * 1024]); } - Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), msgNum / 2)); List entries = cursor.readEntries(msgNum); Assert.assertEquals(msgNum, entries.size()); @@ -3074,9 +3080,12 @@ public void testManagedLedgerRollOverIfFull() throws Exception { // all the messages have benn acknowledged // and all the ledgers have been removed except the last ledger - Thread.sleep(1000); - Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1); - Assert.assertEquals(ledger.getTotalSize(), 0); + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(ledger, ManagedLedgerImpl.State.LedgerOpened); + ledger.rollCurrentLedgerIfFull(); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getLedgersInfoAsList().size(), 1)); + Awaitility.await().untilAsserted(() -> Assert.assertEquals(ledger.getTotalSize(), 0)); } @Test @@ -3094,6 +3103,26 @@ public void testLedgerReachMaximumRolloverTime() throws Exception { .until(() -> firstLedgerId != ml.addEntry("test".getBytes()).getLedgerId()); } + @Test + public void testLedgerNotRolloverWithoutOpenState() throws Exception { + ManagedLedgerConfig config = new ManagedLedgerConfig(); + config.setMaxEntriesPerLedger(2); + + ManagedLedgerImpl ml = spy((ManagedLedgerImpl)factory.open("ledger-not-rollover-without-open-state", config)); + ml.addEntry("test1".getBytes()).getLedgerId(); + long ledgerId2 = ml.addEntry("test2".getBytes()).getLedgerId(); + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + // Set state to CreatingLedger to avoid rollover + stateUpdater.set(ml, ManagedLedgerImpl.State.CreatingLedger); + ml.rollCurrentLedgerIfFull(); + Field currentLedger = ManagedLedgerImpl.class.getDeclaredField("currentLedger"); + currentLedger.setAccessible(true); + LedgerHandle lh = (LedgerHandle) currentLedger.get(ml); + Awaitility.await() + .until(() -> ledgerId2 == lh.getId()); + } + @Test public void testExpiredLedgerDeletionAfterManagedLedgerRestart() throws Exception { ManagedLedgerConfig config = new ManagedLedgerConfig(); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java index 77ec229862e90..b05abf3be5218 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/CurrentLedgerRolloverIfFullTest.java @@ -18,6 +18,7 @@ */ package org.apache.pulsar.broker.service; +import java.lang.reflect.Field; import java.time.Duration; import java.util.concurrent.TimeUnit; import lombok.Cleanup; @@ -98,6 +99,9 @@ public void testCurrentLedgerRolloverIfFull() throws Exception { }); // trigger a ledger rollover + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened); managedLedger.rollCurrentLedgerIfFull(); // the last ledger will be closed and removed and we have one ledger for empty diff --git a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java index 7fa3c08300162..a06bf9e6deaae 100644 --- a/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java +++ b/pulsar-transaction/coordinator/src/test/java/org/apache/pulsar/transaction/coordinator/MLTransactionMetadataStoreTest.java @@ -164,8 +164,11 @@ public void testRecoverSequenceId(boolean isUseManagedLedgerProperties) throws E ManagedLedgerImpl managedLedger = (ManagedLedgerImpl) field.get(mlTransactionLog); Position position = managedLedger.getLastConfirmedEntry(); if (isUseManagedLedgerProperties) { + Field stateUpdater = ManagedLedgerImpl.class.getDeclaredField("state"); + stateUpdater.setAccessible(true); + stateUpdater.set(managedLedger, ManagedLedgerImpl.State.LedgerOpened); + managedLedger.rollCurrentLedgerIfFull(); Awaitility.await().until(() -> { - managedLedger.rollCurrentLedgerIfFull(); return !managedLedger.ledgerExists(position.getLedgerId()); }); }