From 0c5bd02794cbc09cf9be6fc3aa18d199b74b3382 Mon Sep 17 00:00:00 2001 From: Andriy Redko Date: Sun, 22 Dec 2024 12:51:19 -0500 Subject: [PATCH] CXF-8629: AsyncHTTPConduit (hc5) should support chunked request / response. Add test cases with auto-redirect --- .../asyncclient/hc5/AsyncHTTPConduit.java | 5 +- .../hc5/URLConnectionAsyncHTTPConduit.java | 5 +- .../cxf/systest/hc5/jaxrs/FileStore.java | 58 +++- .../jaxrs/JAXRSAsyncClientChunkingTest.java | 150 +++++++++- .../apache/cxf/systest/jaxrs/FileStore.java | 78 ++++- .../jaxrs/JAXRSAsyncClientChunkingTest.java | 273 ++++++++++++++++++ 6 files changed, 552 insertions(+), 17 deletions(-) create mode 100644 systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientChunkingTest.java diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java index 52739f60991..c990ecf7197 100644 --- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/AsyncHTTPConduit.java @@ -696,7 +696,10 @@ protected synchronized HttpResponse getHttpResponse() throws IOException { } protected void handleResponseAsync() throws IOException { - isAsync = true; + // The response hasn't been handled yet, should be handled asynchronously + if (httpResponse == null) { + isAsync = true; + } } protected void closeInputStream() throws IOException { diff --git a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/URLConnectionAsyncHTTPConduit.java b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/URLConnectionAsyncHTTPConduit.java index 09c8dd2a459..900c778adad 100644 --- a/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/URLConnectionAsyncHTTPConduit.java +++ b/rt/transports/http-hc5/src/main/java/org/apache/cxf/transport/http/asyncclient/hc5/URLConnectionAsyncHTTPConduit.java @@ -699,7 +699,10 @@ protected synchronized HttpResponse getHttpResponse() throws IOException { } protected void handleResponseAsync() throws IOException { - isAsync = true; + // The response hasn't been handled yet, should be handled asynchronously + if (httpResponse == null) { + isAsync = true; + } } protected void closeInputStream() throws IOException { diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/FileStore.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/FileStore.java index 11e36a38135..5a0ebd76eae 100644 --- a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/FileStore.java +++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/FileStore.java @@ -29,6 +29,7 @@ import jakarta.activation.DataHandler; import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.QueryParam; @@ -38,8 +39,10 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.ResponseBuilder; import jakarta.ws.rs.core.Response.Status; import jakarta.ws.rs.core.StreamingOutput; +import jakarta.ws.rs.core.UriBuilder; import jakarta.ws.rs.core.UriInfo; import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.helpers.IOUtils; @@ -54,7 +57,7 @@ public class FileStore { @POST @Path("/stream") @Consumes("*/*") - public Response addBook(@QueryParam("chunked") boolean chunked, InputStream in) throws IOException { + public Response addFile(@QueryParam("chunked") boolean chunked, InputStream in) throws IOException { String transferEncoding = headers.getHeaderString("Transfer-Encoding"); if (chunked != Objects.equals("chunked", transferEncoding)) { @@ -75,11 +78,11 @@ public void write(OutputStream out) throws IOException, WebApplicationException return Response.ok(Arrays.copyOf(content, content.length / 10)).build(); } } - } + } @POST @Consumes("multipart/form-data") - public void addBook(@QueryParam("chunked") boolean chunked, + public void addFile(@QueryParam("chunked") boolean chunked, @Suspended final AsyncResponse response, @Context final UriInfo uri, final MultipartBody body) { String transferEncoding = headers.getHeaderString("Transfer-Encoding"); @@ -136,4 +139,53 @@ public void write(OutputStream os) throws IOException, WebApplicationException { } } } + + @GET + @Consumes("multipart/form-data") + public void getFile(@QueryParam("chunked") boolean chunked, @QueryParam("filename") String source, + @Suspended final AsyncResponse response) { + + if (StringUtils.isEmpty(source)) { + response.resume(Response.status(Status.BAD_REQUEST).build()); + return; + } + + try { + if (!store.containsKey(source)) { + response.resume(Response.status(Status.NOT_FOUND).build()); + return; + } + + final byte[] content = store.get(source); + if (response.isSuspended()) { + final StreamingOutput stream = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, WebApplicationException { + if (chunked) { + // Make sure we have enough data for chunking to kick in + for (int i = 0; i < 10; ++i) { + os.write(content); + } + } else { + os.write(content); + } + } + }; + response.resume(Response.ok().entity(stream).build()); + } + + } catch (final Exception ex) { + response.resume(Response.serverError().build()); + } + } + + @GET + @Path("/redirect") + public Response redirectFile(@Context UriInfo uriInfo) { + final UriBuilder builder = uriInfo.getBaseUriBuilder().path(getClass()); + uriInfo.getQueryParameters(true).forEach((p, v) -> builder.queryParam(p, v.get(0))); + + final ResponseBuilder response = Response.status(303).header("Location", builder.build()); + return response.build(); + } } diff --git a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientChunkingTest.java b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientChunkingTest.java index 0f687385345..53080752db4 100644 --- a/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientChunkingTest.java +++ b/systests/transport-hc5/src/test/java/org/apache/cxf/systest/hc5/jaxrs/JAXRSAsyncClientChunkingTest.java @@ -26,12 +26,20 @@ import java.util.Collection; import java.util.List; import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; import jakarta.ws.rs.client.Entity; import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.MultivaluedMap; import jakarta.ws.rs.core.Response; import org.apache.cxf.interceptor.LoggingInInterceptor; +import org.apache.cxf.interceptor.LoggingMessage; import org.apache.cxf.interceptor.LoggingOutInterceptor; import org.apache.cxf.jaxrs.client.ClientConfiguration; import org.apache.cxf.jaxrs.client.WebClient; @@ -40,6 +48,7 @@ import org.apache.cxf.jaxrs.impl.MetadataMap; import org.apache.cxf.jaxrs.model.AbstractResourceInfo; import org.apache.cxf.jaxrs.provider.MultipartProvider; +import org.apache.cxf.message.Message; import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; import org.apache.cxf.transport.http.asyncclient.hc5.AsyncHTTPConduit; @@ -50,6 +59,7 @@ import static org.hamcrest.CoreMatchers.equalTo; import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.startsWith; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -57,9 +67,12 @@ public class JAXRSAsyncClientChunkingTest extends AbstractBusClientServerTestBase { private static final String PORT = allocatePort(FileStoreServer.class); private final Boolean chunked; + private final Boolean autoRedirect; + private final ConcurrentMap ids = new ConcurrentHashMap<>(); - public JAXRSAsyncClientChunkingTest(Boolean chunked) { + public JAXRSAsyncClientChunkingTest(Boolean chunked, Boolean autoRedirect) { this.chunked = chunked; + this.autoRedirect = autoRedirect; } @BeforeClass @@ -69,9 +82,14 @@ public static void startServers() throws Exception { createStaticBus(); } - @Parameters(name = "{0}") - public static Collection data() { - return Arrays.asList(new Boolean[] {Boolean.FALSE, Boolean.TRUE}); + @Parameters(name = "chunked {0}, auto-redirect {1}") + public static Collection data() { + return Arrays.asList(new Boolean[][] { + {Boolean.FALSE /* chunked */, Boolean.FALSE /* autoredirect */}, + {Boolean.FALSE /* chunked */, Boolean.TRUE /* autoredirect */}, + {Boolean.TRUE /* chunked */, Boolean.FALSE /* autoredirect */}, + {Boolean.TRUE /* chunked */, Boolean.TRUE /* autoredirect */}, + }); } @Test @@ -82,17 +100,18 @@ public void testMultipartChunking() { final ClientConfiguration config = WebClient.getConfig(webClient); config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); configureLogging(config); + final String filename = "keymanagers.jks"; try { - final String filename = "keymanagers.jks"; final MultivaluedMap headers = new MetadataMap<>(); headers.add("Content-ID", filename); headers.add("Content-Type", "application/binary"); - headers.add("Content-Disposition", "attachment; filename=" + chunked + "_" + filename); + headers.add("Content-Disposition", "attachment; filename=" + chunked + "_" + autoRedirect + "_" + filename); final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers); final MultipartBody entity = new MultipartBody(att); - try (Response response = webClient.header("Content-Type", "multipart/form-data").post(entity)) { + try (Response response = webClient.header("Content-Type", MediaType.MULTIPART_FORM_DATA).post(entity)) { assertThat(response.getStatus(), equalTo(201)); assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); assertThat(response.getEntity(), not(equalTo(null))); @@ -100,6 +119,42 @@ public void testMultipartChunking() { } finally { webClient.close(); } + + assertRedirect(chunked + "_" + autoRedirect + "_" + filename); + } + + @Test + public void testMultipartChunkingAsync() throws InterruptedException, ExecutionException, TimeoutException { + final String url = "http://localhost:" + PORT + "/file-store"; + final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())).query("chunked", chunked); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + final String filename = "keymanagers.jks"; + try { + final MultivaluedMap headers = new MetadataMap<>(); + headers.add("Content-ID", filename); + headers.add("Content-Type", "application/binary"); + headers.add("Content-Disposition", "attachment; filename=" + chunked + + "_" + autoRedirect + "_async_" + filename); + final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers); + final Entity entity = Entity.entity(new MultipartBody(att), + MediaType.MULTIPART_FORM_DATA_TYPE); + try (Response response = webClient.header("Content-Type", MediaType.MULTIPART_FORM_DATA).async() + .post(entity).get(10, TimeUnit.SECONDS)) { + assertThat(response.getStatus(), equalTo(201)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } + } finally { + webClient.close(); + } + + assertRedirect(chunked + "_" + autoRedirect + "_" + filename); } @Test @@ -110,6 +165,7 @@ public void testStreamChunking() throws IOException { final ClientConfiguration config = WebClient.getConfig(webClient); config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); configureLogging(config); final byte[] bytes = new byte [32 * 1024]; @@ -126,13 +182,89 @@ public void testStreamChunking() throws IOException { } finally { webClient.close(); } + + assertNoDuplicateLogging(); } - + + @Test + public void testStreamChunkingAsync() throws IOException, InterruptedException, + ExecutionException, TimeoutException { + final String url = "http://localhost:" + PORT + "/file-store/stream"; + final WebClient webClient = WebClient.create(url).query("chunked", chunked); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + final byte[] bytes = new byte [32 * 1024]; + final Random random = new Random(); + random.nextBytes(bytes); + + try (InputStream in = new ByteArrayInputStream(bytes)) { + final Entity entity = Entity.entity(in, MediaType.APPLICATION_OCTET_STREAM); + try (Response response = webClient.async().post(entity).get(10, TimeUnit.SECONDS)) { + assertThat(response.getStatus(), equalTo(200)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } + } finally { + webClient.close(); + } + + assertNoDuplicateLogging(); + } + + private void assertRedirect(String filename) { + final String url = "http://localhost:" + PORT + "/file-store/redirect"; + + final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())) + .query("chunked", chunked) + .query("filename", filename); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + try { + try (Response response = webClient.get()) { + if (autoRedirect) { + assertThat(response.getStatus(), equalTo(200)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } else { + assertThat(response.getStatus(), equalTo(303)); + assertThat(response.getHeaderString("Location"), + startsWith("http://localhost:" + PORT + "/file-store")); + } + } + } finally { + webClient.close(); + } + + assertNoDuplicateLogging(); + } + + private void assertNoDuplicateLogging() { + ids.forEach((id, counter) -> assertThat("Duplicate client logging for message " + id, + counter.get(), equalTo(1))); + } + private void configureLogging(final ClientConfiguration config) { final LoggingOutInterceptor out = new LoggingOutInterceptor(); out.setShowMultipartContent(false); - final LoggingInInterceptor in = new LoggingInInterceptor(); + final LoggingInInterceptor in = new LoggingInInterceptor() { + @Override + protected void logging(Logger logger, Message message) { + super.logging(logger, message); + final String id = (String) message.get(LoggingMessage.ID_KEY); + ids.computeIfAbsent(id, key -> new AtomicInteger()).incrementAndGet(); + } + }; in.setShowBinaryContent(false); config.getInInterceptors().add(in); diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/FileStore.java b/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/FileStore.java index fe64e82ffde..a1f7d336621 100644 --- a/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/FileStore.java +++ b/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/FileStore.java @@ -29,6 +29,7 @@ import jakarta.activation.DataHandler; import jakarta.ws.rs.Consumes; +import jakarta.ws.rs.GET; import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.QueryParam; @@ -38,8 +39,10 @@ import jakarta.ws.rs.core.Context; import jakarta.ws.rs.core.HttpHeaders; import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.Response.ResponseBuilder; import jakarta.ws.rs.core.Response.Status; import jakarta.ws.rs.core.StreamingOutput; +import jakarta.ws.rs.core.UriBuilder; import jakarta.ws.rs.core.UriInfo; import org.apache.cxf.common.util.StringUtils; import org.apache.cxf.helpers.IOUtils; @@ -54,7 +57,7 @@ public class FileStore { @POST @Path("/stream") @Consumes("*/*") - public Response addBook(@QueryParam("chunked") boolean chunked, InputStream in) throws IOException { + public Response addFile(@QueryParam("chunked") boolean chunked, InputStream in) throws IOException { String transferEncoding = headers.getHeaderString("Transfer-Encoding"); if (chunked != Objects.equals("chunked", transferEncoding)) { @@ -79,9 +82,9 @@ public void write(OutputStream out) throws IOException, WebApplicationException @POST @Consumes("multipart/form-data") - public void addBook(@QueryParam("chunked") boolean chunked, + public void addFile(@QueryParam("chunked") boolean chunked, @Suspended final AsyncResponse response, @Context final UriInfo uri, final MultipartBody body) { - + String transferEncoding = headers.getHeaderString("Transfer-Encoding"); if (chunked != Objects.equals("chunked", transferEncoding)) { response.resume(Response.status(Status.EXPECTATION_FAILED).build()); @@ -109,6 +112,26 @@ public void addBook(@QueryParam("chunked") boolean chunked, response.resume(Response.status(Status.CONFLICT).build()); return; } + + if (response.isSuspended()) { + final StreamingOutput stream = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, WebApplicationException { + if (chunked) { + // Make sure we have enough data for chunking to kick in + for (int i = 0; i < 10; ++i) { + os.write(content); + } + } else { + os.write(content); + } + } + }; + response.resume(Response.created(uri.getRequestUriBuilder() + .path(source).build()).entity(stream) + .build()); + } + } catch (final Exception ex) { response.resume(Response.serverError().build()); } @@ -124,4 +147,53 @@ public void addBook(@QueryParam("chunked") boolean chunked, response.resume(Response.status(Status.BAD_REQUEST).build()); } } + + @GET + @Consumes("multipart/form-data") + public void getFile(@QueryParam("chunked") boolean chunked, @QueryParam("filename") String source, + @Suspended final AsyncResponse response) { + + if (StringUtils.isEmpty(source)) { + response.resume(Response.status(Status.BAD_REQUEST).build()); + return; + } + + try { + if (!store.containsKey(source)) { + response.resume(Response.status(Status.NOT_FOUND).build()); + return; + } + + final byte[] content = store.get(source); + if (response.isSuspended()) { + final StreamingOutput stream = new StreamingOutput() { + @Override + public void write(OutputStream os) throws IOException, WebApplicationException { + if (chunked) { + // Make sure we have enough data for chunking to kick in + for (int i = 0; i < 10; ++i) { + os.write(content); + } + } else { + os.write(content); + } + } + }; + response.resume(Response.ok().entity(stream).build()); + } + + } catch (final Exception ex) { + response.resume(Response.serverError().build()); + } + } + + @GET + @Path("/redirect") + public Response redirectFile(@Context UriInfo uriInfo) { + final UriBuilder builder = uriInfo.getBaseUriBuilder().path(getClass()); + uriInfo.getQueryParameters(true).forEach((p, v) -> builder.queryParam(p, v.get(0))); + + final ResponseBuilder response = Response.status(303).header("Location", builder.build()); + return response.build(); + } } diff --git a/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientChunkingTest.java b/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientChunkingTest.java new file mode 100644 index 00000000000..8d754780d5a --- /dev/null +++ b/systests/transports/src/test/java/org/apache/cxf/systest/jaxrs/JAXRSAsyncClientChunkingTest.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.cxf.systest.jaxrs; + +import java.io.ByteArrayInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.util.Arrays; +import java.util.Collection; +import java.util.List; +import java.util.Random; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.logging.Logger; + +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.MultivaluedMap; +import jakarta.ws.rs.core.Response; +import org.apache.cxf.interceptor.LoggingInInterceptor; +import org.apache.cxf.interceptor.LoggingMessage; +import org.apache.cxf.interceptor.LoggingOutInterceptor; +import org.apache.cxf.jaxrs.client.ClientConfiguration; +import org.apache.cxf.jaxrs.client.WebClient; +import org.apache.cxf.jaxrs.ext.multipart.Attachment; +import org.apache.cxf.jaxrs.ext.multipart.MultipartBody; +import org.apache.cxf.jaxrs.impl.MetadataMap; +import org.apache.cxf.jaxrs.model.AbstractResourceInfo; +import org.apache.cxf.jaxrs.provider.MultipartProvider; +import org.apache.cxf.message.Message; +import org.apache.cxf.testutil.common.AbstractBusClientServerTestBase; +import org.apache.cxf.transport.http.asyncclient.AsyncHTTPConduit; + +import org.junit.BeforeClass; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized.Parameters; + +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.CoreMatchers.startsWith; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +@RunWith(value = org.junit.runners.Parameterized.class) +public class JAXRSAsyncClientChunkingTest extends AbstractBusClientServerTestBase { + private static final String PORT = allocatePort(FileStoreServer.class); + private final Boolean chunked; + private final Boolean autoRedirect; + private final ConcurrentMap ids = new ConcurrentHashMap<>(); + + public JAXRSAsyncClientChunkingTest(Boolean chunked, Boolean autoRedirect) { + this.chunked = chunked; + this.autoRedirect = autoRedirect; + } + + @BeforeClass + public static void startServers() throws Exception { + AbstractResourceInfo.clearAllMaps(); + assertTrue("server did not launch correctly", launchServer(new FileStoreServer(PORT))); + createStaticBus(); + } + + @Parameters(name = "chunked {0}, auto-redirect {1}") + public static Collection data() { + return Arrays.asList(new Boolean[][] { + {Boolean.FALSE /* chunked */, Boolean.FALSE /* autoredirect */}, + {Boolean.FALSE /* chunked */, Boolean.TRUE /* autoredirect */}, + {Boolean.TRUE /* chunked */, Boolean.FALSE /* autoredirect */}, + {Boolean.TRUE /* chunked */, Boolean.TRUE /* autoredirect */}, + }); + } + + @Test + public void testMultipartChunking() { + final String url = "http://localhost:" + PORT + "/file-store"; + final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())).query("chunked", chunked); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + final String filename = "keymanagers.jks"; + try { + final MultivaluedMap headers = new MetadataMap<>(); + headers.add("Content-ID", filename); + headers.add("Content-Type", "application/binary"); + headers.add("Content-Disposition", "attachment; filename=" + chunked + "_" + autoRedirect + "_" + filename); + final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers); + final MultipartBody entity = new MultipartBody(att); + try (Response response = webClient.header("Content-Type", MediaType.MULTIPART_FORM_DATA).post(entity)) { + assertThat(response.getStatus(), equalTo(201)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } + } finally { + webClient.close(); + } + + assertRedirect(chunked + "_" + autoRedirect + "_" + filename); + } + + @Test + public void testMultipartChunkingAsync() throws InterruptedException, ExecutionException, TimeoutException { + final String url = "http://localhost:" + PORT + "/file-store"; + final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())).query("chunked", chunked); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + final String filename = "keymanagers.jks"; + try { + final MultivaluedMap headers = new MetadataMap<>(); + headers.add("Content-ID", filename); + headers.add("Content-Type", "application/binary"); + headers.add("Content-Disposition", "attachment; filename=" + chunked + + "_" + autoRedirect + "_async_" + filename); + final Attachment att = new Attachment(getClass().getResourceAsStream("/" + filename), headers); + final Entity entity = Entity.entity(new MultipartBody(att), + MediaType.MULTIPART_FORM_DATA_TYPE); + try (Response response = webClient.header("Content-Type", MediaType.MULTIPART_FORM_DATA).async() + .post(entity).get(10, TimeUnit.SECONDS)) { + assertThat(response.getStatus(), equalTo(201)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } + } finally { + webClient.close(); + } + + assertRedirect(chunked + "_" + autoRedirect + "_" + filename); + } + + @Test + public void testStreamChunking() throws IOException { + final String url = "http://localhost:" + PORT + "/file-store/stream"; + final WebClient webClient = WebClient.create(url).query("chunked", chunked); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + final byte[] bytes = new byte [32 * 1024]; + final Random random = new Random(); + random.nextBytes(bytes); + + try (InputStream in = new ByteArrayInputStream(bytes)) { + final Entity entity = Entity.entity(in, MediaType.APPLICATION_OCTET_STREAM); + try (Response response = webClient.post(entity)) { + assertThat(response.getStatus(), equalTo(200)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } + } finally { + webClient.close(); + } + + assertNoDuplicateLogging(); + } + + @Test + public void testStreamChunkingAsync() throws IOException, InterruptedException, + ExecutionException, TimeoutException { + final String url = "http://localhost:" + PORT + "/file-store/stream"; + final WebClient webClient = WebClient.create(url).query("chunked", chunked); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + final byte[] bytes = new byte [32 * 1024]; + final Random random = new Random(); + random.nextBytes(bytes); + + try (InputStream in = new ByteArrayInputStream(bytes)) { + final Entity entity = Entity.entity(in, MediaType.APPLICATION_OCTET_STREAM); + try (Response response = webClient.async().post(entity).get(10, TimeUnit.SECONDS)) { + assertThat(response.getStatus(), equalTo(200)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } + } finally { + webClient.close(); + } + + assertNoDuplicateLogging(); + } + + private void assertRedirect(String filename) { + final String url = "http://localhost:" + PORT + "/file-store/redirect"; + + final WebClient webClient = WebClient.create(url, List.of(new MultipartProvider())) + .query("chunked", chunked) + .query("filename", filename); + + final ClientConfiguration config = WebClient.getConfig(webClient); + config.getBus().setProperty(AsyncHTTPConduit.USE_ASYNC, true); + config.getHttpConduit().getClient().setAllowChunking(chunked); + config.getHttpConduit().getClient().setAutoRedirect(autoRedirect); + configureLogging(config); + + try { + try (Response response = webClient.get()) { + if (autoRedirect) { + assertThat(response.getStatus(), equalTo(200)); + assertThat(response.getHeaderString("Transfer-Encoding"), equalTo(chunked ? "chunked" : null)); + assertThat(response.getEntity(), not(equalTo(null))); + } else { + assertThat(response.getStatus(), equalTo(303)); + assertThat(response.getHeaderString("Location"), + startsWith("http://localhost:" + PORT + "/file-store")); + } + } + } finally { + webClient.close(); + } + + assertNoDuplicateLogging(); + } + + private void assertNoDuplicateLogging() { + ids.forEach((id, counter) -> assertThat("Duplicate client logging for message " + id, + counter.get(), equalTo(1))); + } + + private void configureLogging(final ClientConfiguration config) { + final LoggingOutInterceptor out = new LoggingOutInterceptor(); + out.setShowMultipartContent(false); + + final LoggingInInterceptor in = new LoggingInInterceptor() { + @Override + protected void logging(Logger logger, Message message) { + super.logging(logger, message); + final String id = (String) message.get(LoggingMessage.ID_KEY); + ids.computeIfAbsent(id, key -> new AtomicInteger()).incrementAndGet(); + } + }; + in.setShowBinaryContent(false); + + config.getInInterceptors().add(in); + config.getOutInterceptors().add(out); + } +}