diff --git a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistry.java b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistry.java index 278fadd25..a9fdafbf0 100644 --- a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistry.java +++ b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistry.java @@ -20,6 +20,7 @@ import com.alipay.sofa.rpc.client.ProviderInfo; import com.alipay.sofa.rpc.common.struct.NamedThreadFactory; import com.alipay.sofa.rpc.common.utils.CommonUtils; +import com.alipay.sofa.rpc.common.utils.StringUtils; import com.alipay.sofa.rpc.config.ConsumerConfig; import com.alipay.sofa.rpc.config.ProviderConfig; import com.alipay.sofa.rpc.config.RegistryConfig; @@ -184,6 +185,10 @@ protected PublishServiceRequest buildPublishServiceRequest(String serviceName, S providerMetaInfo.setVersion(VERSION); providerMetaInfo.setProperties(providerInfo.getStaticAttrs()); publishServiceRequest.setProviderMetaInfo(providerMetaInfo); + String group = providerInfo.getStaticAttrs().get(SofaRegistryConstants.SOFA_GROUP_KEY); + if (StringUtils.isNotBlank(group)) { + publishServiceRequest.setGroup(group); + } return publishServiceRequest; } @@ -233,6 +238,10 @@ protected UnPublishServiceRequest buildUnPublishServiceRequest(String serviceNam UnPublishServiceRequest unPublishServiceRequest = new UnPublishServiceRequest(); unPublishServiceRequest.setServiceName(serviceName); unPublishServiceRequest.setProtocolType(providerInfo.getProtocolType()); + String group = providerInfo.getStaticAttr(SofaRegistryConstants.SOFA_GROUP_KEY); + if (StringUtils.isNotBlank(group)) { + unPublishServiceRequest.setGroup(group); + } return unPublishServiceRequest; } @@ -303,6 +312,11 @@ protected SubscribeServiceRequest buildSubscribeServiceRequest(ConsumerConfig co SubscribeServiceRequest subscribeRequest = new SubscribeServiceRequest(); subscribeRequest.setServiceName(key); subscribeRequest.setProtocolType(consumerConfig.getProtocol()); + subscribeRequest.setProperties(consumerConfig.getParameters()); + String group = consumerConfig.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY); + if (StringUtils.isNotBlank(group)) { + subscribeRequest.setGroup(group); + } return subscribeRequest; } @@ -368,6 +382,10 @@ protected UnSubscribeServiceRequest buildUnSubscribeServiceRequest(ConsumerConfi String key = MeshRegistryHelper.buildMeshKey(config, config.getProtocol()); unsubscribeRequest.setServiceName(key); unsubscribeRequest.setProtocolType(config.getProtocol()); + String group = config.getParameter(SofaRegistryConstants.SOFA_GROUP_KEY); + if (StringUtils.isNotBlank(group)) { + unsubscribeRequest.setGroup(group); + } return unsubscribeRequest; } diff --git a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/PublishServiceRequest.java b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/PublishServiceRequest.java index 8911eb9fa..895aa8241 100644 --- a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/PublishServiceRequest.java +++ b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/PublishServiceRequest.java @@ -32,6 +32,8 @@ public class PublishServiceRequest { private boolean onlyPublishInCloud; + private String group; + public String getServiceName() { return serviceName; } @@ -64,6 +66,14 @@ public void setOnlyPublishInCloud(boolean onlyPublishInCloud) { this.onlyPublishInCloud = onlyPublishInCloud; } + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + @Override public String toString() { final StringBuffer sb = new StringBuffer("PublishServiceRequest{"); @@ -71,6 +81,7 @@ public String toString() { sb.append(", protocolType='").append(protocolType).append('\''); sb.append(", providerMetaInfo=").append(providerMetaInfo); sb.append(", onlyPublishInCloud=").append(onlyPublishInCloud); + sb.append(", group='").append(group).append('\''); sb.append('}'); return sb.toString(); } diff --git a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/SubscribeServiceRequest.java b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/SubscribeServiceRequest.java index 27fbc5f22..118a71846 100644 --- a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/SubscribeServiceRequest.java +++ b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/SubscribeServiceRequest.java @@ -16,25 +16,31 @@ */ package com.alipay.sofa.rpc.registry.mesh.model; +import java.util.Map; + /** * @author bystander * @version $Id: PublishServiceRequest.java, v 0.1 2018年04月03日 11:27 AM bystander Exp $ */ public class SubscribeServiceRequest { - private String serviceName; + private String serviceName; //这个值是类似DEFAULT/XFIRE这种,也有可能是tr - private String protocolType; + private String protocolType; //this should be xxx-pool.alipay.com or xxx.alipay.com,can be null - private String targetAppAddress; + private String targetAppAddress; + + private boolean vipEnforce; + + private boolean vipOnly; - private boolean vipEnforce; + private boolean localCloudFirst; - private boolean vipOnly; + private String group; - private boolean localCloudFirst; + private Map properties; public String getServiceName() { return serviceName; @@ -84,6 +90,22 @@ public void setLocalCloudFirst(boolean localCloudFirst) { this.localCloudFirst = localCloudFirst; } + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + + public Map getProperties() { + return properties; + } + + public void setProperties(Map properties) { + this.properties = properties; + } + @Override public String toString() { final StringBuffer sb = new StringBuffer("SubscribeServiceRequest{"); @@ -93,6 +115,7 @@ public String toString() { sb.append(", vipEnforce=").append(vipEnforce); sb.append(", vipOnly=").append(vipOnly); sb.append(", localCloudFirst=").append(localCloudFirst); + sb.append(", group='").append(group).append('\''); sb.append('}'); return sb.toString(); } diff --git a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnPublishServiceRequest.java b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnPublishServiceRequest.java index 7cfd3ad32..577fe2e50 100644 --- a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnPublishServiceRequest.java +++ b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnPublishServiceRequest.java @@ -27,6 +27,8 @@ public class UnPublishServiceRequest { //这个值是类似DEFAULT/XFIRE这种,也有可能是tr private String protocolType; + private String group; + public String getServiceName() { return serviceName; } @@ -43,6 +45,14 @@ public void setProtocolType(String protocolType) { this.protocolType = protocolType; } + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + @Override public String toString() { final StringBuffer sb = new StringBuffer("UnPublishServiceRequest{"); diff --git a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnSubscribeServiceRequest.java b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnSubscribeServiceRequest.java index 1cf0e8f2c..86150c591 100644 --- a/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnSubscribeServiceRequest.java +++ b/registry/registry-mesh/src/main/java/com/alipay/sofa/rpc/registry/mesh/model/UnSubscribeServiceRequest.java @@ -30,6 +30,8 @@ public class UnSubscribeServiceRequest { //这个值是类似DEFAULT/XFIRE这种,也有可能是tr private String protocolType; + private String group; + public String getServiceName() { return serviceName; } @@ -54,12 +56,21 @@ public void setProtocolType(String protocolType) { this.protocolType = protocolType; } + public String getGroup() { + return group; + } + + public void setGroup(String group) { + this.group = group; + } + @Override public String toString() { final StringBuffer sb = new StringBuffer("UnSubscribeServiceRequest{"); sb.append("serviceName='").append(serviceName).append('\''); sb.append(", targetAppAddress='").append(targetAppAddress).append('\''); sb.append(", protocolType='").append(protocolType).append('\''); + sb.append(", group='").append(group).append('\''); sb.append('}'); return sb.toString(); } diff --git a/registry/registry-mesh/src/test/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistryTest.java b/registry/registry-mesh/src/test/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistryTest.java index ca478aa54..c896c55b0 100644 --- a/registry/registry-mesh/src/test/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistryTest.java +++ b/registry/registry-mesh/src/test/java/com/alipay/sofa/rpc/registry/mesh/MeshRegistryTest.java @@ -167,6 +167,8 @@ public void testOnlyPublish() throws InterruptedException { public void testAll() throws Exception { int timeoutPerSub = 1000; + Map parameter = new HashMap<>(); + parameter.put(SofaRegistryConstants.SOFA_GROUP_KEY, "SOFA_TEST"); ServerConfig serverConfig = new ServerConfig() .setProtocol("bolt") @@ -183,7 +185,8 @@ public void testAll() throws Exception { .setSerialization("hessian2") .setServer(serverConfig) .setWeight(222) - .setTimeout(3000); + .setTimeout(3000) + .setParameters(parameter); // 注册 registry.register(provider); @@ -196,7 +199,8 @@ public void testAll() throws Exception { .setSubscribe(true) .setSerialization("java") .setInvokeType("sync") - .setTimeout(4444); + .setTimeout(4444) + .setParameters(parameter); String tag0 = MeshRegistryHelper.buildMeshKey(provider, serverConfig.getProtocol()); String tag1 = MeshRegistryHelper.buildMeshKey(consumer, consumer.getProtocol()); @@ -205,6 +209,7 @@ public void testAll() throws Exception { PublishServiceRequest publishServiceRequest = registry.buildPublishServiceRequest(tag0, serverConfig.getProtocol(), providerInfo, "test-server"); Assert.assertEquals(serverConfig.getProtocol(), publishServiceRequest.getProtocolType()); + Assert.assertEquals("SOFA_TEST", publishServiceRequest.getGroup()); // 订阅 MeshRegistryTest.MockProviderInfoListener providerInfoListener = new MeshRegistryTest.MockProviderInfoListener(); @@ -216,6 +221,8 @@ public void testAll() throws Exception { Assert.assertTrue(ps.toString(), ps.size() == 1); SubscribeServiceRequest subscribeServiceRequest = registry.buildSubscribeServiceRequest(consumer); Assert.assertEquals(consumer.getProtocol(), subscribeServiceRequest.getProtocolType()); + Assert.assertEquals("SOFA_TEST", subscribeServiceRequest.getGroup()); + Assert.assertNotNull(subscribeServiceRequest.getProperties()); // 反注册 CountDownLatch latch = new CountDownLatch(1); @@ -226,6 +233,7 @@ public void testAll() throws Exception { Assert.assertTrue(ps.size() == 1); UnPublishServiceRequest unPublishServiceRequest = registry.buildUnPublishServiceRequest(tag0, providerInfo); Assert.assertEquals(serverConfig.getProtocol(), unPublishServiceRequest.getProtocolType()); + Assert.assertEquals("SOFA_TEST", unPublishServiceRequest.getGroup()); // 一次发2个端口的再次注册 latch = new CountDownLatch(1); @@ -246,7 +254,8 @@ public void testAll() throws Exception { .setSubscribe(true) .setSerialization("java") .setInvokeType("sync") - .setTimeout(4444); + .setTimeout(4444) + .setParameters(parameter); CountDownLatch latch2 = new CountDownLatch(1); MeshRegistryTest.MockProviderInfoListener providerInfoListener2 = new MeshRegistryTest.MockProviderInfoListener(); providerInfoListener2.setCountDownLatch(latch2); @@ -261,6 +270,7 @@ public void testAll() throws Exception { registry.unSubscribe(consumer); UnSubscribeServiceRequest unSubscribeServiceRequest = registry.buildUnSubscribeServiceRequest(consumer); Assert.assertEquals(consumer.getProtocol(), unSubscribeServiceRequest.getProtocolType()); + Assert.assertEquals("SOFA_TEST", unSubscribeServiceRequest.getGroup()); // 批量反注册,判断订阅者2的数据 latch = new CountDownLatch(1);