diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java index 3ba937d04b0..f5526fa57fd 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStream.java @@ -283,8 +283,10 @@ public void resetOut(OutputStream out, boolean copyOldContent) throws IOExceptio } } finally { streamList.remove(currentStream); + // we are not backed by file anymore, unregister from the cleaner if (cachedOutputStreamCleaner != null) { cachedOutputStreamCleaner.unregister(currentStream); + cachedOutputStreamCleaner.unregister(this); } deleteTempFile(); inmem = true; diff --git a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java index 3d6361a4c95..dbef316afbe 100644 --- a/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java +++ b/core/src/main/java/org/apache/cxf/io/CachedOutputStreamCleaner.java @@ -40,4 +40,12 @@ public interface CachedOutputStreamCleaner { * Unregister the stream instance from the clean up (closed properly) */ void register(Closeable closeable); + + /** + * The exact or approximate (depending on the implementation) size of the cleaner queue + * @return exact or approximate (depending on the implementation) size of the cleaner queue + */ + default int size() { + return 0; + } } diff --git a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java index 5c8a1768146..df033936a32 100644 --- a/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java +++ b/core/src/main/java/org/apache/cxf/io/DelayedCachedOutputStreamCleaner.java @@ -52,19 +52,24 @@ private interface DelayedCleaner extends CachedOutputStreamCleaner, Closeable { @Override default void register(Closeable closeable) { } - + @Override default void unregister(Closeable closeable) { } - + @Override default void close() { } - + @Override default void clean() { } - + + @Override + default int size() { + return 0; + } + default void forceClean() { } } @@ -101,18 +106,23 @@ public void clean() { queue.drainTo(closeables); clean(closeables); } - + @Override public void forceClean() { clean(queue); } - + @Override public void close() { timer.cancel(); queue.clear(); } - + + @Override + public int size() { + return queue.size(); + } + private void clean(Collection closeables) { final Iterator iterator = closeables.iterator(); while (iterator.hasNext()) { @@ -167,7 +177,8 @@ public boolean equals(Object obj) { } final DelayedCloseable other = (DelayedCloseable) obj; - return Objects.equals(closeable, other.closeable); + // because of the broken Liskov Substitution Principle, check in two ways to find equal streams + return Objects.equals(other.closeable, closeable) || Objects.equals(closeable, other.closeable); } } @@ -224,6 +235,11 @@ public void unregister(Closeable closeable) { cleaner.unregister(closeable); } + @Override + public int size() { + return cleaner.size(); + } + @Override public void clean() { cleaner.clean(); diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java index 6c81b20d1dd..01c188c96f8 100644 --- a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java +++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/RESTLoggingTest.java @@ -19,17 +19,21 @@ package org.apache.cxf.jaxrs.client.logging; +import java.io.Closeable; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.util.Collections; import java.util.List; +import java.util.concurrent.atomic.LongAdder; +import org.apache.cxf.Bus; import org.apache.cxf.endpoint.Server; import org.apache.cxf.ext.logging.AbstractLoggingInterceptor; import org.apache.cxf.ext.logging.LoggingFeature; import org.apache.cxf.ext.logging.event.EventType; import org.apache.cxf.ext.logging.event.LogEvent; +import org.apache.cxf.io.CachedOutputStreamCleaner; import org.apache.cxf.jaxrs.JAXRSServerFactoryBean; import org.apache.cxf.jaxrs.client.JAXRSClientFactoryBean; import org.apache.cxf.jaxrs.client.WebClient; @@ -39,7 +43,10 @@ import org.junit.Test; import static org.awaitility.Awaitility.await; +import static org.hamcrest.CoreMatchers.equalTo; +import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertEquals; public class RESTLoggingTest { @@ -57,6 +64,61 @@ public void testSlf4j() throws IOException { Assert.assertEquals("test1", result); } + @Test + public void testCacheCleanUp() throws Exception { + LoggingFeature loggingFeature = new LoggingFeature(); + loggingFeature.setInMemThreshold(1); // To activate usage of the CachedOutputStream + + Server server = createService(SERVICE_URI, new TestServiceRest(), loggingFeature); + server.start(); + + try { + final JAXRSClientFactoryBean bean = new JAXRSClientFactoryBean(); + bean.setAddress(SERVICE_URI); + bean.setTransportId(LocalTransportFactory.TRANSPORT_ID); + + final LongAdder registers = new LongAdder(); + final WebClient client = bean.createWebClient(); + final Bus bus = bean.getBus(); + + // See please https://issues.apache.org/jira/browse/CXF-9110 + final CachedOutputStreamCleaner cleaner = bus.getExtension(CachedOutputStreamCleaner.class); + bus.setExtension(new CachedOutputStreamCleaner() { + @Override + public void clean() { + cleaner.clean(); + } + + @Override + public void unregister(Closeable closeable) { + cleaner.unregister(closeable); + } + + @Override + public void register(Closeable closeable) { + cleaner.register(closeable); + registers.increment(); + } + + @Override + public int size() { + return cleaner.size(); + } + }, CachedOutputStreamCleaner.class); + + String response = null; + for (int i = 0; i < 1_000; i++) { // ~2...5 seconds of the execution + response = client.post("DATA", String.class); + } + assertEquals("DATA", response); + + assertThat(registers.longValue(), equalTo(3000L)); + assertThat(cleaner.size(), equalTo(0)); + } finally { + server.destroy(); + } + } + @Test public void testBinary() throws IOException, InterruptedException { LoggingFeature loggingFeature = new LoggingFeature(); diff --git a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java index ab237bc7102..1ef575e970e 100644 --- a/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java +++ b/rt/rs/client/src/test/java/org/apache/cxf/jaxrs/client/logging/TestServiceRest.java @@ -19,6 +19,7 @@ package org.apache.cxf.jaxrs.client.logging; import jakarta.ws.rs.GET; +import jakarta.ws.rs.POST; import jakarta.ws.rs.Path; import jakarta.ws.rs.PathParam; @@ -29,4 +30,9 @@ public String echo(@PathParam("msg") String msg) { return msg; } + @POST + public String post(String msg) { + return msg; + } } +