Skip to content

Commit

Permalink
support get live learner from follower node and transfer learner from…
Browse files Browse the repository at this point in the history
… follower to other node
  • Loading branch information
linxin committed Nov 20, 2024
1 parent 7b0075b commit 7123f00
Show file tree
Hide file tree
Showing 3 changed files with 88 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -373,7 +374,7 @@ public Status removeLearners(final String groupId, final Configuration conf, fin

@Override
public Status learner2Follower(final String groupId, final Configuration conf, final PeerId learner) {
Status status = removeLearners(groupId, conf, Arrays.asList(learner));
Status status = removeLearners(groupId, conf, Collections.singletonList(learner));
if (status.isOk()) {
status = addPeer(groupId, conf, new PeerId(learner.getIp(), learner.getPort()));
}
Expand Down Expand Up @@ -529,7 +530,7 @@ public List<PeerId> getLearners(final String groupId, final Configuration conf)

@Override
public List<PeerId> getAliveLearners(final String groupId, final Configuration conf) {
return getPeers(groupId, conf, true, true);
return getLiveLearners(groupId, conf);
}

@Override
Expand Down Expand Up @@ -613,6 +614,44 @@ private PeerId findTargetPeer(final PeerId self, final String groupId, final Con
return PeerId.emptyPeer();
}

private List<PeerId> getLiveLearners(final String groupId, final Configuration conf) {
List<PeerId> liveFollowers = getPeers(groupId, conf, false, true);
if (liveFollowers.isEmpty()) {
return Collections.emptyList();
}

final List<PeerId> learnersList = new ArrayList<>();
for (PeerId follower : liveFollowers) {
final GetPeersRequest.Builder rb = GetPeersRequest.newBuilder() //
.setGroupId(groupId) //
.setLeaderId(follower.toString()) // send request to follower
.setOnlyAlive(true); // get alive learner

try {
final Message result = this.cliClientService.getPeers(follower.getEndpoint(), rb.build(), null).get(
this.cliOptions.getTimeoutMs() <= 0 ? this.cliOptions.getRpcDefaultTimeout()
: this.cliOptions.getTimeoutMs(), TimeUnit.MILLISECONDS);
if (result instanceof GetPeersResponse) {
final GetPeersResponse resp = (GetPeersResponse) result;
final ProtocolStringList responsePeers = resp.getLearnersList();
for (final String peerIdStr : responsePeers) {
final PeerId newPeer = new PeerId();
newPeer.parse(peerIdStr);
learnersList.add(newPeer);
}
} else {
final ErrorResponse resp = (ErrorResponse) result;
throw new JRaftException(resp.getErrorMsg());
}
} catch (final JRaftException e) {
throw e;
} catch (final Exception e) {
throw new JRaftException(e);
}
}
return learnersList;
}

private List<PeerId> getPeers(final String groupId, final Configuration conf, final boolean returnLearners,
final boolean onlyGetAlive) {
Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id");
Expand Down
42 changes: 42 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -541,6 +541,48 @@ public NodeImpl(final String groupId, final PeerId serverId) {
this.wakingCandidate = null;
final int num = GLOBAL_NUM_NODES.incrementAndGet();
LOG.info("The number of active nodes increment to {}.", num);

addReplicatorStateListener(new Replicator.ReplicatorStateListener() {
@Override
public void onCreated(PeerId peer) {

}

@Override
public void onError(PeerId peer, Status status) {

}

@Override
public void onDestroyed(PeerId peer) {
// if follower destroyed, transfer learner to other node
List<PeerId> learners = new ArrayList<>();
Map<PeerId, PeerId> learnerWithSource = getCurrentConf().getLearners();
for (Map.Entry<PeerId, PeerId> entry : learnerWithSource.entrySet()) {
if (peer.equals(entry.getValue())) {
learners.add(entry.getKey());
}
}
if (learners.isEmpty()) {
return;
}

LOG.info("Transfer learners to another node because Node {} fails, learners: {}", peer, learners);
addLearners(learners, new Closure() {
@Override
public void run(Status status) {
if (status.isOk()) {
LOG.info("Finish transfer learners to another node because Node {} fails, learners: {}",
peer, learners);
} else {
LOG.error(
"Failed to transfer learners to another node because Node {} fails, status: {}, learners: {}",
peer, status, learners);
}
}
});
}
});
}

private boolean initSnapshotStorage() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ protected String getGroupId(final RemoveLearnersRequest request) {
protected Message processRequest0(final CliRequestContext ctx, final RemoveLearnersRequest request,
final RpcRequestClosure done) {
final Map<PeerId, PeerId> oldLearners = ctx.node.listLearners();
final List<PeerId> removeingLearners = new ArrayList<>(request.getLearnersCount());
final List<PeerId> removingLearners = new ArrayList<>(request.getLearnersCount());

for (final String peerStr : request.getLearnersList()) {
final PeerId peer = new PeerId();
Expand All @@ -64,20 +64,20 @@ protected Message processRequest0(final CliRequestContext ctx, final RemoveLearn
.responseFactory() //
.newResponse(defaultResp(), RaftError.EINVAL, "Fail to parse peer id %s", peerStr);
}
removeingLearners.add(peer);
removingLearners.add(peer);
}

LOG.info("Receive RemoveLearnersRequest to {} from {}, removing {}.", ctx.node.getNodeId(),
done.getRpcCtx().getRemoteAddress(), removeingLearners);
ctx.node.removeLearners(removeingLearners, status -> {
done.getRpcCtx().getRemoteAddress(), removingLearners);
ctx.node.removeLearners(removingLearners, status -> {
if (!status.isOk()) {
done.run(status);
} else {
final LearnersOpResponse.Builder rb = LearnersOpResponse.newBuilder();

for (final PeerId peer : oldLearners.keySet()) {
rb.addOldLearners(peer.toString());
if (!removeingLearners.contains(peer)) {
if (!removingLearners.contains(peer)) {
rb.addNewLearners(peer.toString());
}
}
Expand Down

0 comments on commit 7123f00

Please sign in to comment.