diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactory.java b/src/main/java/com/rabbitmq/client/ConnectionFactory.java index dc7ea88c7..f6f43f8eb 100644 --- a/src/main/java/com/rabbitmq/client/ConnectionFactory.java +++ b/src/main/java/com/rabbitmq/client/ConnectionFactory.java @@ -322,9 +322,9 @@ public ConnectionFactory setVirtualHost(String virtualHost) { public ConnectionFactory setUri(URI uri) throws URISyntaxException, NoSuchAlgorithmException, KeyManagementException { - if ("amqp".equals(uri.getScheme().toLowerCase())) { + if ("amqp".equalsIgnoreCase(uri.getScheme())) { // nothing special to do - } else if ("amqps".equals(uri.getScheme().toLowerCase())) { + } else if ("amqps".equalsIgnoreCase(uri.getScheme())) { setPort(DEFAULT_AMQP_OVER_SSL_PORT); // SSL context factory not set yet, we use the default one if (this.sslContextFactory == null) { @@ -1253,7 +1253,7 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres ConnectionParams params = params(executor); // set client-provided via a client property if (clientProvidedName != null) { - Map properties = new HashMap(params.getClientProperties()); + Map properties = new HashMap<>(params.getClientProperties()); properties.put("connection_name", clientProvidedName); params.setClientProperties(properties); } @@ -1277,16 +1277,14 @@ public Connection newConnection(ExecutorService executor, AddressResolver addres conn.start(); this.metricsCollector.newConnection(conn); return conn; - } catch (IOException e) { + } catch (IOException | TimeoutException e) { lastException = e; - } catch (TimeoutException te) { - lastException = te; } } if (lastException != null) { if (lastException instanceof IOException) { throw (IOException) lastException; - } else if (lastException instanceof TimeoutException) { + } else { throw (TimeoutException) lastException; } } @@ -1762,4 +1760,8 @@ public ConnectionFactory setTrafficListener(TrafficListener trafficListener) { this.trafficListener = trafficListener; return this; } + + public static ConnectionFactoryConfiguration configure() { + return null; + } } diff --git a/src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java b/src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java new file mode 100644 index 000000000..853253e77 --- /dev/null +++ b/src/main/java/com/rabbitmq/client/ConnectionFactoryConfiguration.java @@ -0,0 +1,207 @@ +package com.rabbitmq.client; + +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.CredentialsRefreshService; +import com.rabbitmq.client.impl.ErrorOnWriteListener; +import com.rabbitmq.client.impl.nio.ByteBufferFactory; +import com.rabbitmq.client.impl.nio.NioContext; +import com.rabbitmq.client.impl.nio.NioQueue; +import com.rabbitmq.client.impl.recovery.RecoveredQueueNameSupplier; +import com.rabbitmq.client.impl.recovery.TopologyRecoveryFilter; +import com.rabbitmq.client.observation.ObservationCollector; +import java.net.HttpURLConnection; +import java.net.URI; +import java.time.Duration; +import java.util.Map; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.function.Consumer; +import java.util.function.Function; +import java.util.function.Predicate; +import javax.net.SocketFactory; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSocketFactory; +import javax.net.ssl.TrustManager; + +public interface ConnectionFactoryConfiguration { + + ConnectionFactoryConfiguration host(String name); + ConnectionFactoryConfiguration port(int port); + ConnectionFactoryConfiguration username(String username); + ConnectionFactoryConfiguration password(String username); + ConnectionFactoryConfiguration virtualHost(String virtualHost); + ConnectionFactoryConfiguration uri(URI uri); + ConnectionFactoryConfiguration uri(String uri); + + ConnectionFactoryConfiguration requestedChannelMax(int requestedChannelMax); + ConnectionFactoryConfiguration requestedFrameMax(int requestedFrameMax); + ConnectionFactoryConfiguration requestedHeartbeat(Duration heartbeat); + ConnectionFactoryConfiguration connectionTimeout(Duration timeout); + ConnectionFactoryConfiguration handshakeTimeout(Duration timeout); + ConnectionFactoryConfiguration shutdownTimeout(Duration timeout); + ConnectionFactoryConfiguration channelRpcTimeout(Duration timeout); + + ConnectionFactoryConfiguration maxInboundMessageBodySize(int maxInboundMessageBodySize); + ConnectionFactoryConfiguration channelShouldCheckRpcResponseType(boolean channelShouldCheckRpcResponseType); + ConnectionFactoryConfiguration workPoolTimeout(Duration timeout); + + ConnectionFactoryConfiguration errorOnWriteListener(ErrorOnWriteListener errorOnWriteListener); + + ConnectionFactoryConfiguration trafficListener(TrafficListener trafficListener); + + // TODO provide helper for client properties + ConnectionFactoryConfiguration clientProperties(Map clientProperties); + ConnectionFactoryConfiguration clientProperty(String name, Object value); + + ConnectionFactoryConfiguration saslConfig(SaslConfig saslConfig); + + ConnectionFactoryConfiguration socketFactory(SocketFactory socketFactory); + + ConnectionFactoryConfiguration socketConfigurator(SocketConfigurator socketConfigurator); + + ConnectionFactoryConfiguration sharedExecutor(ExecutorService executorService); + ConnectionFactoryConfiguration shutdownExecutor(ExecutorService executorService); + ConnectionFactoryConfiguration heartbeatExecutor(ExecutorService executorService); + ConnectionFactoryConfiguration threadFactory(ThreadFactory threadFactory); + + ConnectionFactoryConfiguration exceptionHandler(ExceptionHandler exceptionHandler); + + ConnectionFactoryConfiguration metricsCollector(MetricsCollector metricsCollector); + ConnectionFactoryConfiguration observationCollector(ObservationCollector observationCollector); + + // TODO special configuration for credentials, especially for OAuth? + ConnectionFactoryConfiguration credentialsProvider(CredentialsProvider credentialsProvider); + ConnectionFactoryConfiguration credentialsRefreshService(CredentialsRefreshService credentialsRefreshService); + + NioConfiguration nio(); + + TlsConfiguration tls(); + + OAuth2Configuration oauth2(); + + ConnectionFactory create(); + + interface NioConfiguration { + + NioConfiguration readByteBufferSize(int readByteBufferSize); + + NioConfiguration writeByteBufferSize(int writeByteBufferSize); + + NioConfiguration nbIoThreads(int nbIoThreads); + + NioConfiguration writeEnqueuingTimeout(Duration writeEnqueuingTimeout); + + NioConfiguration writeQueueCapacity(int writeQueueCapacity); + + NioConfiguration executor(ExecutorService executorService); + + NioConfiguration threadFactory(ThreadFactory threadFactory); + + NioConfiguration socketChannelConfigurator(SocketChannelConfigurator configurator); + + NioConfiguration sslEngineConfigurator(SslEngineConfigurator configurator); + + NioConfiguration connectionShutdownExecutor(ExecutorService executorService); + + NioConfiguration byteBufferFactory(ByteBufferFactory byteBufferFactory); + + NioConfiguration writeQueueFactory(Function writeQueueFactory); + + ConnectionFactoryConfiguration configuration(); + + + } + + interface TlsConfiguration { + + TlsConfiguration hostnameVerification(); + + TlsConfiguration hostnameVerification(boolean hostnameVerification); + + TlsConfiguration sslContextFactory(SslContextFactory sslContextFactory); + + TlsConfiguration protocol(String protocol); + + TlsConfiguration trustManager(TrustManager trustManager); + + TlsConfiguration trustEverything(); + + TlsConfiguration sslContext(SSLContext sslContext); + + ConnectionFactoryConfiguration configuration(); + + } + + interface RecoveryConfiguration { + + RecoveryConfiguration enableConnectionRecovery(); + RecoveryConfiguration enableConnectionRecovery(boolean connectionRecovery); + + RecoveryConfiguration enableTopologyRecovery(); + RecoveryConfiguration enableTopologyRecovery(boolean connectionRecovery); + + RecoveryConfiguration topologyRecoveryExecutor(ExecutorService executorService); + + RecoveryConfiguration recoveryInterval(Duration interval); + + RecoveryConfiguration recoveryDelayHandler(RecoveryDelayHandler recoveryDelayHandler); + + RecoveryConfiguration topologyRecoveryFilter(TopologyRecoveryFilter topologyRecoveryFilter); + + RecoveryConfiguration recoveryTriggeringCondition(Predicate connectionRecoveryTriggeringCondition); + + RecoveryConfiguration recoveredQueueNameSupplier(RecoveredQueueNameSupplier recoveredQueueNameSupplier); + + ConnectionFactoryConfiguration configuration(); + } + + interface OAuth2Configuration { + + OAuth2Configuration tokenEndpointUri(String tokenEndpointUri); + + OAuth2Configuration clientId(String clientId); + + OAuth2Configuration clientSecret(String clientSecret); + + OAuth2Configuration grantType(String grantType); + + OAuth2Configuration parameter(String name, String value); + + OAuth2Configuration connectionConfigurator(Consumer connectionConfigurator); + + OAuth2TlsConfiguration tls(); + + OAuth2CredentialsRefreshConfiguration refresh(); + + ConnectionFactoryConfiguration configuration(); + } + + interface OAuth2TlsConfiguration { + + OAuth2TlsConfiguration hostnameVerifier(HostnameVerifier hostnameVerifier); + + OAuth2TlsConfiguration sslSocketFactory(SSLSocketFactory sslSocketFactory); + + OAuth2TlsConfiguration sslContext(SSLContext sslContext); + + OAuth2TlsConfiguration trustEverything(); + + OAuth2Configuration oauth2(); + + } + + interface OAuth2CredentialsRefreshConfiguration { + + OAuth2CredentialsRefreshConfiguration refreshDelayStrategy(Function refreshDelayStrategy); + + OAuth2CredentialsRefreshConfiguration approachingExpirationStrategy(Function approachingExpirationStrategy); + + OAuth2CredentialsRefreshConfiguration scheduler(ScheduledThreadPoolExecutor scheduler); + + OAuth2Configuration oauth2(); + + } + +} diff --git a/src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java b/src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java new file mode 100644 index 000000000..1db2c7e51 --- /dev/null +++ b/src/test/java/com/rabbitmq/client/ConnectionFactoryConfigurationDemo.java @@ -0,0 +1,78 @@ +package com.rabbitmq.client; + +import com.rabbitmq.client.impl.CredentialsProvider; +import com.rabbitmq.client.impl.CredentialsRefreshService; +import com.rabbitmq.client.impl.DefaultCredentialsRefreshService; +import com.rabbitmq.client.impl.OAuth2ClientCredentialsGrantCredentialsProvider; +import com.rabbitmq.client.impl.nio.NioParams; + +import javax.net.ssl.SSLContext; +import java.time.Duration; +import java.util.Collections; +import java.util.Map; + +import static com.rabbitmq.client.impl.DefaultCredentialsRefreshService.ratioRefreshDelayStrategy; + +public class ConnectionFactoryConfigurationDemo { + + public static void main(String[] args) throws Exception { + SSLContext sslContext = SSLContext.getDefault(); + + // historical configuration with ConnectionFactory setters + ConnectionFactory cf = new ConnectionFactory(); + cf.setUri("amqp://rabbitmq-1:5672/foo"); + cf.setChannelRpcTimeout(10_000); // unit? + Map clientProperties = Collections.singletonMap("foo", "bar"); + cf.setClientProperties(clientProperties); + cf.useSslProtocol("TLSv1.3", new TrustEverythingTrustManager()); + NioParams nioParams = new NioParams(); + nioParams.setNbIoThreads(4); + cf.setNioParams(nioParams); + + CredentialsProvider credentialsProvider = + new OAuth2ClientCredentialsGrantCredentialsProvider.OAuth2ClientCredentialsGrantCredentialsProviderBuilder() + .tokenEndpointUri("http://localhost:8080/uaa/oauth/token/") + .clientId("rabbit_client").clientSecret("rabbit_secret") + .grantType("password") + .parameter("username", "rabbit_super") + .parameter("password", "rabbit_super") + .tls() + .sslContext(sslContext) + .builder() + .build(); + cf.setCredentialsProvider(credentialsProvider); + CredentialsRefreshService refreshService = + new DefaultCredentialsRefreshService.DefaultCredentialsRefreshServiceBuilder() + .refreshDelayStrategy(ratioRefreshDelayStrategy(0.8)) + .build(); + cf.setCredentialsRefreshService(refreshService); + + // configuration with new configuration API + ConnectionFactory.configure() + .uri("amqp://rabbitmq-1:5672/foo") + .channelRpcTimeout(Duration.ofSeconds(10)) // Duration class instead of int + .clientProperty("foo", "bar") + .tls() // TLS configuration API + .protocol("TLSv1.3") + .trustEverything() + .configuration() // back to main configuration + .nio() // NIO configuration API + .nbIoThreads(4) + .configuration() // back to main configuration + .oauth2() // OAuth 2 configuration API + .tokenEndpointUri("http://localhost:8080/uaa/oauth/token/") + .clientId("rabbit_client").clientSecret("rabbit_secret") + .grantType("password") + .parameter("username", "rabbit_super") + .parameter("password", "rabbit_super") + .tls() // OAuth 2 TLS + .sslContext(sslContext) + .oauth2() + .refresh() // OAuth refresh configuration + .refreshDelayStrategy(ratioRefreshDelayStrategy(0.8)) + .oauth2() + .configuration() // back to main configuration + .create(); + } + +}