Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implementing Flexible Raft with NWR #1040

Open
wants to merge 17 commits into
base: master
Choose a base branch
from
12 changes: 12 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/CliService.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,18 @@
*/
public interface CliService extends Lifecycle<CliOptions> {

/**
* Reset the size of the read or write factor of a raft group in flexible mode
*
* @param groupId the raft group id
* @param conf current configuration
* @param readFactor read factor in flexible raft mode
* @param writeFactor write factor in flexible raft mode
* @return operation status
*/
Status resetFactor(final String groupId, final Configuration conf, final Integer readFactor,
1294566108 marked this conversation as resolved.
Show resolved Hide resolved
final Integer writeFactor);

/**
* Add a new peer into the replicating group which consists of |conf|.
* return OK status when success.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.core.NodeImpl;
import com.alipay.sofa.jraft.entity.BallotFactory;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BootstrapOptions;
import com.alipay.sofa.jraft.util.Endpoint;
Expand Down Expand Up @@ -112,6 +113,9 @@ public static Configuration getConfiguration(final String s) {
return conf;
}
if (conf.parse(s)) {
conf.setEnableFlexible(false);
Quorum quorum = BallotFactory.buildMajorityQuorum(conf.size());
conf.setQuorum(quorum);
return conf;
}
throw new IllegalArgumentException("Invalid conf str:" + s);
Expand Down
12 changes: 12 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,18 @@ public interface Node extends Lifecycle<NodeOptions>, Describer {
*/
void changePeers(final Configuration newPeers, final Closure done);

/**
* This method can be called to reset the size of the read or write factor
*
* It should be noted that we cannot change the factory size while changing
* the number of cluster nodes.
*
* @param readFactor read factor for flexible raft
* @param writeFactor write factor for flexible raft
* @since 1.3.14
*/
void resetFactor(final Integer readFactor, final Integer writeFactor, final Closure done);

/**
* Reset the configuration of this node individually, without any replication
* to other peers before this node becomes the leader. This function is
Expand Down
67 changes: 67 additions & 0 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/Quorum.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.jraft;

/**
* @author Akai
1294566108 marked this conversation as resolved.
Show resolved Hide resolved
*/
public class Quorum {
private int w;

private int r;

public Quorum(int w, int r) {
this.w = w;
this.r = r;
}

public int getW() {
return w;
}

public void setW(int w) {
this.w = w;
}

public int getR() {
return r;
}

public void setR(int r) {
this.r = r;
}

@Override
public String toString() {
return "Quorum{w=" + w + ", r=" + r + '}';
}

@Override
public int hashCode() {
return super.hashCode();
1294566108 marked this conversation as resolved.
Show resolved Hide resolved
}

@Override
public boolean equals(Object obj) {
if (obj instanceof Quorum) {
Quorum quorum = (Quorum) obj;
return quorum.getR() == this.getR() && quorum.getW() == this.getW();
}
return false;
}

}
108 changes: 94 additions & 14 deletions jraft-core/src/main/java/com/alipay/sofa/jraft/conf/Configuration.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,32 +22,43 @@
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.Quorum;
import com.alipay.sofa.jraft.util.Copiable;
import com.alipay.sofa.jraft.util.Requires;

/**
* A configuration with a set of peers.
* @author boyan ([email protected])
* @author Akai
*
* 2018-Mar-15 11:00:26 AM
*/
public class Configuration implements Iterable<PeerId>, Copiable<Configuration> {

private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);
private static final Logger LOG = LoggerFactory.getLogger(Configuration.class);

private static final String LEARNER_POSTFIX = "/learner";
private static final String LEARNER_POSTFIX = "/learner";

private List<PeerId> peers = new ArrayList<>();
private Quorum quorum;

private Integer readFactor;

private Integer writeFactor;

private Boolean isEnableFlexible = false;
1294566108 marked this conversation as resolved.
Show resolved Hide resolved
1294566108 marked this conversation as resolved.
Show resolved Hide resolved

private List<PeerId> peers = new ArrayList<>();

// use LinkedHashSet to keep insertion order.
private LinkedHashSet<PeerId> learners = new LinkedHashSet<>();
private LinkedHashSet<PeerId> learners = new LinkedHashSet<>();

public Configuration() {
super();
Expand All @@ -68,16 +79,34 @@ public Configuration(final Iterable<PeerId> conf) {
* @param conf configuration
*/
public Configuration(final Configuration conf) {
this(conf.getPeers(), conf.getLearners());
this(conf.getPeers(), conf.getLearners(), conf.getQuorum(), conf.getReadFactor(), conf.getWriteFactor(), conf
.isEnableFlexible());
}

/**
* Construct a Configuration instance with peers and learners.
*
* @param conf peers configuration
* @param learners learners
* @since 1.3.0
* @param conf peers configuration
* @param learners learners
* @param quorum quorum
* @param readFactor read factor
* @param writeFactor write factor
* @param isEnableFlexible enable flexible mode or not
* @since 1.3.14
*/
public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learners, final Quorum quorum,
final Integer readFactor, final Integer writeFactor, final Boolean isEnableFlexible) {
Requires.requireNonNull(conf, "conf");
for (final PeerId peer : conf) {
this.peers.add(peer.copy());
}
addLearners(learners);
this.quorum = quorum;
this.readFactor = readFactor;
this.writeFactor = writeFactor;
this.isEnableFlexible = isEnableFlexible;
}

public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learners) {
Requires.requireNonNull(conf, "conf");
for (final PeerId peer : conf) {
Expand All @@ -86,6 +115,38 @@ public Configuration(final Iterable<PeerId> conf, final Iterable<PeerId> learner
addLearners(learners);
}

public Integer getReadFactor() {
return readFactor;
}

public void setReadFactor(Integer readFactor) {
this.readFactor = readFactor;
}

public Integer getWriteFactor() {
return writeFactor;
}

public void setWriteFactor(Integer writeFactor) {
this.writeFactor = writeFactor;
}

public Quorum getQuorum() {
return this.quorum;
}

public void setQuorum(Quorum quorum) {
this.quorum = quorum;
}

public Boolean isEnableFlexible() {
return isEnableFlexible;
}

public void setEnableFlexible(Boolean enableFlexible) {
isEnableFlexible = enableFlexible;
}

public void setLearners(final LinkedHashSet<PeerId> learners) {
this.learners = learners;
}
Expand Down Expand Up @@ -148,7 +209,7 @@ public List<PeerId> listLearners() {

@Override
public Configuration copy() {
return new Configuration(this.peers, this.learners);
return new Configuration(this);
}

/**
Expand Down Expand Up @@ -251,12 +312,32 @@ public boolean equals(final Object obj) {
if (this.peers == null) {
return other.peers == null;
} else {
return this.peers.equals(other.peers);
return this.peers.equals(other.peers) && Objects.equals(this.quorum, other.quorum)
&& Objects.equals(this.readFactor, other.readFactor)
&& Objects.equals(this.writeFactor, other.writeFactor)
&& Objects.equals(this.isEnableFlexible, other.isEnableFlexible);
}
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder(toBasicString());

if (Objects.nonNull(isEnableFlexible) && !isEmpty()) {
sb.append(",isEnableFlexible:").append(isEnableFlexible);
}

if (Objects.nonNull(readFactor) || Objects.nonNull(writeFactor)) {
sb.append(",readFactor:").append(readFactor).append(",writeFactor:").append(writeFactor);
}

if (Objects.nonNull(quorum)) {
sb.append(",quorum:").append(quorum);
}
return sb.toString();
}

public String toBasicString() {
final StringBuilder sb = new StringBuilder();
final List<PeerId> peers = listPeers();
int i = 0;
Expand All @@ -278,7 +359,6 @@ public String toString() {
}
i++;
}

return sb.toString();
}

Expand Down Expand Up @@ -311,9 +391,9 @@ public boolean parse(final String conf) {
}

/**
* Get the difference between |*this| and |rhs|
* |included| would be assigned to |*this| - |rhs|
* |excluded| would be assigned to |rhs| - |*this|
* Get the difference between |*this| and |rhs|
* |included| would be assigned to |*this| - |rhs|
* |excluded| would be assigned to |rhs| - |*this|
*/
public void diff(final Configuration rhs, final Configuration included, final Configuration excluded) {
included.peers = new ArrayList<>(this.peers);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import javax.annotation.concurrent.ThreadSafe;

import com.alipay.sofa.jraft.entity.Ballot;
import com.alipay.sofa.jraft.entity.PeerId;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -28,8 +30,6 @@
import com.alipay.sofa.jraft.Lifecycle;
import com.alipay.sofa.jraft.closure.ClosureQueue;
import com.alipay.sofa.jraft.conf.Configuration;
import com.alipay.sofa.jraft.entity.Ballot;
import com.alipay.sofa.jraft.entity.PeerId;
import com.alipay.sofa.jraft.option.BallotBoxOptions;
import com.alipay.sofa.jraft.util.Describer;
import com.alipay.sofa.jraft.util.OnlyForTest;
Expand Down Expand Up @@ -114,12 +114,13 @@ public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final
final long startAt = Math.max(this.pendingIndex, firstLogIndex);
Ballot.PosHint hint = new Ballot.PosHint();
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
hint = bl.grant(peer, hint);
if (bl.isGranted()) {
final Ballot ballot = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
hint = ballot.grant(peer, hint);
if (ballot.isGranted()) {
lastCommittedIndex = logIndex;
}
}

if (lastCommittedIndex == 0) {
return true;
}
Expand Down Expand Up @@ -198,8 +199,8 @@ public boolean resetPendingIndex(final long newPendingIndex) {
* @return returns true on success
*/
public boolean appendPendingTask(final Configuration conf, final Configuration oldConf, final Closure done) {
final Ballot bl = new Ballot();
if (!bl.init(conf, oldConf)) {
final Ballot ballot = new Ballot();
if (!ballot.init(conf, oldConf)) {
LOG.error("Fail to init ballot.");
return false;
}
Expand All @@ -209,7 +210,7 @@ public boolean appendPendingTask(final Configuration conf, final Configuration o
LOG.error("Node {} fail to appendingTask, pendingIndex={}.", this.opts.getNodeId(), this.pendingIndex);
return false;
}
this.pendingMetaQueue.add(bl);
this.pendingMetaQueue.add(ballot);
this.closureQueue.appendPendingClosure(done);
return true;
} finally {
Expand Down
Loading
Loading