Skip to content

Commit

Permalink
triple pojo mode support stream
Browse files Browse the repository at this point in the history
  • Loading branch information
liujianjun.ljj committed May 13, 2024
1 parent df27427 commit c7435df
Show file tree
Hide file tree
Showing 30 changed files with 1,118 additions and 635 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ protected void decorateRequest(SofaRequest request) {

if (!consumerConfig.isGeneric()) {
// 找到调用类型, generic的时候类型在filter里进行判断
request.setInvokeType(consumerConfig.getMethodInvokeType(request));
request.setInvokeType(consumerConfig.getMethodInvokeType(request.getMethodName()));
}

RpcInvokeContext invokeCtx = RpcInvokeContext.peekContext();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -657,10 +657,6 @@ else if (RpcConstants.INVOKER_TYPE_FUTURE.equals(invokeType)) {
// 放入线程上下文
RpcInternalContext.getContext().setFuture(future);
response = buildEmptyResponse(request);
} else if (RpcConstants.INVOKER_TYPE_CLIENT_STREAMING.equals(invokeType)
|| RpcConstants.INVOKER_TYPE_BI_STREAMING.equals(invokeType)
|| RpcConstants.INVOKER_TYPE_SERVER_STREAMING.equals(invokeType)) {
response = transport.syncSend(request, Integer.MAX_VALUE);
} else {
throw new SofaRpcException(RpcErrorType.CLIENT_UNDECLARED_ERROR, "Unknown invoke type:" + invokeType);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,11 +212,6 @@ public abstract class AbstractInterfaceConfig<T, S extends AbstractInterfaceConf
*/
protected transient volatile Map<String, Object> configValueCache = null;

/**
* 方法调用类型,(方法全名 - 调用类型)
*/
protected transient volatile Map<String, String> methodCallType = null;

/**
* 代理接口类,和T对应,主要针对泛化调用
*/
Expand Down Expand Up @@ -252,21 +247,6 @@ public S setProxyClass(Class proxyClass) {
return castThis();
}

/**
* Cache the call type of interface methods
*/
protected void loadMethodCallType(Class<?> interfaceClass){
Method[] methods = interfaceClass.getDeclaredMethods();
this.methodCallType = new ConcurrentHashMap<>();
for(Method method :methods) {
methodCallType.put(method.getName(),MethodConfig.mapStreamType(method,RpcConstants.INVOKER_TYPE_SYNC));
}
}

public String getMethodCallType(String methodName) {
return methodCallType.get(methodName);
}

/**
* Gets application.
*
Expand Down Expand Up @@ -1035,7 +1015,7 @@ public Object getMethodConfigValue(String methodName, String configKey) {
* @param key the key
* @return the string
*/
protected String buildmkey(String methodName, String key) {
private String buildmkey(String methodName, String key) {
return RpcConstants.HIDE_KEY_PREFIX + methodName + RpcConstants.HIDE_KEY_PREFIX + key;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@
import com.alipay.sofa.rpc.common.utils.ExceptionUtils;
import com.alipay.sofa.rpc.common.utils.StringUtils;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.core.request.SofaRequest;
import com.alipay.sofa.rpc.listener.ChannelListener;
import com.alipay.sofa.rpc.listener.ConsumerStateListener;
import com.alipay.sofa.rpc.listener.ProviderInfoListener;
Expand Down Expand Up @@ -936,62 +935,12 @@ public SofaResponseCallback getMethodOnreturn(String methodName) {
/**
* Gets the call type corresponding to the method name
*
* @param sofaRequest the request
* @param methodName the method name
* @return the call type
*/
public String getMethodInvokeType(SofaRequest sofaRequest) {
String methodName = sofaRequest.getMethodName();

String invokeType = (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE, null);

if (invokeType == null) {
invokeType = getAndCacheCallType(sofaRequest);
}

return invokeType;
}

/**
* Get and cache the call type of certain method
* @param request RPC request
* @return request call type
*/
public String getAndCacheCallType(SofaRequest request) {
Method method = request.getMethod();
String callType = MethodConfig
.mapStreamType(
method,
(String) getMethodConfigValue(request.getMethodName(), RpcConstants.CONFIG_KEY_INVOKE_TYPE,
getInvokeType())
);
//Method level config
updateAttribute(buildMethodConfigKey(request, RpcConstants.CONFIG_KEY_INVOKE_TYPE), callType, true);
return callType;
}

/**
* 通过请求的目标方法构建方法配置key。该key使用内部配置格式。(以'.' 开头)
* @param request RPC请求
* @return 方法配置名称,带方法参数列表
*/
public String buildMethodConfigKey(SofaRequest request, String propertyKey) {
return "." + getMethodSignature(request.getMethod()) + "." + propertyKey;
}

public static String getMethodSignature(Method method) {
Class<?>[] parameterTypes = method.getParameterTypes();
StringBuilder methodSignature = new StringBuilder();
methodSignature.append(method.getName()).append("(");

for (int i = 0; i < parameterTypes.length; i++) {
methodSignature.append(parameterTypes[i].getSimpleName());
if (i < parameterTypes.length - 1) {
methodSignature.append(", ");
}
}

methodSignature.append(")");
return methodSignature.toString();
public String getMethodInvokeType(String methodName) {
return (String) getMethodConfigValue(methodName, RpcConstants.CONFIG_KEY_INVOKE_TYPE,
getInvokeType());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,15 +16,9 @@
*/
package com.alipay.sofa.rpc.config;

import com.alipay.sofa.rpc.common.RpcConstants;
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.core.invoke.SofaResponseCallback;
import com.alipay.sofa.rpc.transport.StreamHandler;

import java.io.Serializable;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

Expand All @@ -49,8 +43,7 @@ public class MethodConfig implements Serializable {
protected Map<String, String> parameters;

/**
* The Timeout. 远程调用超时时间(毫秒)。
* 对于Stream调用,这个时间为整个调用的超时时长,而非stream内单个调用的时长。未在这个时长内完成(调用{@link StreamHandler#onFinish()})的Stream调用会认为超时并抛出异常。
* The Timeout. 远程调用超时时间(毫秒)
*/
protected Integer timeout;

Expand Down Expand Up @@ -333,38 +326,4 @@ public MethodConfig setParameter(String key, String value) {
public String getParameter(String key) {
return parameters == null ? null : parameters.get(key);
}

/**
* Gets the stream call type of certain method
* @param method the method
* @return call type,server/client/bidirectional stream or default value. If not mapped to any stream call type, use the default value
*/
public static String mapStreamType(Method method, String defaultValue){
Class<?>[] paramClasses = method.getParameterTypes();
Class<?> returnClass = method.getReturnType();

int paramLen = paramClasses.length;
String callType;

//BidirectionalStream & ClientStream
if(paramLen > 0 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && StreamHandler.class.isAssignableFrom(returnClass)){
if(paramLen > 1){
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE,"Bidirectional/Client stream method parameters can be only one StreamHandler.");
}
callType = RpcConstants.INVOKER_TYPE_BI_STREAMING;
}
//ServerStream
else if (paramLen > 1 && StreamHandler.class.isAssignableFrom(paramClasses[0]) && void.class == returnClass){
callType = RpcConstants.INVOKER_TYPE_SERVER_STREAMING;
}
else if (StreamHandler.class.isAssignableFrom(returnClass) || Arrays.stream(paramClasses).anyMatch(StreamHandler.class::isAssignableFrom)) {
throw new SofaRpcException(RpcErrorType.CLIENT_CALL_TYPE, "StreamHandler can only at the specified location of parameter. Please check related docs.");
}
//Other call types
else {
callType = defaultValue;
}

return callType;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ public T getRef() {
*/
public ProviderConfig<T> setRef(T ref) {
this.ref = ref;
loadMethodCallType(ref.getClass());
return this;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,21 +69,6 @@ public static SofaRequest buildSofaRequest(Class<?> clazz, Method method, Class[
return request;
}

/**
* 根据一个请求的属性复制一个不包含具体方法实参的请求。
* 复制以下属性:请求接口名、请求方法名、请求方法、方法参数类型
*
* @param sofaRequest 被复制的请求实例
*/
public static SofaRequest copyEmptyRequest(SofaRequest sofaRequest) {
SofaRequest request = new SofaRequest();
request.setInterfaceName(sofaRequest.getInterfaceName());
request.setMethodName(sofaRequest.getMethodName());
request.setMethod(sofaRequest.getMethod());
request.setMethodArgSigs(sofaRequest.getMethodArgSigs());
return request;
}

/**
* 构建rpc错误结果
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,30 +19,30 @@
/**
* StreamHandler, works just like gRPC StreamObserver.
*/
public interface StreamHandler<T> {
public interface SofaStreamObserver<T> {

/**
* Sends a message, or defines the behavior when a message is received.
* <p>This method should never be called after {@link StreamHandler#onFinish()} has been invoked.
* <p>This method should never be called after {@link SofaStreamObserver#onCompleted()} has been invoked.
*/
void onMessage(T message);
void onNext(T message);

/**
* Note: This method MUST be invoked after the transport is complete.
* Failure to do so may result in unexpected errors.
* <p>
* Signals that all messages have been sent/received normally, and closes this stream.
*/
void onFinish();
void onCompleted();

/**
* Signals an exception to terminate this stream, or defines the behavior when an error occurs.
* <p></p>
* Once this method is invoked by one side, it can't send more messages, and the corresponding method on the other side will be triggered.
* Depending on the protocol implementation, it's possible that the other side can still call {@link StreamHandler#onMessage(Object)} after this method has been invoked, although this is not recommended.
* Depending on the protocol implementation, it's possible that the other side can still call {@link SofaStreamObserver#onNext(Object)} after this method has been invoked, although this is not recommended.
* <p></p>
* As a best practice, it is advised not to send any more information once this method is called.
*
*/
void onException(Throwable throwable);
void onError(Throwable throwable);
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public SofaResponse invoke(FilterInvoker invoker, SofaRequest request) throws So

// 修正类型
ConsumerConfig consumerConfig = (ConsumerConfig) invoker.getConfig();
String invokeType = consumerConfig.getMethodInvokeType(request);
String invokeType = consumerConfig.getMethodInvokeType(methodName);
request.setInvokeType(invokeType);
request.addRequestProp(RemotingConstants.HEAD_INVOKE_TYPE, invokeType);
request.addRequestProp(REVISE_KEY, REVISE_VALUE);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,51 +21,51 @@
import com.alipay.sofa.rpc.core.exception.RpcErrorType;
import com.alipay.sofa.rpc.core.exception.SofaRpcException;
import com.alipay.sofa.rpc.transport.ByteArrayWrapperByteBuf;
import com.alipay.sofa.rpc.transport.StreamHandler;
import com.alipay.sofa.rpc.transport.SofaStreamObserver;
import io.grpc.stub.StreamObserver;

/**
* ClientStreamObserverAdapter.
*/
public class ClientStreamObserverAdapter implements StreamObserver<triple.Response> {

private final StreamHandler<Object> streamHandler;
private final SofaStreamObserver<Object> sofaStreamObserver;

private final Serializer serializer;
private final Serializer serializer;

private volatile Class<?> returnType;
private volatile Class<?> returnType;

public ClientStreamObserverAdapter(StreamHandler<Object> streamHandler, byte serializeType) {
this.streamHandler = streamHandler;
public ClientStreamObserverAdapter(SofaStreamObserver<Object> sofaStreamObserver, byte serializeType) {
this.sofaStreamObserver = sofaStreamObserver;
this.serializer = SerializerFactory.getSerializer(serializeType);
}

@Override
public void onNext(triple.Response response) {
byte[] responseDate = response.getData().toByteArray();
byte[] responseData = response.getData().toByteArray();
Object appResponse = null;
String returnTypeName = response.getType();
if (responseDate != null && responseDate.length > 0) {
if (responseData != null && responseData.length > 0) {
if (returnType == null && !returnTypeName.isEmpty()) {
try {
returnType = Class.forName(returnTypeName);
} catch (ClassNotFoundException e) {
throw new SofaRpcException(RpcErrorType.CLIENT_SERIALIZE, "Can not find return type :" + returnType);
}
}
appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseDate), returnType, null);
appResponse = serializer.decode(new ByteArrayWrapperByteBuf(responseData), returnType, null);
}

streamHandler.onMessage(appResponse);
sofaStreamObserver.onNext(appResponse);
}

@Override
public void onError(Throwable t) {
streamHandler.onException(t);
sofaStreamObserver.onError(t);
}

@Override
public void onCompleted() {
streamHandler.onFinish();
sofaStreamObserver.onCompleted();
}
}
Loading

0 comments on commit c7435df

Please sign in to comment.