From 528d0fb0e03390ec6e14931807f92ea2935f7689 Mon Sep 17 00:00:00 2001 From: Narziss Date: Mon, 22 Jul 2024 05:02:21 +0800 Subject: [PATCH 1/4] integrate Zookeeper and Nacos as configuration centers --- all/pom.xml | 12 ++ .../ApolloDynamicConfigManagerTest.java | 19 ++- config/config-nacos/pom.xml | 107 ++++++++++++ .../nacos/NacosDynamicConfigManager.java | 129 +++++++++++++++ ...ipay.sofa.rpc.dynamic.DynamicConfigManager | 2 + .../nacos/NacosDynamicConfigManagerTest.java | 58 +++++++ .../config-nacos/src/test/resources/log4j.xml | 16 ++ config/config-zk/pom.xml | 117 +++++++++++++ .../zk/ZookeeperDynamicConfigManager.java | 156 ++++++++++++++++++ ...ipay.sofa.rpc.dynamic.DynamicConfigManager | 1 + .../zk/ZookeeperDynamicConfigManagerTest.java | 58 +++++++ config/config-zk/src/test/resources/log4j.xml | 16 ++ config/pom.xml | 2 + .../sofa/rpc/common/config/RpcConfigKeys.java | 17 ++ 14 files changed, 703 insertions(+), 7 deletions(-) create mode 100644 config/config-nacos/pom.xml create mode 100644 config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java create mode 100644 config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager create mode 100644 config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java create mode 100644 config/config-nacos/src/test/resources/log4j.xml create mode 100644 config/config-zk/pom.xml create mode 100644 config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java create mode 100644 config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager create mode 100644 config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java create mode 100644 config/config-zk/src/test/resources/log4j.xml diff --git a/all/pom.xml b/all/pom.xml index 3da5e4bf6..7610e334b 100644 --- a/all/pom.xml +++ b/all/pom.xml @@ -310,6 +310,16 @@ sofa-rpc-config-apollo ${project.version} + + com.alipay.sofa + sofa-rpc-config-zk + ${project.version} + + + com.alipay.sofa + sofa-rpc-config-nacos + ${project.version} + com.alipay.sofa bolt @@ -565,6 +575,8 @@ com.alipay.sofa:sofa-rpc-tracer-opentracing-resteasy com.alipay.sofa:sofa-rpc-tracer-opentracing-triple com.alipay.sofa:sofa-rpc-config-apollo + com.alipay.sofa:sofa-rpc-config-zk + com.alipay.sofa:sofa-rpc-config-nacos com.alipay.sofa:sofa-rpc-doc-swagger diff --git a/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java b/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java index 5fbb4daf0..2b08e90d3 100644 --- a/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java +++ b/config/config-apollo/src/test/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManagerTest.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.dynamic.apollo; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; import com.alipay.sofa.rpc.dynamic.DynamicHelper; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; @@ -24,10 +26,11 @@ public class ApolloDynamicConfigManagerTest { - private final static Logger logger = LoggerFactory - .getLogger(ApolloDynamicConfigManagerTest.class); + private final static Logger logger = LoggerFactory + .getLogger(ApolloDynamicConfigManagerTest.class); - private ApolloDynamicConfigManager apolloDynamicConfigManager = new ApolloDynamicConfigManager("test"); + private DynamicConfigManager apolloDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager("test", + "apollo"); @Test public void getProviderServiceProperty() { @@ -37,17 +40,19 @@ public void getProviderServiceProperty() { @Test public void getConsumerServiceProperty() { + String result = apolloDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); } @Test public void getProviderMethodProperty() { + String result = apolloDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); } @Test public void getConsumerMethodProperty() { - } - - @Test - public void getServiceAuthRule() { + String result = apolloDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); } } \ No newline at end of file diff --git a/config/config-nacos/pom.xml b/config/config-nacos/pom.xml new file mode 100644 index 000000000..4e0d8740b --- /dev/null +++ b/config/config-nacos/pom.xml @@ -0,0 +1,107 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-config + ${revision} + + + sofa-rpc-config-nacos + + + + com.alipay.sofa + sofa-rpc-log-common-tools + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + + com.alibaba.nacos + nacos-client + + + org.slf4j + slf4j-log4j12 + test + + + junit + junit + test + + + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.source} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + + **/*Test.java + + + once + + + + + diff --git a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java new file mode 100644 index 000000000..f5b9800ff --- /dev/null +++ b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.nacos; + +import com.alibaba.nacos.api.PropertyKeyConst; +import com.alipay.sofa.common.config.SofaConfigs; +import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.config.RpcConfigKeys; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.ext.Extension; +import com.alibaba.nacos.api.config.ConfigService; +import com.alibaba.nacos.api.exception.NacosException; +import com.alibaba.nacos.api.config.ConfigFactory; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; + +import java.util.Properties; + +/** + * @author Narziss + * @version NaocsDynamicConfigManager.java, v 0.1 2024年07月26日 09:37 Narziss + */ + +@Extension(value = "nacos", override = true) +public class NacosDynamicConfigManager extends DynamicConfigManager { + + private final static Logger LOGGER = LoggerFactory.getLogger(NacosDynamicConfigManager.class); + private static final String DEFAULT_NAMESPACE = "sofa-rpc"; + private static final String ADDRESS = SofaConfigs.getOrDefault(RpcConfigKeys.NACOS_ADDRESS); + private static final String DEFAULT_GROUP = "sofa-rpc"; + private static final long DEFAULT_TIMEOUT = 5000; + private ConfigService configService; + private Properties nacosConfig = new Properties(); + private final String appName; + + protected NacosDynamicConfigManager(String appName) { + super(appName); + if (StringUtils.isEmpty(appName)) { + this.appName = DEFAULT_GROUP; + } else { + this.appName = appName; + } + try { + nacosConfig.put(PropertyKeyConst.SERVER_ADDR, ADDRESS); + nacosConfig.put(PropertyKeyConst.NAMESPACE, DEFAULT_NAMESPACE); + configService = ConfigFactory.createConfigService(nacosConfig); + + } catch (NacosException e) { + LOGGER.error("Failed to create ConfigService", e); + } + } + + @Override + public void initServiceConfiguration(String service) { + //TODO not now + + } + + @Override + public String getProviderServiceProperty(String service, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), + appName, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (NacosException e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerServiceProperty(String service, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), + appName, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (NacosException e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getProviderMethodProperty(String service, String method, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), + appName, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (NacosException e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerMethodProperty(String service, String method, String key) { + try { + String configValue = configService.getConfig( + DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key), + appName, DEFAULT_TIMEOUT); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (NacosException e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public AuthRuleGroup getServiceAuthRule(String service) { + //TODO 暂不支持 + return null; + } +} \ No newline at end of file diff --git a/config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager b/config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager new file mode 100644 index 000000000..c055a88ae --- /dev/null +++ b/config/config-nacos/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager @@ -0,0 +1,2 @@ +nacos=com.alipay.sofa.rpc.dynamic.nacos.NacosDynamicConfigManager + diff --git a/config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java b/config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java new file mode 100644 index 000000000..f880cdf7b --- /dev/null +++ b/config/config-nacos/src/test/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManagerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.nacos; + +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import org.junit.Assert; +import org.junit.Test; + +public class NacosDynamicConfigManagerTest { + + private final static Logger logger = LoggerFactory + .getLogger(NacosDynamicConfigManagerTest.class); + + private DynamicConfigManager nacosDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager("test", + "nacos"); + + @Test + public void getProviderServiceProperty() { + String result = nacosDynamicConfigManager.getProviderServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerServiceProperty() { + String result = nacosDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getProviderMethodProperty() { + String result = nacosDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerMethodProperty() { + String result = nacosDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } +} \ No newline at end of file diff --git a/config/config-nacos/src/test/resources/log4j.xml b/config/config-nacos/src/test/resources/log4j.xml new file mode 100644 index 000000000..e95634f16 --- /dev/null +++ b/config/config-nacos/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/config/config-zk/pom.xml b/config/config-zk/pom.xml new file mode 100644 index 000000000..8701b83b0 --- /dev/null +++ b/config/config-zk/pom.xml @@ -0,0 +1,117 @@ + + + 4.0.0 + + + com.alipay.sofa + sofa-rpc-config + ${revision} + + + sofa-rpc-config-zk + + + + com.alipay.sofa + sofa-rpc-log-common-tools + + + com.alipay.sofa + sofa-rpc-log + + + com.alipay.sofa + sofa-rpc-api + + + + org.apache.curator + curator-framework + + + org.apache.curator + curator-x-discovery + + + org.apache.zookeeper + zookeeper + + + + org.slf4j + slf4j-log4j12 + test + + + junit + junit + test + + + + + + + src/main/java + + + src/main/resources + false + + **/** + + + + src/test/java + + + src/test/resources + false + + **/** + + + + + + + org.apache.maven.plugins + maven-compiler-plugin + + ${maven.compiler.source} + ${maven.compiler.source} + ${project.build.sourceEncoding} + + + + org.apache.maven.plugins + maven-install-plugin + + ${module.install.skip} + + + + org.apache.maven.plugins + maven-deploy-plugin + + ${module.deploy.skip} + + + + org.apache.maven.plugins + maven-surefire-plugin + + ${skipTests} + + + **/*Test.java + + + once + + + + + diff --git a/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java new file mode 100644 index 000000000..363ddb534 --- /dev/null +++ b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.zk; + +import com.alipay.sofa.common.config.SofaConfigs; +import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.config.RpcConfigKeys; +import com.alipay.sofa.rpc.common.utils.StringUtils; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.recipes.cache.PathChildrenCache; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; +import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.retry.ExponentialBackoffRetry; + +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; + +import static com.alipay.sofa.rpc.common.utils.StringUtils.CONTEXT_SEP; + +/** + * @author Narziss + * @version ZookeeperDynamicConfigManager.java, v 0.1 2024年07月20日 09:23 Narziss + */ + +@Extension(value = "zookeeper", override = true) +public class ZookeeperDynamicConfigManager extends DynamicConfigManager { + + private final static Logger LOGGER = LoggerFactory + .getLogger(ZookeeperDynamicConfigManager.class); + private final CuratorFramework zkClient; + private static final String ADDRESS = SofaConfigs + .getOrDefault(RpcConfigKeys.ZK_ADDRESS); + private static final String DEFAULT_NAMESPACE = "sofa-rpc"; + private static final String CONFIG_NODE = "config"; + private static final String DEFAULT_APP = "sofa-rpc"; + private final String appName; + private final String rootPath; + private ConcurrentMap configMap = new ConcurrentHashMap<>(); + + protected ZookeeperDynamicConfigManager(String appName) { + super(appName); + if (StringUtils.isEmpty(appName)) { + this.appName = DEFAULT_APP; + } else { + this.appName = appName; + } + rootPath = CONTEXT_SEP + CONFIG_NODE + CONTEXT_SEP + appName; + zkClient = CuratorFrameworkFactory.builder() + .connectString(ADDRESS) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .namespace(DEFAULT_NAMESPACE) + .build(); + zkClient.start(); + + PathChildrenCache cache = new PathChildrenCache(zkClient, rootPath, true); + cache.getListenable().addListener(new PathChildrenCacheListener() { + @Override + public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { + switch (event.getType()) { + case CHILD_ADDED: + case CHILD_UPDATED: + String key = event.getData().getPath().substring(rootPath.length() + 1); + String value = new String(event.getData().getData()); + configMap.put(key, value); + LOGGER.info("Receive zookeeper event: " + "type=[" + event.getType() + "] key=[" + key + "] value=[" + value + "]"); + break; + case CHILD_REMOVED: + key = event.getData().getPath().substring(rootPath.length() + 1); + configMap.remove(key); + LOGGER.info("Receive zookeeper event: " + "type=[" + event.getType() + "] key=[" + key + "]"); + break; + default: + break; + } + } + }); + try { + cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); + } catch (Exception e) { + LOGGER.error("setupPathChildrenCache error", e); + } + } + + @Override + public void initServiceConfiguration(String service) { + //TODO not now + } + + @Override + public String getProviderServiceProperty(String service, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildProviderServiceProKey(service, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerServiceProperty(String service, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getProviderMethodProperty(String service, String method, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public String getConsumerMethodProperty(String service, String method, String key) { + try { + String configValue = configMap.get(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key)); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + + } + + @Override + public AuthRuleGroup getServiceAuthRule(String service) { + //TODO 暂不支持 + return null; + } +} \ No newline at end of file diff --git a/config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager b/config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager new file mode 100644 index 000000000..5f9a8243e --- /dev/null +++ b/config/config-zk/src/main/resources/META-INF/services/sofa-rpc/com.alipay.sofa.rpc.dynamic.DynamicConfigManager @@ -0,0 +1 @@ +zookeeper=com.alipay.sofa.rpc.dynamic.zk.ZookeeperDynamicConfigManager diff --git a/config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java b/config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java new file mode 100644 index 000000000..9ccc3bf9f --- /dev/null +++ b/config/config-zk/src/test/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManagerTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic.zk; + +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; +import org.junit.Assert; +import org.junit.Test; + +public class ZookeeperDynamicConfigManagerTest { + + private final static Logger logger = LoggerFactory + .getLogger(ZookeeperDynamicConfigManager.class); + + private DynamicConfigManager zookeeperDynamicConfigManager = DynamicConfigManagerFactory.getDynamicManager( + "test", "zookeeper"); + + @Test + public void getProviderServiceProperty() { + String result = zookeeperDynamicConfigManager.getProviderServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerServiceProperty() { + String result = zookeeperDynamicConfigManager.getConsumerServiceProperty("serviceName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getProviderMethodProperty() { + String result = zookeeperDynamicConfigManager.getProviderMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } + + @Test + public void getConsumerMethodProperty() { + String result = zookeeperDynamicConfigManager.getConsumerMethodProperty("serviceName", "methodName", "timeout"); + Assert.assertEquals(DynamicHelper.DEFAULT_DYNAMIC_VALUE, result); + } +} \ No newline at end of file diff --git a/config/config-zk/src/test/resources/log4j.xml b/config/config-zk/src/test/resources/log4j.xml new file mode 100644 index 000000000..e95634f16 --- /dev/null +++ b/config/config-zk/src/test/resources/log4j.xml @@ -0,0 +1,16 @@ + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/config/pom.xml b/config/pom.xml index 7d0caacb8..2e380e67e 100644 --- a/config/pom.xml +++ b/config/pom.xml @@ -15,6 +15,8 @@ config-apollo + config-zk + config-nacos diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java index 2b4934ce0..bc60ffca2 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java @@ -140,4 +140,21 @@ public class RpcConfigKeys { false, "specify biz thread pool implementation type", new String[] { "sofa_rpc_server_thread_pool_type" }); + + // config center + public static ConfigKey ZK_ADDRESS = ConfigKey + .build( + "sofa.rpc.config.center.zookeeper.address", + "127.0.0.1:2181", + false, + "The address of Zookeeper configuration center.", + new String[] { "zookeeper_address" }); + + public static ConfigKey NACOS_ADDRESS = ConfigKey + .build( + "sofa.rpc.config.center.nacos.address", + "127.0.0.1:8848", + false, + "The address of Nacos configuration center.", + new String[] { "nacos_address" }); } From 853ed3f4f4a48f4e6570d44bda423ef6e37528ec Mon Sep 17 00:00:00 2001 From: Narziss Date: Mon, 23 Sep 2024 08:02:53 +0800 Subject: [PATCH 2/4] support dynamic config at the interface level --- .../bootstrap/DefaultConsumerBootstrap.java | 73 ++++++++- .../apollo/ApolloDynamicConfigManager.java | 83 ++++++++++- .../nacos/NacosDynamicConfigManager.java | 140 +++++++++++++++--- .../zk/ZookeeperDynamicConfigManager.java | 123 ++++++++++++--- .../sofa/rpc/client/AbstractCluster.java | 3 +- .../sofa/rpc/client/lb/AutoLoadBalancer.java | 2 +- .../sofa/rpc/dynamic/ConfigChangeType.java | 39 +++++ .../sofa/rpc/dynamic/ConfigChangedEvent.java | 94 ++++++++++++ .../sofa/rpc/dynamic/DynamicConfigKeys.java | 36 ++++- .../rpc/dynamic/DynamicConfigManager.java | 22 +++ .../dynamic/DynamicConfigManagerFactory.java | 42 +++++- .../sofa/rpc/listener/ConfigListener.java | 11 ++ .../sofa/rpc/common/config/RpcConfigKeys.java | 17 --- .../sofa/rpc/common/utils/StringUtils.java | 5 + 14 files changed, 618 insertions(+), 72 deletions(-) create mode 100644 core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java create mode 100644 core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java diff --git a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java index f6ba175a7..0c4bfc4af 100644 --- a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java +++ b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java @@ -28,6 +28,8 @@ import com.alipay.sofa.rpc.config.RegistryConfig; import com.alipay.sofa.rpc.context.RpcRuntimeContext; import com.alipay.sofa.rpc.core.exception.SofaRpcRuntimeException; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; +import com.alipay.sofa.rpc.dynamic.ConfigChangeType; import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; @@ -42,10 +44,7 @@ import com.alipay.sofa.rpc.registry.Registry; import com.alipay.sofa.rpc.registry.RegistryFactory; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -156,13 +155,26 @@ public T refer() { proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(), proxyInvoker); - //动态配置 + //启用请求级别动态配置 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager( consumerConfig.getAppName(), dynamicAlias); dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId()); } + + //启用接口级别动态配置 + final String dynamicUrl = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_URL); + if (StringUtils.isNotBlank(dynamicUrl)) { + final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl( + consumerConfig.getAppName(), dynamicUrl); + + //build listeners for dynamic config + consumerConfig.setConfigListener(buildConfigListener(this,dynamicManager)); + } + + + } catch (Exception e) { if (cluster != null) { cluster.destroy(); @@ -202,6 +214,17 @@ protected ConfigListener buildConfigListener(ConsumerBootstrap bootstrap) { return new ConsumerAttributeListener(); } + /** + * Build ConfigListener for consumer bootstrap with dynamic config. + * + * @param bootstrap ConsumerBootstrap + * @param dynamicManager DynamicConfigManager + * @return ConfigListener + */ + protected ConfigListener buildConfigListener(ConsumerBootstrap bootstrap, DynamicConfigManager dynamicManager) { + return new ConsumerAttributeListener(dynamicManager); + } + /** * Build ProviderInfoListener for consumer bootstrap. * @@ -438,8 +461,46 @@ public void updateAllProviders(List groups) { */ private class ConsumerAttributeListener implements ConfigListener { + private Map newValueMap = new HashMap<>(); + + ConsumerAttributeListener() { + + } + + ConsumerAttributeListener(DynamicConfigManager dynamicManager) { + this.initWith(consumerConfig.getInterfaceId(),dynamicManager); + } + + public void initWith(String key, DynamicConfigManager dynamicManager) { + dynamicManager.addListener(key, this); + // 初始化配置值 + String rawConfig = dynamicManager.getConfig(key); + if (!StringUtils.isEmpty(rawConfig)) { + process(new ConfigChangedEvent(key, "sofa-rpc",rawConfig)); + } + } + + @Override + public void process(ConfigChangedEvent event){ + if (event.getChangeType().equals(ConfigChangeType.DELETED)) { + newValueMap = null; + } else { + // ADDED or MODIFIED + String[] lines = event.getContent().split("\n"); + for (String line : lines) { + String[] keyValue = line.split("=", 2); + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + newValueMap.put(key, value); + } + } + } + attrUpdated(newValueMap); + } + @Override - public void configChanged(Map newValue) { + public void configChanged(Map newValueMap) { } diff --git a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java index 35367d5ca..e8394b8f8 100644 --- a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java +++ b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java @@ -16,13 +16,24 @@ */ package com.alipay.sofa.rpc.dynamic.apollo; +import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.auth.AuthRuleGroup; -import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; -import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; -import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.dynamic.*; import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ConfigListener; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; import com.ctrip.framework.apollo.Config; +import com.ctrip.framework.apollo.ConfigChangeListener; import com.ctrip.framework.apollo.ConfigService; +import com.ctrip.framework.apollo.enums.PropertyChangeType; +import com.ctrip.framework.apollo.model.ConfigChange; + +import java.util.Collections; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; /** * @author bystander @@ -34,11 +45,30 @@ @Extension(value = "apollo", override = true) public class ApolloDynamicConfigManager extends DynamicConfigManager { + Logger LOGGER = LoggerFactory.getLogger(ApolloDynamicConfigManager.class); + + private static final String APOLLO_APPID_KEY = "app.id"; + + private static final String APOLLO_ADDR_KEY = "apollo.meta"; + + private static final String APOLLO_PROTOCOL_PREFIX = "http://"; + private Config config; + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + protected ApolloDynamicConfigManager(String appName) { super(appName); - config = ConfigService.getAppConfig(); + System.setProperty(APOLLO_APPID_KEY, appName); + System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + SofaConfigs.getOrDefault(DynamicConfigKeys.APOLLO_ADDRESS)); + config = ConfigService.getConfig(DynamicConfigKeys.DEFAULT_GROUP); + } + + protected ApolloDynamicConfigManager(String appName,String address) { + super(appName); + System.setProperty(APOLLO_APPID_KEY, appName); + System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + address); + config = ConfigService.getConfig(DynamicConfigKeys.DEFAULT_GROUP); } @Override @@ -77,4 +107,49 @@ public AuthRuleGroup getServiceAuthRule(String service) { //TODO 暂不支持 return null; } + + @Override + public String getConfig(String key){ + return config.getProperty(key, DynamicHelper.DEFAULT_DYNAMIC_VALUE); + } + + @Override + public void addListener(String key, ConfigListener listener){ + ApolloListener apolloListener = watchListenerMap.computeIfAbsent(key, k -> new ApolloListener()); + apolloListener.addListener(listener); + config.addChangeListener(apolloListener, Collections.singleton(key)); + } + + public class ApolloListener implements ConfigChangeListener { + + private Set listeners = new CopyOnWriteArraySet<>(); + + ApolloListener() {} + + @Override + public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) { + for (String key : changeEvent.changedKeys()) { + ConfigChange change = changeEvent.getChange(key); + if ("".equals(change.getNewValue())) { + LOGGER.info("an empty rule is received for key: " + key); + return; + } + + ConfigChangedEvent event = + new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change)); + listeners.forEach(listener -> listener.process(event)); + } + } + + private ConfigChangeType getChangeType(ConfigChange change) { + if (change.getChangeType() == PropertyChangeType.DELETED) { + return ConfigChangeType.DELETED; + } + return ConfigChangeType.MODIFIED; + } + + void addListener(ConfigListener configListener) { + this.listeners.add(configListener); + } + } } \ No newline at end of file diff --git a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java index f5b9800ff..dd58d1bd6 100644 --- a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java +++ b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java @@ -17,21 +17,28 @@ package com.alipay.sofa.rpc.dynamic.nacos; import com.alibaba.nacos.api.PropertyKeyConst; +import com.alibaba.nacos.api.config.listener.AbstractSharedListener; import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.auth.AuthRuleGroup; -import com.alipay.sofa.rpc.common.config.RpcConfigKeys; import com.alipay.sofa.rpc.common.utils.StringUtils; -import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; -import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; -import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.dynamic.*; import com.alipay.sofa.rpc.ext.Extension; import com.alibaba.nacos.api.config.ConfigService; import com.alibaba.nacos.api.exception.NacosException; import com.alibaba.nacos.api.config.ConfigFactory; +import com.alipay.sofa.rpc.listener.ConfigListener; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; +import java.util.Map; import java.util.Properties; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; +import java.util.concurrent.Executor; + +import static com.alipay.sofa.rpc.common.utils.StringUtils.KEY_SEPARATOR; /** * @author Narziss @@ -42,24 +49,38 @@ public class NacosDynamicConfigManager extends DynamicConfigManager { private final static Logger LOGGER = LoggerFactory.getLogger(NacosDynamicConfigManager.class); - private static final String DEFAULT_NAMESPACE = "sofa-rpc"; - private static final String ADDRESS = SofaConfigs.getOrDefault(RpcConfigKeys.NACOS_ADDRESS); - private static final String DEFAULT_GROUP = "sofa-rpc"; + private static final long DEFAULT_TIMEOUT = 5000; + + private final String address; + private ConfigService configService; + private Properties nacosConfig = new Properties(); - private final String appName; + + private final String group; + + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); protected NacosDynamicConfigManager(String appName) { super(appName); - if (StringUtils.isEmpty(appName)) { - this.appName = DEFAULT_GROUP; - } else { - this.appName = appName; + address=SofaConfigs.getOrDefault(DynamicConfigKeys.NACOS_ADDRESS); + group = DynamicConfigKeys.DEFAULT_GROUP; + try { + nacosConfig.put(PropertyKeyConst.SERVER_ADDR, address); + configService = ConfigFactory.createConfigService(nacosConfig); + + } catch (NacosException e) { + LOGGER.error("Failed to create ConfigService", e); } + } + + protected NacosDynamicConfigManager(String appName, String address) { + super(appName); + this.address = address; + group = DynamicConfigKeys.DEFAULT_GROUP; try { - nacosConfig.put(PropertyKeyConst.SERVER_ADDR, ADDRESS); - nacosConfig.put(PropertyKeyConst.NAMESPACE, DEFAULT_NAMESPACE); + nacosConfig.put(PropertyKeyConst.SERVER_ADDR, address); configService = ConfigFactory.createConfigService(nacosConfig); } catch (NacosException e) { @@ -78,7 +99,7 @@ public String getProviderServiceProperty(String service, String key) { try { String configValue = configService.getConfig( DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), - appName, DEFAULT_TIMEOUT); + group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { return DynamicHelper.DEFAULT_DYNAMIC_VALUE; @@ -90,7 +111,7 @@ public String getConsumerServiceProperty(String service, String key) { try { String configValue = configService.getConfig( DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), - appName, DEFAULT_TIMEOUT); + group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { return DynamicHelper.DEFAULT_DYNAMIC_VALUE; @@ -102,7 +123,7 @@ public String getProviderMethodProperty(String service, String method, String ke try { String configValue = configService.getConfig( DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), - appName, DEFAULT_TIMEOUT); + group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { return DynamicHelper.DEFAULT_DYNAMIC_VALUE; @@ -113,17 +134,98 @@ public String getProviderMethodProperty(String service, String method, String ke public String getConsumerMethodProperty(String service, String method, String key) { try { String configValue = configService.getConfig( - DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key), - appName, DEFAULT_TIMEOUT); + buildDataId(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key)), + group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { return DynamicHelper.DEFAULT_DYNAMIC_VALUE; } } + public String buildDataId(String proKey) { + return getAppName() + KEY_SEPARATOR + proKey; + } + @Override public AuthRuleGroup getServiceAuthRule(String service) { //TODO 暂不支持 return null; } + + @Override + public String getConfig(String key){ + try { + return configService.getConfig(getAppName()+ KEY_SEPARATOR +key, group, DEFAULT_TIMEOUT); + } catch (NacosException e) { + LOGGER.error("Failed to getConfig for key:{}, group:{}", key, group, e); + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public void addListener(String key, ConfigListener listener) { + NacosConfigListener nacosConfigListener = watchListenerMap.computeIfAbsent( + key, k -> createTargetListener(key)); + nacosConfigListener.addListener(listener); + try { + configService.addListener(getAppName()+KEY_SEPARATOR +key, group, nacosConfigListener); + } catch (NacosException e) { + LOGGER.error("Failed to add listener for key:{}, group:{}", key, group, e); + } + } + + private NacosConfigListener createTargetListener(String key) { + NacosConfigListener configListener = new NacosConfigListener(); + configListener.fillContext(key, group); + return configListener; + } + + public class NacosConfigListener extends AbstractSharedListener { + + private Set listeners = new CopyOnWriteArraySet<>(); + /** + * cache data to store old value + */ + private Map cacheData = new ConcurrentHashMap<>(); + + @Override + public Executor getExecutor() { + return null; + } + + /** + * receive + * + * @param dataId data ID + * @param group group + * @param configInfo content + */ + @Override + public void innerReceive(String dataId, String group, String configInfo) { + String oldValue = cacheData.get(dataId); + ConfigChangedEvent event = + new ConfigChangedEvent(dataId, group, configInfo, getChangeType(configInfo, oldValue)); + if (configInfo == null) { + cacheData.remove(dataId); + } else { + cacheData.put(dataId, configInfo); + } + listeners.forEach(listener -> listener.process(event)); + } + + void addListener(ConfigListener configListener) { + + this.listeners.add(configListener); + } + + private ConfigChangeType getChangeType(String configInfo, String oldValue) { + if (StringUtils.isBlank(configInfo)) { + return ConfigChangeType.DELETED; + } + if (StringUtils.isBlank(oldValue)) { + return ConfigChangeType.ADDED; + } + return ConfigChangeType.MODIFIED; + } + } } \ No newline at end of file diff --git a/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java index 363ddb534..47dadc444 100644 --- a/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java +++ b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java @@ -18,25 +18,24 @@ import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.auth.AuthRuleGroup; -import com.alipay.sofa.rpc.common.config.RpcConfigKeys; -import com.alipay.sofa.rpc.common.utils.StringUtils; -import com.alipay.sofa.rpc.dynamic.DynamicConfigKeyHelper; -import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; -import com.alipay.sofa.rpc.dynamic.DynamicHelper; +import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.dynamic.*; import com.alipay.sofa.rpc.ext.Extension; +import com.alipay.sofa.rpc.listener.ConfigListener; import com.alipay.sofa.rpc.log.Logger; import com.alipay.sofa.rpc.log.LoggerFactory; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.recipes.cache.PathChildrenCache; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheEvent; -import org.apache.curator.framework.recipes.cache.PathChildrenCacheListener; +import org.apache.curator.framework.recipes.cache.*; import org.apache.curator.retry.ExponentialBackoffRetry; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.CopyOnWriteArraySet; import static com.alipay.sofa.rpc.common.utils.StringUtils.CONTEXT_SEP; +import static com.alipay.sofa.rpc.common.utils.StringUtils.KEY_SEPARATOR; /** * @author Narziss @@ -48,28 +47,24 @@ public class ZookeeperDynamicConfigManager extends DynamicConfigManager { private final static Logger LOGGER = LoggerFactory .getLogger(ZookeeperDynamicConfigManager.class); + private final CuratorFramework zkClient; - private static final String ADDRESS = SofaConfigs - .getOrDefault(RpcConfigKeys.ZK_ADDRESS); - private static final String DEFAULT_NAMESPACE = "sofa-rpc"; - private static final String CONFIG_NODE = "config"; - private static final String DEFAULT_APP = "sofa-rpc"; - private final String appName; + + private final String address; + private final String rootPath; private ConcurrentMap configMap = new ConcurrentHashMap<>(); + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + protected ZookeeperDynamicConfigManager(String appName) { super(appName); - if (StringUtils.isEmpty(appName)) { - this.appName = DEFAULT_APP; - } else { - this.appName = appName; - } - rootPath = CONTEXT_SEP + CONFIG_NODE + CONTEXT_SEP + appName; + rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + DynamicConfigKeys.DEFAULT_GROUP + CONTEXT_SEP + getAppName(); + address = SofaConfigs.getOrDefault(DynamicConfigKeys.ZK_ADDRESS); zkClient = CuratorFrameworkFactory.builder() - .connectString(ADDRESS) + .connectString(address) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .namespace(DEFAULT_NAMESPACE) + .namespace(DynamicConfigKeys.DEFAULT_NAMESPACE) .build(); zkClient.start(); @@ -102,9 +97,22 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th } } + protected ZookeeperDynamicConfigManager(String appName,String address) { + super(appName); + rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + DynamicConfigKeys.DEFAULT_GROUP; + this.address = address; + zkClient = CuratorFrameworkFactory.builder() + .connectString(address) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .namespace(DynamicConfigKeys.DEFAULT_NAMESPACE) + .build(); + zkClient.start(); + } + @Override public void initServiceConfiguration(String service) { //TODO not now + } @Override @@ -153,4 +161,75 @@ public AuthRuleGroup getServiceAuthRule(String service) { //TODO 暂不支持 return null; } + + @Override + public String getConfig(String key){ + try { + byte[] bytes = zkClient.getData().forPath(rootPath+ CONTEXT_SEP +getAppName()+ KEY_SEPARATOR + key); + String configValue = new String(bytes, RpcConstants.DEFAULT_CHARSET); + return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } catch (Exception e) { + return DynamicHelper.DEFAULT_DYNAMIC_VALUE; + } + } + + @Override + public void addListener(String key, ConfigListener listener){ + String pathKey = rootPath+ CONTEXT_SEP +getAppName()+ KEY_SEPARATOR + key; + + ZookeeperConfigListener zookeeperConfigListener = watchListenerMap.computeIfAbsent( + key, k -> createTargetListener(pathKey)); + + zookeeperConfigListener.addListener(listener); + } + + private ZookeeperConfigListener createTargetListener(String pathKey) { + ZookeeperConfigListener configListener = new ZookeeperConfigListener(pathKey); + return configListener; + } + + public class ZookeeperConfigListener implements NodeCacheListener { + + private String pathKey; + private Set listeners = new CopyOnWriteArraySet<>(); + private NodeCache nodeCache; + + public ZookeeperConfigListener(String pathKey) { + this.pathKey = pathKey; + this.nodeCache = new NodeCache(zkClient, pathKey); + nodeCache.getListenable().addListener(this); + try { + nodeCache.start(); + } catch (Exception e) { + LOGGER.error("Failed to add listener for path:{}", pathKey, e); + } + } + + public void addListener(ConfigListener configListener) { + listeners.add(configListener); + } + + @Override + public void nodeChanged() throws Exception { + ChildData childData = nodeCache.getCurrentData(); + String content = null; + ConfigChangeType changeType; + if (childData == null) { + changeType = ConfigChangeType.DELETED; + + } else if (childData.getStat().getVersion() == 0) { + content = new String(childData.getData(), RpcConstants.DEFAULT_CHARSET); + changeType = ConfigChangeType.ADDED; + } else { + content = new String(childData.getData(), RpcConstants.DEFAULT_CHARSET); + changeType = ConfigChangeType.MODIFIED; + } + ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(pathKey, DynamicConfigKeys.DEFAULT_GROUP, (String) content, changeType); + listeners.forEach(listener -> listener.process(configChangeEvent)); + + } + } + + + } \ No newline at end of file diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java index 33fc4cc2a..ed38586d7 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/AbstractCluster.java @@ -686,7 +686,7 @@ private SofaResponse buildEmptyResponse(SofaRequest request) { * @return 调用超时 */ private int resolveTimeout(SofaRequest request, ConsumerConfig consumerConfig, ProviderInfo providerInfo) { - // 动态配置优先 + // 请求级别动态配置优先 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { String dynamicTimeout = null; @@ -704,6 +704,7 @@ private int resolveTimeout(SofaRequest request, ConsumerConfig consumerConfig, P return Integer.parseInt(dynamicTimeout); } } + // 先去调用级别配置 Integer timeout = request.getTimeout(); if (timeout == null || timeout <= 0) { diff --git a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java index 9bf01c75e..27d92a90d 100644 --- a/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java +++ b/core-impl/client/src/main/java/com/alipay/sofa/rpc/client/lb/AutoLoadBalancer.java @@ -53,7 +53,7 @@ public AutoLoadBalancer(ConsumerBootstrap consumerBootstrap) { @Override protected ProviderInfo doSelect(SofaRequest request, List providerInfos) { - // 动态配置优先 + // 请求级别动态配置优先 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { String dynamicLoadBalancer = null; diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java new file mode 100644 index 000000000..99f87d457 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangeType.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic; + +/** + * @author Narziss + * @version ConfigChangeType.java, v 0.1 2024年09月15日 20:20 Narziss + */ + +public enum ConfigChangeType { + /** + * A config is created. + */ + ADDED, + + /** + * A config is updated. + */ + MODIFIED, + + /** + * A config is deleted. + */ + DELETED +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java new file mode 100644 index 000000000..2dc006610 --- /dev/null +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.dynamic; + +import java.util.EventObject; +import java.util.Objects; + +/** + * @author Narziss + * @version ConfigChangedEvent.java, v 0.1 2024年09月15日 20:12 Narziss + */ + +public class ConfigChangedEvent extends EventObject { + + private final String key; + + private final String group; + + private final String content; + + private final ConfigChangeType changeType; + + public ConfigChangedEvent(String key, String group, String content) { + this(key, group, content, ConfigChangeType.MODIFIED); + } + + public ConfigChangedEvent(String key, String group, String content, ConfigChangeType changeType) { + super(key + "," + group); + this.key = key; + this.group = group; + this.content = content; + this.changeType = changeType; + } + + public String getKey() { + return key; + } + + public String getGroup() { + return group; + } + + public String getContent() { + return content; + } + + public ConfigChangeType getChangeType() { + return changeType; + } + + @Override + public String toString() { + return "ConfigChangedEvent{" + "key='" + + key + '\'' + ", group='" + + group + '\'' + ", content='" + + content + '\'' + ", changeType=" + + changeType + "} " + + super.toString(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ConfigChangedEvent)) { + return false; + } + ConfigChangedEvent that = (ConfigChangedEvent) o; + return Objects.equals(getKey(), that.getKey()) + && Objects.equals(getGroup(), that.getGroup()) + && Objects.equals(getContent(), that.getContent()) + && getChangeType() == that.getChangeType(); + } + + @Override + public int hashCode() { + return Objects.hash(getKey(), getGroup(), getContent(), getChangeType()); + } +} diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java index d8c49dcec..d7929b689 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java @@ -16,10 +16,44 @@ */ package com.alipay.sofa.rpc.dynamic; +import com.alipay.sofa.common.config.ConfigKey; + /** * @author bystander * @version : DynamicConfigKeys.java, v 0.1 2019年04月17日 21:51 bystander Exp $ */ public class DynamicConfigKeys { - public static final String DYNAMIC_ALIAS = "dynamicAlias"; + public static final String DYNAMIC_ALIAS = "dynamicAlias"; + + public static final String DYNAMIC_URL = "dynamicUrl"; + + public static final String CONFIG_NODE = "config"; + + public static final String DEFAULT_NAMESPACE = "sofa-rpc"; + + public static final String DEFAULT_GROUP = "sofa-rpc"; + + public static ConfigKey ZK_ADDRESS = ConfigKey + .build( + "sofa.rpc.config.center.zookeeper.address", + "127.0.0.1:2181", + false, + "The address of Zookeeper configuration center.", + new String[] { "zookeeper_address" }); + + public static ConfigKey NACOS_ADDRESS = ConfigKey + .build( + "sofa.rpc.config.center.nacos.address", + "127.0.0.1:8848", + false, + "The address of Nacos configuration center.", + new String[] { "nacos_address" }); + public static ConfigKey APOLLO_ADDRESS = ConfigKey + .build( + "sofa.rpc.config.center.apollo.address", + "127.0.0.1:8080", + false, + "The address of Apollo configuration center.", + new String[] { "apollo_address" }); + } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java index f44818df8..72f879fcd 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java @@ -18,6 +18,7 @@ import com.alipay.sofa.rpc.auth.AuthRuleGroup; import com.alipay.sofa.rpc.ext.Extensible; +import com.alipay.sofa.rpc.listener.ConfigListener; /** * @@ -33,6 +34,10 @@ protected DynamicConfigManager(String appName) { this.appName = appName; } + protected String getAppName() { + return appName; + } + /** * Init service's governance related configuration. * Such as auth rules、lb rules @@ -86,4 +91,21 @@ protected DynamicConfigManager(String appName) { * @return auth rules */ public abstract AuthRuleGroup getServiceAuthRule(String service); + + /** + * Add config listener. + * + * @param key config key + * @param listener config listener + */ + public abstract void addListener(String key, ConfigListener listener); + + /** + * Get config value by key. + * + * @param key config key + * @return config value + */ + public abstract String getConfig(String key); + } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java index d3cee7245..dd6c1c978 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManagerFactory.java @@ -51,7 +51,7 @@ public class DynamicConfigManagerFactory { .getLogger(DynamicConfigManagerFactory.class); /** - * 得到动态配置管理 + * 得到请求级别动态配置管理 * * @param alias 别名 * @return DynamicManager 实现 @@ -86,4 +86,44 @@ public static DynamicConfigManager getDynamicManager(String appName, String alia } } + /** + * 得到接口级别动态配置管理 + * + * @param url 配置中心别名与地址 + * @return DynamicManager 实现 + */ + public static DynamicConfigManager getDynamicManagerWithUrl(String appName, String url) { + + String[] urlValues = url.split("://", 2); + + classLock.lock(); + try { + if (ALL_DYNAMICS.size() > 3) { + if (LOGGER.isWarnEnabled()) { + LOGGER.warn("Size of dynamic manager is greater than 3, Please check it!"); + } + } + DynamicConfigManager registry = ALL_DYNAMICS.get(urlValues[0]); + if (registry == null) { + ExtensionClass ext = ExtensionLoaderFactory.getExtensionLoader( + DynamicConfigManager.class) + .getExtensionClass(urlValues[0]); + if (ext == null) { + throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_LOAD_EXT, "DynamicConfigManager", + urlValues[0])); + } + registry = ext.getExtInstance(new Class[] { String.class, String.class }, new Object[] { appName, + urlValues[1] }); + ALL_DYNAMICS.put(urlValues[0], registry); + } + return registry; + } catch (Throwable e) { + throw new SofaRpcRuntimeException(LogCodes.getLog(LogCodes.ERROR_LOAD_EXT, "DynamicConfigManager", + urlValues[0]), + e); + } finally { + classLock.unlock(); + } + } + } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java b/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java index 7fb03ac2b..0c7685350 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/listener/ConfigListener.java @@ -16,6 +16,8 @@ */ package com.alipay.sofa.rpc.listener; +import com.alipay.sofa.rpc.dynamic.ConfigChangedEvent; + import java.util.Map; /** @@ -25,6 +27,15 @@ */ public interface ConfigListener { + /** + * 处理配置变更事件 + * + * @param event 配置变更事件 + */ + default void process(ConfigChangedEvent event){ + // do nothing + } + /** * 配置发生变化,例如 * diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java index bc60ffca2..2b4934ce0 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/config/RpcConfigKeys.java @@ -140,21 +140,4 @@ public class RpcConfigKeys { false, "specify biz thread pool implementation type", new String[] { "sofa_rpc_server_thread_pool_type" }); - - // config center - public static ConfigKey ZK_ADDRESS = ConfigKey - .build( - "sofa.rpc.config.center.zookeeper.address", - "127.0.0.1:2181", - false, - "The address of Zookeeper configuration center.", - new String[] { "zookeeper_address" }); - - public static ConfigKey NACOS_ADDRESS = ConfigKey - .build( - "sofa.rpc.config.center.nacos.address", - "127.0.0.1:8848", - false, - "The address of Nacos configuration center.", - new String[] { "nacos_address" }); } diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java index 77febb203..97bda388b 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java @@ -39,6 +39,11 @@ public class StringUtils { */ public static final String CONTEXT_SEP = "/"; + /** + * The config key separator String {@code ":"} + */ + public static final String KEY_SEPARATOR = ":"; + /** * The string {@code "*"}. * From 92351f2813b6f65afadaccf91c05cf1774b4a069 Mon Sep 17 00:00:00 2001 From: Narziss Date: Sun, 29 Sep 2024 09:01:00 +0800 Subject: [PATCH 3/4] Optimize interface-level dynamic config --- .../bootstrap/DefaultConsumerBootstrap.java | 70 +++++++-------- .../apollo/ApolloDynamicConfigManager.java | 73 +++++++++------- .../nacos/NacosDynamicConfigManager.java | 87 ++++++++++--------- .../zk/ZookeeperDynamicConfigManager.java | 73 ++++++++-------- .../rpc/config/AbstractInterfaceConfig.java | 29 ++++++- .../sofa/rpc/dynamic/ConfigChangedEvent.java | 46 +--------- .../sofa/rpc/dynamic/DynamicConfigKeys.java | 10 ++- .../rpc/dynamic/DynamicConfigManager.java | 37 ++++++-- .../sofa/rpc/common/utils/StringUtils.java | 5 -- test/test-integration/pom.xml | 20 ++++- .../test/config/ApolloDynamicConfigTest.java | 76 ++++++++++++++++ .../test/config/NacosDynamicConfigTest.java | 72 +++++++++++++++ .../config/ZookeeperDynamicConfigTest.java | 80 +++++++++++++++++ .../sofa/rpc/test/config/base/BaseZkTest.java | 62 +++++++++++++ 14 files changed, 526 insertions(+), 214 deletions(-) create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java create mode 100644 test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java diff --git a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java index 0c4bfc4af..9c4923243 100644 --- a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java +++ b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java @@ -20,6 +20,7 @@ import com.alipay.sofa.rpc.client.Cluster; import com.alipay.sofa.rpc.client.ClusterFactory; import com.alipay.sofa.rpc.client.ProviderGroup; +import com.alipay.sofa.rpc.common.RpcConstants; import com.alipay.sofa.rpc.common.SofaConfigs; import com.alipay.sofa.rpc.common.SofaOptions; import com.alipay.sofa.rpc.common.utils.CommonUtils; @@ -53,6 +54,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static com.alipay.sofa.rpc.common.RpcConstants.REGISTRY_PROTOCOL_DOMAIN; +import static com.alipay.sofa.common.config.SofaConfigs.getOrDefault; /** * Default consumer bootstrap. @@ -145,7 +147,8 @@ public T refer() { // build cluster cluster = ClusterFactory.getCluster(this); // build listeners - consumerConfig.setConfigListener(buildConfigListener(this)); + ConfigListener configListener = buildConfigListener(this); + consumerConfig.setConfigListener(configListener); consumerConfig.setProviderInfoListener(buildProviderInfoListener(this)); // init cluster cluster.init(); @@ -155,26 +158,23 @@ public T refer() { proxyIns = (T) ProxyFactory.buildProxy(consumerConfig.getProxy(), consumerConfig.getProxyClass(), proxyInvoker); - //启用请求级别动态配置 + //请求级别动态配置参数 final String dynamicAlias = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_ALIAS); if (StringUtils.isNotBlank(dynamicAlias)) { final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManager( - consumerConfig.getAppName(), dynamicAlias); + consumerConfig.getAppName(), dynamicAlias); dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId()); } - //启用接口级别动态配置 - final String dynamicUrl = consumerConfig.getParameter(DynamicConfigKeys.DYNAMIC_URL); - if (StringUtils.isNotBlank(dynamicUrl)) { + //接口级别动态配置参数 + final String dynamicUrl = getOrDefault(DynamicConfigKeys.DYNAMIC_URL); + if ( StringUtils.isNotBlank(dynamicUrl)) { + //启用接口级别动态配置 final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl( consumerConfig.getAppName(), dynamicUrl); - - //build listeners for dynamic config - consumerConfig.setConfigListener(buildConfigListener(this,dynamicManager)); + dynamicManager.addListener(consumerConfig.getInterfaceId(), configListener); + dynamicManager.initServiceConfiguration(consumerConfig.getInterfaceId(), configListener); } - - - } catch (Exception e) { if (cluster != null) { cluster.destroy(); @@ -214,17 +214,6 @@ protected ConfigListener buildConfigListener(ConsumerBootstrap bootstrap) { return new ConsumerAttributeListener(); } - /** - * Build ConfigListener for consumer bootstrap with dynamic config. - * - * @param bootstrap ConsumerBootstrap - * @param dynamicManager DynamicConfigManager - * @return ConfigListener - */ - protected ConfigListener buildConfigListener(ConsumerBootstrap bootstrap, DynamicConfigManager dynamicManager) { - return new ConsumerAttributeListener(dynamicManager); - } - /** * Build ProviderInfoListener for consumer bootstrap. * @@ -463,28 +452,20 @@ private class ConsumerAttributeListener implements ConfigListener { private Map newValueMap = new HashMap<>(); - ConsumerAttributeListener() { + // 动态配置项 + private List dynamicConfigKeys = Arrays.asList(RpcConstants.CONFIG_KEY_TIMEOUT, + RpcConstants.CONFIG_KEY_RETRIES, RpcConstants.CONFIG_KEY_LOADBALANCER); - } - - ConsumerAttributeListener(DynamicConfigManager dynamicManager) { - this.initWith(consumerConfig.getInterfaceId(),dynamicManager); - } + ConsumerAttributeListener() { - public void initWith(String key, DynamicConfigManager dynamicManager) { - dynamicManager.addListener(key, this); - // 初始化配置值 - String rawConfig = dynamicManager.getConfig(key); - if (!StringUtils.isEmpty(rawConfig)) { - process(new ConfigChangedEvent(key, "sofa-rpc",rawConfig)); - } } @Override - public void process(ConfigChangedEvent event){ - if (event.getChangeType().equals(ConfigChangeType.DELETED)) { - newValueMap = null; - } else { + public void process(ConfigChangedEvent event) { + for (String key : newValueMap.keySet()) { + newValueMap.put(key, ""); + } + if (!event.getChangeType().equals(ConfigChangeType.DELETED)) { // ADDED or MODIFIED String[] lines = event.getContent().split("\n"); for (String line : lines) { @@ -492,7 +473,14 @@ public void process(ConfigChangedEvent event){ if (keyValue.length == 2) { String key = keyValue[0].trim(); String value = keyValue[1].trim(); - newValueMap.put(key, value); + for (String dynamicConfigKey : dynamicConfigKeys) { + if (key.equals(dynamicConfigKey) || key.endsWith("." + dynamicConfigKey)) { + newValueMap.put(key, value); + break; + } + } + } else { + LOGGER.warn("Malformed configuration line: {}", line); } } } diff --git a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java index e8394b8f8..f36a23d1c 100644 --- a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java +++ b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java @@ -18,11 +18,10 @@ import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.auth.AuthRuleGroup; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.dynamic.*; import com.alipay.sofa.rpc.ext.Extension; import com.alipay.sofa.rpc.listener.ConfigListener; -import com.alipay.sofa.rpc.log.Logger; -import com.alipay.sofa.rpc.log.LoggerFactory; import com.ctrip.framework.apollo.Config; import com.ctrip.framework.apollo.ConfigChangeListener; import com.ctrip.framework.apollo.ConfigService; @@ -45,12 +44,12 @@ @Extension(value = "apollo", override = true) public class ApolloDynamicConfigManager extends DynamicConfigManager { - Logger LOGGER = LoggerFactory.getLogger(ApolloDynamicConfigManager.class); - private static final String APOLLO_APPID_KEY = "app.id"; private static final String APOLLO_ADDR_KEY = "apollo.meta"; + private static final String APOLLO_CLUSTER_KEY = "apollo.cluster"; + private static final String APOLLO_PROTOCOL_PREFIX = "http://"; private Config config; @@ -58,47 +57,70 @@ public class ApolloDynamicConfigManager extends DynamicConfigManager { private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); protected ApolloDynamicConfigManager(String appName) { - super(appName); - System.setProperty(APOLLO_APPID_KEY, appName); - System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + SofaConfigs.getOrDefault(DynamicConfigKeys.APOLLO_ADDRESS)); - config = ConfigService.getConfig(DynamicConfigKeys.DEFAULT_GROUP); + super(appName, SofaConfigs.getOrCustomDefault(DynamicConfigKeys.APOLLO_ADDRESS,"")); + if (StringUtils.isNotBlank(appName)) { + System.setProperty(APOLLO_APPID_KEY, appName); + } + if (StringUtils.isNotBlank(getAddress())) { + System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getAddress()); + } + config = ConfigService.getAppConfig(); } - protected ApolloDynamicConfigManager(String appName,String address) { - super(appName); + protected ApolloDynamicConfigManager(String appName, String remainUrl) { + super(appName, remainUrl); System.setProperty(APOLLO_APPID_KEY, appName); - System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + address); - config = ConfigService.getConfig(DynamicConfigKeys.DEFAULT_GROUP); + System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getAddress()); + String params[] = getParams(); + if (params!= null && params.length > 0){ + for (String param : params) { + String[] keyValue = param.split("="); + if (keyValue.length == 2) { + if ("cluster".equals(keyValue[0])) { + System.setProperty(APOLLO_CLUSTER_KEY, keyValue[1]); + } + } + } + } + config = ConfigService.getAppConfig(); } @Override public void initServiceConfiguration(String service) { - //TODO not now + // TODO 暂不支持 + } + + @Override + public void initServiceConfiguration(String service, ConfigListener listener) { + String rawConfig = config.getProperty(service, ""); + if (StringUtils.isNotBlank(rawConfig)) { + listener.process(new ConfigChangedEvent(service, rawConfig)); + } } @Override public String getProviderServiceProperty(String service, String key) { return config.getProperty(DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @Override public String getConsumerServiceProperty(String service, String key) { return config.getProperty(DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @Override public String getProviderMethodProperty(String service, String method, String key) { return config.getProperty(DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @Override public String getConsumerMethodProperty(String service, String method, String key) { return config.getProperty(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key), - DynamicHelper.DEFAULT_DYNAMIC_VALUE); + DynamicHelper.DEFAULT_DYNAMIC_VALUE); } @@ -109,12 +131,7 @@ public AuthRuleGroup getServiceAuthRule(String service) { } @Override - public String getConfig(String key){ - return config.getProperty(key, DynamicHelper.DEFAULT_DYNAMIC_VALUE); - } - - @Override - public void addListener(String key, ConfigListener listener){ + public void addListener(String key, ConfigListener listener) { ApolloListener apolloListener = watchListenerMap.computeIfAbsent(key, k -> new ApolloListener()); apolloListener.addListener(listener); config.addChangeListener(apolloListener, Collections.singleton(key)); @@ -124,19 +141,15 @@ public class ApolloListener implements ConfigChangeListener { private Set listeners = new CopyOnWriteArraySet<>(); - ApolloListener() {} + ApolloListener() { + } @Override public void onChange(com.ctrip.framework.apollo.model.ConfigChangeEvent changeEvent) { for (String key : changeEvent.changedKeys()) { ConfigChange change = changeEvent.getChange(key); - if ("".equals(change.getNewValue())) { - LOGGER.info("an empty rule is received for key: " + key); - return; - } - ConfigChangedEvent event = - new ConfigChangedEvent(key, change.getNamespace(), change.getNewValue(), getChangeType(change)); + new ConfigChangedEvent(key, change.getNewValue(), getChangeType(change)); listeners.forEach(listener -> listener.process(event)); } } diff --git a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java index dd58d1bd6..fc6e92ca8 100644 --- a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java +++ b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java @@ -38,8 +38,6 @@ import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.Executor; -import static com.alipay.sofa.rpc.common.utils.StringUtils.KEY_SEPARATOR; - /** * @author Narziss * @version NaocsDynamicConfigManager.java, v 0.1 2024年07月26日 09:37 Narziss @@ -48,39 +46,47 @@ @Extension(value = "nacos", override = true) public class NacosDynamicConfigManager extends DynamicConfigManager { - private final static Logger LOGGER = LoggerFactory.getLogger(NacosDynamicConfigManager.class); - - private static final long DEFAULT_TIMEOUT = 5000; + private final static Logger LOGGER = LoggerFactory.getLogger(NacosDynamicConfigManager.class); - private final String address; + private static final long DEFAULT_TIMEOUT = 5000; - private ConfigService configService; + private final String group; - private Properties nacosConfig = new Properties(); + private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); - private final String group; + private ConfigService configService; - private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + private Properties nacosConfig = new Properties(); protected NacosDynamicConfigManager(String appName) { - super(appName); - address=SofaConfigs.getOrDefault(DynamicConfigKeys.NACOS_ADDRESS); - group = DynamicConfigKeys.DEFAULT_GROUP; + super(appName, SofaConfigs.getOrDefault(DynamicConfigKeys.NACOS_ADDRESS)); + group = appName; + nacosConfig.put(PropertyKeyConst.SERVER_ADDR, getAddress()); try { - nacosConfig.put(PropertyKeyConst.SERVER_ADDR, address); configService = ConfigFactory.createConfigService(nacosConfig); - } catch (NacosException e) { LOGGER.error("Failed to create ConfigService", e); } } - protected NacosDynamicConfigManager(String appName, String address) { - super(appName); - this.address = address; - group = DynamicConfigKeys.DEFAULT_GROUP; + protected NacosDynamicConfigManager(String appName, String remainUrl) { + super(appName, remainUrl); + group = appName; + nacosConfig.put(PropertyKeyConst.SERVER_ADDR, getAddress()); + String params[] = getParams(); + if (params != null && params.length > 0) { + for (String param : params) { + String[] keyValue = param.split("="); + if (keyValue.length == 2) { + if ("username".equals(keyValue[0])) { + nacosConfig.put(PropertyKeyConst.USERNAME, keyValue[1]); + } else if ("password".equals(keyValue[0])) { + nacosConfig.put(PropertyKeyConst.PASSWORD, keyValue[1]); + } + } + } + } try { - nacosConfig.put(PropertyKeyConst.SERVER_ADDR, address); configService = ConfigFactory.createConfigService(nacosConfig); } catch (NacosException e) { @@ -90,15 +96,26 @@ protected NacosDynamicConfigManager(String appName, String address) { @Override public void initServiceConfiguration(String service) { - //TODO not now + // TODO 暂不支持 + } + @Override + public void initServiceConfiguration(String service, ConfigListener listener) { + try { + String rawConfig = configService.getConfig(service, group, DEFAULT_TIMEOUT); + if (!StringUtils.isEmpty(rawConfig)) { + listener.process(new ConfigChangedEvent(service, rawConfig)); + } + } catch (NacosException e) { + LOGGER.error("Failed to getConfig for key:{}, group:{}", service, group, e); + } } @Override public String getProviderServiceProperty(String service, String key) { try { String configValue = configService.getConfig( - DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), + DynamicConfigKeyHelper.buildProviderServiceProKey(service, key), group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { @@ -110,7 +127,7 @@ public String getProviderServiceProperty(String service, String key) { public String getConsumerServiceProperty(String service, String key) { try { String configValue = configService.getConfig( - DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), + DynamicConfigKeyHelper.buildConsumerServiceProKey(service, key), group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { @@ -122,7 +139,7 @@ public String getConsumerServiceProperty(String service, String key) { public String getProviderMethodProperty(String service, String method, String key) { try { String configValue = configService.getConfig( - DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), + DynamicConfigKeyHelper.buildProviderMethodProKey(service, method, key), group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { @@ -134,7 +151,7 @@ public String getProviderMethodProperty(String service, String method, String ke public String getConsumerMethodProperty(String service, String method, String key) { try { String configValue = configService.getConfig( - buildDataId(DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key)), + DynamicConfigKeyHelper.buildConsumerMethodProKey(service, method, key), group, DEFAULT_TIMEOUT); return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; } catch (NacosException e) { @@ -142,33 +159,19 @@ public String getConsumerMethodProperty(String service, String method, String ke } } - public String buildDataId(String proKey) { - return getAppName() + KEY_SEPARATOR + proKey; - } - @Override public AuthRuleGroup getServiceAuthRule(String service) { //TODO 暂不支持 return null; } - @Override - public String getConfig(String key){ - try { - return configService.getConfig(getAppName()+ KEY_SEPARATOR +key, group, DEFAULT_TIMEOUT); - } catch (NacosException e) { - LOGGER.error("Failed to getConfig for key:{}, group:{}", key, group, e); - return DynamicHelper.DEFAULT_DYNAMIC_VALUE; - } - } - @Override public void addListener(String key, ConfigListener listener) { NacosConfigListener nacosConfigListener = watchListenerMap.computeIfAbsent( key, k -> createTargetListener(key)); nacosConfigListener.addListener(listener); try { - configService.addListener(getAppName()+KEY_SEPARATOR +key, group, nacosConfigListener); + configService.addListener(key, group, nacosConfigListener); } catch (NacosException e) { LOGGER.error("Failed to add listener for key:{}, group:{}", key, group, e); } @@ -194,7 +197,7 @@ public Executor getExecutor() { } /** - * receive + * receive config change event * * @param dataId data ID * @param group group @@ -204,7 +207,7 @@ public Executor getExecutor() { public void innerReceive(String dataId, String group, String configInfo) { String oldValue = cacheData.get(dataId); ConfigChangedEvent event = - new ConfigChangedEvent(dataId, group, configInfo, getChangeType(configInfo, oldValue)); + new ConfigChangedEvent(dataId, configInfo, getChangeType(configInfo, oldValue)); if (configInfo == null) { cacheData.remove(dataId); } else { diff --git a/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java index 47dadc444..773c8e7f3 100644 --- a/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java +++ b/config/config-zk/src/main/java/com/alipay/sofa/rpc/dynamic/zk/ZookeeperDynamicConfigManager.java @@ -19,6 +19,7 @@ import com.alipay.sofa.common.config.SofaConfigs; import com.alipay.sofa.rpc.auth.AuthRuleGroup; import com.alipay.sofa.rpc.common.RpcConstants; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.dynamic.*; import com.alipay.sofa.rpc.ext.Extension; import com.alipay.sofa.rpc.listener.ConfigListener; @@ -35,7 +36,6 @@ import java.util.concurrent.CopyOnWriteArraySet; import static com.alipay.sofa.rpc.common.utils.StringUtils.CONTEXT_SEP; -import static com.alipay.sofa.rpc.common.utils.StringUtils.KEY_SEPARATOR; /** * @author Narziss @@ -45,27 +45,23 @@ @Extension(value = "zookeeper", override = true) public class ZookeeperDynamicConfigManager extends DynamicConfigManager { - private final static Logger LOGGER = LoggerFactory - .getLogger(ZookeeperDynamicConfigManager.class); + private final static Logger LOGGER = LoggerFactory + .getLogger(ZookeeperDynamicConfigManager.class); - private final CuratorFramework zkClient; - - private final String address; - - private final String rootPath; - private ConcurrentMap configMap = new ConcurrentHashMap<>(); + private final CuratorFramework zkClient; + private final String rootPath; private final ConcurrentMap watchListenerMap = new ConcurrentHashMap<>(); + private ConcurrentMap configMap = new ConcurrentHashMap<>(); protected ZookeeperDynamicConfigManager(String appName) { - super(appName); - rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + DynamicConfigKeys.DEFAULT_GROUP + CONTEXT_SEP + getAppName(); - address = SofaConfigs.getOrDefault(DynamicConfigKeys.ZK_ADDRESS); + super(appName, SofaConfigs.getOrDefault(DynamicConfigKeys.ZK_ADDRESS)); + rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + appName; zkClient = CuratorFrameworkFactory.builder() - .connectString(address) - .retryPolicy(new ExponentialBackoffRetry(1000, 3)) - .namespace(DynamicConfigKeys.DEFAULT_NAMESPACE) - .build(); + .connectString(getAddress()) + .retryPolicy(new ExponentialBackoffRetry(1000, 3)) + .namespace(DynamicConfigKeys.DEFAULT_NAMESPACE) + .build(); zkClient.start(); PathChildrenCache cache = new PathChildrenCache(zkClient, rootPath, true); @@ -97,12 +93,11 @@ public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) th } } - protected ZookeeperDynamicConfigManager(String appName,String address) { - super(appName); - rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + DynamicConfigKeys.DEFAULT_GROUP; - this.address = address; + protected ZookeeperDynamicConfigManager(String appName, String remainUrl) { + super(appName, remainUrl); + rootPath = CONTEXT_SEP + DynamicConfigKeys.CONFIG_NODE + CONTEXT_SEP + appName; zkClient = CuratorFrameworkFactory.builder() - .connectString(address) + .connectString(getAddress()) .retryPolicy(new ExponentialBackoffRetry(1000, 3)) .namespace(DynamicConfigKeys.DEFAULT_NAMESPACE) .build(); @@ -111,7 +106,23 @@ protected ZookeeperDynamicConfigManager(String appName,String address) { @Override public void initServiceConfiguration(String service) { - //TODO not now + // TODO 暂不支持 + } + + @Override + public void initServiceConfiguration(String service, ConfigListener listener) { + try { + String path = rootPath + CONTEXT_SEP + service; + if (zkClient.checkExists().forPath(path) != null) { + byte[] bytes = zkClient.getData().forPath(rootPath + CONTEXT_SEP + service); + String rawConfig = new String(bytes, RpcConstants.DEFAULT_CHARSET); + if (!StringUtils.isEmpty(rawConfig)) { + listener.process(new ConfigChangedEvent(service, rawConfig)); + } + } + } catch (Exception e) { + LOGGER.error("Failed to init service configuration for service: " + service, e); + } } @@ -163,19 +174,8 @@ public AuthRuleGroup getServiceAuthRule(String service) { } @Override - public String getConfig(String key){ - try { - byte[] bytes = zkClient.getData().forPath(rootPath+ CONTEXT_SEP +getAppName()+ KEY_SEPARATOR + key); - String configValue = new String(bytes, RpcConstants.DEFAULT_CHARSET); - return configValue != null ? configValue : DynamicHelper.DEFAULT_DYNAMIC_VALUE; - } catch (Exception e) { - return DynamicHelper.DEFAULT_DYNAMIC_VALUE; - } - } - - @Override - public void addListener(String key, ConfigListener listener){ - String pathKey = rootPath+ CONTEXT_SEP +getAppName()+ KEY_SEPARATOR + key; + public void addListener(String key, ConfigListener listener) { + String pathKey = rootPath + CONTEXT_SEP + key; ZookeeperConfigListener zookeeperConfigListener = watchListenerMap.computeIfAbsent( key, k -> createTargetListener(pathKey)); @@ -224,12 +224,11 @@ public void nodeChanged() throws Exception { content = new String(childData.getData(), RpcConstants.DEFAULT_CHARSET); changeType = ConfigChangeType.MODIFIED; } - ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(pathKey, DynamicConfigKeys.DEFAULT_GROUP, (String) content, changeType); + ConfigChangedEvent configChangeEvent = new ConfigChangedEvent(pathKey, (String) content, changeType); listeners.forEach(listener -> listener.process(configChangeEvent)); } } - } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java b/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java index acc4a74e1..8966c6a7e 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/config/AbstractInterfaceConfig.java @@ -215,6 +215,11 @@ public abstract class AbstractInterfaceConfig configValueCache = null; + /** + * 动态配置的方法名称和方法参数配置的map + */ + protected transient volatile Map dynamicConfigValueCache = new ConcurrentHashMap<>(); + /** * 代理接口类,和T对应,主要针对泛化调用 */ @@ -924,6 +929,10 @@ public String queryAttribute(String property) { public boolean updateAttribute(String property, String newValueStr, boolean overwrite) { try { boolean changed = false; + if(StringUtils.isBlank(newValueStr)){ + // 改为默认值 + newValueStr = configValueCache.get(property) == null ? null : configValueCache.get(property).toString(); + } if (property.charAt(0) == RpcConstants.HIDE_KEY_PREFIX) { // 方法级配置 例如.echoStr.timeout String methodAndP = property.substring(1); @@ -940,6 +949,7 @@ public boolean updateAttribute(String property, String newValueStr, boolean over // 拿到旧的值 Object oldValue = null; Object newValue = CompatibleTypeUtils.convert(newValueStr, propertyClazz); + if (methodConfig == null) { methodConfig = new MethodConfig(); methodConfig.setName(methodName); @@ -960,6 +970,11 @@ public boolean updateAttribute(String property, String newValueStr, boolean over } if (changed && overwrite) { BeanUtils.setProperty(methodConfig, methodProperty, propertyClazz, newValue);// 覆盖属性 + if (newValue != null){ + dynamicConfigValueCache.put(property, newValue); + } else { + dynamicConfigValueCache.remove(property); + } if (LOGGER.isInfoEnabled()) { LOGGER.info("Property \"" + methodName + "." + methodProperty + "\" changed from {} to {}", oldValue, newValueStr); @@ -1016,11 +1031,17 @@ public Object getMethodConfigValue(String methodName, String configKey, Object d * @return 配置值 method config value */ public Object getMethodConfigValue(String methodName, String configKey) { - if (configValueCache == null) { - return null; - } String key = buildmkey(methodName, configKey); - return configValueCache.get(key); + Object value = null; + if (dynamicConfigValueCache != null) { + value = dynamicConfigValueCache.get(key); + } + if (value == null) { + if (configValueCache != null) { + value = configValueCache.get(key); + } + } + return value; } /** diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java index 2dc006610..77d2d06f6 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/ConfigChangedEvent.java @@ -17,7 +17,6 @@ package com.alipay.sofa.rpc.dynamic; import java.util.EventObject; -import java.util.Objects; /** * @author Narziss @@ -28,20 +27,17 @@ public class ConfigChangedEvent extends EventObject { private final String key; - private final String group; - private final String content; private final ConfigChangeType changeType; - public ConfigChangedEvent(String key, String group, String content) { - this(key, group, content, ConfigChangeType.MODIFIED); + public ConfigChangedEvent(String key, String content) { + this(key, content, ConfigChangeType.MODIFIED); } - public ConfigChangedEvent(String key, String group, String content, ConfigChangeType changeType) { - super(key + "," + group); + public ConfigChangedEvent(String key, String content, ConfigChangeType changeType) { + super(key); this.key = key; - this.group = group; this.content = content; this.changeType = changeType; } @@ -50,10 +46,6 @@ public String getKey() { return key; } - public String getGroup() { - return group; - } - public String getContent() { return content; } @@ -61,34 +53,4 @@ public String getContent() { public ConfigChangeType getChangeType() { return changeType; } - - @Override - public String toString() { - return "ConfigChangedEvent{" + "key='" - + key + '\'' + ", group='" - + group + '\'' + ", content='" - + content + '\'' + ", changeType=" - + changeType + "} " - + super.toString(); - } - - @Override - public boolean equals(Object o) { - if (this == o) { - return true; - } - if (!(o instanceof ConfigChangedEvent)) { - return false; - } - ConfigChangedEvent that = (ConfigChangedEvent) o; - return Objects.equals(getKey(), that.getKey()) - && Objects.equals(getGroup(), that.getGroup()) - && Objects.equals(getContent(), that.getContent()) - && getChangeType() == that.getChangeType(); - } - - @Override - public int hashCode() { - return Objects.hash(getKey(), getGroup(), getContent(), getChangeType()); - } } diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java index d7929b689..581c7c55e 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java @@ -25,13 +25,17 @@ public class DynamicConfigKeys { public static final String DYNAMIC_ALIAS = "dynamicAlias"; - public static final String DYNAMIC_URL = "dynamicUrl"; - public static final String CONFIG_NODE = "config"; public static final String DEFAULT_NAMESPACE = "sofa-rpc"; - public static final String DEFAULT_GROUP = "sofa-rpc"; + public static ConfigKey DYNAMIC_URL = ConfigKey + .build( + "dynamicUrl", + " ", + false, + "The url of the dynamic configuration.", + new String[] { "dynamicUrl" }); public static ConfigKey ZK_ADDRESS = ConfigKey .build( diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java index 72f879fcd..7fda9f36d 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java @@ -30,14 +30,32 @@ public abstract class DynamicConfigManager { private String appName; - protected DynamicConfigManager(String appName) { + private String address; + + private String params[]; + + protected DynamicConfigManager(String appName, String remainUrl) { this.appName = appName; + int queryIndex = remainUrl.indexOf("?"); + this.address = (queryIndex > -1) ? remainUrl.substring(0, queryIndex) : remainUrl; + if (queryIndex > -1 && queryIndex < remainUrl.length() - 1) { + String query = remainUrl.substring(queryIndex + 1); + this.params = query.split("&"); + } } protected String getAppName() { return appName; } + protected String getAddress() { + return address; + } + + protected String[] getParams() { + return params; + } + /** * Init service's governance related configuration. * Such as auth rules、lb rules @@ -46,6 +64,15 @@ protected String getAppName() { */ public abstract void initServiceConfiguration(String service); + /** + * Init service's governance related configuration. + * Such as auth rules、lb rules + * + * @param service target service + * @param listener config listener + */ + public abstract void initServiceConfiguration(String service, ConfigListener listener); + /** * Get provider service related property. * @@ -100,12 +127,4 @@ protected String getAppName() { */ public abstract void addListener(String key, ConfigListener listener); - /** - * Get config value by key. - * - * @param key config key - * @return config value - */ - public abstract String getConfig(String key); - } \ No newline at end of file diff --git a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java index 97bda388b..77febb203 100644 --- a/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java +++ b/core/common/src/main/java/com/alipay/sofa/rpc/common/utils/StringUtils.java @@ -39,11 +39,6 @@ public class StringUtils { */ public static final String CONTEXT_SEP = "/"; - /** - * The config key separator String {@code ":"} - */ - public static final String KEY_SEPARATOR = ":"; - /** * The string {@code "*"}. * diff --git a/test/test-integration/pom.xml b/test/test-integration/pom.xml index 5d78a8e48..84e669099 100644 --- a/test/test-integration/pom.xml +++ b/test/test-integration/pom.xml @@ -206,7 +206,7 @@ com.alipay.sofa - sofa-rpc-config-apollo + sofa-rpc-config-nacos ${project.parent.version} test @@ -273,6 +273,24 @@ junit test + + com.alipay.sofa + sofa-rpc-config-apollo + 5.13.1-SNAPSHOT + test + + + com.alipay.sofa + sofa-rpc-config-zk + 5.13.1-SNAPSHOT + test + + + org.apache.curator + curator-test + 4.3.0 + test + diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java new file mode 100644 index 000000000..f72a5c362 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.apollo.ApolloDynamicConfigManager; +import com.alipay.sofa.rpc.test.HelloService; +import com.ctrip.framework.apollo.enums.PropertyChangeType; +import com.ctrip.framework.apollo.model.ConfigChange; +import com.ctrip.framework.apollo.model.ConfigChangeEvent; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.HashMap; +import java.util.Map; + +/** + * @author Narziss + * @version ApolloDynamicConfigTest.java, v 0.1 2024年09月28日 10:46 Narziss + */ +public class ApolloDynamicConfigTest { + + @Test + public void testApolloDynamicConfig() throws Exception { + System.setProperty(DynamicConfigKeys.DYNAMIC_URL.getKey(), "apollo://127.0.0.1:8080"); + ApplicationConfig clientApplication = new ApplicationConfig(); + clientApplication.setAppName("demo"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("bolt") + .setDirectUrl("bolt://127.0.0.1:12200") + .setConnectTimeout(10 * 1000) + .setApplication(clientApplication); + + consumerConfig.refer(); + + // 获取接口对应的动态配置监听器 + DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl + (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.DYNAMIC_URL.getKey())); + Field field = ApolloDynamicConfigManager.class.getDeclaredField("watchListenerMap"); + field.setAccessible(true); + Map watchListenerMap = (Map) field + .get(dynamicConfigManager); + ApolloDynamicConfigManager.ApolloListener apolloConfigListener = watchListenerMap.get(consumerConfig + .getInterfaceId()); + + // 测试配置更新 + String configValue = "timeout=5000\n.sayHello.timeout=6000"; + ConfigChange configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), null, configValue, PropertyChangeType.ADDED); + Map changes= new HashMap<>(); + changes.put(configChange.getPropertyName(), configChange); + ConfigChangeEvent event = new ConfigChangeEvent("application",changes); + apolloConfigListener.onChange(event); + Assert.assertEquals(6000, consumerConfig.getMethodTimeout("sayHello")); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java new file mode 100644 index 000000000..a95ee2a39 --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; + +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.nacos.NacosDynamicConfigManager; +import com.alipay.sofa.rpc.test.HelloService; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; +import java.util.Map; + +/** + * @author Narziss + * @version NacosDynamicConfigTest.java, v 0.1 2024年09月28日 12:11 Narziss + */ +public class NacosDynamicConfigTest { + + @Test + public void testNacosDynamicConfig() throws Exception { + System.setProperty(DynamicConfigKeys.DYNAMIC_URL.getKey(), "nacos://127.0.0.1:8848"); + ApplicationConfig clientApplication = new ApplicationConfig(); + clientApplication.setAppName("demo"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("bolt") + .setDirectUrl("bolt://127.0.0.1:12200") + .setConnectTimeout(10 * 1000) + .setApplication(clientApplication); + + consumerConfig.refer(); + + // 获取接口对应的动态配置监听器 + DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl + (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.DYNAMIC_URL.getKey())); + Field field = NacosDynamicConfigManager.class.getDeclaredField("watchListenerMap"); + field.setAccessible(true); + Map watchListenerMap = (Map) field + .get(dynamicConfigManager); + NacosDynamicConfigManager.NacosConfigListener nacosConfigListener = watchListenerMap.get(consumerConfig + .getInterfaceId()); + + // 测试配置更新 + String configValue = "timeout=5000"; + nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); + Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + configValue = "timeout=5000\n.sayHello.timeout=6000"; + nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); + Assert.assertEquals(6000, consumerConfig.getMethodTimeout("sayHello")); + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java new file mode 100644 index 000000000..224b1489f --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config; + +import com.alipay.sofa.rpc.config.ApplicationConfig; +import com.alipay.sofa.rpc.config.ConsumerConfig; +import com.alipay.sofa.rpc.dynamic.DynamicConfigKeys; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; +import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; +import com.alipay.sofa.rpc.dynamic.zk.ZookeeperDynamicConfigManager; +import com.alipay.sofa.rpc.test.HelloService; +import com.alipay.sofa.rpc.test.config.base.BaseZkTest; +import org.apache.curator.framework.CuratorFramework; +import org.apache.zookeeper.CreateMode; +import org.junit.Assert; +import org.junit.Test; + +import java.lang.reflect.Field; + +/** + * @author Narziss + * @version ZookeeperDynamicConfigTest.java, v 0.1 2024年09月28日 14:33 Narziss + */ +public class ZookeeperDynamicConfigTest extends BaseZkTest { + + @Test + public void testZookeeperDynamicConfig() throws Exception { + System.setProperty(DynamicConfigKeys.DYNAMIC_URL.getKey(), "zookeeper://127.0.0.1:2181"); + ApplicationConfig clientApplication = new ApplicationConfig(); + clientApplication.setAppName("demo"); + + ConsumerConfig consumerConfig = new ConsumerConfig() + .setInterfaceId(HelloService.class.getName()) + .setProtocol("bolt") + .setDirectUrl("bolt://127.0.0.1:12200") + .setConnectTimeout(10 * 1000) + .setApplication(clientApplication); + + consumerConfig.refer(); + + DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl + (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.DYNAMIC_URL.getKey())); + Field field = ZookeeperDynamicConfigManager.class.getDeclaredField("zkClient"); + field.setAccessible(true); + CuratorFramework zkClient = (CuratorFramework) field.get(dynamicConfigManager); + // 创建或更新配置节点 + if (zkClient.checkExists().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService") == null) { + zkClient.create() + .creatingParentsIfNeeded() + .withMode(CreateMode.PERSISTENT) + .forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService", "timeout=5000".getBytes()); + } else { + zkClient.setData().forPath("/sofa-rpc/config/demo/com.alipay.sofa.rpc.test.HelloService", + "timeout=5000".getBytes()); + } + + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + // 验证配置是否更新 + Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + + } +} diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java new file mode 100644 index 000000000..9268a8f0b --- /dev/null +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/base/BaseZkTest.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.alipay.sofa.rpc.test.config.base; + +import com.alipay.sofa.rpc.context.RpcInternalContext; +import com.alipay.sofa.rpc.context.RpcInvokeContext; +import com.alipay.sofa.rpc.context.RpcRunningState; +import com.alipay.sofa.rpc.context.RpcRuntimeContext; +import org.apache.curator.test.TestingServer; +import org.junit.AfterClass; +import org.junit.BeforeClass; + +import java.io.IOException; + +/** + * @author Narziss + * @version BaseZkTest.java, v 0.1 2024年10月08日 10:20 Narziss + */ +public abstract class BaseZkTest { + protected static TestingServer server = null; + + @BeforeClass + public static void adBeforeClass() { + RpcRunningState.setUnitTestMode(true); + + try { + server = new TestingServer(2181, true); + server.start(); + } catch (Exception e) { + e.printStackTrace(); + } + } + + @AfterClass + public static void adAfterClass() { + RpcRuntimeContext.destroy(); + RpcInternalContext.removeContext(); + RpcInvokeContext.removeContext(); + + if (server != null) { + try { + server.stop(); + } catch (IOException e) { + e.printStackTrace(); + } + } + } +} \ No newline at end of file From 7a354b319e4b62976d7843b42c717bc8e12bd5a6 Mon Sep 17 00:00:00 2001 From: Narziss Date: Tue, 15 Oct 2024 10:10:54 +0800 Subject: [PATCH 4/4] Modify dynamic config test --- .../bootstrap/DefaultConsumerBootstrap.java | 53 +++++++++++-------- .../apollo/ApolloDynamicConfigManager.java | 18 +++---- .../nacos/NacosDynamicConfigManager.java | 20 ++++--- .../sofa/rpc/dynamic/DynamicConfigKeys.java | 12 ++--- .../rpc/dynamic/DynamicConfigManager.java | 19 +++++-- test/test-integration/pom.xml | 4 +- .../test/config/ApolloDynamicConfigTest.java | 28 ++++++++-- .../test/config/NacosDynamicConfigTest.java | 14 +++-- .../config/ZookeeperDynamicConfigTest.java | 28 +++++++--- 9 files changed, 128 insertions(+), 68 deletions(-) diff --git a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java index 9c4923243..2b661be9a 100644 --- a/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java +++ b/bootstrap/bootstrap-api/src/main/java/com/alipay/sofa/rpc/bootstrap/DefaultConsumerBootstrap.java @@ -45,7 +45,12 @@ import com.alipay.sofa.rpc.registry.Registry; import com.alipay.sofa.rpc.registry.RegistryFactory; -import java.util.*; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; import java.util.concurrent.CountDownLatch; @@ -167,7 +172,7 @@ public T refer() { } //接口级别动态配置参数 - final String dynamicUrl = getOrDefault(DynamicConfigKeys.DYNAMIC_URL); + final String dynamicUrl = getOrDefault(DynamicConfigKeys.CENTER_ADDRESS); if ( StringUtils.isNotBlank(dynamicUrl)) { //启用接口级别动态配置 final DynamicConfigManager dynamicManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl( @@ -453,38 +458,40 @@ private class ConsumerAttributeListener implements ConfigListener { private Map newValueMap = new HashMap<>(); // 动态配置项 - private List dynamicConfigKeys = Arrays.asList(RpcConstants.CONFIG_KEY_TIMEOUT, - RpcConstants.CONFIG_KEY_RETRIES, RpcConstants.CONFIG_KEY_LOADBALANCER); + private final Set dynamicConfigKeys = new HashSet<>(); ConsumerAttributeListener() { - + dynamicConfigKeys.add(RpcConstants.CONFIG_KEY_TIMEOUT); + dynamicConfigKeys.add(RpcConstants.CONFIG_KEY_RETRIES); + dynamicConfigKeys.add(RpcConstants.CONFIG_KEY_LOADBALANCER); } @Override public void process(ConfigChangedEvent event) { - for (String key : newValueMap.keySet()) { - newValueMap.put(key, ""); - } + // 清除上次的赋值,并保留赋值过的key + newValueMap.replaceAll((k, v) -> ""); if (!event.getChangeType().equals(ConfigChangeType.DELETED)) { // ADDED or MODIFIED - String[] lines = event.getContent().split("\n"); - for (String line : lines) { - String[] keyValue = line.split("=", 2); - if (keyValue.length == 2) { - String key = keyValue[0].trim(); - String value = keyValue[1].trim(); - for (String dynamicConfigKey : dynamicConfigKeys) { - if (key.equals(dynamicConfigKey) || key.endsWith("." + dynamicConfigKey)) { - newValueMap.put(key, value); - break; - } - } - } else { - LOGGER.warn("Malformed configuration line: {}", line); + parseConfigurationLines(event.getContent()); + } + attrUpdated(newValueMap); + } + + private void parseConfigurationLines(String content) { + String[] lines = content.split("\n"); + for (String line : lines) { + String[] keyValue = line.split("=", 2); + if (keyValue.length == 2) { + String key = keyValue[0].trim(); + String value = keyValue[1].trim(); + String tempKey = key.lastIndexOf(".") == -1 ? key : key.substring(key.lastIndexOf(".") + 1); + if (dynamicConfigKeys.contains(tempKey)) { + newValueMap.put(key, value); } + } else { + LOGGER.warn("Malformed configuration line: {}", line); } } - attrUpdated(newValueMap); } @Override diff --git a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java index f36a23d1c..c1553a3cf 100644 --- a/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java +++ b/config/config-apollo/src/main/java/com/alipay/sofa/rpc/dynamic/apollo/ApolloDynamicConfigManager.java @@ -29,6 +29,7 @@ import com.ctrip.framework.apollo.model.ConfigChange; import java.util.Collections; +import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -71,16 +72,10 @@ protected ApolloDynamicConfigManager(String appName, String remainUrl) { super(appName, remainUrl); System.setProperty(APOLLO_APPID_KEY, appName); System.setProperty(APOLLO_ADDR_KEY, APOLLO_PROTOCOL_PREFIX + getAddress()); - String params[] = getParams(); - if (params!= null && params.length > 0){ - for (String param : params) { - String[] keyValue = param.split("="); - if (keyValue.length == 2) { - if ("cluster".equals(keyValue[0])) { - System.setProperty(APOLLO_CLUSTER_KEY, keyValue[1]); - } - } - } + Map params = getParams(); + if (params != null && params.containsKey("cluster")) { + String clusterValue = (String)params.get("cluster"); + System.setProperty(APOLLO_CLUSTER_KEY, clusterValue); } config = ConfigService.getAppConfig(); } @@ -158,6 +153,9 @@ private ConfigChangeType getChangeType(ConfigChange change) { if (change.getChangeType() == PropertyChangeType.DELETED) { return ConfigChangeType.DELETED; } + if (change.getChangeType() == PropertyChangeType.ADDED) { + return ConfigChangeType.ADDED; + } return ConfigChangeType.MODIFIED; } diff --git a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java index fc6e92ca8..afb81188c 100644 --- a/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java +++ b/config/config-nacos/src/main/java/com/alipay/sofa/rpc/dynamic/nacos/NacosDynamicConfigManager.java @@ -73,17 +73,15 @@ protected NacosDynamicConfigManager(String appName, String remainUrl) { super(appName, remainUrl); group = appName; nacosConfig.put(PropertyKeyConst.SERVER_ADDR, getAddress()); - String params[] = getParams(); - if (params != null && params.length > 0) { - for (String param : params) { - String[] keyValue = param.split("="); - if (keyValue.length == 2) { - if ("username".equals(keyValue[0])) { - nacosConfig.put(PropertyKeyConst.USERNAME, keyValue[1]); - } else if ("password".equals(keyValue[0])) { - nacosConfig.put(PropertyKeyConst.PASSWORD, keyValue[1]); - } - } + Map params = getParams(); + if (params != null) { + if( params.containsKey("username")) { + String username = (String)params.get("username"); + nacosConfig.put(PropertyKeyConst.USERNAME, username); + } + if( params.containsKey("password")) { + String password = (String) params.get("password"); + nacosConfig.put(PropertyKeyConst.PASSWORD, password); } } try { diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java index 581c7c55e..754afe468 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigKeys.java @@ -29,13 +29,13 @@ public class DynamicConfigKeys { public static final String DEFAULT_NAMESPACE = "sofa-rpc"; - public static ConfigKey DYNAMIC_URL = ConfigKey + public static ConfigKey CENTER_ADDRESS = ConfigKey .build( - "dynamicUrl", + "sofa.rpc.config.center.address", " ", false, "The url of the dynamic configuration.", - new String[] { "dynamicUrl" }); + new String[] { "sofa_rpc_config_center_address" }); public static ConfigKey ZK_ADDRESS = ConfigKey .build( @@ -43,7 +43,7 @@ public class DynamicConfigKeys { "127.0.0.1:2181", false, "The address of Zookeeper configuration center.", - new String[] { "zookeeper_address" }); + new String[] { "sofa_rpc_config_center_zookeeper_address" }); public static ConfigKey NACOS_ADDRESS = ConfigKey .build( @@ -51,13 +51,13 @@ public class DynamicConfigKeys { "127.0.0.1:8848", false, "The address of Nacos configuration center.", - new String[] { "nacos_address" }); + new String[] { "sofa_rpc_config_center_nacos_address" }); public static ConfigKey APOLLO_ADDRESS = ConfigKey .build( "sofa.rpc.config.center.apollo.address", "127.0.0.1:8080", false, "The address of Apollo configuration center.", - new String[] { "apollo_address" }); + new String[] { "sofa_rpc_config_center_apollo_address" }); } \ No newline at end of file diff --git a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java index 7fda9f36d..49099d09e 100644 --- a/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java +++ b/core/api/src/main/java/com/alipay/sofa/rpc/dynamic/DynamicConfigManager.java @@ -20,6 +20,9 @@ import com.alipay.sofa.rpc.ext.Extensible; import com.alipay.sofa.rpc.listener.ConfigListener; +import java.util.HashMap; +import java.util.Map; + /** * * @author bystander @@ -32,15 +35,25 @@ public abstract class DynamicConfigManager { private String address; - private String params[]; + private Map params = new HashMap<>(); protected DynamicConfigManager(String appName, String remainUrl) { this.appName = appName; + parseRemainUrl(remainUrl); + } + + protected void parseRemainUrl(String remainUrl) { int queryIndex = remainUrl.indexOf("?"); this.address = (queryIndex > -1) ? remainUrl.substring(0, queryIndex) : remainUrl; if (queryIndex > -1 && queryIndex < remainUrl.length() - 1) { String query = remainUrl.substring(queryIndex + 1); - this.params = query.split("&"); + String[] paramPairs = query.split("&"); + for (String paramPair : paramPairs) { + String[] keyValue = paramPair.split("="); + if (keyValue.length == 2) { + this.params.put(keyValue[0], keyValue[1]); + } + } } } @@ -52,7 +65,7 @@ protected String getAddress() { return address; } - protected String[] getParams() { + protected Map getParams() { return params; } diff --git a/test/test-integration/pom.xml b/test/test-integration/pom.xml index 32adfe74b..6e3d254a3 100644 --- a/test/test-integration/pom.xml +++ b/test/test-integration/pom.xml @@ -264,13 +264,13 @@ com.alipay.sofa sofa-rpc-config-apollo - 5.13.1-SNAPSHOT + ${project.parent.version} test com.alipay.sofa sofa-rpc-config-zk - 5.13.1-SNAPSHOT + ${project.parent.version} test diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java index f72a5c362..e09241130 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ApolloDynamicConfigTest.java @@ -41,7 +41,7 @@ public class ApolloDynamicConfigTest { @Test public void testApolloDynamicConfig() throws Exception { - System.setProperty(DynamicConfigKeys.DYNAMIC_URL.getKey(), "apollo://127.0.0.1:8080"); + System.setProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey(), "apollo://127.0.0.1:8080?cluster=default"); ApplicationConfig clientApplication = new ApplicationConfig(); clientApplication.setAppName("demo"); @@ -56,7 +56,7 @@ public void testApolloDynamicConfig() throws Exception { // 获取接口对应的动态配置监听器 DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl - (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.DYNAMIC_URL.getKey())); + (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey())); Field field = ApolloDynamicConfigManager.class.getDeclaredField("watchListenerMap"); field.setAccessible(true); Map watchListenerMap = (Map) field @@ -64,13 +64,33 @@ public void testApolloDynamicConfig() throws Exception { ApolloDynamicConfigManager.ApolloListener apolloConfigListener = watchListenerMap.get(consumerConfig .getInterfaceId()); - // 测试配置更新 - String configValue = "timeout=5000\n.sayHello.timeout=6000"; + // 测试配置新增 + String configValue = "timeout=5000\n"; ConfigChange configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), null, configValue, PropertyChangeType.ADDED); Map changes= new HashMap<>(); changes.put(configChange.getPropertyName(), configChange); ConfigChangeEvent event = new ConfigChangeEvent("application",changes); apolloConfigListener.onChange(event); + Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + + // 测试配置修改 + String oldValue = configValue; + configValue = "timeout=5000\n.sayHello.timeout=6000"; + configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), oldValue, configValue, PropertyChangeType.MODIFIED); + changes= new HashMap<>(); + changes.put(configChange.getPropertyName(), configChange); + event = new ConfigChangeEvent("application",changes); + apolloConfigListener.onChange(event); Assert.assertEquals(6000, consumerConfig.getMethodTimeout("sayHello")); + + // 测试配置删除 + configChange = new ConfigChange("application", consumerConfig.getInterfaceId(), configValue, null, PropertyChangeType.DELETED); + changes= new HashMap<>(); + changes.put(configChange.getPropertyName(), configChange); + event = new ConfigChangeEvent("application",changes); + apolloConfigListener.onChange(event); + Assert.assertEquals(-1, consumerConfig.getMethodTimeout("sayHello")); + + System.clearProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey()); } } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java index a95ee2a39..bc6e5b0bb 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/NacosDynamicConfigTest.java @@ -38,7 +38,8 @@ public class NacosDynamicConfigTest { @Test public void testNacosDynamicConfig() throws Exception { - System.setProperty(DynamicConfigKeys.DYNAMIC_URL.getKey(), "nacos://127.0.0.1:8848"); + System.setProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey(), + "nacos://127.0.0.1:8848?username=nacos&password=nacos"); ApplicationConfig clientApplication = new ApplicationConfig(); clientApplication.setAppName("demo"); @@ -53,7 +54,7 @@ public void testNacosDynamicConfig() throws Exception { // 获取接口对应的动态配置监听器 DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl - (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.DYNAMIC_URL.getKey())); + (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey())); Field field = NacosDynamicConfigManager.class.getDeclaredField("watchListenerMap"); field.setAccessible(true); Map watchListenerMap = (Map) field @@ -61,12 +62,19 @@ public void testNacosDynamicConfig() throws Exception { NacosDynamicConfigManager.NacosConfigListener nacosConfigListener = watchListenerMap.get(consumerConfig .getInterfaceId()); - // 测试配置更新 + // 测试配置新增 String configValue = "timeout=5000"; nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + // 测试配置修改 configValue = "timeout=5000\n.sayHello.timeout=6000"; nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); Assert.assertEquals(6000, consumerConfig.getMethodTimeout("sayHello")); + // 测试配置删除 + configValue = ""; + nacosConfigListener.innerReceive(consumerConfig.getInterfaceId(), consumerConfig.getAppName(), configValue); + Assert.assertEquals(-1, consumerConfig.getMethodTimeout("sayHello")); + + System.clearProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey()); } } diff --git a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java index 224b1489f..1ce3973c7 100644 --- a/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java +++ b/test/test-integration/src/test/java/com/alipay/sofa/rpc/test/config/ZookeeperDynamicConfigTest.java @@ -22,6 +22,8 @@ import com.alipay.sofa.rpc.dynamic.DynamicConfigManager; import com.alipay.sofa.rpc.dynamic.DynamicConfigManagerFactory; import com.alipay.sofa.rpc.dynamic.zk.ZookeeperDynamicConfigManager; +import com.alipay.sofa.rpc.log.Logger; +import com.alipay.sofa.rpc.log.LoggerFactory; import com.alipay.sofa.rpc.test.HelloService; import com.alipay.sofa.rpc.test.config.base.BaseZkTest; import org.apache.curator.framework.CuratorFramework; @@ -37,9 +39,11 @@ */ public class ZookeeperDynamicConfigTest extends BaseZkTest { + Logger logger = LoggerFactory.getLogger(ZookeeperDynamicConfigTest.class); + @Test public void testZookeeperDynamicConfig() throws Exception { - System.setProperty(DynamicConfigKeys.DYNAMIC_URL.getKey(), "zookeeper://127.0.0.1:2181"); + System.setProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey(), "zookeeper://127.0.0.1:2181"); ApplicationConfig clientApplication = new ApplicationConfig(); clientApplication.setAppName("demo"); @@ -53,28 +57,40 @@ public void testZookeeperDynamicConfig() throws Exception { consumerConfig.refer(); DynamicConfigManager dynamicConfigManager = DynamicConfigManagerFactory.getDynamicManagerWithUrl - (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.DYNAMIC_URL.getKey())); + (clientApplication.getAppName(), System.getProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey())); Field field = ZookeeperDynamicConfigManager.class.getDeclaredField("zkClient"); field.setAccessible(true); CuratorFramework zkClient = (CuratorFramework) field.get(dynamicConfigManager); - // 创建或更新配置节点 + + // 新增或修改配置节点 if (zkClient.checkExists().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService") == null) { zkClient.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService", "timeout=5000".getBytes()); } else { - zkClient.setData().forPath("/sofa-rpc/config/demo/com.alipay.sofa.rpc.test.HelloService", + zkClient.setData().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService", "timeout=5000".getBytes()); } - try { Thread.sleep(3000); } catch (InterruptedException e) { - e.printStackTrace(); + logger.error(e.getMessage(), e); } // 验证配置是否更新 Assert.assertEquals(5000, consumerConfig.getMethodTimeout("sayHello")); + //删除配置节点 + zkClient.delete().forPath("/config/demo/com.alipay.sofa.rpc.test.HelloService"); + try { + Thread.sleep(3000); + } catch (InterruptedException e) { + logger.error(e.getMessage(), e); + } + // 验证配置是否删除 + Assert.assertEquals(-1, consumerConfig.getMethodTimeout("sayHello")); + + System.clearProperty(DynamicConfigKeys.CENTER_ADDRESS.getKey()); + } }