Skip to content

Commit

Permalink
feat: support lettuce cluster (#368)
Browse files Browse the repository at this point in the history
* feat: support lettuce cluster

* feat: support lettuce cluster

---------

Co-authored-by: lucas-myx <[email protected]>
Co-authored-by: mr3 <[email protected]>
  • Loading branch information
3 people authored Jan 17, 2024
1 parent 664afac commit c5c1b1e
Show file tree
Hide file tree
Showing 66 changed files with 8,668 additions and 3,893 deletions.
Original file line number Diff line number Diff line change
@@ -1,30 +1,32 @@
package io.arex.inst.dynamic.common.listener;
package io.arex.inst.common.util;

import io.arex.agent.bootstrap.ctx.TraceTransmitter;
import io.arex.inst.common.util.FluxUtil;
import io.arex.inst.common.util.FluxUtil.FluxResult;
import io.arex.inst.dynamic.common.DynamicClassExtractor;
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
import io.arex.inst.runtime.model.ArexConstants;
import io.arex.inst.runtime.serializer.Serializer;
import io.arex.inst.runtime.util.TypeUtil;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.UnaryOperator;
import reactor.core.publisher.Flux;
import java.util.function.Function;

public class FluxConsumer {

public class FluxRecordFunction implements UnaryOperator<Flux<?>> {

private final Function<FluxResult, Void> executor;
private final TraceTransmitter traceTransmitter;
private final DynamicClassExtractor extractor;

public FluxConsumer(DynamicClassExtractor extractor) {
public FluxRecordFunction(Function<FluxResult, Void> executor) {
this.traceTransmitter = TraceTransmitter.create();
this.extractor = extractor;
this.executor = executor;
}

public Flux<?> accept(Flux<?> responseFlux) {
@Override
public Flux<?> apply(Flux<?> responseFlux) {
// use a list to record all elements
List<FluxUtil.FluxElementResult> fluxElementMockerResults = new ArrayList<>();
List<FluxReplayUtil.FluxElementResult> fluxElementMockerResults = new ArrayList<>();
AtomicInteger index = new AtomicInteger(1);
String responseType = TypeUtil.getName(responseFlux);
return responseFlux
Expand All @@ -36,7 +38,7 @@ public Flux<?> accept(Flux<?> responseFlux) {
}
})
// add error to list
.doOnError(error -> {
.doOnError(error -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
fluxElementMockerResults.add(
getFluxElementMockerResult(index.getAndIncrement(), error));
Expand All @@ -45,13 +47,14 @@ public Flux<?> accept(Flux<?> responseFlux) {
.doFinally(result -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
FluxResult fluxResult = new FluxResult(responseType, fluxElementMockerResults);
extractor.recordResponse(fluxResult);
executor.apply(fluxResult);
}
});
}

private FluxUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) {

private FluxReplayUtil.FluxElementResult getFluxElementMockerResult(int index, Object element) {
String content = Serializer.serialize(element, ArexConstants.GSON_SERIALIZER);
return new FluxUtil.FluxElementResult(index, content, TypeUtil.getName(element));
return new FluxReplayUtil.FluxElementResult(index, content, TypeUtil.getName(element));
}
}
Original file line number Diff line number Diff line change
@@ -1,24 +1,24 @@
package io.arex.inst.common.util;

import io.arex.agent.bootstrap.util.CollectionUtil;
import io.arex.inst.runtime.serializer.Serializer;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import reactor.core.publisher.Flux;
import io.arex.agent.bootstrap.util.CollectionUtil;

public class FluxUtil {

public class FluxReplayUtil{

static final String FLUX_FROM_ITERATOR = "reactor.core.publisher.FluxIterable-";
static final String FLUX_FROM_ARRAY = "reactor.core.publisher.FluxArray-";
static final String FLUX_FROM_STREAM = "reactor.core.publisher.FluxStream-";

private FluxUtil() {
}


public static Flux<?> restore(Object fluxObj) {
if(fluxObj == null){
if (fluxObj == null) {
return Flux.empty();
}
FluxResult fluxResult = (FluxResult) fluxObj;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
package io.arex.inst.common.util;

import io.arex.agent.bootstrap.ctx.TraceTransmitter;
import java.util.function.Function;
import java.util.function.UnaryOperator;
import reactor.core.publisher.Mono;

public class MonoRecordFunction implements UnaryOperator<Mono<?>> {

private final Function<Object, Void> executor;
private final TraceTransmitter traceTransmitter;

public MonoRecordFunction(Function<Object, Void> executor) {
this.traceTransmitter = TraceTransmitter.create();
this.executor = executor;
}

@Override
public Mono<?> apply(Mono<?> responseMono) {
return responseMono
.doOnSuccess(result -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
executor.apply(result);
}
})
.doOnError(error -> {
try (TraceTransmitter tm = traceTransmitter.transmit()) {
executor.apply(error);
}
});
}
}
Original file line number Diff line number Diff line change
@@ -1,36 +1,27 @@
package io.arex.inst.dynamic.common.listener;
package io.arex.inst.common.util;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertThrows;
import io.arex.inst.dynamic.common.DynamicClassExtractor;
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
import io.arex.inst.runtime.config.ConfigBuilder;
import io.arex.inst.runtime.context.ContextManager;
import java.lang.reflect.Method;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Flux;

public class FluxConsumerTest {

static DynamicClassExtractor extractor;
static FluxConsumer fluxConsumer;
public class FluxRecordFuntionTest {

static FluxRecordFunction fluxRecordFunction;
@BeforeAll
static void setUp() {
Method testWithArexMock;
try {
testWithArexMock = FluxConsumerTest.class.getDeclaredMethod("testWithArexMock", String.class);
} catch (NoSuchMethodException e) {
testWithArexMock = null;
}
final Object[] args = {"errorSerialize"};
extractor = new DynamicClassExtractor(testWithArexMock, args);
fluxConsumer = new FluxConsumer(extractor);
Mockito.mockStatic(ContextManager.class);
ConfigBuilder.create("test").enableDebug(true).build();
Function<FluxResult, Void> executor = result -> null;
fluxRecordFunction = new FluxRecordFunction(executor);
}

@AfterAll
Expand Down Expand Up @@ -67,18 +58,19 @@ private static void testNormalFlux() {
// doFinally performs some operations that have nothing to do with the value of the element.
// If the doFinally operator is called multiple times, doFinally will be executed once at the end of each sequence.
.doFinally(System.out::println);
Flux subscribe = fluxConsumer.accept(flux);
Flux blockFirst = fluxConsumer.accept(flux);
Flux subscribe = fluxRecordFunction.apply(flux);
Flux blockFirst = fluxRecordFunction.apply(flux);
// record content: 1,2,3,4,5
subscribe.subscribe();
// record content: 1
assertEquals(blockFirst.blockFirst(), 1);
}

private static void testEmptyFlux() {

Flux flux = Flux.empty();
Flux subscribe = fluxConsumer.accept(flux);
Flux blockFirst = fluxConsumer.accept(flux);
Flux subscribe = fluxRecordFunction.apply(flux);
Flux blockFirst = fluxRecordFunction.apply(flux);
// record content: 1,2,3,4,5
subscribe.subscribe();
// record content: 1
Expand All @@ -97,8 +89,8 @@ private static void testFluxOnErrorResume() {
// returns an alternate Flux sequence when a Flux error occurs,
.onErrorResume(t -> Flux.just(7, 8, 9));

Flux subscribe = fluxConsumer.accept(flux);
Flux blockFirst = fluxConsumer.accept(flux);
Flux subscribe = fluxRecordFunction.apply(flux);
Flux blockFirst = fluxRecordFunction.apply(flux);

// record content: 1,7,8,9
subscribe.subscribe();
Expand All @@ -115,9 +107,9 @@ private static void testFluxOnError() {
})
.doOnError(t -> System.out.println("error" + ":" + t));

Flux subscribe = fluxConsumer.accept(flux);
Flux blockFirst = fluxConsumer.accept(flux);
Flux blockLast = fluxConsumer.accept(flux);
Flux subscribe = fluxRecordFunction.apply(flux);
Flux blockFirst = fluxRecordFunction.apply(flux);
Flux blockLast = fluxRecordFunction.apply(flux);

// record content: 1,2,RuntimeException
subscribe.subscribe();
Expand All @@ -136,9 +128,9 @@ private static void testFluxOnErrorContinue() {
})
.onErrorContinue((t, o) -> System.out.println("error" + ":" + t))
.doOnNext(val -> System.out.println("val" + ":" + val));
Flux subscribe = fluxConsumer.accept(flux);
Flux blockFirst = fluxConsumer.accept(flux);
Flux blockLast = fluxConsumer.accept(flux);
Flux subscribe = fluxRecordFunction.apply(flux);
Flux blockFirst = fluxRecordFunction.apply(flux);
Flux blockLast = fluxRecordFunction.apply(flux);

// record content: 1,2,4,5
subscribe.subscribe();
Expand All @@ -150,8 +142,8 @@ private static void testFluxOnErrorContinue() {

private static void testFluxError() {
Flux flux = Flux.error(new RuntimeException("error"));
Flux subscribe = fluxConsumer.accept(flux);
Flux blockFirst = fluxConsumer.accept(flux);
Flux subscribe = fluxRecordFunction.apply(flux);
Flux blockFirst = fluxRecordFunction.apply(flux);
// record content: RuntimeException
subscribe.subscribe();
// record content: RuntimeException
Expand All @@ -162,4 +154,8 @@ public String testWithArexMock(String val) {
return val + "testWithArexMock";
}

private void record(Flux<?> responseFlux) {
System.out.println(responseFlux.hashCode());
}

}
Original file line number Diff line number Diff line change
@@ -1,29 +1,27 @@
package io.arex.inst.common.util;

import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ARRAY;
import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_ITERATOR;
import static io.arex.inst.common.util.FluxUtil.FLUX_FROM_STREAM;
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ARRAY;
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_ITERATOR;
import static io.arex.inst.common.util.FluxReplayUtil.FLUX_FROM_STREAM;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;

import io.arex.inst.common.util.FluxUtil.FluxElementResult;
import io.arex.inst.common.util.FluxUtil.FluxResult;
import io.arex.inst.common.util.FluxReplayUtil.FluxElementResult;
import io.arex.inst.common.util.FluxReplayUtil.FluxResult;
import io.arex.inst.runtime.util.TypeUtil;
import java.lang.reflect.Type;
import java.util.ArrayList;
import java.util.List;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;

public class FluxUtilTest {
public class FluxReplayUtilTest {

@Test
void FluxRecory() {
List<FluxElementResult> list = new ArrayList<>();
FluxResult fluxResult = new FluxResult(null, list);
// flux is empty
assertNotNull(FluxUtil.restore(null));
Flux<?> result = FluxUtil.restore(fluxResult);
assertNotNull(FluxReplayUtil.restore(null));
Flux<?> result = FluxReplayUtil.restore(fluxResult);
assertNotNull(result);

// flux is not empty
Expand All @@ -34,22 +32,22 @@ void FluxRecory() {

// Flux.just()
fluxResult = new FluxResult(null, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),"reactor.core.publisher.FluxJust-java.util.ArrayList-");

// Flux.fromIterable()
fluxResult = new FluxResult(FLUX_FROM_ITERATOR, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_ITERATOR);

// Flux.fromArray()
fluxResult = new FluxResult(FLUX_FROM_ARRAY, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_ARRAY);

// Flux.fromStream()
fluxResult = new FluxResult(FLUX_FROM_STREAM, list);
result = FluxUtil.restore(fluxResult);
result = FluxReplayUtil.restore(fluxResult);
assertEquals(TypeUtil.getName(result),FLUX_FROM_STREAM);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.arex.inst.common.util;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import java.util.function.Function;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
import reactor.core.publisher.Mono;

public class MonoRecordFuntionTest {

static Function<Object, Void> executor;

@BeforeAll
static void setUp() {
executor = Mockito.mock(Function.class);
}

@AfterAll
static void tearDown() {
executor = null;
Mockito.clearAllCaches();
}

@Test
void monoConsumer() {
Mono<String> mono = Mono.just("test");
Mono<String> monoResult = (Mono<String>) new MonoRecordFunction(executor).apply(mono);
assertEquals("test", monoResult.block());

Mono<Exception> mono2 = Mono.error(new Exception("exception"));
Mono<String> monoResult2 = (Mono<String>) new MonoRecordFunction(executor).apply(mono2);
assertThrows(Exception.class, () -> monoResult2.block());
}

}
Loading

0 comments on commit c5c1b1e

Please sign in to comment.