Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add ping executor #1068

Merged
merged 1 commit into from
Jan 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,22 @@ public static RpcServer createRaftRpcServer(final Endpoint endpoint) {
*/
public static RpcServer createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,
final Executor cliExecutor) {
return createRaftRpcServer(endpoint, raftExecutor, cliExecutor, null);
}

/**
* Creates a raft RPC server with executors to handle requests.
*
* @param endpoint server address to bind
* @param raftExecutor executor to handle RAFT requests.
* @param cliExecutor executor to handle CLI service requests.
* @param pingExecutor executor to handle ping requests.
* @return a rpc server instance
*/
public static RpcServer createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,
final Executor cliExecutor, final Executor pingExecutor) {
final RpcServer rpcServer = RpcFactoryHelper.rpcFactory().createRpcServer(endpoint);
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor);
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor, pingExecutor);
return rpcServer;
}

Expand All @@ -94,6 +108,19 @@ public static void addRaftRequestProcessors(final RpcServer rpcServer) {
*/
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor) {
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor, null);
}

/**
* Adds RAFT and CLI service request processors.
*
* @param rpcServer rpc server instance
* @param raftExecutor executor to handle RAFT requests.
* @param cliExecutor executor to handle CLI service requests.
* @param pingExecutor executor to handle ping requests.
*/
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor, final Executor pingExecutor) {
// raft core processors
final AppendEntriesRequestProcessor appendEntriesRequestProcessor = new AppendEntriesRequestProcessor(
raftExecutor);
Expand All @@ -102,7 +129,7 @@ public static void addRaftRequestProcessors(final RpcServer rpcServer, final Exe
rpcServer.registerProcessor(new GetFileRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new InstallSnapshotRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new RequestVoteRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new PingRequestProcessor());
rpcServer.registerProcessor(new PingRequestProcessor(pingExecutor));
rpcServer.registerProcessor(new TimeoutNowRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new ReadIndexRequestProcessor(raftExecutor));
// raft cli service
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import com.alipay.sofa.jraft.rpc.RpcRequests.PingRequest;
import com.alipay.sofa.jraft.util.RpcFactoryHelper;

import java.util.concurrent.Executor;

/**
* Ping request processor.
*
Expand All @@ -30,6 +32,12 @@
*/
public class PingRequestProcessor implements RpcProcessor<PingRequest> {

private final Executor executor;

public PingRequestProcessor(Executor executor) {
this.executor = executor;
}

@Override
public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
rpcCtx.sendResponse( //
Expand All @@ -42,4 +50,9 @@ public void handleRequest(final RpcContext rpcCtx, final PingRequest request) {
public String interest() {
return PingRequest.class.getName();
}

@Override
public Executor executor() {
return this.executor;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.registerProcessor(new PingRequestProcessor(null));
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PingRequestProcessorTest {

@Test
public void testHandlePing() throws Exception {
PingRequestProcessor processor = new PingRequestProcessor();
PingRequestProcessor processor = new PingRequestProcessor(null);
MockAsyncContext ctx = new MockAsyncContext();
processor.handleRequest(ctx, TestUtils.createPingRequest());
ErrorResponse response = (ErrorResponse) ctx.getResponseObject();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public void simulation() throws InterruptedException {
ProtobufMsgFactory.load();

final RpcServer server = RpcFactoryHelper.rpcFactory().createRpcServer(new Endpoint("127.0.0.1", 19991));
server.registerProcessor(new PingRequestProcessor());
server.registerProcessor(new PingRequestProcessor(null));
server.init(null);

final Endpoint target = new Endpoint("my.test.host1.com", 19991);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public class PingRequestProcessorTest {

@Test
public void testHandlePing() throws Exception {
PingRequestProcessor processor = new PingRequestProcessor();
PingRequestProcessor processor = new PingRequestProcessor(null);
MockAsyncContext ctx = new MockAsyncContext();
processor.handleRequest(ctx, TestUtils.createPingRequest());
ErrorResponse response = (ErrorResponse) ctx.getResponseObject();
Expand Down
Loading