Skip to content

Commit

Permalink
feat: 增加count的distinct和unique支持.
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Jan 10, 2024
1 parent 69491ae commit a47352a
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@
<project.build.locales>zh_CN</project.build.locales>
<java.version>1.8</java.version>
<project.build.jdk>${java.version}</project.build.jdk>
<reactor.version>2020.0.31</reactor.version>
<reactor.version>2020.0.38</reactor.version>
</properties>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -547,6 +547,12 @@ public DefaultReactorQLMetadata(String sql) {
init();
}

@SneakyThrows
public DefaultReactorQLMetadata(PlainSelect selectSql) {
this.selectSql = selectSql;
init();
}

@SneakyThrows
public DefaultReactorQLMetadata(ReactorQLMetadata source, PlainSelect selectSql) {
this.selectSql = selectSql;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;

import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import java.util.stream.Collectors;

public class CountAggFeature implements ValueAggMapFeature {

Expand All @@ -28,7 +30,36 @@ public Function<Flux<ReactorQLRecord>, Flux<Object>> createMapper(Expression exp

Expression expr = function.getParameters().getExpressions().get(0);

Function<ReactorQLRecord,Publisher<?>> mapper = ValueMapFeature.createMapperNow(expr, metadata);
Function<ReactorQLRecord, Publisher<?>> mapper = ValueMapFeature.createMapperNow(expr, metadata);

//去重记数
if (function.isDistinct()) {
return flux -> flux
.flatMap(mapper)
.distinct()
.count()
.cast(Object.class)
.flux();
}
//统计唯一值的个数
if (function.isUnique()) {
return flux -> flux
.flatMap(mapper)
.collect(Collectors.groupingBy(Function.identity(),
ConcurrentHashMap::new,
Collectors.counting()))
.map(map -> {
long count = 0;
for (Long value : map.values()) {
if (value == 1) {
count++;
}
}
return count;
})
.flux()
.cast(Object.class);
}

return flux -> flux
.flatMap(mapper)
Expand Down
51 changes: 48 additions & 3 deletions src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Flux;
import reactor.function.Consumer3;
import reactor.test.StepVerifier;

import java.math.BigDecimal;
Expand All @@ -17,7 +16,9 @@
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.BiConsumer;
import java.util.function.Function;
import java.util.stream.Collectors;

class ReactorQLTest {
Expand Down Expand Up @@ -1187,15 +1188,59 @@ void testQuo() {
}

@Test
void testDistinctCount(){
void testDistinctCount() {
ReactorQL.builder()
.sql("select distinct_count(this) t from \"table\" ")
.build()
.start(Flux.just(1, 2, 3, 3, 4, 5, 6, 6, 6, 7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNext(Collections.singletonMap("t",7L))
.expectNext(Collections.singletonMap("t", 7L))
.verifyComplete();

ReactorQL.builder()
.sql("select count(distinct this) t from \"table\" ")
.build()
.start(Flux.just(1, 2, 3, 3, 4, 5, 6, 6, 6, 7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNext(Collections.singletonMap("t", 7L))
.verifyComplete();
}

@Test
void testUniqueCount() {

ReactorQL.builder()
.sql("select count(unique this) t from \"table\" ")
.build()
.start(Flux.just(1, 2, 3, 3, 4, 5, 6, 6, 6, 7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNext(Collections.singletonMap("t", 5L))
.verifyComplete();

Duration time = Flux
.range(0, 1000000)
.collect(Collectors.groupingBy(Function.identity(),
ConcurrentHashMap::new,
Collectors.counting()))
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();

System.out.println(time);

time = ReactorQL
.builder()
.sql("select count(unique this) t from \"table\" ")
.build()
.start(Flux.range(0, 1000000))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNext(Collections.singletonMap("t", 1000000L))
.verifyComplete();
System.out.println(time);
}

@Test
Expand Down

0 comments on commit a47352a

Please sign in to comment.