Skip to content

Commit

Permalink
Merge branch 'EI62027952_20210723' of https://code.alipay.com/alipay-…
Browse files Browse the repository at this point in the history
…com/bolt into release/1.5.9
  • Loading branch information
dbl-x committed Aug 23, 2021
2 parents 8114247 + b564ea6 commit 9520e04
Show file tree
Hide file tree
Showing 10 changed files with 264 additions and 22 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.alipay.sofa</groupId>
<artifactId>bolt</artifactId>
<version>1.5.8</version>
<version>1.5.9</version>
<packaging>jar</packaging>

<name>${project.groupId}:${project.artifactId}</name>
Expand Down
12 changes: 12 additions & 0 deletions src/main/java/com/alipay/remoting/BaseRemoting.java
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,10 @@ protected RemotingCommand invokeSync(final Connection conn, final RemotingComman
final InvokeFuture future = createInvokeFuture(request, request.getInvokeContext());
conn.addInvokeFuture(future);
final int requestId = request.getId();
InvokeContext invokeContext = request.getInvokeContext();
if (null != invokeContext) {
invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_BEFORE_SEND, System.nanoTime());
}
try {
conn.getChannel().writeAndFlush(request).addListener(new ChannelFutureListener() {

Expand All @@ -75,13 +79,21 @@ public void operationComplete(ChannelFuture f) throws Exception {
}

});

if (null != invokeContext) {
invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_AFTER_SEND, System.nanoTime());
}
} catch (Exception e) {
conn.removeInvokeFuture(requestId);
future.putResponse(commandFactory.createSendFailedResponse(conn.getRemoteAddress(), e));
logger.error("Exception caught when sending invocation, id={}", requestId, e);
}
RemotingCommand response = future.waitResponse(timeoutMillis);

if (null != invokeContext) {
invokeContext.put(InvokeContext.BOLT_PROCESS_CLIENT_RECEIVED, System.nanoTime());
}

if (response == null) {
conn.removeInvokeFuture(requestId);
response = this.commandFactory.createTimeoutResponse(conn.getRemoteAddress());
Expand Down
47 changes: 33 additions & 14 deletions src/main/java/com/alipay/remoting/InvokeContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -26,28 +26,47 @@
*/
public class InvokeContext {
// ~~~ invoke context keys of client side
public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip";
public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port";
public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip";
public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port";
public final static String CLIENT_LOCAL_IP = "bolt.client.local.ip";
public final static String CLIENT_LOCAL_PORT = "bolt.client.local.port";
public final static String CLIENT_REMOTE_IP = "bolt.client.remote.ip";
public final static String CLIENT_REMOTE_PORT = "bolt.client.remote.port";
/** time consumed during connection creating, this is a timespan */
public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";
public final static String CLIENT_CONN_CREATETIME = "bolt.client.conn.createtime";
public final static String CLIENT_CONN_CREATE_START_IN_NANO = "bolt.client.conn.create.start.nano";
public final static String CLIENT_CONN_CREATE_END_IN_NANO = "bolt.client.conn.create.end.nano";

// ~~~ invoke context keys of server side
public final static String SERVER_LOCAL_IP = "bolt.server.local.ip";
public final static String SERVER_LOCAL_PORT = "bolt.server.local.port";
public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip";
public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port";
public final static String SERVER_LOCAL_IP = "bolt.server.local.ip";
public final static String SERVER_LOCAL_PORT = "bolt.server.local.port";
public final static String SERVER_REMOTE_IP = "bolt.server.remote.ip";
public final static String SERVER_REMOTE_PORT = "bolt.server.remote.port";

// ~~~ invoke context keys of bolt client and server side
public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
public final static String BOLT_INVOKE_REQUEST_ID = "bolt.invoke.request.id";
/** time consumed start from the time when request arrive, to the time when request be processed, this is a timespan */
public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch";
public final static String BOLT_PROCESS_WAIT_TIME = "bolt.invoke.wait.time";
/** time request arrived in nano seconds , collected by System.nanoTime() */
public final static String BOLT_PROCESS_ARRIVE_HEADER_IN_NANO = "bolt.invoke.request.arrive.header.in.nano";
public final static String BOLT_PROCESS_ARRIVE_BODY_IN_NANO = "bolt.invoke.request.arrive.body.in.nano";

/** time before send request to user thread in nano seconds , collected by System.nanoTime() */
public final static String BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO = "bolt.invoke.before.dispatch.in.nano";

/** time before send request to user thread in nano seconds , collected by System.nanoTime() */
public final static String BOLT_PROCESS_START_PROCESS_IN_NANO = "bolt.invoke.start.process.in.nano";

public final static String BOLT_CUSTOM_SERIALIZER = "bolt.invoke.custom.serializer";
public final static String BOLT_CRC_SWITCH = "bolt.invoke.crc.switch";

/** time before send request to net in nano seconds , collected by System.nanoTime() **/
public final static String BOLT_PROCESS_CLIENT_BEFORE_SEND = "bolt.invoke.client.before.send";
/** time after send request to net in nano seconds , collected by System.nanoTime() **/
public final static String BOLT_PROCESS_CLIENT_AFTER_SEND = "bolt.invoke.client.after.send";
/** time after receive response from server in nano seconds , collected by System.nanoTime() **/
public final static String BOLT_PROCESS_CLIENT_RECEIVED = "bolt.invoke.client.received";

// ~~~ constants
public final static int INITIAL_SIZE = 8;
public final static int INITIAL_SIZE = 8;

/** context */
private ConcurrentHashMap<String, Object> context;
Expand Down
8 changes: 7 additions & 1 deletion src/main/java/com/alipay/remoting/rpc/RpcClientRemoting.java
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

/**
* Rpc client remoting
*
*
* @author xiaomin.cxm
* @version $Id: RpcClientRemoting.java, v 0.1 Apr 14, 2016 11:58:56 AM xiaomin.cxm Exp $
*/
Expand Down Expand Up @@ -119,13 +119,19 @@ protected Connection getConnectionAndInitInvokeContext(Url url, InvokeContext in
throws RemotingException,
InterruptedException {
long start = System.currentTimeMillis();
long startInNano = System.nanoTime();

Connection conn;
try {
conn = this.connectionManager.getAndCreateIfAbsent(url);
} finally {
if (null != invokeContext) {
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATETIME,
(System.currentTimeMillis() - start));
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATE_START_IN_NANO,
startInNano);
invokeContext.putIfAbsent(InvokeContext.CLIENT_CONN_CREATE_END_IN_NANO,
System.nanoTime());
}
}
return conn;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
import java.net.InetSocketAddress;
import java.util.List;

import com.alipay.remoting.util.ThreadLocalArriveTimeHolder;
import io.netty.channel.Channel;
import org.slf4j.Logger;

import com.alipay.remoting.CommandCode;
Expand Down Expand Up @@ -96,6 +98,9 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) thro
byte[] clazz = null;
byte[] header = null;
byte[] content = null;
Channel channel = ctx.channel();
ThreadLocalArriveTimeHolder.arrive(channel, requestId);

if (in.readableBytes() >= classLen + headerLen + contentLen) {
if (classLen > 0) {
clazz = new byte[classLen];
Expand All @@ -114,10 +119,14 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) thro
return;
}
RequestCommand command;

long headerArriveTimeInNano = ThreadLocalArriveTimeHolder.getAndClear(
channel, requestId);

if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
command = new HeartbeatCommand();
} else {
command = createRequestCommand(cmdCode);
command = createRequestCommand(cmdCode, headerArriveTimeInNano);
}
command.setType(type);
command.setVersion(ver2);
Expand Down Expand Up @@ -207,10 +216,12 @@ private ResponseCommand createResponseCommand(short cmdCode) {
return command;
}

private RpcRequestCommand createRequestCommand(short cmdCode) {
private RpcRequestCommand createRequestCommand(short cmdCode, long headerArriveTimeInNano) {
RpcRequestCommand command = new RpcRequestCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
command.setArriveTime(System.currentTimeMillis());
command.setArriveHeaderTimeInNano(headerArriveTimeInNano);
command.setArriveBodyTimeInNano(System.nanoTime());
return command;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@
import java.util.List;

import com.alipay.remoting.log.BoltLoggerFactory;
import com.alipay.remoting.util.ThreadLocalArriveTimeHolder;
import io.netty.channel.Channel;
import org.slf4j.Logger;

import com.alipay.remoting.CommandCode;
Expand Down Expand Up @@ -103,6 +105,9 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) thro
byte[] header = null;
byte[] content = null;

Channel channel = ctx.channel();
ThreadLocalArriveTimeHolder.arrive(channel, requestId);

// decide the at-least bytes length for each version
int lengthAtLeastForV1 = classLen + headerLen + contentLen;
boolean crcSwitchOn = ProtocolSwitch.isOn(
Expand Down Expand Up @@ -135,11 +140,15 @@ public void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) thro
in.resetReaderIndex();
return;
}

long headerArriveTimeInNano = ThreadLocalArriveTimeHolder.getAndClear(
channel, requestId);

RequestCommand command;
if (cmdCode == CommandCode.HEARTBEAT_VALUE) {
command = new HeartbeatCommand();
} else {
command = createRequestCommand(cmdCode);
command = createRequestCommand(cmdCode, headerArriveTimeInNano);
}
command.setType(type);
command.setVersion(ver2);
Expand Down Expand Up @@ -261,10 +270,13 @@ private ResponseCommand createResponseCommand(short cmdCode) {
return command;
}

private RpcRequestCommand createRequestCommand(short cmdCode) {
private RpcRequestCommand createRequestCommand(short cmdCode, long headerArriveTimeInNano) {
RpcRequestCommand command = new RpcRequestCommand();
command.setCmdCode(RpcCommandCode.valueOf(cmdCode));
command.setArriveTime(System.currentTimeMillis());
command.setArriveHeaderTimeInNano(headerArriveTimeInNano);
command.setArriveBodyTimeInNano(System.nanoTime());

return command;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,14 +36,20 @@
*/
public class RpcRequestCommand extends RequestCommand {
/** For serialization */
private static final long serialVersionUID = -4602613826188210946L;
private static final long serialVersionUID = -4602613826188210946L;
private Object requestObject;
private String requestClass;

private CustomSerializer customSerializer;
private Object requestHeader;

private transient long arriveTime = -1;
private transient long arriveTime = -1;

private transient long arriveHeaderTimeInNano = -1;

private transient long arriveBodyTimeInNano = -1;

private transient long beforeEnterQueueTime = -1;

/**
* create request command without id
Expand Down Expand Up @@ -247,4 +253,29 @@ public long getArriveTime() {
public void setArriveTime(long arriveTime) {
this.arriveTime = arriveTime;
}

public long getArriveHeaderTimeInNano() {
return arriveHeaderTimeInNano;
}

public void setArriveHeaderTimeInNano(long arriveHeaderTimeInNano) {
this.arriveHeaderTimeInNano = arriveHeaderTimeInNano;
}

public long getArriveBodyTimeInNano() {
return arriveBodyTimeInNano;
}

public void setArriveBodyTimeInNano(long arriveBodyTimeInNano) {
this.arriveBodyTimeInNano = arriveBodyTimeInNano;
}

public long getBeforeEnterQueueTime() {
return beforeEnterQueueTime;
}

public void setBeforeEnterQueueTime(long beforeEnterQueueTime) {
this.beforeEnterQueueTime = beforeEnterQueueTime;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void process(RemotingContext ctx, RpcRequestCommand cmd, ExecutorService
}

// use the final executor dispatch process task
cmd.setBeforeEnterQueueTime(System.nanoTime());
executor.execute(new ProcessTask(ctx, cmd));
}

Expand Down Expand Up @@ -299,6 +300,14 @@ private void preProcessRemotingContext(RemotingContext ctx, RpcRequestCommand cm
ctx.setRpcCommandType(cmd.getType());
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_WAIT_TIME,
currentTimestamp - cmd.getArriveTime());
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_ARRIVE_HEADER_IN_NANO,
cmd.getArriveHeaderTimeInNano());
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_ARRIVE_BODY_IN_NANO,
cmd.getArriveBodyTimeInNano());
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_BEFORE_DISPATCH_IN_NANO,
cmd.getBeforeEnterQueueTime());
ctx.getInvokeContext().putIfAbsent(InvokeContext.BOLT_PROCESS_START_PROCESS_IN_NANO,
System.nanoTime());
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.remoting.util;

import io.netty.channel.Channel;
import io.netty.util.concurrent.FastThreadLocal;

import java.util.HashMap;
import java.util.Map;
import java.util.WeakHashMap;

/**
* @author zhaowang
* @version : ThreadLocalTimeHolder.java, v 0.1 2021年07月01日 3:05 下午 zhaowang
*/
public class ThreadLocalArriveTimeHolder {
private static FastThreadLocal<WeakHashMap<Channel, Map<Integer, Long>>> arriveTimeInNano = new FastThreadLocal<WeakHashMap<Channel, Map<Integer, Long>>>();

public static void arrive(Channel channel, Integer key) {
Map<Integer, Long> map = getArriveTimeMap(channel);
if (map.get(key) == null) {
map.put(key, System.nanoTime());
}
}

public static long getAndClear(Channel channel, Integer key) {
Map<Integer, Long> map = getArriveTimeMap(channel);
Long result = map.remove(key);
if (result == null) {
return -1;
}
return result;
}

private static Map<Integer, Long> getArriveTimeMap(Channel channel) {
WeakHashMap<Channel, Map<Integer, Long>> map = arriveTimeInNano.get();
if (map == null) {
arriveTimeInNano.set(new WeakHashMap<Channel, Map<Integer, Long>>(256));
map = arriveTimeInNano.get();
}
Map<Integer, Long> subMap = map.get(channel);
if (subMap == null) {
map.put(channel, new HashMap<Integer, Long>());
}
return map.get(channel);
}

}
Loading

0 comments on commit 9520e04

Please sign in to comment.