Skip to content

Commit

Permalink
fix: last committed index in BallotBox (#1109)
Browse files Browse the repository at this point in the history
* fix: readIndex may return wrong value when single node group resetarts, #1049

* fix: init lastCommittedIndex in ballot box with last snapshot index if exists, #1092

* chore: format code

* chore: style

* chore: update jraft-extension/java-log-storage-impl/src/main/java/com/alipay/sofa/jraft/storage/HybridLogStorage.java

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>

* chore: apply suggestion

Co-authored-by: Jeremyhi <[email protected]>

---------

Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
Co-authored-by: Jeremyhi <[email protected]>
  • Loading branch information
3 people authored Jun 17, 2024
1 parent f45e961 commit 8cdde76
Show file tree
Hide file tree
Showing 8 changed files with 98 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@ public boolean init(final BallotBoxOptions opts) {
this.opts = opts;
this.waiter = opts.getWaiter();
this.closureQueue = opts.getClosureQueue();
this.lastCommittedIndex = opts.getLastCommittedIndex();
return true;
}

Expand Down Expand Up @@ -165,6 +166,7 @@ public void clearPendingTasks() {
* committed until a log at the new term becomes committed, so
* |newPendingIndex| should be |last_log_index| + 1.
* @param newPendingIndex pending index of new leader
*
* @return returns true if reset success
*/
public boolean resetPendingIndex(final long newPendingIndex) {
Expand All @@ -180,6 +182,7 @@ public boolean resetPendingIndex(final long newPendingIndex) {
this.opts.getNodeId(), newPendingIndex, this.lastCommittedIndex);
return false;
}

this.pendingIndex = newPendingIndex;
this.closureQueue.resetFirstIndex(newPendingIndex);
return true;
Expand Down
55 changes: 46 additions & 9 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -1013,15 +1013,6 @@ protected int adjustTimeout(final int timeoutMs) {
LOG.error("Node {} initFSMCaller failed.", getNodeId());
return false;
}
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
ballotBoxOpts.setNodeId(getNodeId());
if (!this.ballotBox.init(ballotBoxOpts)) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}

if (!initSnapshotStorage()) {
LOG.error("Node {} initSnapshotStorage failed.", getNodeId());
Expand All @@ -1044,6 +1035,12 @@ protected int adjustTimeout(final int timeoutMs) {
this.targetPriority = getMaxPriorityOfNodes(this.conf.getConf().getPeers());
}

// It must be initialized after initializing conf and log storage.
if (!initBallotBox()) {
LOG.error("Node {} init ballotBox failed.", getNodeId());
return false;
}

if (!this.conf.isEmpty()) {
Requires.requireTrue(this.conf.isValid(), "Invalid conf: %s", this.conf);
} else {
Expand Down Expand Up @@ -1120,13 +1117,41 @@ protected int adjustTimeout(final int timeoutMs) {
return true;
}

private boolean initBallotBox() {
this.ballotBox = new BallotBox();
final BallotBoxOptions ballotBoxOpts = new BallotBoxOptions();
ballotBoxOpts.setWaiter(this.fsmCaller);
ballotBoxOpts.setClosureQueue(this.closureQueue);
ballotBoxOpts.setNodeId(getNodeId());
// Try to initialize the last committed index in BallotBox to be the last snapshot index.
long lastCommittedIndex = 0;
if (this.snapshotExecutor != null) {
lastCommittedIndex = this.snapshotExecutor.getLastSnapshotIndex();
}
if (this.getQuorum() == 1) {
// It is safe to initiate lastCommittedIndex as last log one because in case of single peer no one will discard
// log records on leader election.
// Fix https://github.com/sofastack/sofa-jraft/issues/1049
lastCommittedIndex = Math.max(lastCommittedIndex, this.logManager.getLastLogIndex());
}

ballotBoxOpts.setLastCommittedIndex(lastCommittedIndex);
LOG.info("Node {} init ballot box's lastCommittedIndex={}.", getNodeId(), lastCommittedIndex);
return this.ballotBox.init(ballotBoxOpts);
}

@OnlyForTest
void tryElectSelf() {
this.writeLock.lock();
// unlock in electSelf
electSelf();
}

@OnlyForTest
BallotBox getBallotBox() {
return this.ballotBox;
}

// should be in writeLock
private void electSelf() {
long oldTerm;
Expand Down Expand Up @@ -1360,6 +1385,18 @@ public void run(final Status status) {
}

private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
if (!this.logManager.hasAvailableCapacityToAppendEntries(1)) {
// It's overload, fail-fast
final List<Closure> dones = tasks.stream().map(ele -> ele.done).filter(Objects::nonNull)
.collect(Collectors.toList());
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : dones) {
done.run(new Status(RaftError.EBUSY, "Node %s log manager is busy.", this.getNodeId()));
}
});
return;
}

this.writeLock.lock();
try {
final int size = tasks.size();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,20 @@ public class BallotBoxOptions {
private FSMCaller waiter;
private ClosureQueue closureQueue;
private NodeId nodeId;
private long lastCommittedIndex;

public NodeId getNodeId() {
return nodeId;
}

public long getLastCommittedIndex() {
return lastCommittedIndex;
}

public void setLastCommittedIndex(long lastCommittedIndex) {
this.lastCommittedIndex = lastCommittedIndex;
}

public void setNodeId(NodeId nodeId) {
this.nodeId = nodeId;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ public interface SnapshotExecutor extends Lifecycle<SnapshotExecutorOptions>, De
*/
void doSnapshot(final Closure done);

/**
* Returns the last snapshot index.
* @return
*/
long getLastSnapshotIndex();

/**
* Start to snapshot StateMachine immediately with the latest log applied to state machine.
* You MUST call this method in {@link StateMachine} callback methods to trigger a snapshot synchronously, otherwise throws {@link IllegalStateException}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -112,10 +112,6 @@ public long getLastSnapshotTerm() {
return this.lastSnapshotTerm;
}

/**
* Only for test
*/
@OnlyForTest
public long getLastSnapshotIndex() {
return this.lastSnapshotIndex;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ public void setup() {
this.closureQueue = new ClosureQueueImpl(GROUP_ID);
opts.setClosureQueue(this.closureQueue);
opts.setWaiter(this.waiter);
opts.setLastCommittedIndex(0);
box = new BallotBox();
assertTrue(box.init(opts));
}
Expand All @@ -60,11 +61,25 @@ public void teardown() {
box.shutdown();
}

@Test
public void initWithLastCommittedIndex() {
BallotBoxOptions opts = new BallotBoxOptions();
this.closureQueue = new ClosureQueueImpl(GROUP_ID);
opts.setClosureQueue(this.closureQueue);
opts.setWaiter(this.waiter);
opts.setLastCommittedIndex(9);
box = new BallotBox();
assertTrue(box.init(opts));

assertEquals(box.getLastCommittedIndex(), 9);
}

@Test
public void testResetPendingIndex() {
assertEquals(0, closureQueue.getFirstIndex());
assertEquals(0, box.getPendingIndex());
assertTrue(box.resetPendingIndex(1));
assertEquals(0, box.getLastCommittedIndex());
assertEquals(1, closureQueue.getFirstIndex());
assertEquals(1, box.getPendingIndex());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,12 @@
*/
package com.alipay.sofa.jraft.storage;

import java.io.File;
import java.io.IOException;
import java.nio.file.Paths;
import java.util.List;

import org.apache.commons.io.FileUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -48,6 +50,11 @@ public class HybridLogStorage implements LogStorage {
private long thresholdIndex;

public HybridLogStorage(final String path, final StoreOptions storeOptions, final LogStorage oldStorage) {
try {
FileUtils.forceMkdir(new File(path));
} catch (IOException e) {
LOG.error("Failed to create directory at path: {}", path, e);
}
final String newLogStoragePath = Paths.get(path, storeOptions.getStoragePath()).toString();
this.newLogStorage = new LogitLogStorage(newLogStoragePath, storeOptions);
this.oldLogStorage = oldStorage;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;

Expand Down Expand Up @@ -2067,6 +2068,8 @@ public void testRestoreSnasphot() throws Exception {
cluster.waitLeader();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());
assertTrue(cluster.start(leaderAddr));
final NodeImpl newLeader = (NodeImpl) cluster.getLeader();
assertEquals(newLeader.getBallotBox().getLastCommittedIndex(), newLeader.getLastCommittedIndex());
cluster.ensureSame();
assertEquals(0, cluster.getLeaderFsm().getLoadSnapshotTimes());

Expand Down Expand Up @@ -2886,6 +2889,8 @@ public void readCommittedUserLog() throws Exception {
final Node leader = cluster.getLeader();
assertNotNull(leader);
this.sendTestTaskAndWait(leader);
// Waits for applying to FSM
Thread.sleep(500);

// index == 1 is a CONFIGURATION log, so real_index will be 2 when returned.
UserLog userLog = leader.readCommittedUserLog(1);
Expand Down Expand Up @@ -3402,9 +3407,13 @@ public void testChangePeersChaosApplyTasks() throws Exception {
for (final ChangeArg arg : args) {
arg.stop = true;
}
for (final Future<?> future : futures) {
future.get();
}
for (final Future<?> future : futures) {
try {
future.get(20, TimeUnit.SECONDS);
} catch (TimeoutException e) {
// ignore
}
}

cluster.waitLeader();
final SynchronizedClosure done = new SynchronizedClosure();
Expand Down

0 comments on commit 8cdde76

Please sign in to comment.