diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/CliServiceImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/CliServiceImpl.java index 0182b76d6..3311c6cc0 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/CliServiceImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/CliServiceImpl.java @@ -20,6 +20,7 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -373,7 +374,7 @@ public Status removeLearners(final String groupId, final Configuration conf, fin @Override public Status learner2Follower(final String groupId, final Configuration conf, final PeerId learner) { - Status status = removeLearners(groupId, conf, Arrays.asList(learner)); + Status status = removeLearners(groupId, conf, Collections.singletonList(learner)); if (status.isOk()) { status = addPeer(groupId, conf, new PeerId(learner.getIp(), learner.getPort())); } @@ -529,7 +530,7 @@ public List getLearners(final String groupId, final Configuration conf) @Override public List getAliveLearners(final String groupId, final Configuration conf) { - return getPeers(groupId, conf, true, true); + return getLiveLearners(groupId, conf); } @Override @@ -613,6 +614,44 @@ private PeerId findTargetPeer(final PeerId self, final String groupId, final Con return PeerId.emptyPeer(); } + private List getLiveLearners(final String groupId, final Configuration conf) { + List liveFollowers = getPeers(groupId, conf, false, true); + if (liveFollowers.isEmpty()) { + return Collections.emptyList(); + } + + final List learnersList = new ArrayList<>(); + for (PeerId follower : liveFollowers) { + final GetPeersRequest.Builder rb = GetPeersRequest.newBuilder() // + .setGroupId(groupId) // + .setLeaderId(follower.toString()) // send request to follower + .setOnlyAlive(true); // get alive learner + + try { + final Message result = this.cliClientService.getPeers(follower.getEndpoint(), rb.build(), null).get( + this.cliOptions.getTimeoutMs() <= 0 ? this.cliOptions.getRpcDefaultTimeout() + : this.cliOptions.getTimeoutMs(), TimeUnit.MILLISECONDS); + if (result instanceof GetPeersResponse) { + final GetPeersResponse resp = (GetPeersResponse) result; + final ProtocolStringList responsePeers = resp.getLearnersList(); + for (final String peerIdStr : responsePeers) { + final PeerId newPeer = new PeerId(); + newPeer.parse(peerIdStr); + learnersList.add(newPeer); + } + } else { + final ErrorResponse resp = (ErrorResponse) result; + throw new JRaftException(resp.getErrorMsg()); + } + } catch (final JRaftException e) { + throw e; + } catch (final Exception e) { + throw new JRaftException(e); + } + } + return learnersList; + } + private List getPeers(final String groupId, final Configuration conf, final boolean returnLearners, final boolean onlyGetAlive) { Requires.requireTrue(!StringUtils.isBlank(groupId), "Blank group id"); diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java index 76557ff83..b5364c78d 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/core/NodeImpl.java @@ -541,6 +541,48 @@ public NodeImpl(final String groupId, final PeerId serverId) { this.wakingCandidate = null; final int num = GLOBAL_NUM_NODES.incrementAndGet(); LOG.info("The number of active nodes increment to {}.", num); + + addReplicatorStateListener(new Replicator.ReplicatorStateListener() { + @Override + public void onCreated(PeerId peer) { + + } + + @Override + public void onError(PeerId peer, Status status) { + + } + + @Override + public void onDestroyed(PeerId peer) { + // if follower destroyed, transfer learner to other node + List learners = new ArrayList<>(); + Map learnerWithSource = getCurrentConf().getLearners(); + for (Map.Entry entry : learnerWithSource.entrySet()) { + if (peer.equals(entry.getValue())) { + learners.add(entry.getKey()); + } + } + if (learners.isEmpty()) { + return; + } + + LOG.info("Transfer learners to another node because Node {} fails, learners: {}", peer, learners); + addLearners(learners, new Closure() { + @Override + public void run(Status status) { + if (status.isOk()) { + LOG.info("Finish transfer learners to another node because Node {} fails, learners: {}", + peer, learners); + } else { + LOG.error( + "Failed to transfer learners to another node because Node {} fails, status: {}, learners: {}", + peer, status, learners); + } + } + }); + } + }); } private boolean initSnapshotStorage() { diff --git a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java index 2ca1205f7..8650911bc 100644 --- a/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java +++ b/jraft-core/src/main/java/com/alipay/sofa/jraft/rpc/impl/cli/RemoveLearnersRequestProcessor.java @@ -55,7 +55,7 @@ protected String getGroupId(final RemoveLearnersRequest request) { protected Message processRequest0(final CliRequestContext ctx, final RemoveLearnersRequest request, final RpcRequestClosure done) { final Map oldLearners = ctx.node.listLearners(); - final List removeingLearners = new ArrayList<>(request.getLearnersCount()); + final List removingLearners = new ArrayList<>(request.getLearnersCount()); for (final String peerStr : request.getLearnersList()) { final PeerId peer = new PeerId(); @@ -64,12 +64,12 @@ protected Message processRequest0(final CliRequestContext ctx, final RemoveLearn .responseFactory() // .newResponse(defaultResp(), RaftError.EINVAL, "Fail to parse peer id %s", peerStr); } - removeingLearners.add(peer); + removingLearners.add(peer); } LOG.info("Receive RemoveLearnersRequest to {} from {}, removing {}.", ctx.node.getNodeId(), - done.getRpcCtx().getRemoteAddress(), removeingLearners); - ctx.node.removeLearners(removeingLearners, status -> { + done.getRpcCtx().getRemoteAddress(), removingLearners); + ctx.node.removeLearners(removingLearners, status -> { if (!status.isOk()) { done.run(status); } else { @@ -77,7 +77,7 @@ protected Message processRequest0(final CliRequestContext ctx, final RemoveLearn for (final PeerId peer : oldLearners.keySet()) { rb.addOldLearners(peer.toString()); - if (!removeingLearners.contains(peer)) { + if (!removingLearners.contains(peer)) { rb.addNewLearners(peer.toString()); } }