diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java index 69f3c878..21b59ddb 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/AmqpConnection.java @@ -24,6 +24,7 @@ import io.netty.handler.timeout.IdleState; import io.netty.handler.timeout.IdleStateEvent; import io.netty.handler.timeout.IdleStateHandler; +import io.streamnative.pulsar.handlers.amqp.utils.VirtualHostUtil; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -36,14 +37,12 @@ import lombok.extern.log4j.Log4j2; import org.apache.bookkeeper.util.collections.ConcurrentLongLongHashMap; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.pulsar.broker.authentication.AuthenticationProvider; import org.apache.pulsar.broker.authentication.AuthenticationState; -import org.apache.pulsar.broker.namespace.LookupOptions; import org.apache.pulsar.broker.service.ServerCnx; import org.apache.pulsar.common.api.AuthData; import org.apache.pulsar.common.naming.NamespaceName; -import org.apache.pulsar.common.naming.TopicDomain; -import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.common.util.collections.ConcurrentLongHashMap; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.ErrorCodes; @@ -324,26 +323,17 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap assertState(ConnectionState.AWAIT_OPEN); - boolean isDefaultNamespace = false; String virtualHostStr = AMQShortString.toString(virtualHost); - if ((virtualHostStr != null) && virtualHostStr.charAt(0) == '/') { - virtualHostStr = virtualHostStr.substring(1); - if (StringUtils.isEmpty(virtualHostStr)){ - virtualHostStr = DEFAULT_NAMESPACE; - isDefaultNamespace = true; - } + Pair pair; + if (virtualHostStr == null + || (pair = VirtualHostUtil.getTenantAndNamespace(virtualHostStr, amqpConfig.getAmqpTenant())) == null) { + sendConnectionClose(ErrorCodes.NOT_ALLOWED, String.format( + "The virtualHost [%s] configuration is incorrect. For example: tenant/namespace or namespace", + virtualHostStr), 0); + return; } - NamespaceName namespaceName = NamespaceName.get(amqpConfig.getAmqpTenant(), virtualHostStr); - if (isDefaultNamespace) { - // avoid the namespace public/default is not owned in standalone mode - TopicName topic = TopicName.get(TopicDomain.persistent.value(), - namespaceName, "__lookup__"); - LookupOptions lookupOptions = LookupOptions.builder().authoritative(true).build(); - getPulsarService().getNamespaceService().getBrokerServiceUrlAsync(topic, lookupOptions); - } - // Policies policies = getPolicies(namespaceName); -// if (policies != null) { + NamespaceName namespaceName = NamespaceName.get(pair.getLeft(), pair.getRight()); this.namespaceName = namespaceName; MethodRegistry methodRegistry = getMethodRegistry(); @@ -351,10 +341,6 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap writeFrame(responseBody.generateFrame(0)); state = ConnectionState.OPEN; amqpBrokerService.getConnectionContainer().addConnection(namespaceName, this); -// } else { -// sendConnectionClose(ErrorCodes.NOT_FOUND, -// "Unknown virtual host: '" + virtualHostStr + "'", 0); -// } } @Override diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java index 69f6e4d2..345d9de1 100644 --- a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/proxy/ProxyConnection.java @@ -23,9 +23,9 @@ import io.netty.util.concurrent.Future; import io.netty.util.concurrent.FutureListener; import io.streamnative.pulsar.handlers.amqp.AmqpBrokerDecoder; -import io.streamnative.pulsar.handlers.amqp.AmqpConnection; import io.streamnative.pulsar.handlers.amqp.AmqpProtocolHandler; import io.streamnative.pulsar.handlers.amqp.AopVersion; +import io.streamnative.pulsar.handlers.amqp.utils.VirtualHostUtil; import java.util.ArrayList; import java.util.HashMap; import java.util.List; @@ -42,11 +42,14 @@ import org.apache.qpid.server.QpidException; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.common.ServerPropertyNames; +import org.apache.qpid.server.protocol.ErrorCodes; import org.apache.qpid.server.protocol.ProtocolVersion; import org.apache.qpid.server.protocol.v0_8.AMQShortString; import org.apache.qpid.server.protocol.v0_8.FieldTable; import org.apache.qpid.server.protocol.v0_8.transport.AMQDataBlock; +import org.apache.qpid.server.protocol.v0_8.transport.AMQFrame; import org.apache.qpid.server.protocol.v0_8.transport.AMQMethodBody; +import org.apache.qpid.server.protocol.v0_8.transport.ConnectionCloseBody; import org.apache.qpid.server.protocol.v0_8.transport.ConnectionTuneBody; import org.apache.qpid.server.protocol.v0_8.transport.MethodRegistry; import org.apache.qpid.server.protocol.v0_8.transport.ProtocolInitiation; @@ -74,6 +77,9 @@ public class ProxyConnection extends ChannelInboundHandlerAdapter implements private LookupHandler lookupHandler; private AMQShortString virtualHost; private String vhost; + private String tenant; + private volatile int currentClassId; + private volatile int currentMethodId; private List connectMsgList = new ArrayList<>(); @@ -88,6 +94,7 @@ public ProxyConnection(ProxyService proxyService) throws PulsarClientException { log.info("ProxyConnection init ..."); this.proxyService = proxyService; this.proxyConfig = proxyService.getProxyConfig(); + this.tenant = proxyConfig.getAmqpTenant(); brokerDecoder = new AmqpBrokerDecoder(this); protocolVersion = ProtocolVersion.v0_91; methodRegistry = new MethodRegistry(protocolVersion); @@ -230,13 +237,15 @@ public void receiveConnectionOpen(AMQShortString virtualHost, AMQShortString cap this.virtualHost = virtualHost; state = State.RedirectLookup; String virtualHostStr = AMQShortString.toString(virtualHost); - if ((virtualHostStr != null) && virtualHostStr.charAt(0) == '/') { - virtualHostStr = virtualHostStr.substring(1); - if (org.apache.commons.lang.StringUtils.isEmpty(virtualHostStr)){ - virtualHostStr = AmqpConnection.DEFAULT_NAMESPACE; - } + Pair pair; + if (virtualHostStr == null || (pair = VirtualHostUtil.getTenantAndNamespace(virtualHostStr, tenant)) == null) { + sendConnectionClose(ErrorCodes.NOT_ALLOWED, String.format( + "The virtualHost [%s] configuration is incorrect. For example: tenant/namespace or namespace", + virtualHostStr)); + return; } - vhost = virtualHostStr; + tenant = pair.getLeft(); + vhost = pair.getRight(); handleConnect(new AtomicInteger(5)); } @@ -248,7 +257,7 @@ public void handleConnect(AtomicInteger retryTimes) { return; } try { - NamespaceName namespaceName = NamespaceName.get(proxyConfig.getAmqpTenant(), vhost); + NamespaceName namespaceName = NamespaceName.get(tenant, vhost); String topic = TopicName.get(TopicDomain.persistent.value(), namespaceName, "__lookup__").toString(); @@ -349,6 +358,8 @@ public void setCurrentMethod(int classId, int methodId) { if (log.isDebugEnabled()) { log.debug("ProxyConnection - [setCurrentMethod] classId: {}, methodId: {}", classId, methodId); } + currentClassId = classId; + currentMethodId = methodId; } @Override @@ -359,6 +370,11 @@ public boolean ignoreAllButCloseOk() { return false; } + public void sendConnectionClose(int errorCode, String message) { + writeFrame(new AMQFrame(0, new ConnectionCloseBody(getProtocolVersion(), + errorCode, AMQShortString.validValueOf(message), currentClassId, currentMethodId))); + } + public synchronized void writeFrame(AMQDataBlock frame) { if (log.isDebugEnabled()) { log.debug("send: " + frame); diff --git a/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/utils/VirtualHostUtil.java b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/utils/VirtualHostUtil.java new file mode 100644 index 00000000..27286041 --- /dev/null +++ b/amqp-impl/src/main/java/io/streamnative/pulsar/handlers/amqp/utils/VirtualHostUtil.java @@ -0,0 +1,39 @@ +/** + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.streamnative.pulsar.handlers.amqp.utils; + +import io.streamnative.pulsar.handlers.amqp.AmqpConnection; +import java.util.StringTokenizer; +import org.apache.commons.lang3.tuple.Pair; + +/** + * VirtualHostUtil. + */ +public class VirtualHostUtil { + + private static final String L = "/"; + + public static Pair getTenantAndNamespace(String virtualHostStr, String tenant) { + String virtualHost = virtualHostStr.trim(); + if (L.equals(virtualHost)) { + return Pair.of(tenant, AmqpConnection.DEFAULT_NAMESPACE); + } + StringTokenizer tokenizer = new StringTokenizer(virtualHost, L, false); + return switch (tokenizer.countTokens()) { + case 1 -> Pair.of(tenant, tokenizer.nextToken()); + case 2 -> Pair.of(tokenizer.nextToken(), tokenizer.nextToken()); + default -> null; + }; + } +}