diff --git a/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/AloneBoltClientConnectionManager.java b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/AloneBoltClientConnectionManager.java new file mode 100644 index 000000000..8709b0205 --- /dev/null +++ b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/AloneBoltClientConnectionManager.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.transport.bolt; + +import com.alipay.remoting.Connection; +import com.alipay.remoting.Url; +import com.alipay.remoting.exception.RemotingException; +import com.alipay.remoting.rpc.RpcClient; +import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; +import com.alipay.sofa.rpc.transport.ClientTransportConfig; + +/** + * @author zhiyuan.lzy + */ +class AloneBoltClientConnectionManager extends BoltClientConnectionManager { + + public AloneBoltClientConnectionManager(boolean addHook) { + super(addHook); + } + + @Override + protected void checkLeak() { + //do not check + } + + /** + * 通过配置获取长连接 + * + * @param rpcClient bolt客户端 + * @param transportConfig 传输层配置 + * @param url 传输层地址 + * @return 长连接 + */ + public Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { + if (rpcClient == null || transportConfig == null || url == null) { + return null; + } + Connection connection; + try { + connection = rpcClient.getConnection(url, url.getConnectTimeout()); + } catch (InterruptedException e) { + throw new SofaRpcRuntimeException(e); + } catch (RemotingException e) { + throw new SofaRpcRuntimeException(e); + } + if (connection == null) { + return null; + } + + return connection; + } + + /** + * 关闭长连接 + * + * @param rpcClient bolt客户端 + * @param transportConfig 传输层配置 + * @param url 传输层地址 + */ + public void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { + if (rpcClient == null || transportConfig == null || url == null) { + return; + } + //TODO do not close + } + + @Override + public boolean isConnectionFine(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { + Connection connection; + try { + connection = rpcClient.getConnection(url, url.getConnectTimeout()); + } catch (RemotingException e) { + return false; + } catch (InterruptedException e) { + return false; + } + + return connection != null && connection.isFine(); + } +} diff --git a/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManager.java b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManager.java index 01a1e5a1c..4c1ab4089 100644 --- a/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManager.java +++ b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManager.java @@ -18,47 +18,20 @@ import com.alipay.remoting.Connection; import com.alipay.remoting.Url; -import com.alipay.remoting.exception.RemotingException; import com.alipay.remoting.rpc.RpcClient; import com.alipay.sofa.rpc.base.Destroyable; import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; -import com.alipay.sofa.rpc.common.utils.CommonUtils; -import com.alipay.sofa.rpc.common.utils.NetUtils; import com.alipay.sofa.rpc.context.RpcRuntimeContext; -import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; -import com.alipay.sofa.rpc.log.Logger; -import com.alipay.sofa.rpc.log.LoggerFactory; import com.alipay.sofa.rpc.transport.ClientTransportConfig; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentMap; -import java.util.concurrent.atomic.AtomicInteger; - /** - * @author GengZhang + * @author bystander + * @version $Id: BoltClientConnectionManager.java, v 0.1 2019年01月29日 11:58 bystander Exp $ */ -class BoltClientConnectionManager { - - /** - * slf4j Logger for this class - */ - private final static Logger LOGGER = LoggerFactory - .getLogger(BoltClientConnectionManager.class); +public abstract class BoltClientConnectionManager { - /** - * 长连接复用时,共享长连接的连接池,一个服务端ip和端口同一协议只建立一个长连接,不管多少接口,共用长连接 - */ @VisibleForTesting - final ConcurrentMap urlConnectionMap = new ConcurrentHashMap(); - - /** - * 长连接复用时,共享长连接的计数器 - */ - @VisibleForTesting - final ConcurrentMap connectionRefCounter = new ConcurrentHashMap(); - - @VisibleForTesting - protected BoltClientConnectionManager(boolean addHook) { + public BoltClientConnectionManager(boolean addHook) { if (addHook) { RpcRuntimeContext.registryDestroyHook(new Destroyable.DestroyHook() { @Override @@ -77,118 +50,32 @@ public void postDestroy() { /** * 检查是否有没回收 */ - protected void checkLeak() { - if (CommonUtils.isNotEmpty(urlConnectionMap)) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Bolt client transport maybe leak. {}", urlConnectionMap); - } - urlConnectionMap.clear(); - } - if (CommonUtils.isNotEmpty(connectionRefCounter)) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Bolt client transport maybe leak. {}", connectionRefCounter); - } - connectionRefCounter.clear(); - } - } + protected abstract void checkLeak(); /** - * 通过配置获取长连接 - * - * @param rpcClient bolt客户端 - * @param transportConfig 传输层配置 - * @param url 传输层地址 - * @return 长连接 + * get connection + * @param rpcClient + * @param transportConfig + * @param url + * @return the connection or null */ - public Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { - if (rpcClient == null || transportConfig == null || url == null) { - return null; - } - Connection connection = urlConnectionMap.get(transportConfig); - if (connection != null && !connection.isFine()) { - closeConnection(rpcClient, transportConfig, url); - connection = null; - } - if (connection == null) { - try { - connection = rpcClient.getConnection(url, url.getConnectTimeout()); - } catch (InterruptedException e) { - throw new SofaRpcRuntimeException(e); - } catch (RemotingException e) { - throw new SofaRpcRuntimeException(e); - } - if (connection == null) { - return null; - } - // 保存唯一长连接 - Connection oldConnection = urlConnectionMap.putIfAbsent(transportConfig, connection); - if (oldConnection != null) { - if (LOGGER.isWarnEnabled()) { - LOGGER.warn("Multiple threads init ClientTransport with same key:" + url); - } - rpcClient.closeStandaloneConnection(connection); //如果同时有人插入,则使用第一个 - connection = oldConnection; - } else { + public abstract Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url); - // 增加计数器 - AtomicInteger counter = connectionRefCounter.get(connection); - if (counter == null) { - counter = new AtomicInteger(0); - AtomicInteger oldCounter = connectionRefCounter.putIfAbsent(connection, counter); - if (oldCounter != null) { - counter = oldCounter; - } - } - int currentCount = counter.incrementAndGet(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Bolt client transport {} of {}, current ref count is: {}", url.toString(), - NetUtils.channelToString(connection.getLocalAddress(), connection.getRemoteAddress()), - currentCount); - } - } - } - return connection; - } + /** + * close connection + * @param rpcClient + * @param transportConfig + * @param url + */ + public abstract void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url); /** - * 关闭长连接 - * - * @param rpcClient bolt客户端 - * @param transportConfig 传输层配置 - * @param url 传输层地址 + * judge connection status + * @param rpcClient + * @param transportConfig + * @param url + * @return true /false */ - public void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { - if (rpcClient == null || transportConfig == null || url == null) { - return; - } - // 先删除 - Connection connection = urlConnectionMap.remove(transportConfig); - if (connection == null) { - return; - } - // 再判断是否需要关闭 - boolean needDestroy; - AtomicInteger integer = connectionRefCounter.get(connection); - if (integer == null) { - needDestroy = true; - } else { - // 当前连接引用数 - int currentCount = integer.decrementAndGet(); - if (LOGGER.isDebugEnabled()) { - LOGGER.debug("Client transport {} of {} , current ref count is: {}", url.toString(), - NetUtils.channelToString(connection.getLocalAddress(), connection.getRemoteAddress()), - currentCount); - } - if (currentCount <= 0) { - // 此长连接无任何引用,可以销毁 - connectionRefCounter.remove(connection); - needDestroy = true; - } else { - needDestroy = false; - } - } - if (needDestroy) { - rpcClient.closeStandaloneConnection(connection); - } - } -} + public abstract boolean isConnectionFine(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url); + +} \ No newline at end of file diff --git a/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransport.java b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransport.java index 934d58b83..7b81cd4fb 100644 --- a/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransport.java +++ b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransport.java @@ -32,9 +32,10 @@ import com.alipay.sofa.rpc.client.ProviderInfo; import com.alipay.sofa.rpc.codec.bolt.SofaRpcSerializationRegister; import com.alipay.sofa.rpc.common.RemotingConstants; +import com.alipay.sofa.rpc.common.RpcConfigs; import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.RpcOptions; import com.alipay.sofa.rpc.common.utils.ClassLoaderUtils; -import com.alipay.sofa.rpc.common.utils.NetUtils; import com.alipay.sofa.rpc.context.RpcInternalContext; import com.alipay.sofa.rpc.core.exception.RpcErrorType; import com.alipay.sofa.rpc.core.exception.SofaRpcException; @@ -78,12 +79,18 @@ public class BoltClientTransport extends ClientTransport { */ protected static final RpcClient RPC_CLIENT = new RpcClient(); + protected static final boolean REUSE_CONNECTION = RpcConfigs.getOrDefaultValue( + RpcOptions.TRANSPORT_CONNECTION_REUSE, true); + /** * Connection manager for reuse connection * * @since 5.4.0 */ - protected static BoltClientConnectionManager connectionManager = new BoltClientConnectionManager(true); + protected static BoltClientConnectionManager connectionManager = REUSE_CONNECTION ? new ReuseBoltClientConnectionManager( + true) + : new AloneBoltClientConnectionManager( + true); static { RPC_CLIENT.init(); @@ -95,12 +102,6 @@ public class BoltClientTransport extends ClientTransport { */ protected final Url url; - /** - * Connection的实时状态
- * 因为一个url在bolt里对应多个connect的,但是我们禁用,只保留一个 - */ - protected volatile Connection connection; - /** * 正在发送的调用数量 */ @@ -146,32 +147,13 @@ protected Url convertProviderToUrl(ClientTransportConfig transportConfig, Provid @Override public void connect() { - if (connection != null) { - if (!connection.isFine()) { - connection.close(); - connection = null; - } - } - if (connection == null) { - synchronized (this) { - if (connection == null) { - connection = connectionManager.getConnection(RPC_CLIENT, transportConfig, url); - } - } - } + fetchConnection(); } @Override public void disconnect() { try { - if (connection != null) { - if (LOGGER.isInfoEnabled()) { - LOGGER.info("Try disconnect client transport now. The connection is {}.", - NetUtils.channelToString(localAddress(), remoteAddress())); - } - connectionManager.closeConnection(RPC_CLIENT, transportConfig, url); - connection = null; - } + connectionManager.closeConnection(RPC_CLIENT, transportConfig, url); } catch (Exception e) { throw new SofaRpcRuntimeException("", e); } @@ -184,7 +166,7 @@ public void destroy() { @Override public boolean isAvailable() { - return connection != null && connection.isFine(); + return connectionManager.isConnectionFine(RPC_CLIENT, transportConfig, url); } @Override @@ -435,15 +417,19 @@ public void handleRpcRequest(SofaRequest request) { @Override public InetSocketAddress remoteAddress() { + Connection connection = fetchConnection(); return connection == null ? null : connection.getRemoteAddress(); } @Override public InetSocketAddress localAddress() { - return connection == null ? null : connection.getLocalAddress(); + Connection connection = fetchConnection(); + return connection == null ? null : connection.getRemoteAddress(); } protected void checkConnection() throws SofaRpcException { + + Connection connection = fetchConnection(); if (connection == null) { throw new SofaRpcException(RpcErrorType.CLIENT_NETWORK, "connection is null"); } @@ -459,4 +445,8 @@ protected void putToContextIfNotNull(InvokeContext invokeContext, String oldKey, context.setAttachment(key, value); } } + + public Connection fetchConnection() { + return connectionManager.getConnection(RPC_CLIENT, transportConfig, url); + } } diff --git a/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/ReuseBoltClientConnectionManager.java b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/ReuseBoltClientConnectionManager.java new file mode 100644 index 000000000..5557b41d5 --- /dev/null +++ b/extension-impl/remoting-bolt/src/main/java/com/alipay/sofa/rpc/transport/bolt/ReuseBoltClientConnectionManager.java @@ -0,0 +1,183 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.transport.bolt; + +import com.alipay.remoting.Connection; +import com.alipay.remoting.Url; +import com.alipay.remoting.rpc.RpcClient; +import com.alipay.sofa.rpc.common.annotation.VisibleForTesting; +import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.common.utils.NetUtils; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import com.alipay.sofa.rpc.transport.ClientTransportConfig; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * @author GengZhang + */ +class ReuseBoltClientConnectionManager extends BoltClientConnectionManager { + + /** + * slf4j Logger for this class + */ + private final static Logger LOGGER = LoggerFactory + .getLogger(ReuseBoltClientConnectionManager.class); + + /** + * 长连接复用时,共享长连接的连接池,一个服务端ip和端口同一协议只建立一个长连接,不管多少接口,共用长连接 + */ + @VisibleForTesting + final ConcurrentMap urlConnectionMap = new ConcurrentHashMap(); + + /** + * 长连接复用时,共享长连接的计数器 + */ + @VisibleForTesting + final ConcurrentMap connectionRefCounter = new ConcurrentHashMap(); + + public ReuseBoltClientConnectionManager(boolean addHook) { + super(addHook); + } + + /** + * 检查是否有没回收 + */ + protected void checkLeak() { + if (CommonUtils.isNotEmpty(urlConnectionMap)) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Bolt client transport maybe leak. {}", urlConnectionMap); + } + urlConnectionMap.clear(); + } + if (CommonUtils.isNotEmpty(connectionRefCounter)) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Bolt client transport maybe leak. {}", connectionRefCounter); + } + connectionRefCounter.clear(); + } + } + + /** + * 通过配置获取长连接 + * + * @param rpcClient bolt客户端 + * @param transportConfig 传输层配置 + * @param url 传输层地址 + * @return 长连接 + */ + @Override + public Connection getConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { + if (rpcClient == null || transportConfig == null || url == null) { + return null; + } + Connection connection = urlConnectionMap.get(transportConfig); + if (connection != null && !connection.isFine()) { + closeConnection(rpcClient, transportConfig, url); + connection = null; + } + if (connection == null) { + try { + connection = rpcClient.getConnection(url, url.getConnectTimeout()); + } catch (Exception e) { + LOGGER.warn("get connection failed in url," + url); + } + if (connection == null) { + return null; + } + // 保存唯一长连接 + Connection oldConnection = urlConnectionMap.putIfAbsent(transportConfig, connection); + if (oldConnection != null) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Multiple threads init ClientTransport with same key:" + url); + } + rpcClient.closeStandaloneConnection(connection); //如果同时有人插入,则使用第一个 + connection = oldConnection; + } else { + + // 增加计数器 + AtomicInteger counter = connectionRefCounter.get(connection); + if (counter == null) { + counter = new AtomicInteger(0); + AtomicInteger oldCounter = connectionRefCounter.putIfAbsent(connection, counter); + if (oldCounter != null) { + counter = oldCounter; + } + } + int currentCount = counter.incrementAndGet(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Bolt client transport {} of {}, current ref count is: {}", url.toString(), + NetUtils.channelToString(connection.getLocalAddress(), connection.getRemoteAddress()), + currentCount); + } + } + } + return connection; + } + + /** + * 关闭长连接 + * + * @param rpcClient bolt客户端 + * @param transportConfig 传输层配置 + * @param url 传输层地址 + */ + @Override + public void closeConnection(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { + if (rpcClient == null || transportConfig == null || url == null) { + return; + } + // 先删除 + Connection connection = urlConnectionMap.remove(transportConfig); + if (connection == null) { + return; + } + // 再判断是否需要关闭 + boolean needDestroy; + AtomicInteger integer = connectionRefCounter.get(connection); + if (integer == null) { + needDestroy = true; + } else { + // 当前连接引用数 + int currentCount = integer.decrementAndGet(); + if (LOGGER.isDebugEnabled()) { + LOGGER.debug("Client transport {} of {} , current ref count is: {}", url.toString(), + NetUtils.channelToString(connection.getLocalAddress(), connection.getRemoteAddress()), + currentCount); + } + if (currentCount <= 0) { + // 此长连接无任何引用,可以销毁 + connectionRefCounter.remove(connection); + needDestroy = true; + } else { + needDestroy = false; + } + } + if (needDestroy) { + rpcClient.closeStandaloneConnection(connection); + } + } + + @Override + public boolean isConnectionFine(RpcClient rpcClient, ClientTransportConfig transportConfig, Url url) { + Connection connection = this.getConnection(rpcClient, transportConfig, url); + return connection != null && connection.isFine(); + } +} diff --git a/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransportTest.java b/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransportTest.java index 3fa4dbad4..e7dc04e23 100644 --- a/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransportTest.java +++ b/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientTransportTest.java @@ -56,8 +56,14 @@ public void doReuseTest() { BoltClientTransport clientTransport = new BoltClientTransport(clientTransportConfig); clientTransport.disconnect(); - Assert.assertTrue(BoltClientTransport.connectionManager.urlConnectionMap.size() == 0); - Assert.assertTrue(BoltClientTransport.connectionManager.connectionRefCounter.size() == 0); + final BoltClientConnectionManager connectionManager = BoltClientTransport.connectionManager; + + if (connectionManager instanceof ReuseBoltClientConnectionManager) { + Assert.assertTrue(((ReuseBoltClientConnectionManager) connectionManager).urlConnectionMap.size() == 0); + Assert.assertTrue(((ReuseBoltClientConnectionManager) connectionManager).connectionRefCounter.size() == 0); + } else { + Assert.fail(); + } ClientTransportConfig config1 = new ClientTransportConfig(); config1.setProviderInfo(new ProviderInfo().setHost("127.0.0.1").setPort(12222)) @@ -74,7 +80,7 @@ public void doReuseTest() { .getClientTransport(config2); clientTransport2.connect(); Assert.assertFalse(clientTransport1 == clientTransport2); - Assert.assertTrue(clientTransport1.connection == clientTransport2.connection); + Assert.assertTrue(clientTransport1.fetchConnection() == clientTransport2.fetchConnection()); ClientTransportConfig config3 = new ClientTransportConfig(); config3.setProviderInfo(new ProviderInfo().setHost("127.0.0.1").setPort(12223)) @@ -83,7 +89,7 @@ public void doReuseTest() { .getClientTransport(config3); clientTransport3.connect(); Assert.assertFalse(clientTransport1 == clientTransport3); - Assert.assertFalse(clientTransport1.connection == clientTransport3.connection); + Assert.assertFalse(clientTransport1.fetchConnection() == clientTransport3.fetchConnection()); ClientTransportFactory.releaseTransport(null, 500); diff --git a/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManagerTest.java b/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/ReuseBoltClientConnectionManagerTest.java similarity index 95% rename from extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManagerTest.java rename to extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/ReuseBoltClientConnectionManagerTest.java index 4c620bf71..c5c62f75c 100644 --- a/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/BoltClientConnectionManagerTest.java +++ b/extension-impl/remoting-bolt/src/test/java/com/alipay/sofa/rpc/transport/bolt/ReuseBoltClientConnectionManagerTest.java @@ -39,11 +39,11 @@ /** * @author GengZhang */ -public class BoltClientConnectionManagerTest extends ActivelyDestroyTest { +public class ReuseBoltClientConnectionManagerTest extends ActivelyDestroyTest { - private BoltClientConnectionManager manager = new BoltClientConnectionManager(false); + private ReuseBoltClientConnectionManager manager = new ReuseBoltClientConnectionManager(false); - private RpcClient rpcClient = new RpcClient(); + private RpcClient rpcClient = new RpcClient(); @Before public void init() { @@ -65,12 +65,9 @@ public void testAll() throws Exception { Assert.assertNull(connection); // 连不上的端口 - try { - manager.getConnection(rpcClient, wrongConfig, buildUrl(wrongConfig)); - Assert.fail(); - } catch (Exception e) { - } + Connection result = manager.getConnection(rpcClient, wrongConfig, buildUrl(wrongConfig)); + Assert.assertNull(result); // ok final ClientTransportConfig config = buildConfig(12222); diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/AllConnectConnectionHolderTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/AllConnectConnectionHolderTest.java index 4a3f58acd..4521a549c 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/AllConnectConnectionHolderTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/AllConnectConnectionHolderTest.java @@ -24,11 +24,11 @@ import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcRunningState; import com.alipay.sofa.rpc.proxy.ProxyFactory; import com.alipay.sofa.rpc.test.ActivelyDestroyTest; import com.alipay.sofa.rpc.test.HelloService; import com.alipay.sofa.rpc.test.HelloServiceImpl; -import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; import org.junit.Test; @@ -46,6 +46,8 @@ public class AllConnectConnectionHolderTest extends ActivelyDestroyTest { @BeforeClass public static void startServer() throws Exception { + RpcRunningState.setUnitTestMode(true); + // 发布一个服务,每个请求要执行2秒 serverConfig1 = new ServerConfig() .setStopTimeout(0) @@ -75,12 +77,6 @@ public static void startServer() throws Exception { providerConfig2.export(); } - @AfterClass - public static void stopServer() { - serverConfig1.destroy(); - serverConfig2.destroy(); - } - @Test public void getAvailableClientTransport1() throws Exception { ConsumerConfig consumerConfig = new ConsumerConfig() diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/LazyConnectTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/LazyConnectTest.java index b0fdc7854..cd571291b 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/LazyConnectTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/LazyConnectTest.java @@ -20,6 +20,7 @@ import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.ServerConfig; +import com.alipay.sofa.rpc.context.RpcRunningState; import com.alipay.sofa.rpc.test.ActivelyDestroyTest; import com.alipay.sofa.rpc.test.HelloService; import com.alipay.sofa.rpc.test.HelloServiceImpl; @@ -42,6 +43,8 @@ public class LazyConnectTest extends ActivelyDestroyTest { @BeforeClass public static void startServer() { + + RpcRunningState.setUnitTestMode(true); // 只有2个线程 执行 serverConfig = new ServerConfig() .setStopTimeout(0) diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/ReconnectTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/ReconnectTest.java index fce047f40..9b1d16511 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/ReconnectTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/client/ReconnectTest.java @@ -16,7 +16,6 @@ */ package com.alipay.sofa.rpc.test.client; -import com.alipay.remoting.Connection; import com.alipay.sofa.rpc.client.ProviderHelper; import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.config.ConsumerConfig; @@ -32,7 +31,6 @@ import org.junit.Assert; import org.junit.Test; -import java.lang.reflect.Field; import java.util.Arrays; import java.util.concurrent.Callable; @@ -70,19 +68,20 @@ public void testReconnect() throws Exception { // Mock server down, and RPC will throw exception(no available provider) providerConfig.unExport(); ServerFactory.destroyAll(); + BoltClientTransport clientTransport = (BoltClientTransport) consumerConfig.getConsumerBootstrap().getCluster() .getConnectionHolder() .getAvailableClientTransport(ProviderHelper.toProviderInfo("bolt://127.0.0.1:22221")); - Field field = BoltClientTransport.class.getDeclaredField("connection"); - field.setAccessible(true); - Connection connection = (Connection) field.get(clientTransport); - connection.close(); + + clientTransport.disconnect(); + TestUtils.delayGet(new Callable() { @Override public Boolean call() throws Exception { return consumerConfig.getConsumerBootstrap().getCluster().getConnectionHolder().isAvailableEmpty(); } }, true, 100, 30); + try { helloService.sayHello("xxx", 11); Assert.fail();