From 7510425645af14396924ef2e53b1ca4f085a2173 Mon Sep 17 00:00:00 2001 From: mr3 Date: Tue, 16 Jan 2024 10:52:37 +0800 Subject: [PATCH] feat: support lettuce cluster --- .../inst/common/util/FluxRecordFunction.java} | 29 +-- .../{FluxUtil.java => FluxReplayUtil.java} | 13 +- .../inst/common/util/MonoRecordFunction.java | 31 ++++ .../arex/inst/common/util/FluxUtilTest.java | 22 +-- .../dynamic/common/DynamicClassExtractor.java | 22 ++- .../dynamic/common/listener/FluxConsumer.java | 57 ------ .../dynamic/common/listener/MonoConsumer.java | 36 ---- .../common/DynamicClassExtractorTest.java | 19 +- .../common/listener/FluxConsumerTest.java | 165 ------------------ .../RedisClusterAsyncCommandsImplWrapper.java | 152 ++++++++-------- .../cluster/RedisClusterClientExtractor.java | 18 -- ...disClusterReactiveCommandsImplWrapper.java | 37 ++-- .../RedisClusterClientInstrumentation.java | 10 +- ...isClusterAsyncCommandsImplWrapperTest.java | 22 +-- .../RedisReactiveCommandsImplWrapperTest.java | 6 +- .../RedisClusterAsyncCommandsImplWrapper.java | 142 +++++++-------- .../cluster/RedisClusterClientExtractor.java | 17 -- ...disClusterReactiveCommandsImplWrapper.java | 30 ++-- .../RedisClusterClientInstrumentation.java | 12 +- ...isClusterAsyncCommandsImplWrapperTest.java | 22 +-- .../RedisReactiveCommandsImplWrapperTest.java | 4 +- .../redis/common/RedisConnectionManager.java | 13 +- .../common/lettuce/LettuceClusterUtil.java | 42 +++++ .../redis/common/lettuce/MonoConsumer.java | 35 ---- ...ResultUtil.java => ReactorStreamUtil.java} | 24 ++- .../wrapper/RedisReactiveCommandWrapper.java | 43 ++--- .../common/RedisConnectionManagerTest.java | 6 + .../common/lettuce/FluxConsumerTest.java | 39 ----- .../common/lettuce/MonoConsumerTest.java | 39 ----- ...ilTest.java => ReactorStreamUtilTest.java} | 18 +- .../RedisReactiveCommandWrapperTest.java | 4 +- 31 files changed, 406 insertions(+), 723 deletions(-) rename arex-instrumentation/{redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/FluxConsumer.java => common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java} (66%) rename arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/{FluxUtil.java => FluxReplayUtil.java} (93%) create mode 100644 arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java delete mode 100644 arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java delete mode 100644 arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java delete mode 100644 arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java delete mode 100644 arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterClientExtractor.java delete mode 100644 arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterClientExtractor.java create mode 100644 arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/LettuceClusterUtil.java delete mode 100644 arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/MonoConsumer.java rename arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/{RedisClusterReactiveResultUtil.java => ReactorStreamUtil.java} (67%) delete mode 100644 arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/FluxConsumerTest.java delete mode 100644 arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/MonoConsumerTest.java rename arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/{RedisClusterReactiveResultUtilTest.java => ReactorStreamUtilTest.java} (86%) diff --git a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/FluxConsumer.java b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java similarity index 66% rename from arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/FluxConsumer.java rename to arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java index 48b4a3e78..c1b7930a8 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/FluxConsumer.java +++ b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java @@ -1,9 +1,7 @@ -package io.arex.inst.redis.common.lettuce; +package io.arex.inst.common.util; import io.arex.agent.bootstrap.ctx.TraceTransmitter; -import io.arex.inst.common.util.FluxUtil; -import io.arex.inst.common.util.FluxUtil.FluxResult; -import io.arex.inst.redis.common.RedisExtractor; +import io.arex.inst.common.util.FluxReplayUtil.FluxResult; import io.arex.inst.runtime.model.ArexConstants; import io.arex.inst.runtime.serializer.Serializer; import io.arex.inst.runtime.util.TypeUtil; @@ -11,21 +9,23 @@ import java.util.List; import java.util.concurrent.atomic.AtomicInteger; import reactor.core.publisher.Flux; +import java.util.function.Function; -public class FluxConsumer { +public class FluxRecordFunction implements Function, Flux> { - private final RedisExtractor extractor; + private final Function executor; private final TraceTransmitter traceTransmitter; - public FluxConsumer(RedisExtractor extractor) { + public FluxRecordFunction(Function executor) { this.traceTransmitter = TraceTransmitter.create(); - this.extractor = extractor; + this.executor = executor; } - public Flux accept(Flux responseFlux) { + @Override + public Flux apply(Flux responseFlux) { // use a list to record all elements - List fluxElementMockerResults = new ArrayList<>(); + List fluxElementMockerResults = new ArrayList<>(); AtomicInteger index = new AtomicInteger(1); String responseType = TypeUtil.getName(responseFlux); return responseFlux @@ -37,7 +37,7 @@ public Flux accept(Flux responseFlux) { } }) // add error to list - .doOnError(error -> { + .doOnError(error -> { try (TraceTransmitter tm = traceTransmitter.transmit()) { fluxElementMockerResults.add( getFluxElementMockerResult(index.getAndIncrement(), error)); @@ -46,13 +46,14 @@ public Flux accept(Flux responseFlux) { .doFinally(result -> { try (TraceTransmitter tm = traceTransmitter.transmit()) { FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults); - extractor.record(fluxResult); + executor.apply(fluxResult); } }); } - private FluxUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) { + + private FluxReplayUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) { String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER); - return new FluxUtil.FluxElementResult(index, content, TypeUtil.getName(element)); + return new FluxReplayUtil.FluxElementResult(index, content, TypeUtil.getName(element)); } } diff --git a/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxReplayUtil.java similarity index 93% rename from arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java rename to arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxReplayUtil.java index 9d3ed3aa3..087885c29 100644 --- a/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxUtil.java +++ b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxReplayUtil.java @@ -1,24 +1,27 @@ package io.arex.inst.common.util; +import io.arex.agent.bootstrap.ctx.TraceTransmitter; +import io.arex.agent.bootstrap.model.MockResult; +import io.arex.agent.bootstrap.util.CollectionUtil; import io.arex.inst.runtime.serializer.Serializer; import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.List; +import java.util.function.Function; import reactor.core.publisher.Flux; -import io.arex.agent.bootstrap.util.CollectionUtil; -public class FluxUtil { + +public class FluxReplayUtil{ static final String FLUX_FROM_ITERATOR = "reactor.core.publisher.FluxIterable-"; static final String FLUX_FROM_ARRAY = "reactor.core.publisher.FluxArray-"; static final String FLUX_FROM_STREAM = "reactor.core.publisher.FluxStream-"; - private FluxUtil() { - } + public static Flux restore(Object fluxObj) { - if(fluxObj == null){ + if (fluxObj == null) { return Flux.empty(); } FluxResult fluxResult = (FluxResult) fluxObj; diff --git a/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java new file mode 100644 index 000000000..b727103b4 --- /dev/null +++ b/arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java @@ -0,0 +1,31 @@ +package io.arex.inst.common.util; + +import io.arex.agent.bootstrap.ctx.TraceTransmitter; +import java.util.function.Function; +import reactor.core.publisher.Mono; + +public class MonoRecordFunction implements Function, Mono> { + + private final Function executor; + private final TraceTransmitter traceTransmitter; + + public MonoRecordFunction(Function executor) { + this.traceTransmitter = TraceTransmitter.create(); + this.executor = executor; + } + + @Override + public Mono apply(Mono responseMono) { + return responseMono + .doOnSuccess(result -> { + try (TraceTransmitter tm = traceTransmitter.transmit()) { + executor.apply(result); + } + }) + .doOnError(error -> { + try (TraceTransmitter tm = traceTransmitter.transmit()) { + executor.apply(error); + } + }); + } +} diff --git a/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java b/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java index 79f912817..02b92c5a0 100644 --- a/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java +++ b/arex-instrumentation/common/arex-common/src/test/java/io/arex/inst/common/util/FluxUtilTest.java @@ -1,13 +1,13 @@ package io.arex.inst.common.util; -import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ARRAY; -import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ITERATOR; -import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_STREAM; +import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ARRAY; +import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ITERATOR; +import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_STREAM; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertNotNull; -import io.arex.inst.common.util.FluxUtil.FluxElementResult; -import io.arex.inst.common.util.FluxUtil.FluxResult; +import io.arex.inst.common.util.FluxReplayUtil.FluxElementResult; +import io.arex.inst.common.util.FluxReplayUtil.FluxResult; import io.arex.inst.runtime.util.TypeUtil; import java.lang.reflect.Type; import java.util.ArrayList; @@ -22,8 +22,8 @@ void FluxRecory() { List list = new ArrayList<>(); FluxResult fluxResult = new FluxResult(null, list); // flux is empty - assertNotNull(FluxUtil.restore(null)); - Flux result = FluxUtil.restore(fluxResult); + assertNotNull(FluxReplayUtil.restore(null)); + Flux result = FluxReplayUtil.restore(fluxResult); assertNotNull(result); // flux is not empty @@ -34,22 +34,22 @@ void FluxRecory() { // Flux.just() fluxResult = new FluxResult(null, list); - result = FluxUtil.restore(fluxResult); + result = FluxReplayUtil.restore(fluxResult); assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxJust-java.util.ArrayList-"); // Flux.fromIterable() fluxResult = new FluxResult(FLUX_FROM_ITERATOR, list); - result = FluxUtil.restore(fluxResult); + result = FluxReplayUtil.restore(fluxResult); assertEquals(TypeUtil.getName(result),FLUX_FROM_ITERATOR); // Flux.fromArray() fluxResult = new FluxResult(FLUX_FROM_ARRAY, list); - result = FluxUtil.restore(fluxResult); + result = FluxReplayUtil.restore(fluxResult); assertEquals(TypeUtil.getName(result),FLUX_FROM_ARRAY); // Flux.fromStream() fluxResult = new FluxResult(FLUX_FROM_STREAM, list); - result = FluxUtil.restore(fluxResult); + result = FluxReplayUtil.restore(fluxResult); assertEquals(TypeUtil.getName(result),FLUX_FROM_STREAM); } } diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java index ccb483f7b..0f8fea413 100644 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java @@ -8,10 +8,11 @@ import io.arex.agent.bootstrap.util.ArrayUtils; import io.arex.agent.bootstrap.util.StringUtil; import io.arex.agent.thirdparty.util.time.DateFormatUtils; -import io.arex.inst.common.util.FluxUtil; -import io.arex.inst.dynamic.common.listener.FluxConsumer; +import io.arex.inst.common.util.FluxRecordFunction; +import io.arex.inst.common.util.FluxReplayUtil; +import io.arex.inst.common.util.FluxReplayUtil.FluxResult; +import io.arex.inst.common.util.MonoRecordFunction; import io.arex.inst.dynamic.common.listener.ListenableFutureAdapter; -import io.arex.inst.dynamic.common.listener.MonoConsumer; import io.arex.inst.dynamic.common.listener.ResponseConsumer; import io.arex.inst.runtime.config.Config; import io.arex.inst.runtime.context.ArexContext; @@ -33,6 +34,7 @@ import java.util.Objects; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Future; +import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; @@ -97,11 +99,19 @@ public Object recordResponse(Object response) { } // Compatible with not import package reactor-core if (MONO.equals(methodReturnType) && response instanceof Mono) { - return new MonoConsumer(this).accept((Mono) response); + Function executor = result -> { + this.recordResponse(result); + return null; + }; + return new MonoRecordFunction(executor).apply((Mono) response); } if (FLUX.equals(methodReturnType) && response instanceof Flux) { - return new FluxConsumer(this).accept((Flux) response); + Function executor = result -> { + this.recordResponse(result); + return null; + }; + return new FluxRecordFunction(executor).apply((Flux) response); } this.result = response; @@ -321,7 +331,7 @@ Object restoreResponse(Object result) { if (result instanceof Throwable) { return Flux.error((Throwable) result); } - return FluxUtil.restore(result); + return FluxReplayUtil.restore(result); } return result; } diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java deleted file mode 100644 index 82703d9de..000000000 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/FluxConsumer.java +++ /dev/null @@ -1,57 +0,0 @@ -package io.arex.inst.dynamic.common.listener; - -import io.arex.agent.bootstrap.ctx.TraceTransmitter; -import io.arex.inst.common.util.FluxUtil; -import io.arex.inst.common.util.FluxUtil.FluxResult; -import io.arex.inst.dynamic.common.DynamicClassExtractor; -import io.arex.inst.runtime.model.ArexConstants; -import io.arex.inst.runtime.serializer.Serializer; -import io.arex.inst.runtime.util.TypeUtil; -import java.util.ArrayList; -import java.util.List; -import java.util.concurrent.atomic.AtomicInteger; -import reactor.core.publisher.Flux; - -public class FluxConsumer { - - private final TraceTransmitter traceTransmitter; - private final DynamicClassExtractor extractor; - - public FluxConsumer(DynamicClassExtractor extractor) { - this.traceTransmitter = TraceTransmitter.create(); - this.extractor = extractor; - } - - public Flux accept(Flux responseFlux) { - // use a list to record all elements - List fluxElementMockerResults = new ArrayList<>(); - AtomicInteger index = new AtomicInteger(1); - String responseType = TypeUtil.getName(responseFlux); - return responseFlux - // add element to list - .doOnNext(element -> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - fluxElementMockerResults.add( - getFluxElementMockerResult(index.getAndIncrement(), element)); - } - }) - // add error to list - .doOnError(error -> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - fluxElementMockerResults.add( - getFluxElementMockerResult(index.getAndIncrement(), error)); - } - }) - .doFinally(result -> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults); - extractor.recordResponse(fluxResult); - } - }); - } - - private FluxUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) { - String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER); - return new FluxUtil.FluxElementResult(index, content, TypeUtil.getName(element)); - } -} diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java deleted file mode 100644 index 6ea63154d..000000000 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/listener/MonoConsumer.java +++ /dev/null @@ -1,36 +0,0 @@ -package io.arex.inst.dynamic.common.listener; - -import io.arex.agent.bootstrap.ctx.TraceTransmitter; -import io.arex.inst.dynamic.common.DynamicClassExtractor; -import reactor.core.publisher.Mono; - -public class MonoConsumer { - - private final TraceTransmitter traceTransmitter; - private final DynamicClassExtractor extractor; - - public MonoConsumer(DynamicClassExtractor extractor) { - this.traceTransmitter = TraceTransmitter.create(); - this.extractor = extractor; - } - - /** - * support for Mono type recording - * @param responseMono - * @return - */ - public Mono accept(Mono responseMono) { - return responseMono - .doOnSuccess(result -> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - extractor.recordResponse(result); - } - }) - .doOnError(error-> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - extractor.recordResponse(error); - } - }); - } - -} diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java index 8b655757b..130f338e2 100644 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java +++ b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/DynamicClassExtractorTest.java @@ -5,8 +5,8 @@ import io.arex.agent.bootstrap.model.ArexMocker; import io.arex.agent.bootstrap.model.Mocker.Target; import io.arex.agent.thirdparty.util.time.DateFormatUtils; -import io.arex.inst.common.util.FluxUtil; -import io.arex.inst.dynamic.common.listener.MonoConsumer; +import io.arex.inst.common.util.FluxReplayUtil; +import io.arex.inst.common.util.MonoRecordFunction; import io.arex.inst.runtime.config.ConfigBuilder; import io.arex.inst.runtime.context.ArexContext; import io.arex.inst.runtime.context.ContextManager; @@ -28,6 +28,7 @@ import java.util.TimeZone; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.function.Function; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -122,14 +123,18 @@ void resetMonoResponse() { // exception Mono result = monoExceptionTest(); - MonoConsumer monoConsumer = new MonoConsumer(extractor); - result = monoConsumer.accept(result); + Function executor = res -> { + extractor.recordResponse(res); + return null; + }; + MonoRecordFunction monoRecordFunction = new MonoRecordFunction(executor); + result = monoRecordFunction.apply(result); result.subscribe(); assertTrue(nonNull.test(result)); // normal result = monoTest(); - result = monoConsumer.accept(result); + result = monoRecordFunction.apply(result); result.subscribe(); assertTrue(nonNull.test(result)); } catch (NoSuchMethodException e) { @@ -294,8 +299,8 @@ void restoreResponseTest() throws NoSuchMethodException, ExecutionException, Int Throwable.class); DynamicClassExtractor fluxTestExtractor = new DynamicClassExtractor(testReturnFlux, new Object[]{"mock"}, "#val", null); - List list = new ArrayList<>(); - FluxUtil.FluxResult fluxResult = new FluxUtil.FluxResult(null, list); + List list = new ArrayList<>(); + FluxReplayUtil.FluxResult fluxResult = new FluxReplayUtil.FluxResult(null, list); Object fluxNormalTest = fluxTestExtractor.restoreResponse(fluxResult); assertNull(((Flux) fluxNormalTest).blockFirst()); diff --git a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java b/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java deleted file mode 100644 index bef4fd909..000000000 --- a/arex-instrumentation/dynamic/arex-dynamic-common/src/test/java/io/arex/inst/dynamic/common/listener/FluxConsumerTest.java +++ /dev/null @@ -1,165 +0,0 @@ -package io.arex.inst.dynamic.common.listener; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNull; -import static org.junit.jupiter.api.Assertions.assertThrows; -import io.arex.inst.dynamic.common.DynamicClassExtractor; -import io.arex.inst.runtime.config.ConfigBuilder; -import io.arex.inst.runtime.context.ContextManager; -import java.lang.reflect.Method; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import reactor.core.publisher.Flux; - -public class FluxConsumerTest { - - static DynamicClassExtractor extractor; - static FluxConsumer fluxConsumer; - - @BeforeAll - static void setUp() { - Method testWithArexMock; - try { - testWithArexMock = FluxConsumerTest.class.getDeclaredMethod("testWithArexMock", String.class); - } catch (NoSuchMethodException e) { - testWithArexMock = null; - } - final Object[] args = {"errorSerialize"}; - extractor = new DynamicClassExtractor(testWithArexMock, args); - fluxConsumer = new FluxConsumer(extractor); - Mockito.mockStatic(ContextManager.class); - ConfigBuilder.create("test").enableDebug(true).build(); - } - - @AfterAll - static void tearDown() { - Mockito.clearAllCaches(); - } - - - @Test - void record() { - - // Record empty flux - testEmptyFlux(); - - // Record Exception - testFluxError(); - - // Normal conditions without exceptions or errors, all elements are recorded. - testNormalFlux(); - - // Elements before the error occurs and all elements in alternate Flux sequenceare when error occurs are recorded. - testFluxOnErrorResume(); - - // Except for the element when the error occurs, all other elements are recorded - testFluxOnErrorContinue(); - - // Elements before the error occurs and the exception are recorded (Flux terminates when exception is thrown). - testFluxOnError(); - } - - private static void testNormalFlux() { - Flux flux = Flux.just(1, 2, 3, 4, 5) - .doOnNext(val -> System.out.println("val" + ":" + val)) - // doFinally performs some operations that have nothing to do with the value of the element. - // If the doFinally operator is called multiple times, doFinally will be executed once at the end of each sequence. - .doFinally(System.out::println); - Flux subscribe = fluxConsumer.accept(flux); - Flux blockFirst = fluxConsumer.accept(flux); - // record content: 1,2,3,4,5 - subscribe.subscribe(); - // record content: 1 - assertEquals(blockFirst.blockFirst(), 1); - } - - private static void testEmptyFlux() { - Flux flux = Flux.empty(); - Flux subscribe = fluxConsumer.accept(flux); - Flux blockFirst = fluxConsumer.accept(flux); - // record content: 1,2,3,4,5 - subscribe.subscribe(); - // record content: 1 - assertNull(blockFirst.blockFirst()); - } - - - private static void testFluxOnErrorResume() { - Flux flux = Flux.just(1, 2) - .doOnNext(val -> { - if (val.equals(2)) { - throw new RuntimeException("error"); - } - }) - .doOnError(t -> System.out.println("error" + ":" + t)) - // returns an alternate Flux sequence when a Flux error occurs, - .onErrorResume(t -> Flux.just(7, 8, 9)); - - Flux subscribe = fluxConsumer.accept(flux); - Flux blockFirst = fluxConsumer.accept(flux); - - // record content: 1,7,8,9 - subscribe.subscribe(); - // record content: 1 - assertEquals(blockFirst.blockFirst(), 1); - } - - private static void testFluxOnError() { - final Flux flux = Flux.just(1, 2, 3, 4, 5) - .doOnNext(val -> { - if (val.equals(3)) { - throw new RuntimeException("error"); - } - }) - .doOnError(t -> System.out.println("error" + ":" + t)); - - Flux subscribe = fluxConsumer.accept(flux); - Flux blockFirst = fluxConsumer.accept(flux); - Flux blockLast = fluxConsumer.accept(flux); - - // record content: 1,2,RuntimeException - subscribe.subscribe(); - // record content: 1 - assertEquals(blockFirst.blockFirst(), 1); - // record content: RuntimeException - assertThrows(RuntimeException.class, () -> blockLast.blockLast()); - } - - private static void testFluxOnErrorContinue() { - Flux flux = Flux.just(1, 2, 3, 4, 5) - .doOnNext(val -> { - if (val.equals(3)) { - throw new RuntimeException("error"); - } - }) - .onErrorContinue((t, o) -> System.out.println("error" + ":" + t)) - .doOnNext(val -> System.out.println("val" + ":" + val)); - Flux subscribe = fluxConsumer.accept(flux); - Flux blockFirst = fluxConsumer.accept(flux); - Flux blockLast = fluxConsumer.accept(flux); - - // record content: 1,2,4,5 - subscribe.subscribe(); - // record content: 1 - assertEquals(blockFirst.blockFirst(), 1); - // record content: 5 - assertEquals(blockLast.blockLast(), 5); - } - - private static void testFluxError() { - Flux flux = Flux.error(new RuntimeException("error")); - Flux subscribe = fluxConsumer.accept(flux); - Flux blockFirst = fluxConsumer.accept(flux); - // record content: RuntimeException - subscribe.subscribe(); - // record content: RuntimeException - assertThrows(RuntimeException.class, () -> blockFirst.blockFirst()); - } - - public String testWithArexMock(String val) { - return val + "testWithArexMock"; - } - -} diff --git a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapper.java b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapper.java index f635f858f..138933e09 100644 --- a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapper.java +++ b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapper.java @@ -1,9 +1,7 @@ package io.arex.inst.lettuce.v5.cluster; -import io.arex.agent.bootstrap.ctx.TraceTransmitter; -import io.arex.agent.bootstrap.model.MockResult; import io.arex.inst.redis.common.RedisConnectionManager; -import io.arex.inst.redis.common.RedisExtractor; +import io.arex.inst.redis.common.lettuce.LettuceClusterUtil; import io.arex.inst.redis.common.lettuce.RedisCommandBuilderImpl; import io.arex.inst.redis.common.lettuce.wrapper.RedisCommandWrapper; import io.arex.inst.runtime.context.ContextManager; @@ -16,7 +14,6 @@ import io.lettuce.core.output.KeyStreamingChannel; import io.lettuce.core.output.KeyValueStreamingChannel; import io.lettuce.core.output.ValueStreamingChannel; -import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.Command; import java.util.Arrays; import java.util.Date; @@ -30,7 +27,7 @@ public class RedisClusterAsyncCommandsImplWrapper extends RedisAdvancedClusterAsyncCommandsImpl { private String redisUri; - private RedisCommandWrapper redisCommandWrapper; + private final RedisCommandWrapper redisCommandWrapper; private final RedisCommandBuilderImpl commandBuilder; public RedisClusterAsyncCommandsImplWrapper(StatefulRedisClusterConnection connection, @@ -387,137 +384,128 @@ public RedisFuture zcard(K key) { return redisCommandWrapper.zcard(this, getRedisUri(), key); } - // The following methods are special handling in Redis cluster @Override public RedisFuture del(K... keys) { return del(Arrays.asList(keys)); } + @Override - public RedisFuture del( - Iterable keys) { - Command cmd = commandBuilder.del(keys); + public RedisFuture del(Iterable keys) { + Command cmd = commandBuilder.del(keys); if (ContextManager.needReplay()) { - return replay("DEL",keys.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("DEL", keys.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.del(keys); if (ContextManager.needRecord()) { - clusterAsyncRecord(keys.toString(), resultFuture, "DEL"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "DEL", getRedisUri()); } return resultFuture; } + @Override - public RedisFuture exists( K... keys) { - return exists( Arrays.asList(keys)); + public RedisFuture exists(K... keys) { + return exists(Arrays.asList(keys)); } @Override - public RedisFuture exists( - Iterable keys) { - Command cmd = commandBuilder.exists(keys); + public RedisFuture exists(Iterable keys) { + Command cmd = commandBuilder.exists(keys); if (ContextManager.needReplay()) { - return replay("EXISTS",keys.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("EXISTS", keys.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.exists(keys); if (ContextManager.needRecord()) { - clusterAsyncRecord(keys.toString(), resultFuture, "EXISTS"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "EXISTS", getRedisUri()); } return resultFuture; - } @Override - public RedisFuture> keys( K pattern) { - Command cmd = commandBuilder.keys(pattern); + public RedisFuture> keys(K pattern) { + Command> cmd = commandBuilder.keys(pattern); if (ContextManager.needReplay()) { - return replay("KEYS", pattern.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("KEYS", pattern.toString(), cmd, getRedisUri()); } RedisFuture> resultFuture = super.keys(pattern); if (ContextManager.needRecord()) { - clusterAsyncRecord( pattern.toString(), resultFuture, "KEYS"); + LettuceClusterUtil.clusterAsyncRecord(pattern.toString(), resultFuture, "KEYS", getRedisUri()); } return resultFuture; } @Override - public RedisFuture keys( - KeyStreamingChannel channel, K pattern) { - Command cmd = commandBuilder.keys(channel, pattern); + public RedisFuture keys(KeyStreamingChannel channel, K pattern) { + Command cmd = commandBuilder.keys(channel, pattern); if (ContextManager.needReplay()) { - return replay("KEYS", pattern.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("KEYS", pattern.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.keys(channel, pattern); if (ContextManager.needRecord()) { - clusterAsyncRecord( pattern.toString(), resultFuture, "KEYS"); + LettuceClusterUtil.clusterAsyncRecord(pattern.toString(), resultFuture, "KEYS", getRedisUri()); } return resultFuture; } @Override - public RedisFuture>> mget( - K... keys) { - return mget( Arrays.asList(keys)); + public RedisFuture>> mget(K... keys) { + return mget(Arrays.asList(keys)); } @Override - public RedisFuture>> mget( - Iterable keys) { - Command cmd = commandBuilder.mgetKeyValue(keys); + public RedisFuture>> mget(Iterable keys) { + Command>> cmd = commandBuilder.mgetKeyValue(keys); if (ContextManager.needReplay()) { - return replay("MGET", keys.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("MGET", keys.toString(), cmd, getRedisUri()); } RedisFuture>> resultFuture = super.mget(keys); if (ContextManager.needRecord()) { - clusterAsyncRecord( keys.toString(), resultFuture, "MGET"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "MGET", getRedisUri()); } return resultFuture; } @Override - public RedisFuture mget( - KeyValueStreamingChannel channel, K... keys) { - return mget( channel, Arrays.asList(keys)); + public RedisFuture mget(KeyValueStreamingChannel channel, K... keys) { + return mget(channel, Arrays.asList(keys)); } @Override - public RedisFuture mget( - KeyValueStreamingChannel channel, Iterable keys) { - Command cmd = commandBuilder.mget(channel, keys); + public RedisFuture mget(KeyValueStreamingChannel channel, Iterable keys) { + Command cmd = commandBuilder.mget(channel, keys); if (ContextManager.needReplay()) { - return replay("MGET", keys.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("MGET", keys.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.mget(channel, keys); if (ContextManager.needRecord()) { - clusterAsyncRecord( keys.toString(), resultFuture, "MGET"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "MGET", getRedisUri()); } return resultFuture; } @Override - public RedisFuture mset( - Map map) { - Command cmd = commandBuilder.mset(map); + public RedisFuture mset(Map map) { + Command cmd = commandBuilder.mset(map); if (ContextManager.needReplay()) { - return replay("MSET", map.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("MSET", map.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.mset(map); if (ContextManager.needRecord()) { - clusterAsyncRecord( map.toString(), resultFuture, "MSET"); + LettuceClusterUtil.clusterAsyncRecord(map.toString(), resultFuture, "MSET", getRedisUri()); } return resultFuture; } @Override - public RedisFuture msetnx( - Map map) { - Command cmd = commandBuilder.msetnx(map); + public RedisFuture msetnx(Map map) { + Command cmd = commandBuilder.msetnx(map); if (ContextManager.needReplay()) { - return replay("MSETNX", map.toString(), cmd); + LettuceClusterUtil.clusterAsynReplay("MSETNX", map.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.msetnx(map); if (ContextManager.needRecord()) { - clusterAsyncRecord( map.toString(), resultFuture, "MSETNX"); + LettuceClusterUtil.clusterAsyncRecord(map.toString(), resultFuture, "MSETNX", getRedisUri()); } return resultFuture; } @@ -528,33 +516,33 @@ private String getRedisUri() { } return redisUri; } - - public RedisFuture replay(String methodName, String key, Command cmd) { - RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); - MockResult mockResult = extractor.replay(); - AsyncCommand asyncCommand = new AsyncCommand<>(cmd); - if (mockResult.notIgnoreMockResult()) { - if (mockResult.getThrowable() != null) { - asyncCommand.completeExceptionally(mockResult.getThrowable()); - } else { - asyncCommand.complete(mockResult.getResult()); - } - } - return asyncCommand; - } - - public void clusterAsyncRecord(String key, RedisFuture resultFuture, String methodName) { - try (TraceTransmitter traceTransmitter = TraceTransmitter.create()) { - resultFuture.whenComplete((v, throwable) -> { - RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); - traceTransmitter.transmit(); - if (throwable != null) { - extractor.record(throwable); - } else { - extractor.record(v); - } - }); - } - } +// +// public RedisFuture LettuceClusterUtil.clusterAsynReplay(String methodName, String key, Command cmd) { +// RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); +// MockResult mockResult = extractor.replay(); +// AsyncCommand asyncCommand = new AsyncCommand<>(cmd); +// if (mockResult.notIgnoreMockResult()) { +// if (mockResult.getThrowable() != null) { +// asyncCommand.completeExceptionally(mockResult.getThrowable()); +// } else { +// asyncCommand.complete((R) mockResult.getResult()); +// } +// } +// return asyncCommand; +// } + +// public void clusterAsyncRecord(String key, RedisFuture resultFuture, String methodName) { +// try (TraceTransmitter traceTransmitter = TraceTransmitter.create()) { +// resultFuture.whenComplete((v, throwable) -> { +// RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); +// traceTransmitter.transmit(); +// if (throwable != null) { +// extractor.record(throwable); +// } else { +// extractor.record(v); +// } +// }); +// } +// } } diff --git a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterClientExtractor.java b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterClientExtractor.java deleted file mode 100644 index 3f1f273f8..000000000 --- a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterClientExtractor.java +++ /dev/null @@ -1,18 +0,0 @@ -package io.arex.inst.lettuce.v5.cluster; - -import io.arex.inst.redis.common.RedisConnectionManager; -import io.lettuce.core.RedisURI; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; - -import java.util.concurrent.CompletableFuture; - -public class RedisClusterClientExtractor { - - private RedisClusterClientExtractor() {} - - public static void addConnection(CompletableFuture> connectionFuture, Iterable redisURIs) { - connectionFuture.thenAccept(connection -> - // take first uri - RedisConnectionManager.add(connection.hashCode(), redisURIs.iterator().next().toString())); - } -} diff --git a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterReactiveCommandsImplWrapper.java b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterReactiveCommandsImplWrapper.java index 50152fa07..3859aeae6 100644 --- a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterReactiveCommandsImplWrapper.java +++ b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/RedisClusterReactiveCommandsImplWrapper.java @@ -1,7 +1,7 @@ package io.arex.inst.lettuce.v5.cluster; import io.arex.inst.redis.common.RedisConnectionManager; -import io.arex.inst.redis.common.lettuce.RedisClusterReactiveResultUtil; +import io.arex.inst.redis.common.lettuce.ReactorStreamUtil; import io.arex.inst.redis.common.lettuce.wrapper.RedisReactiveCommandWrapper; import io.arex.inst.runtime.context.ContextManager; import io.lettuce.core.KeyValue; @@ -33,7 +33,8 @@ public class RedisClusterReactiveCommandsImplWrapper extends RedisAdvanced * @param connection the connection to operate on. * @param codec the codec for command encoding. */ - public RedisClusterReactiveCommandsImplWrapper(StatefulRedisClusterConnection connection, RedisCodec codec) { + public RedisClusterReactiveCommandsImplWrapper(StatefulRedisClusterConnection connection, + RedisCodec codec) { super(connection, codec); this.reactiveCommandsWrapper = new RedisReactiveCommandWrapper<>(codec); } @@ -82,6 +83,7 @@ public Mono getbit(K key, long offset) { public Mono getrange(K key, long start, long end) { return reactiveCommandsWrapper.getrange(this, getRedisUri(), key, start, end); } + @Override public Mono getset(K key, V value) { return reactiveCommandsWrapper.getset(this, getRedisUri(), key, value); @@ -103,7 +105,7 @@ public Mono hget(K key, K field) { } @Override - public Mono> hgetall(K key) { + public Mono> hgetall(K key) { return reactiveCommandsWrapper.hgetallMono(this, getRedisUri(), key); } @@ -131,6 +133,7 @@ public Flux hkeys(K key) { public Mono hkeys(KeyStreamingChannel channel, K key) { return reactiveCommandsWrapper.hkeys(this, getRedisUri(), channel, key); } + @Override public Mono hlen(K key) { return reactiveCommandsWrapper.hlen(this, getRedisUri(), key); @@ -392,11 +395,11 @@ public Mono del(K... keys) { @Override public Mono del(Iterable keys) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "DEL", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "DEL", keys.toString(), null); } Mono result = super.del(keys); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(), result, "DEL", keys.toString(), + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(), result, "DEL", keys.toString(), null); } return result; @@ -410,12 +413,12 @@ public Mono exists(K... keys) { @Override public Mono exists(Iterable keys) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "EXIST", keys.toString(), + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "EXIST", keys.toString(), null); } Mono result = super.exists(keys); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(), result, "EXIST", keys.toString(), + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(), result, "EXIST", keys.toString(), null); } return result; @@ -424,11 +427,11 @@ public Mono exists(Iterable keys) { @Override public Flux keys(K pattern) { if (ContextManager.needReplay()) { - return (Flux) RedisClusterReactiveResultUtil.fluxReplay(getRedisUri(), "KEYS", pattern.toString(), null); + return (Flux) ReactorStreamUtil.fluxReplay(getRedisUri(), "KEYS", pattern.toString(), null); } Flux result = super.keys(pattern); if (ContextManager.needRecord()) { - return (Flux) RedisClusterReactiveResultUtil.fluxRecord(getRedisUri(), result, "KEYS", pattern.toString(), + return (Flux) ReactorStreamUtil.fluxRecord(getRedisUri(), result, "KEYS", pattern.toString(), null); } return result; @@ -437,12 +440,12 @@ public Flux keys(K pattern) { @Override public Mono keys(KeyStreamingChannel channel, K pattern) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "KEYS", pattern.toString(), + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "KEYS", pattern.toString(), null); } Mono result = super.keys(channel, pattern); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(), result, "KEYS", pattern.toString(), + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(), result, "KEYS", pattern.toString(), null); } return result; @@ -451,12 +454,12 @@ public Mono keys(KeyStreamingChannel channel, K pattern) { @Override public Flux> mget(Iterable keys) { if (ContextManager.needReplay()) { - return (Flux>) RedisClusterReactiveResultUtil.fluxReplay(getRedisUri(), "MGET", + return (Flux>) ReactorStreamUtil.fluxReplay(getRedisUri(), "MGET", keys.toString(), null); } Flux> result = super.mget(keys); if (ContextManager.needRecord()) { - return (Flux>) RedisClusterReactiveResultUtil.fluxRecord(getRedisUri(), result, "MGET", + return (Flux>) ReactorStreamUtil.fluxRecord(getRedisUri(), result, "MGET", keys.toString(), null); } return result; @@ -470,11 +473,11 @@ public Mono mget(KeyValueStreamingChannel channel, K... keys) { @Override public Mono mget(KeyValueStreamingChannel channel, Iterable keys) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "MGET", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "MGET", keys.toString(), null); } Mono result = super.mget(channel, keys); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(), result, "MGET", keys.toString(), + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(), result, "MGET", keys.toString(), null); } return result; @@ -483,12 +486,12 @@ public Mono mget(KeyValueStreamingChannel channel, Iterable keys) @Override public Mono msetnx(Map map) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "MSETNX", map.toString(), + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "MSETNX", map.toString(), null); } Mono result = super.msetnx(map); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(), result, "MSETNX", + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(), result, "MSETNX", map.toString(), null); } return result; diff --git a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/inst/RedisClusterClientInstrumentation.java b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/inst/RedisClusterClientInstrumentation.java index f9969d2f3..f629b1c84 100644 --- a/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/inst/RedisClusterClientInstrumentation.java +++ b/arex-instrumentation/redis/arex-lettuce-v5/src/main/java/io/arex/inst/lettuce/v5/cluster/inst/RedisClusterClientInstrumentation.java @@ -2,9 +2,8 @@ import io.arex.inst.extension.MethodInstrumentation; import io.arex.inst.extension.TypeInstrumentation; -import io.arex.inst.lettuce.v5.cluster.RedisClusterClientExtractor; +import io.arex.inst.redis.common.RedisConnectionManager; import io.lettuce.core.RedisURI; -import io.lettuce.core.cluster.RedisClusterClient; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import net.bytebuddy.asm.Advice; import net.bytebuddy.description.method.MethodDescription; @@ -13,6 +12,7 @@ import java.util.Collections; import java.util.List; import java.util.concurrent.CompletableFuture; + import static net.bytebuddy.matcher.ElementMatchers.named; import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; @@ -20,6 +20,7 @@ * RedisClusterClientInstrumentation */ public class RedisClusterClientInstrumentation extends TypeInstrumentation { + @Override protected ElementMatcher typeMatcher() { return named("io.lettuce.core.cluster.RedisClusterClient"); @@ -27,17 +28,18 @@ protected ElementMatcher typeMatcher() { @Override public List methodAdvices() { - ElementMatcher matcher = namedOneOf("connectAsync","connectClusterAsync"); + ElementMatcher matcher = namedOneOf("connectAsync", "connectClusterAsync"); return Collections.singletonList( new MethodInstrumentation(matcher, NewStatefulRedisConnectionAdvice.class.getName())); } public static class NewStatefulRedisConnectionAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( @Advice.Return(readOnly = false) CompletableFuture> connectionFuture, @Advice.FieldValue("initialUris") Iterable redisURIs) { - RedisClusterClientExtractor.addConnection(connectionFuture, redisURIs); + RedisConnectionManager.addClusterConnection(connectionFuture, redisURIs); } } } diff --git a/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapperTest.java b/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapperTest.java index cfcee06f6..2cf1738ec 100644 --- a/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapperTest.java +++ b/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/cluster/RedisClusterAsyncCommandsImplWrapperTest.java @@ -111,17 +111,17 @@ void dispatch(Runnable mocker, Predicate> predicate, MockResult m } } - @Test - void testRecord(){ - AsyncCommand command = new AsyncCommand(cmd); - try (MockedConstruction mocked = Mockito.mockConstruction(RedisExtractor.class, - (extractor, context) -> { - Mockito.doNothing().when(extractor).record(any()); - })) { - command.complete("mock"); - target.clusterAsyncRecord("key", command, "field"); - } - } +// @Test +// void testRecord(){ +// AsyncCommand command = new AsyncCommand(cmd); +// try (MockedConstruction mocked = Mockito.mockConstruction(RedisExtractor.class, +// (extractor, context) -> { +// Mockito.doNothing().when(extractor).record(any()); +// })) { +// command.complete("mock"); +// target.clusterAsyncRecord("key", command, "field"); +// } +// } static Stream dispatchCase() { Runnable mocker1 = () -> { diff --git a/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/standalone/RedisReactiveCommandsImplWrapperTest.java b/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/standalone/RedisReactiveCommandsImplWrapperTest.java index c11f55bc5..1dbe64512 100644 --- a/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/standalone/RedisReactiveCommandsImplWrapperTest.java +++ b/arex-instrumentation/redis/arex-lettuce-v5/src/test/java/io/arex/inst/lettuce/v5/standalone/RedisReactiveCommandsImplWrapperTest.java @@ -6,7 +6,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import io.arex.agent.bootstrap.model.MockResult; -import io.arex.inst.common.util.FluxUtil; +import io.arex.inst.common.util.FluxReplayUtil; import io.arex.inst.redis.common.RedisConnectionManager; import io.arex.inst.redis.common.RedisExtractor; import io.arex.inst.runtime.context.ContextManager; @@ -64,7 +64,7 @@ static void setUp() { //mock static class Mockito.mockStatic(ContextManager.class); Mockito.mockStatic(RedisConnectionManager.class); - Mockito.mockStatic(FluxUtil.class); + Mockito.mockStatic(FluxReplayUtil.class); //mock object connection = Mockito.mock(StatefulRedisConnection.class); @@ -126,7 +126,7 @@ static Stream dispatchCase() { Mockito.when(RedisConnectionManager.getRedisUri(anyInt())).thenReturn(""); Mockito.when(ContextManager.needReplay()).thenReturn(true); Mockito.when(cmd.getType()).thenReturn(Mockito.mock(ProtocolKeyword.class)); - Mockito.when(FluxUtil.restore(any())).thenReturn(Flux.empty()); + Mockito.when(FluxReplayUtil.restore(any())).thenReturn(Flux.empty()); }; Runnable mocker2 = () -> { Mockito.when(ContextManager.needReplay()).thenReturn(false); diff --git a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapper.java b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapper.java index 73a04da1d..2db99345a 100644 --- a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapper.java +++ b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapper.java @@ -1,9 +1,7 @@ package io.arex.inst.lettuce.v6.cluster; -import io.arex.agent.bootstrap.ctx.TraceTransmitter; -import io.arex.agent.bootstrap.model.MockResult; import io.arex.inst.redis.common.RedisConnectionManager; -import io.arex.inst.redis.common.RedisExtractor; +import io.arex.inst.redis.common.lettuce.LettuceClusterUtil; import io.arex.inst.redis.common.lettuce.RedisCommandBuilderImpl; import io.arex.inst.redis.common.lettuce.wrapper.RedisCommandWrapper; import io.arex.inst.runtime.context.ContextManager; @@ -16,7 +14,6 @@ import io.lettuce.core.output.KeyStreamingChannel; import io.lettuce.core.output.KeyValueStreamingChannel; import io.lettuce.core.output.ValueStreamingChannel; -import io.lettuce.core.protocol.AsyncCommand; import io.lettuce.core.protocol.Command; import java.util.Arrays; import java.util.Date; @@ -113,7 +110,7 @@ public RedisFuture> hgetall(K key) { @Override public RedisFuture hgetall(KeyValueStreamingChannel channel, K key) { - return redisCommandWrapper.hgetall(this, getRedisUri(),channel, key); + return redisCommandWrapper.hgetall(this, getRedisUri(), channel, key); } @Override @@ -133,7 +130,7 @@ public RedisFuture> hkeys(K key) { @Override public RedisFuture hkeys(KeyStreamingChannel channel, K key) { - return redisCommandWrapper.hkeys(this, getRedisUri(), channel,key); + return redisCommandWrapper.hkeys(this, getRedisUri(), channel, key); } @Override @@ -148,7 +145,7 @@ public RedisFuture>> hmget(K key, K... fields) { @Override public RedisFuture hmget(KeyValueStreamingChannel channel, K key, K... fields) { - return redisCommandWrapper.hmget(this, getRedisUri(), channel,key, fields); + return redisCommandWrapper.hmget(this, getRedisUri(), channel, key, fields); } @Override @@ -178,7 +175,7 @@ public RedisFuture> hvals(K key) { @Override public RedisFuture hvals(ValueStreamingChannel channel, K key) { - return redisCommandWrapper.hvals(this, getRedisUri(),channel, key); + return redisCommandWrapper.hvals(this, getRedisUri(), channel, key); } @Override @@ -399,15 +396,14 @@ public RedisFuture del(K... keys) { } @Override - public RedisFuture del( - Iterable keys) { - Command cmd = commandBuilder.del(keys); + public RedisFuture del(Iterable keys) { + Command cmd = commandBuilder.del(keys); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("DEL", keys.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("DEL", keys.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.del(keys); if (ContextManager.needRecord()) { - clusterAsyncRecord(keys.toString(), resultFuture, "DEL"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "DEL", getRedisUri()); } return resultFuture; } @@ -418,15 +414,14 @@ public RedisFuture exists(K... keys) { } @Override - public RedisFuture exists( - Iterable keys) { - Command cmd = commandBuilder.exists(keys); + public RedisFuture exists(Iterable keys) { + Command cmd = commandBuilder.exists(keys); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("EXISTS", keys.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("EXISTS", keys.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.exists(keys); if (ContextManager.needRecord()) { - clusterAsyncRecord(keys.toString(), resultFuture, "EXISTS"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "EXISTS", getRedisUri()); } return resultFuture; @@ -434,95 +429,88 @@ public RedisFuture exists( @Override public RedisFuture> keys(K pattern) { - Command cmd = commandBuilder.keys(pattern); + Command> cmd = commandBuilder.keys(pattern); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("KEYS", pattern.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("KEYS", pattern.toString(), cmd, getRedisUri()); } RedisFuture> resultFuture = super.keys(pattern); if (ContextManager.needRecord()) { - clusterAsyncRecord(pattern.toString(), resultFuture, "KEYS"); + LettuceClusterUtil.clusterAsyncRecord(pattern.toString(), resultFuture, "KEYS", getRedisUri()); } return resultFuture; } @Override - public RedisFuture keys( - KeyStreamingChannel channel, K pattern) { - Command cmd = commandBuilder.keys(channel, pattern); + public RedisFuture keys(KeyStreamingChannel channel, K pattern) { + Command cmd = commandBuilder.keys(channel, pattern); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("KEYS", pattern.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("KEYS", pattern.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.keys(channel, pattern); if (ContextManager.needRecord()) { - clusterAsyncRecord(pattern.toString(), resultFuture, "KEYS"); + LettuceClusterUtil.clusterAsyncRecord(pattern.toString(), resultFuture, "KEYS", getRedisUri()); } return resultFuture; } @Override - public RedisFuture>> mget( - K... keys) { + public RedisFuture>> mget(K... keys) { return mget(Arrays.asList(keys)); } @Override - public RedisFuture>> mget( - Iterable keys) { - Command cmd = commandBuilder.mgetKeyValue(keys); + public RedisFuture>> mget(Iterable keys) { + Command>> cmd = commandBuilder.mgetKeyValue(keys); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("MGET", keys.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("MGET", keys.toString(), cmd, getRedisUri()); } RedisFuture>> resultFuture = super.mget(keys); if (ContextManager.needRecord()) { - clusterAsyncRecord(keys.toString(), resultFuture, "MGET"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "MGET", getRedisUri()); } return resultFuture; } @Override - public RedisFuture mget( - KeyValueStreamingChannel channel, K... keys) { + public RedisFuture mget(KeyValueStreamingChannel channel, K... keys) { return mget(channel, Arrays.asList(keys)); } @Override - public RedisFuture mget( - KeyValueStreamingChannel channel, Iterable keys) { - Command cmd = commandBuilder.mget(channel, keys); + public RedisFuture mget(KeyValueStreamingChannel channel, Iterable keys) { + Command cmd = commandBuilder.mget(channel, keys); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("MGET", keys.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("MGET", keys.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.mget(channel, keys); if (ContextManager.needRecord()) { - clusterAsyncRecord(keys.toString(), resultFuture, "MGET"); + LettuceClusterUtil.clusterAsyncRecord(keys.toString(), resultFuture, "MGET", getRedisUri()); } return resultFuture; } @Override - public RedisFuture mset( - Map map) { - Command cmd = commandBuilder.mset(map); + public RedisFuture mset(Map map) { + Command cmd = commandBuilder.mset(map); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("MSET", map.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("MSET", map.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.mset(map); if (ContextManager.needRecord()) { - clusterAsyncRecord(map.toString(), resultFuture, "MSET"); + LettuceClusterUtil.clusterAsyncRecord(map.toString(), resultFuture, "MSET", getRedisUri()); } return resultFuture; } @Override - public RedisFuture msetnx( - Map map) { - Command cmd = commandBuilder.msetnx(map); + public RedisFuture msetnx(Map map) { + Command cmd = commandBuilder.msetnx(map); if (ContextManager.needReplay()) { - return redisClusterAsynReplay("MSETNX", map.toString(), cmd); + return LettuceClusterUtil.clusterAsynReplay("MSETNX", map.toString(), cmd, getRedisUri()); } RedisFuture resultFuture = super.msetnx(map); if (ContextManager.needRecord()) { - clusterAsyncRecord(map.toString(), resultFuture, "MSETNX"); + LettuceClusterUtil.clusterAsyncRecord(map.toString(), resultFuture, "MSETNX", getRedisUri()); } return resultFuture; } @@ -534,31 +522,31 @@ private String getRedisUri() { return redisUri; } - private RedisFuture redisClusterAsynReplay(String methodName, String key, Command cmd) { - RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); - MockResult mockResult = extractor.replay(); - AsyncCommand asyncCommand = new AsyncCommand<>(cmd); - if (mockResult.notIgnoreMockResult()) { - if (mockResult.getThrowable() != null) { - asyncCommand.completeExceptionally(mockResult.getThrowable()); - } else { - asyncCommand.complete(mockResult.getResult()); - } - } - return asyncCommand; - } - - public void clusterAsyncRecord(String key, RedisFuture resultFuture, String methodName) { - try (TraceTransmitter traceTransmitter = TraceTransmitter.create()) { - resultFuture.whenComplete((v, throwable) -> { - RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); - traceTransmitter.transmit(); - if (throwable != null) { - extractor.record(throwable); - } else { - extractor.record(v); - } - }); - } - } +// private RedisFuture LettuceClusterUtil.clusterAsynReplay(String methodName, String key, Command cmd) { +// RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); +// MockResult mockResult = extractor.replay(); +// AsyncCommand asyncCommand = new AsyncCommand<>(cmd); +// if (mockResult.notIgnoreMockResult()) { +// if (mockResult.getThrowable() != null) { +// asyncCommand.completeExceptionally(mockResult.getThrowable()); +// } else { +// asyncCommand.complete(mockResult.getResult()); +// } +// } +// return asyncCommand; +// } +// +// public void LettuceClusterUtil.clusterAsyncRecord(String key, RedisFuture resultFuture, String methodName) { +// try (TraceTransmitter traceTransmitter = TraceTransmitter.create()) { +// resultFuture.whenComplete((v, throwable) -> { +// RedisExtractor extractor = new RedisExtractor(getRedisUri(), methodName, key, null); +// traceTransmitter.transmit(); +// if (throwable != null) { +// extractor.record(throwable); +// } else { +// extractor.record(v); +// } +// }); +// } +// } } diff --git a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterClientExtractor.java b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterClientExtractor.java deleted file mode 100644 index 9dff7caf2..000000000 --- a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterClientExtractor.java +++ /dev/null @@ -1,17 +0,0 @@ -package io.arex.inst.lettuce.v6.cluster; - -import io.arex.inst.redis.common.RedisConnectionManager; -import io.lettuce.core.RedisURI; -import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; -import java.util.concurrent.CompletableFuture; - -public class RedisClusterClientExtractor { - - private RedisClusterClientExtractor() {} - - public static void addConnection(CompletableFuture> connectionFuture, Iterable redisURIs) { - connectionFuture.thenAccept(connection -> - // take first uri - RedisConnectionManager.add(connection.hashCode(), redisURIs.iterator().next().toString())); - } -} diff --git a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterReactiveCommandsImplWrapper.java b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterReactiveCommandsImplWrapper.java index 7f1af1507..ee05981a5 100644 --- a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterReactiveCommandsImplWrapper.java +++ b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/RedisClusterReactiveCommandsImplWrapper.java @@ -1,7 +1,7 @@ package io.arex.inst.lettuce.v6.cluster; import io.arex.inst.redis.common.RedisConnectionManager; -import io.arex.inst.redis.common.lettuce.RedisClusterReactiveResultUtil; +import io.arex.inst.redis.common.lettuce.ReactorStreamUtil; import io.arex.inst.redis.common.lettuce.wrapper.RedisReactiveCommandWrapper; import io.arex.inst.runtime.context.ContextManager; import io.lettuce.core.GetExArgs; @@ -437,11 +437,11 @@ public Mono del(K... keys) { @Override public Mono del(Iterable keys) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "DEL", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "DEL", keys.toString(), null); } Mono result = super.del(keys); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(),result, "DEL", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(),result, "DEL", keys.toString(), null); } return result; } @@ -454,12 +454,12 @@ public Mono exists(K... keys) { @Override public Mono exists(Iterable keys) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "EXIST", keys.toString(), + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "EXIST", keys.toString(), null); } Mono result = super.exists(keys); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(),result, "EXIST", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(),result, "EXIST", keys.toString(), null); } return result; } @@ -467,11 +467,11 @@ public Mono exists(Iterable keys) { @Override public Flux keys(K pattern) { if (ContextManager.needReplay()) { - return (Flux) RedisClusterReactiveResultUtil.fluxReplay(getRedisUri(), "KEYS", pattern.toString(), null); + return (Flux) ReactorStreamUtil.fluxReplay(getRedisUri(), "KEYS", pattern.toString(), null); } Flux result = super.keys(pattern); if (ContextManager.needRecord()) { - return (Flux) RedisClusterReactiveResultUtil.fluxRecord(getRedisUri(),result, "KEYS", pattern.toString(), null); + return (Flux) ReactorStreamUtil.fluxRecord(getRedisUri(),result, "KEYS", pattern.toString(), null); } return result; } @@ -479,12 +479,12 @@ public Flux keys(K pattern) { @Override public Mono keys(KeyStreamingChannel channel, K pattern) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "KEYS", pattern.toString(), + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "KEYS", pattern.toString(), null); } Mono result = super.keys(channel,pattern); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(),result, "KEYS", pattern.toString(), null); + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(),result, "KEYS", pattern.toString(), null); } return result; } @@ -492,12 +492,12 @@ public Mono keys(KeyStreamingChannel channel, K pattern) { @Override public Flux> mget(Iterable keys) { if (ContextManager.needReplay()) { - return (Flux>) RedisClusterReactiveResultUtil.fluxReplay(getRedisUri(), "MGET", + return (Flux>) ReactorStreamUtil.fluxReplay(getRedisUri(), "MGET", keys.toString(), null); } Flux> result = super.mget(keys); if (ContextManager.needRecord()) { - return (Flux>) RedisClusterReactiveResultUtil.fluxRecord(getRedisUri(),result, "MGET", keys.toString(), null); + return (Flux>) ReactorStreamUtil.fluxRecord(getRedisUri(),result, "MGET", keys.toString(), null); } return result; } @@ -510,11 +510,11 @@ public Mono mget(KeyValueStreamingChannel channel, K... keys) { @Override public Mono mget(KeyValueStreamingChannel channel, Iterable keys) { if (ContextManager.needReplay()) { - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "MGET", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "MGET", keys.toString(), null); } Mono result = super.mget(channel, keys); if (ContextManager.needRecord()) { - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(),result, "MGET", keys.toString(), null); + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(),result, "MGET", keys.toString(), null); } return result; } @@ -522,12 +522,12 @@ public Mono mget(KeyValueStreamingChannel channel, Iterable keys) @Override public Mono msetnx(Map map) { if(ContextManager.needReplay()){ - return (Mono) RedisClusterReactiveResultUtil.monoReplay(getRedisUri(), "MSETNX", map.toString(), + return (Mono) ReactorStreamUtil.monoReplay(getRedisUri(), "MSETNX", map.toString(), null); } Mono result = super.msetnx(map); if(ContextManager.needRecord()){ - return (Mono) RedisClusterReactiveResultUtil.monoRecord(getRedisUri(),result, "MSETNX", map.toString(), null); + return (Mono) ReactorStreamUtil.monoRecord(getRedisUri(),result, "MSETNX", map.toString(), null); } return result; } diff --git a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/inst/RedisClusterClientInstrumentation.java b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/inst/RedisClusterClientInstrumentation.java index 5ada29224..a3fd16f27 100644 --- a/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/inst/RedisClusterClientInstrumentation.java +++ b/arex-instrumentation/redis/arex-lettuce-v6/src/main/java/io/arex/inst/lettuce/v6/cluster/inst/RedisClusterClientInstrumentation.java @@ -4,7 +4,7 @@ import static net.bytebuddy.matcher.ElementMatchers.namedOneOf; import io.arex.inst.extension.MethodInstrumentation; import io.arex.inst.extension.TypeInstrumentation; -import io.arex.inst.lettuce.v6.cluster.RedisClusterClientExtractor; +import io.arex.inst.redis.common.RedisConnectionManager; import io.lettuce.core.RedisURI; import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import java.util.Collections; @@ -19,6 +19,7 @@ * RedisClusterClientInstrumentation */ public class RedisClusterClientInstrumentation extends TypeInstrumentation { + @Override protected ElementMatcher typeMatcher() { return named("io.lettuce.core.cluster.RedisClusterClient"); @@ -26,7 +27,7 @@ protected ElementMatcher typeMatcher() { @Override public List methodAdvices() { - ElementMatcher matcher = namedOneOf("connectAsync","connectClusterAsync"); + ElementMatcher matcher = namedOneOf("connectAsync", "connectClusterAsync"); return Collections.singletonList( new MethodInstrumentation(matcher, NewStatefulRedisConnectionAdvice.class.getName())); @@ -34,11 +35,12 @@ public List methodAdvices() { public static class NewStatefulRedisConnectionAdvice { + @Advice.OnMethodExit(suppress = Throwable.class) public static void onExit( - @Advice.Return(readOnly = false) CompletableFuture> connectionFuture, - @Advice.FieldValue("initialUris") Iterable redisURIs) { - RedisClusterClientExtractor.addConnection(connectionFuture, redisURIs); + @Advice.Return(readOnly = false) CompletableFuture> connectionFuture, + @Advice.FieldValue("initialUris") Iterable redisURIs) { + RedisConnectionManager.addClusterConnection(connectionFuture, redisURIs); } } } diff --git a/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapperTest.java b/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapperTest.java index 8e245c9be..d21454d5b 100644 --- a/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapperTest.java +++ b/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/cluster/RedisClusterAsyncCommandsImplWrapperTest.java @@ -116,17 +116,17 @@ void dispatch(Runnable mocker, Predicate> predicate, MockResult m } } - @Test - void testRecord(){ - AsyncCommand command = new AsyncCommand(cmd); - try (MockedConstruction mocked = Mockito.mockConstruction(RedisExtractor.class, - (extractor, context) -> { - Mockito.doNothing().when(extractor).record(any()); - })) { - command.complete("mock"); - target.clusterAsyncRecord("key", command, "field"); - } - } +// @Test +// void testRecord(){ +// AsyncCommand command = new AsyncCommand(cmd); +// try (MockedConstruction mocked = Mockito.mockConstruction(RedisExtractor.class, +// (extractor, context) -> { +// Mockito.doNothing().when(extractor).record(any()); +// })) { +// command.complete("mock"); +// target.clusterAsyncRecord("key", command, "field"); +// } +// } static Stream dispatchCase() { Runnable mocker1 = () -> { diff --git a/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/standalone/RedisReactiveCommandsImplWrapperTest.java b/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/standalone/RedisReactiveCommandsImplWrapperTest.java index f7148c444..c0ae26f8a 100644 --- a/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/standalone/RedisReactiveCommandsImplWrapperTest.java +++ b/arex-instrumentation/redis/arex-lettuce-v6/src/test/java/io/arex/inst/lettuce/v6/standalone/RedisReactiveCommandsImplWrapperTest.java @@ -6,7 +6,7 @@ import static org.mockito.ArgumentMatchers.anyInt; import io.arex.agent.bootstrap.model.MockResult; -import io.arex.inst.common.util.FluxUtil; +import io.arex.inst.common.util.FluxReplayUtil; import io.arex.inst.redis.common.RedisConnectionManager; import io.arex.inst.redis.common.RedisExtractor; import io.arex.inst.runtime.context.ContextManager; @@ -68,7 +68,7 @@ static void setUp() { //mock static class Mockito.mockStatic(ContextManager.class); Mockito.mockStatic(RedisConnectionManager.class); - Mockito.mockStatic(FluxUtil.class); + Mockito.mockStatic(FluxReplayUtil.class); //mock object connection = Mockito.mock(StatefulRedisConnection.class); diff --git a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/RedisConnectionManager.java b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/RedisConnectionManager.java index 51f2114d0..75b53974e 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/RedisConnectionManager.java +++ b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/RedisConnectionManager.java @@ -1,11 +1,18 @@ package io.arex.inst.redis.common; +import io.arex.agent.bootstrap.util.StringUtil; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; import java.util.Map; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; public final class RedisConnectionManager { private static final Map REDIS_URI_MAP = new ConcurrentHashMap<>(); + private RedisConnectionManager() { + } + public static void add(int connectionHash, String redisUri) { REDIS_URI_MAP.put(connectionHash, redisUri); } @@ -15,6 +22,10 @@ public static String getRedisUri(int connectionHash) { return REDIS_URI_MAP.get(connectionHash); } - private RedisConnectionManager() { + public static void addClusterConnection(CompletableFuture> connectionFuture, Iterable redisURIs) { + connectionFuture.thenAccept(connection -> + // take first uri + RedisConnectionManager.add(connection.hashCode(), StringUtil.join(redisURIs,","))); } + } diff --git a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/LettuceClusterUtil.java b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/LettuceClusterUtil.java new file mode 100644 index 000000000..56582ef77 --- /dev/null +++ b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/LettuceClusterUtil.java @@ -0,0 +1,42 @@ +package io.arex.inst.redis.common.lettuce; + +import io.arex.agent.bootstrap.ctx.TraceTransmitter; +import io.arex.agent.bootstrap.model.MockResult; +import io.arex.inst.redis.common.RedisExtractor; +import io.lettuce.core.RedisFuture; +import io.lettuce.core.protocol.AsyncCommand; +import io.lettuce.core.protocol.Command; + +public class LettuceClusterUtil { + + private LettuceClusterUtil(){} + + public static RedisFuture clusterAsynReplay(String methodName, String key, Command cmd, + String redisUri) { + RedisExtractor extractor = new RedisExtractor(redisUri, methodName, key, null); + MockResult mockResult = extractor.replay(); + AsyncCommand asyncCommand = new AsyncCommand<>(cmd); + if (mockResult.notIgnoreMockResult()) { + if (mockResult.getThrowable() != null) { + asyncCommand.completeExceptionally(mockResult.getThrowable()); + } else { + asyncCommand.complete((R) mockResult.getResult()); + } + } + return asyncCommand; + } + + public static void clusterAsyncRecord(String key, RedisFuture resultFuture, String methodName, String redisUri) { + try (TraceTransmitter traceTransmitter = TraceTransmitter.create()) { + resultFuture.whenComplete((v, throwable) -> { + RedisExtractor extractor = new RedisExtractor(redisUri, methodName, key, null); + traceTransmitter.transmit(); + if (throwable != null) { + extractor.record(throwable); + } else { + extractor.record(v); + } + }); + } + } +} diff --git a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/MonoConsumer.java b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/MonoConsumer.java deleted file mode 100644 index 452e61745..000000000 --- a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/MonoConsumer.java +++ /dev/null @@ -1,35 +0,0 @@ -package io.arex.inst.redis.common.lettuce; - -import io.arex.agent.bootstrap.ctx.TraceTransmitter; -import io.arex.inst.redis.common.RedisExtractor; -import reactor.core.publisher.Mono; - -public class MonoConsumer { - - private final TraceTransmitter traceTransmitter; - private final RedisExtractor extractor; - - public MonoConsumer(RedisExtractor extractor) { - this.traceTransmitter = TraceTransmitter.create(); - this.extractor = extractor; - } - - /** - * support for Mono type recording - * @param responseMono - * @return - */ - public Mono accept(Mono responseMono) { - return responseMono - .doOnSuccess(result -> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - extractor.record(result); - } - }) - .doOnError(error-> { - try (TraceTransmitter tm = traceTransmitter.transmit()) { - extractor.record(error); - } - }); - } -} diff --git a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/RedisClusterReactiveResultUtil.java b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/ReactorStreamUtil.java similarity index 67% rename from arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/RedisClusterReactiveResultUtil.java rename to arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/ReactorStreamUtil.java index 929dd50e3..a2e2618ae 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/RedisClusterReactiveResultUtil.java +++ b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/ReactorStreamUtil.java @@ -1,19 +1,27 @@ package io.arex.inst.redis.common.lettuce; import io.arex.agent.bootstrap.model.MockResult; -import io.arex.inst.common.util.FluxUtil; +import io.arex.inst.common.util.FluxRecordFunction; +import io.arex.inst.common.util.FluxReplayUtil; +import io.arex.inst.common.util.FluxReplayUtil.FluxResult; +import io.arex.inst.common.util.MonoRecordFunction; import io.arex.inst.redis.common.RedisExtractor; +import java.util.function.Function; import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class RedisClusterReactiveResultUtil { +public class ReactorStreamUtil { - private RedisClusterReactiveResultUtil() { + private ReactorStreamUtil() { } public static Mono monoRecord(String redisUri,Mono monoResult,String methodName,String key,String field) { RedisExtractor extractor = new RedisExtractor(redisUri, methodName, key, field); - return new MonoConsumer(extractor).accept(monoResult); + Function executor = result -> { + extractor.record(result); + return null; + }; + return new MonoRecordFunction(executor).apply(monoResult); } public static Mono monoReplay(String redisUri,String methodName,String key,String field) { @@ -30,7 +38,11 @@ public static Mono monoReplay(String redisUri,String methodName,String key,St public static Flux fluxRecord(String redisUri,Flux fluxResult,String methodName,String key,String field) { RedisExtractor extractor = new RedisExtractor(redisUri, methodName, key, field); - return new FluxConsumer(extractor).accept(fluxResult); + Function executor = result -> { + extractor.record(result); + return null; + }; + return new FluxRecordFunction(executor).apply(fluxResult); } public static Flux fluxReplay(String redisUri,String methodName,String key,String field) { @@ -40,7 +52,7 @@ public static Flux fluxReplay(String redisUri,String methodName,String key,St if (mockResult.getThrowable() != null) { return Flux.error(mockResult.getThrowable()); } - return FluxUtil.restore(mockResult.getResult()); + return FluxReplayUtil.restore(mockResult.getResult()); } return Flux.empty(); } diff --git a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapper.java b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapper.java index f682588c2..dc95846c9 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapper.java +++ b/arex-instrumentation/redis/arex-redis-common/src/main/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapper.java @@ -1,11 +1,7 @@ package io.arex.inst.redis.common.lettuce.wrapper; -import io.arex.agent.bootstrap.model.MockResult; -import io.arex.inst.common.util.FluxUtil; -import io.arex.inst.redis.common.RedisExtractor; import io.arex.inst.redis.common.RedisKeyUtil; -import io.arex.inst.redis.common.lettuce.FluxConsumer; -import io.arex.inst.redis.common.lettuce.MonoConsumer; +import io.arex.inst.redis.common.lettuce.ReactorStreamUtil; import io.arex.inst.redis.common.lettuce.RedisCommandBuilderImpl; import io.arex.inst.runtime.context.ContextManager; import io.lettuce.core.AbstractRedisReactiveCommands; @@ -695,21 +691,11 @@ public Mono createMono(AbstractRedisReactiveCommands redisReactiveC Supplier> commandSupplier, String key, String field) { if (ContextManager.needReplay()) { - RedisExtractor extractor = - new RedisExtractor(redisUri, commandSupplier.get().getType().name(), key, field); - MockResult mockResult = extractor.replay(); - if (mockResult.notIgnoreMockResult()) { - if (mockResult.getThrowable() != null) { - return Mono.error(mockResult.getThrowable()); - } - return Mono.justOrEmpty((T) mockResult.getResult()); - } + return (Mono) ReactorStreamUtil.monoReplay(redisUri, commandSupplier.get().getType().name(), key, field); } Mono monoResult = redisReactiveCommands.createMono(commandSupplier); if (ContextManager.needRecord()) { - RedisExtractor extractor = - new RedisExtractor(redisUri, commandSupplier.get().getType().name(), key, field); - return (Mono) new MonoConsumer(extractor).accept(monoResult); + return (Mono) ReactorStreamUtil.monoRecord(redisUri, monoResult, commandSupplier.get().getType().name(), key, field); } return monoResult; } @@ -726,21 +712,20 @@ public Flux createDissolvingFlux(AbstractRedisReactiveCommands r String key, String field) { if (ContextManager.needReplay()) { - RedisExtractor extractor = - new RedisExtractor(redisUri, commandSupplier.get().getType().name(), key, field); - MockResult mockResult = extractor.replay(); - if (mockResult.notIgnoreMockResult()) { - if (mockResult.getThrowable() != null) { - return Flux.error(mockResult.getThrowable()); - } - return (Flux)FluxUtil.restore(mockResult.getResult()); - } + return (Flux) ReactorStreamUtil.fluxReplay(redisUri, commandSupplier.get().getType().name(), key, field); +// RedisExtractor extractor = +// new RedisExtractor(redisUri, commandSupplier.get().getType().name(), key, field); +// MockResult mockResult = extractor.replay(); +// if (mockResult.notIgnoreMockResult()) { +// if (mockResult.getThrowable() != null) { +// return Flux.error(mockResult.getThrowable()); +// } +// return (Flux)FluxReplayUtil.restore(mockResult.getResult()); +// } } Flux fluxResult = redisReactiveCommands.createDissolvingFlux(commandSupplier); if (ContextManager.needRecord()) { - RedisExtractor extractor = - new RedisExtractor(redisUri, commandSupplier.get().getType().name(), key, field); - return (Flux) new FluxConsumer(extractor).accept(fluxResult); + ReactorStreamUtil.fluxRecord(redisUri, fluxResult, commandSupplier.get().getType().name(), key, field); } return fluxResult; } diff --git a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/RedisConnectionManagerTest.java b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/RedisConnectionManagerTest.java index 3e55e0bfc..9ed73dca5 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/RedisConnectionManagerTest.java +++ b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/RedisConnectionManagerTest.java @@ -2,6 +2,12 @@ import static org.junit.jupiter.api.Assertions.*; +import com.google.common.collect.Lists; +import io.lettuce.core.RedisURI; +import io.lettuce.core.cluster.api.StatefulRedisClusterConnection; +import java.util.Arrays; +import java.util.concurrent.CompletableFuture; +import org.checkerframework.checker.units.qual.K; import org.junit.jupiter.api.Test; class RedisConnectionManagerTest { diff --git a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/FluxConsumerTest.java b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/FluxConsumerTest.java deleted file mode 100644 index b2cb3278e..000000000 --- a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/FluxConsumerTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.arex.inst.redis.common.lettuce; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import io.arex.inst.redis.common.RedisExtractor; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import reactor.core.publisher.Flux; - -public class FluxConsumerTest { - - static RedisExtractor extractor; - - @BeforeAll - static void setUp() { - extractor = Mockito.mock(RedisExtractor.class); - } - - @AfterAll - static void tearDown() { - extractor = null; - Mockito.clearAllCaches(); - } - - @Test - void fluxConsumer() { - Mockito.doNothing().when(extractor).record(any()); - Flux flux = Flux.just("test"); - Flux fluxResult = (Flux) new FluxConsumer(extractor).accept(flux); - assertEquals("test", fluxResult.blockFirst()); - - Flux flux2 = Flux.error(new Exception("exception")); - Flux fluxResult2 = (Flux) new FluxConsumer(extractor).accept(flux2); - assertThrows(Exception.class, () -> fluxResult2.blockFirst()); - } -} diff --git a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/MonoConsumerTest.java b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/MonoConsumerTest.java deleted file mode 100644 index 5c08fbe82..000000000 --- a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/MonoConsumerTest.java +++ /dev/null @@ -1,39 +0,0 @@ -package io.arex.inst.redis.common.lettuce; - -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertThrows; -import static org.mockito.ArgumentMatchers.any; -import io.arex.inst.redis.common.RedisExtractor; -import org.junit.jupiter.api.AfterAll; -import org.junit.jupiter.api.BeforeAll; -import org.junit.jupiter.api.Test; -import org.mockito.Mockito; -import reactor.core.publisher.Mono; - -public class MonoConsumerTest { - - static RedisExtractor extractor; - - @BeforeAll - static void setUp() { - extractor = Mockito.mock(RedisExtractor.class); - } - - @AfterAll - static void tearDown() { - extractor = null; - Mockito.clearAllCaches(); - } - - @Test - void monoConsumer() { - Mockito.doNothing().when(extractor).record(any()); - Mono mono = Mono.just("test"); - Mono monoResult = (Mono) new MonoConsumer(extractor).accept(mono); - assertEquals("test", monoResult.block()); - - Mono mono2 = Mono.error(new Exception("exception")); - Mono monoResult2 = (Mono) new MonoConsumer(extractor).accept(mono2); - assertThrows(Exception.class, () -> monoResult2.block()); - } -} diff --git a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/RedisClusterReactiveResultUtilTest.java b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/ReactorStreamUtilTest.java similarity index 86% rename from arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/RedisClusterReactiveResultUtilTest.java rename to arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/ReactorStreamUtilTest.java index 851bde466..82eb2e438 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/RedisClusterReactiveResultUtilTest.java +++ b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/ReactorStreamUtilTest.java @@ -7,8 +7,8 @@ import static org.junit.jupiter.params.provider.Arguments.arguments; import static org.mockito.ArgumentMatchers.any; import io.arex.agent.bootstrap.model.MockResult; -import io.arex.inst.common.util.FluxUtil.FluxElementResult; -import io.arex.inst.common.util.FluxUtil.FluxResult; +import io.arex.inst.common.util.FluxReplayUtil.FluxElementResult; +import io.arex.inst.common.util.FluxReplayUtil.FluxResult; import io.arex.inst.redis.common.RedisConnectionManager; import io.arex.inst.redis.common.RedisExtractor; import io.arex.inst.redis.common.RedisExtractor.RedisMultiKey; @@ -27,7 +27,7 @@ import reactor.core.publisher.Flux; import reactor.core.publisher.Mono; -public class RedisClusterReactiveResultUtilTest { +public class ReactorStreamUtilTest { static RedisExtractor extractor; @@ -46,11 +46,11 @@ static void tearDown() { void monoRecord() { try (MockedConstruction mocked = Mockito.mockConstruction(RedisExtractor.class, (extractor, context) -> { - System.out.println("mock RedisClusterReactiveResultUtil monoRecord"); + System.out.println("mock ReactorStreamUtil monoRecord"); Mockito.doNothing().when(extractor).record(any()); })) { Mono mono = Mono.just("test"); - RedisClusterReactiveResultUtil.monoRecord("127.1.1.0", mono, "test", "key", "field"); + ReactorStreamUtil.monoRecord("127.1.1.0", mono, "test", "key", "field"); String result = mono.block(); assertNotNull(result); } @@ -60,11 +60,11 @@ void monoRecord() { void fluxRecord() { try (MockedConstruction mocked = Mockito.mockConstruction(RedisExtractor.class, (extractor, context) -> { - System.out.println("mock RedisClusterReactiveResultUtil monoRecord"); + System.out.println("mock ReactorStreamUtil monoRecord"); Mockito.doNothing().when(extractor).record(any()); })) { Flux flux = Flux.just("test"); - RedisClusterReactiveResultUtil.fluxRecord("127.1.1.0", flux, "test", "key", "field"); + ReactorStreamUtil.fluxRecord("127.1.1.0", flux, "test", "key", "field"); String result = flux.blockFirst(); assertNotNull(result); } @@ -86,7 +86,7 @@ void monoReplay(Predicate> predicate, MockResult mockResult) { (extractor, context) -> { Mockito.when(extractor.replay()).thenReturn(mockResult); })) { - Mono result = RedisClusterReactiveResultUtil.monoReplay(RedisConnectionManager.getRedisUri(0), "test", + Mono result = ReactorStreamUtil.monoReplay(RedisConnectionManager.getRedisUri(0), "test", "key", "field"); assertTrue(predicate.test(result)); } @@ -99,7 +99,7 @@ void fluxReplay(Predicate> predicate, MockResult mockResult) { (extractor, context) -> { Mockito.when(extractor.replay()).thenReturn(mockResult); })) { - Flux result = RedisClusterReactiveResultUtil.fluxReplay(RedisConnectionManager.getRedisUri(0), "test", + Flux result = ReactorStreamUtil.fluxReplay(RedisConnectionManager.getRedisUri(0), "test", "key", "field"); assertTrue(predicate.test(result)); } diff --git a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapperTest.java b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapperTest.java index 3556bf30d..132d5b3da 100644 --- a/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapperTest.java +++ b/arex-instrumentation/redis/arex-redis-common/src/test/java/io/arex/inst/redis/common/lettuce/wrapper/RedisReactiveCommandWrapperTest.java @@ -7,8 +7,8 @@ import static org.mockito.ArgumentMatchers.anyInt; import io.arex.agent.bootstrap.model.MockResult; -import io.arex.inst.common.util.FluxUtil.FluxElementResult; -import io.arex.inst.common.util.FluxUtil.FluxResult; +import io.arex.inst.common.util.FluxReplayUtil.FluxElementResult; +import io.arex.inst.common.util.FluxReplayUtil.FluxResult; import io.arex.inst.redis.common.RedisConnectionManager; import io.arex.inst.redis.common.RedisExtractor; import io.arex.inst.redis.common.lettuce.RedisCommandBuilderImpl;