From a47352af86df855996c915d7e524237dead7243b Mon Sep 17 00:00:00 2001 From: zhouhao Date: Wed, 10 Jan 2024 10:40:07 +0800 Subject: [PATCH] =?UTF-8?q?feat:=20=E5=A2=9E=E5=8A=A0count=E7=9A=84distinc?= =?UTF-8?q?t=E5=92=8Cunique=E6=94=AF=E6=8C=81.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- pom.xml | 2 +- .../ql/supports/DefaultReactorQLMetadata.java | 6 +++ .../ql/supports/agg/CountAggFeature.java | 33 +++++++++++- .../jetlinks/reactor/ql/ReactorQLTest.java | 51 +++++++++++++++++-- 4 files changed, 87 insertions(+), 5 deletions(-) diff --git a/pom.xml b/pom.xml index 0e89454..b9bcec8 100644 --- a/pom.xml +++ b/pom.xml @@ -44,7 +44,7 @@ zh_CN 1.8 ${java.version} - 2020.0.31 + 2020.0.38 diff --git a/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java b/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java index f19ed21..5265808 100644 --- a/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java +++ b/src/main/java/org/jetlinks/reactor/ql/supports/DefaultReactorQLMetadata.java @@ -547,6 +547,12 @@ public DefaultReactorQLMetadata(String sql) { init(); } + @SneakyThrows + public DefaultReactorQLMetadata(PlainSelect selectSql) { + this.selectSql = selectSql; + init(); + } + @SneakyThrows public DefaultReactorQLMetadata(ReactorQLMetadata source, PlainSelect selectSql) { this.selectSql = selectSql; diff --git a/src/main/java/org/jetlinks/reactor/ql/supports/agg/CountAggFeature.java b/src/main/java/org/jetlinks/reactor/ql/supports/agg/CountAggFeature.java index ad5ee03..b5b7576 100644 --- a/src/main/java/org/jetlinks/reactor/ql/supports/agg/CountAggFeature.java +++ b/src/main/java/org/jetlinks/reactor/ql/supports/agg/CountAggFeature.java @@ -10,7 +10,9 @@ import org.reactivestreams.Publisher; import reactor.core.publisher.Flux; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.Function; +import java.util.stream.Collectors; public class CountAggFeature implements ValueAggMapFeature { @@ -28,7 +30,36 @@ public Function, Flux> createMapper(Expression exp Expression expr = function.getParameters().getExpressions().get(0); - Function> mapper = ValueMapFeature.createMapperNow(expr, metadata); + Function> mapper = ValueMapFeature.createMapperNow(expr, metadata); + + //去重记数 + if (function.isDistinct()) { + return flux -> flux + .flatMap(mapper) + .distinct() + .count() + .cast(Object.class) + .flux(); + } + //统计唯一值的个数 + if (function.isUnique()) { + return flux -> flux + .flatMap(mapper) + .collect(Collectors.groupingBy(Function.identity(), + ConcurrentHashMap::new, + Collectors.counting())) + .map(map -> { + long count = 0; + for (Long value : map.values()) { + if (value == 1) { + count++; + } + } + return count; + }) + .flux() + .cast(Object.class); + } return flux -> flux .flatMap(mapper) diff --git a/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java b/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java index 52e4da6..844c8b9 100644 --- a/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java +++ b/src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java @@ -5,7 +5,6 @@ import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.Test; import reactor.core.publisher.Flux; -import reactor.function.Consumer3; import reactor.test.StepVerifier; import java.math.BigDecimal; @@ -17,7 +16,9 @@ import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; import java.util.function.BiConsumer; +import java.util.function.Function; import java.util.stream.Collectors; class ReactorQLTest { @@ -1187,15 +1188,59 @@ void testQuo() { } @Test - void testDistinctCount(){ + void testDistinctCount() { ReactorQL.builder() .sql("select distinct_count(this) t from \"table\" ") .build() .start(Flux.just(1, 2, 3, 3, 4, 5, 6, 6, 6, 7)) .doOnNext(System.out::println) .as(StepVerifier::create) - .expectNext(Collections.singletonMap("t",7L)) + .expectNext(Collections.singletonMap("t", 7L)) .verifyComplete(); + + ReactorQL.builder() + .sql("select count(distinct this) t from \"table\" ") + .build() + .start(Flux.just(1, 2, 3, 3, 4, 5, 6, 6, 6, 7)) + .doOnNext(System.out::println) + .as(StepVerifier::create) + .expectNext(Collections.singletonMap("t", 7L)) + .verifyComplete(); + } + + @Test + void testUniqueCount() { + + ReactorQL.builder() + .sql("select count(unique this) t from \"table\" ") + .build() + .start(Flux.just(1, 2, 3, 3, 4, 5, 6, 6, 6, 7)) + .doOnNext(System.out::println) + .as(StepVerifier::create) + .expectNext(Collections.singletonMap("t", 5L)) + .verifyComplete(); + + Duration time = Flux + .range(0, 1000000) + .collect(Collectors.groupingBy(Function.identity(), + ConcurrentHashMap::new, + Collectors.counting())) + .as(StepVerifier::create) + .expectNextCount(1) + .verifyComplete(); + + System.out.println(time); + + time = ReactorQL + .builder() + .sql("select count(unique this) t from \"table\" ") + .build() + .start(Flux.range(0, 1000000)) + .doOnNext(System.out::println) + .as(StepVerifier::create) + .expectNext(Collections.singletonMap("t", 1000000L)) + .verifyComplete(); + System.out.println(time); } @Test