Skip to content

Commit

Permalink
Support reuse and do not reuse connection for bolt. (#509)
Browse files Browse the repository at this point in the history
* support reuse and do not reuse the connection for bolt
  • Loading branch information
leizhiyuan authored Feb 20, 2019
1 parent adf9544 commit e161d3c
Show file tree
Hide file tree
Showing 9 changed files with 350 additions and 195 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alipay.sofa.rpc.transport.bolt;

import com.alipay.remoting.Connection;
import com.alipay.remoting.Url;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.transport.ClientTransportConfig;

/**
* @author <a href="mailto:[email protected]">zhiyuan.lzy</a>
*/
class AloneBoltClientConnectionManager extends BoltClientConnectionManager {

public AloneBoltClientConnectionManager(boolean addHook) {
super(addHook);
}

@Override
protected void checkLeak() {
//do not check
}

/**
* 通过配置获取长连接
*
* @param rpcClient bolt客户端
* @param transportConfig 传输层配置
* @param url 传输层地址
* @return 长连接
*/
public Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) {
if (rpcClient == null || transportConfig == null || url == null) {
return null;
}
Connection connection;
try {
connection = rpcClient.getConnection(url, url.getConnectTimeout());
} catch (InterruptedException e) {
throw new SofaRpcRuntimeException(e);
} catch (RemotingException e) {
throw new SofaRpcRuntimeException(e);
}
if (connection == null) {
return null;
}

return connection;
}

/**
* 关闭长连接
*
* @param rpcClient bolt客户端
* @param transportConfig 传输层配置
* @param url 传输层地址
*/
public void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) {
if (rpcClient == null || transportConfig == null || url == null) {
return;
}
//TODO do not close
}

@Override
public boolean isConnectionFine(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) {
Connection connection;
try {
connection = rpcClient.getConnection(url, url.getConnectTimeout());
} catch (RemotingException e) {
return false;
} catch (InterruptedException e) {
return false;
}

return connection != null && connection.isFine();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,47 +18,20 @@

import com.alipay.remoting.Connection;
import com.alipay.remoting.Url;
import com.alipay.remoting.exception.RemotingException;
import com.alipay.remoting.rpc.RpcClient;
import com.alipay.sofa.rpc.base.Destroyable;
import com.alipay.sofa.rpc.common.annotation.VisibleForTesting;
import com.alipay.sofa.rpc.common.utils.CommonUtils;
import com.alipay.sofa.rpc.common.utils.NetUtils;
import com.alipay.sofa.rpc.context.RpcRuntimeContext;
import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException;
import com.alipay.sofa.rpc.log.Logger;
import com.alipay.sofa.rpc.log.LoggerFactory;
import com.alipay.sofa.rpc.transport.ClientTransportConfig;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.atomic.AtomicInteger;

/**
* @author <a href="mailto:[email protected]">GengZhang</a>
* @author bystander
* @version $Id: BoltClientConnectionManager.java, v 0.1 2019年01月29日 11:58 bystander Exp $
*/
class BoltClientConnectionManager {

/**
* slf4j Logger for this class
*/
private final static Logger LOGGER = LoggerFactory
.getLogger(BoltClientConnectionManager.class);
public abstract class BoltClientConnectionManager {

/**
* 长连接复用时,共享长连接的连接池,一个服务端ip和端口同一协议只建立一个长连接,不管多少接口,共用长连接
*/
@VisibleForTesting
final ConcurrentMap<ClientTransportConfig, Connection> urlConnectionMap = new ConcurrentHashMap<ClientTransportConfig, Connection>();

/**
* 长连接复用时,共享长连接的计数器
*/
@VisibleForTesting
final ConcurrentMap<Connection, AtomicInteger> connectionRefCounter = new ConcurrentHashMap<Connection, AtomicInteger>();

@VisibleForTesting
protected BoltClientConnectionManager(boolean addHook) {
public BoltClientConnectionManager(boolean addHook) {
if (addHook) {
RpcRuntimeContext.registryDestroyHook(new Destroyable.DestroyHook() {
@Override
Expand All @@ -77,118 +50,32 @@ public void postDestroy() {
/**
* 检查是否有没回收
*/
protected void checkLeak() {
if (CommonUtils.isNotEmpty(urlConnectionMap)) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Bolt client transport maybe leak. {}", urlConnectionMap);
}
urlConnectionMap.clear();
}
if (CommonUtils.isNotEmpty(connectionRefCounter)) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Bolt client transport maybe leak. {}", connectionRefCounter);
}
connectionRefCounter.clear();
}
}
protected abstract void checkLeak();

/**
* 通过配置获取长连接
*
* @param rpcClient bolt客户端
* @param transportConfig 传输层配置
* @param url 传输层地址
* @return 长连接
* get connection
* @param rpcClient
* @param transportConfig
* @param url
* @return the connection or null
*/
public Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) {
if (rpcClient == null || transportConfig == null || url == null) {
return null;
}
Connection connection = urlConnectionMap.get(transportConfig);
if (connection != null && !connection.isFine()) {
closeConnection(rpcClient, transportConfig, url);
connection = null;
}
if (connection == null) {
try {
connection = rpcClient.getConnection(url, url.getConnectTimeout());
} catch (InterruptedException e) {
throw new SofaRpcRuntimeException(e);
} catch (RemotingException e) {
throw new SofaRpcRuntimeException(e);
}
if (connection == null) {
return null;
}
// 保存唯一长连接
Connection oldConnection = urlConnectionMap.putIfAbsent(transportConfig, connection);
if (oldConnection != null) {
if (LOGGER.isWarnEnabled()) {
LOGGER.warn("Multiple threads init ClientTransport with same key:" + url);
}
rpcClient.closeStandaloneConnection(connection); //如果同时有人插入,则使用第一个
connection = oldConnection;
} else {
public abstract Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url);

// 增加计数器
AtomicInteger counter = connectionRefCounter.get(connection);
if (counter == null) {
counter = new AtomicInteger(0);
AtomicInteger oldCounter = connectionRefCounter.putIfAbsent(connection, counter);
if (oldCounter != null) {
counter = oldCounter;
}
}
int currentCount = counter.incrementAndGet();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Bolt client transport {} of {}, current ref count is: {}", url.toString(),
NetUtils.channelToString(connection.getLocalAddress(), connection.getRemoteAddress()),
currentCount);
}
}
}
return connection;
}
/**
* close connection
* @param rpcClient
* @param transportConfig
* @param url
*/
public abstract void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url);

/**
* 关闭长连接
*
* @param rpcClient bolt客户端
* @param transportConfig 传输层配置
* @param url 传输层地址
* judge connection status
* @param rpcClient
* @param transportConfig
* @param url
* @return true /false
*/
public void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) {
if (rpcClient == null || transportConfig == null || url == null) {
return;
}
// 先删除
Connection connection = urlConnectionMap.remove(transportConfig);
if (connection == null) {
return;
}
// 再判断是否需要关闭
boolean needDestroy;
AtomicInteger integer = connectionRefCounter.get(connection);
if (integer == null) {
needDestroy = true;
} else {
// 当前连接引用数
int currentCount = integer.decrementAndGet();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Client transport {} of {} , current ref count is: {}", url.toString(),
NetUtils.channelToString(connection.getLocalAddress(), connection.getRemoteAddress()),
currentCount);
}
if (currentCount <= 0) {
// 此长连接无任何引用,可以销毁
connectionRefCounter.remove(connection);
needDestroy = true;
} else {
needDestroy = false;
}
}
if (needDestroy) {
rpcClient.closeStandaloneConnection(connection);
}
}
}
public abstract boolean isConnectionFine(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url);

}
Loading

0 comments on commit e161d3c

Please sign in to comment.