Skip to content

Commit

Permalink
feat: support lettuce cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
mr3 authored and DanLi39 committed Jan 17, 2024
1 parent f230eb0 commit 7510425
Show file tree
Hide file tree
Showing 31 changed files with 406 additions and 723 deletions.
Original file line number Diff line number Diff line change
@@ -1,31 +1,31 @@
package io.arex.inst.redis.common.lettuce;
package io.arex.inst.common.util;

import io.arex.agent.bootstrap.ctx.TraceTransmitter;
import io.arex.inst.common.util.FluxUtil;
import io.arex.inst.common.util.FluxUtil.FluxResult;
import io.arex.inst.redis.common.RedisExtractor;
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
import io.arex.inst.runtime.model.ArexConstants;
import io.arex.inst.runtime.serializer.Serializer;
import io.arex.inst.runtime.util.TypeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import reactor.core.publisher.Flux;
import java.util.function.Function;


public class FluxConsumer {
public class FluxRecordFunction implements Function<Flux<?>, Flux<?>> {

private final RedisExtractor extractor;
private final Function<FluxResult, Void> executor;
private final TraceTransmitter traceTransmitter;

public FluxConsumer(RedisExtractor extractor) {
public FluxRecordFunction(Function<FluxResult, Void> executor) {

Check warning on line 20 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java#L20

Added line #L20 was not covered by tests
this.traceTransmitter = TraceTransmitter.create();
this.extractor = extractor;
this.executor = executor;

Check warning on line 22 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java#L22

Added line #L22 was not covered by tests
}

public Flux<?> accept(Flux<?> responseFlux) {
@Override
public Flux<?> apply(Flux<?> responseFlux) {
// use a list to record all elements
List<FluxUtil.FluxElementResult> fluxElementMockerResults = new ArrayList<>();
List<FluxReplayUtil.FluxElementResult> fluxElementMockerResults = new ArrayList<>();

Check warning on line 28 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java#L28

Added line #L28 was not covered by tests
AtomicInteger index = new AtomicInteger(1);
String responseType = TypeUtil.getName(responseFlux);
return responseFlux
Expand All @@ -37,7 +37,7 @@ public Flux<?> accept(Flux<?> responseFlux) {
}
})
// add error to list
.doOnError(error -> {
.doOnError(error -> {

Check warning on line 40 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java#L40

Added line #L40 was not covered by tests
try (TraceTransmitter tm = traceTransmitter.transmit()) {
fluxElementMockerResults.add(
getFluxElementMockerResult(index.getAndIncrement(), error));
Expand All @@ -46,13 +46,14 @@ public Flux<?> accept(Flux<?> responseFlux) {
.doFinally(result -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults);
extractor.record(fluxResult);
executor.apply(fluxResult);

Check warning on line 49 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java#L49

Added line #L49 was not covered by tests
}
});
}

private FluxUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) {

private FluxReplayUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) {
String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER);
return new FluxUtil.FluxElementResult(index, content, TypeUtil.getName(element));
return new FluxReplayUtil.FluxElementResult(index, content, TypeUtil.getName(element));

Check warning on line 57 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxRecordFunction.java#L57

Added line #L57 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,27 @@
package io.arex.inst.common.util;

import io.arex.agent.bootstrap.ctx.TraceTransmitter;
import io.arex.agent.bootstrap.model.MockResult;
import io.arex.agent.bootstrap.util.CollectionUtil;
import io.arex.inst.runtime.serializer.Serializer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import io.arex.agent.bootstrap.util.CollectionUtil;

public class FluxUtil {

public class FluxReplayUtil{

Check warning on line 15 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxReplayUtil.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/FluxReplayUtil.java#L15

Added line #L15 was not covered by tests

static final String FLUX_FROM_ITERATOR = "reactor.core.publisher.FluxIterable-";
static final String FLUX_FROM_ARRAY = "reactor.core.publisher.FluxArray-";
static final String FLUX_FROM_STREAM = "reactor.core.publisher.FluxStream-";

private FluxUtil() {
}


public static Flux<?> restore(Object fluxObj) {
if(fluxObj == null){
if (fluxObj == null) {
return Flux.empty();
}
FluxResult fluxResult = (FluxResult) fluxObj;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package io.arex.inst.common.util;

import io.arex.agent.bootstrap.ctx.TraceTransmitter;
import java.util.function.Function;
import reactor.core.publisher.Mono;

public class MonoRecordFunction implements Function<Mono<?>, Mono<?>> {

private final Function<Object, Void> executor;
private final TraceTransmitter traceTransmitter;

public MonoRecordFunction(Function<Object, Void> executor) {
this.traceTransmitter = TraceTransmitter.create();
this.executor = executor;
}

Check warning on line 15 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java#L12-L15

Added lines #L12 - L15 were not covered by tests

@Override
public Mono<?> apply(Mono<?> responseMono) {
return responseMono
.doOnSuccess(result -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
executor.apply(result);

Check warning on line 22 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java#L19-L22

Added lines #L19 - L22 were not covered by tests
}
})
.doOnError(error -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
executor.apply(error);

Check warning on line 27 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java#L24-L27

Added lines #L24 - L27 were not covered by tests
}
});

Check warning on line 29 in arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/common/arex-common/src/main/java/io/arex/inst/common/util/MonoRecordFunction.java#L29

Added line #L29 was not covered by tests
}
}
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
package io.arex.inst.common.util;

import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ARRAY;
import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ITERATOR;
import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_STREAM;
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ARRAY;
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ITERATOR;
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_STREAM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.arex.inst.common.util.FluxUtil.FluxElementResult;
import io.arex.inst.common.util.FluxUtil.FluxResult;
import io.arex.inst.common.util.FluxReplayUtil.FluxElementResult;
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
import io.arex.inst.runtime.util.TypeUtil;
import java.lang.reflect.Type;
import java.util.ArrayList;
Expand All @@ -22,8 +22,8 @@ void FluxRecory() {
List<FluxElementResult> list = new ArrayList<>();
FluxResult fluxResult = new FluxResult(null, list);
// flux is empty
assertNotNull(FluxUtil.restore(null));
Flux<?> result = FluxUtil.restore(fluxResult);
assertNotNull(FluxReplayUtil.restore(null));
Flux<?> result = FluxReplayUtil.restore(fluxResult);
assertNotNull(result);

// flux is not empty
Expand All @@ -34,22 +34,22 @@ void FluxRecory() {

// Flux.just()
fluxResult = new FluxResult(null, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxJust-java.util.ArrayList-");

// Flux.fromIterable()
fluxResult = new FluxResult(FLUX_FROM_ITERATOR, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_ITERATOR);

// Flux.fromArray()
fluxResult = new FluxResult(FLUX_FROM_ARRAY, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_ARRAY);

// Flux.fromStream()
fluxResult = new FluxResult(FLUX_FROM_STREAM, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_STREAM);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@
import io.arex.agent.bootstrap.util.ArrayUtils;
import io.arex.agent.bootstrap.util.StringUtil;
import io.arex.agent.thirdparty.util.time.DateFormatUtils;
import io.arex.inst.common.util.FluxUtil;
import io.arex.inst.dynamic.common.listener.FluxConsumer;
import io.arex.inst.common.util.FluxRecordFunction;
import io.arex.inst.common.util.FluxReplayUtil;
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
import io.arex.inst.common.util.MonoRecordFunction;
import io.arex.inst.dynamic.common.listener.ListenableFutureAdapter;
import io.arex.inst.dynamic.common.listener.MonoConsumer;
import io.arex.inst.dynamic.common.listener.ResponseConsumer;
import io.arex.inst.runtime.config.Config;
import io.arex.inst.runtime.context.ArexContext;
Expand All @@ -33,6 +34,7 @@
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import java.util.function.Function;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

Expand Down Expand Up @@ -97,11 +99,19 @@ public Object recordResponse(Object response) {
}
// Compatible with not import package reactor-core
if (MONO.equals(methodReturnType) && response instanceof Mono<?>) {
return new MonoConsumer(this).accept((Mono<?>) response);
Function<Object, Void> executor = result -> {
this.recordResponse(result);
return null;

Check warning on line 104 in arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java#L103-L104

Added lines #L103 - L104 were not covered by tests
};
return new MonoRecordFunction(executor).apply((Mono<?>) response);
}

if (FLUX.equals(methodReturnType) && response instanceof Flux<?>) {
return new FluxConsumer(this).accept((Flux<?>) response);
Function<FluxResult, Void> executor = result -> {
this.recordResponse(result);
return null;

Check warning on line 112 in arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java

View check run for this annotation

Codecov / codecov/patch

arex-instrumentation/dynamic/arex-dynamic-common/src/main/java/io/arex/inst/dynamic/common/DynamicClassExtractor.java#L111-L112

Added lines #L111 - L112 were not covered by tests
};
return new FluxRecordFunction(executor).apply((Flux<?>) response);
}

this.result = response;
Expand Down Expand Up @@ -321,7 +331,7 @@ Object restoreResponse(Object result) {
if (result instanceof Throwable) {
return Flux.error((Throwable) result);
}
return FluxUtil.restore(result);
return FluxReplayUtil.restore(result);
}
return result;
}
Expand Down

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@
import io.arex.agent.bootstrap.model.ArexMocker;
import io.arex.agent.bootstrap.model.Mocker.Target;
import io.arex.agent.thirdparty.util.time.DateFormatUtils;
import io.arex.inst.common.util.FluxUtil;
import io.arex.inst.dynamic.common.listener.MonoConsumer;
import io.arex.inst.common.util.FluxReplayUtil;
import io.arex.inst.common.util.MonoRecordFunction;
import io.arex.inst.runtime.config.ConfigBuilder;
import io.arex.inst.runtime.context.ArexContext;
import io.arex.inst.runtime.context.ContextManager;
Expand All @@ -28,6 +28,7 @@
import java.util.TimeZone;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
Expand Down Expand Up @@ -122,14 +123,18 @@ void resetMonoResponse() {

// exception
Mono<?> result = monoExceptionTest();
MonoConsumer monoConsumer = new MonoConsumer(extractor);
result = monoConsumer.accept(result);
Function<Object, Void> executor = res -> {
extractor.recordResponse(res);
return null;
};
MonoRecordFunction monoRecordFunction = new MonoRecordFunction(executor);
result = monoRecordFunction.apply(result);
result.subscribe();
assertTrue(nonNull.test(result));

// normal
result = monoTest();
result = monoConsumer.accept(result);
result = monoRecordFunction.apply(result);
result.subscribe();
assertTrue(nonNull.test(result));
} catch (NoSuchMethodException e) {
Expand Down Expand Up @@ -294,8 +299,8 @@ void restoreResponseTest() throws NoSuchMethodException, ExecutionException, Int
Throwable.class);
DynamicClassExtractor fluxTestExtractor = new DynamicClassExtractor(testReturnFlux, new Object[]{"mock"},
"#val", null);
List<FluxUtil.FluxElementResult> list = new ArrayList<>();
FluxUtil.FluxResult fluxResult = new FluxUtil.FluxResult(null, list);
List<FluxReplayUtil.FluxElementResult> list = new ArrayList<>();
FluxReplayUtil.FluxResult fluxResult = new FluxReplayUtil.FluxResult(null, list);
Object fluxNormalTest = fluxTestExtractor.restoreResponse(fluxResult);
assertNull(((Flux<?>) fluxNormalTest).blockFirst());

Expand Down
Loading

0 comments on commit 7510425

Please sign in to comment.