Skip to content

Commit

Permalink
增加distinct以及oraclehint支持
Browse files Browse the repository at this point in the history
  • Loading branch information
zhou-hao committed Apr 27, 2020
1 parent 5ac575d commit af2139a
Show file tree
Hide file tree
Showing 8 changed files with 205 additions and 11 deletions.
33 changes: 24 additions & 9 deletions src/main/java/org/jetlinks/reactor/ql/DefaultReactorQL.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ public class DefaultReactorQL implements ReactorQL {
private Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> orderBy;
private Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> limit;
private Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> offset;
private Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> distinct;
private Function<ReactorQLContext, Flux<ReactorQLRecord>> builder;


Expand All @@ -54,28 +55,32 @@ protected void prepare() {
groupBy = createGroupBy();
join = createJoin();
orderBy = createOrderBy();

distinct = createDistinct();
Function<ReactorQLContext, Flux<ReactorQLRecord>> fromMapper = FromFeature.createFromMapperByBody(metadata.getSql(), metadata);
PlainSelect select = metadata.getSql();
if (null != select.getGroupBy()) {
builder = ctx ->
limit.apply(
offset.apply(
orderBy.apply(
groupBy.apply(
where.apply(
join.apply(fromMapper.apply(ctx))))
distinct.apply(
orderBy.apply(
groupBy.apply(
where.apply(
join.apply(fromMapper.apply(ctx))))
)
)
)
);
} else {
builder = ctx ->
limit.apply(
offset.apply(
orderBy.apply(
columnMapper.apply(
where.apply(
join.apply(fromMapper.apply(ctx)))
distinct.apply(
orderBy.apply(
columnMapper.apply(
where.apply(
join.apply(fromMapper.apply(ctx)))
)
)
)
)
Expand All @@ -84,6 +89,16 @@ protected void prepare() {
}


protected Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> createDistinct() {
Distinct distinct;
if ((distinct = metadata.getSql().getDistinct()) == null) {
return Function.identity();
}
return metadata.getFeatureNow(FeatureId.Distinct.of(
metadata.getSetting("distinctBy").map(String::valueOf).orElse("default")
)).createDistinctMapper(distinct, metadata);
}

protected Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> createJoin() {
if (CollectionUtils.isEmpty(metadata.getSql().getJoins())) {
return Function.identity();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@

public interface ReactorQLMetadata {


<T extends Feature> Optional<T> getFeature(FeatureId<T> featureId);

default <T extends Feature> T getFeatureNow(FeatureId<T> featureId) {
Expand All @@ -21,6 +20,7 @@ default <T extends Feature> T getFeatureNow(FeatureId<T> featureId, Supplier<Str
.orElseThrow(() -> new UnsupportedOperationException("不支持的操作: " + errorMessage.get()));
}

Optional<Object> getSetting(String key);

PlainSelect getSql();

Expand Down
14 changes: 14 additions & 0 deletions src/main/java/org/jetlinks/reactor/ql/feature/DistinctFeature.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
package org.jetlinks.reactor.ql.feature;

import net.sf.jsqlparser.statement.select.Distinct;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import reactor.core.publisher.Flux;

import java.util.function.Function;

public interface DistinctFeature extends Feature {

Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> createDistinctMapper(Distinct distinct, ReactorQLMetadata metadata);

}
9 changes: 9 additions & 0 deletions src/main/java/org/jetlinks/reactor/ql/feature/FeatureId.java
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,13 @@ static FeatureId<FromFeature> of(String type) {
}

}

interface Distinct {

FeatureId<DistinctFeature> defaultId = Distinct.of("default");

static FeatureId<DistinctFeature> of(String type) {
return FeatureId.of("distinct:".concat(type));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import org.jetlinks.reactor.ql.supports.agg.CollectListAggFeature;
import org.jetlinks.reactor.ql.supports.agg.MathAggFeature;
import org.jetlinks.reactor.ql.supports.agg.CountAggFeature;
import org.jetlinks.reactor.ql.supports.distinct.DefaultDistinctFeature;
import org.jetlinks.reactor.ql.supports.filter.*;
import org.jetlinks.reactor.ql.supports.from.FromTableFeature;
import org.jetlinks.reactor.ql.supports.from.FromValuesFeature;
Expand Down Expand Up @@ -39,6 +40,8 @@ public class DefaultReactorQLMetadata implements ReactorQLMetadata {

private final Map<String, Feature> features = new ConcurrentHashMap<>(globalFeatures);

private final Map<String, Object> settings = new ConcurrentHashMap<>();

static <T> void createCalculator(BiFunction<String, BiFunction<Number, Number, Object>, T> builder, Consumer<T> consumer) {

consumer.accept(builder.apply("+", CalculateUtils::add));
Expand Down Expand Up @@ -72,7 +75,7 @@ static <T> void createCalculator(BiFunction<String, BiFunction<Number, Number, O
}

static {

addGlobal(new DefaultDistinctFeature());
addGlobal(new SubSelectFromFeature());
addGlobal(new FromTableFeature());
addGlobal(new ZipSelectFeature());
Expand Down Expand Up @@ -260,6 +263,18 @@ public static void addGlobal(Feature feature) {
@SneakyThrows
public DefaultReactorQLMetadata(String sql) {
this.selectSql = ((PlainSelect) ((Select) CCJSqlParserUtil.parse(sql)).getSelectBody());
if (this.selectSql.getOracleHint() != null) {
String settings = this.selectSql.getOracleHint().getValue();
String[] arr = settings.split("[,]");
for (String set : arr) {
set = set.trim().replace("\n", "");
if (!set.contains("(")) {
this.settings.put(set, true);
} else {
this.settings.put(set.substring(0, set.indexOf("(")), set.substring(set.indexOf("(") + 1, set.length() - 1));
}
}
}
}

public DefaultReactorQLMetadata(PlainSelect selectSql) {
Expand All @@ -286,4 +301,9 @@ public void addFeature(Collection<Feature> features) {
public PlainSelect getSql() {
return selectSql;
}

@Override
public Optional<Object> getSetting(String key) {
return Optional.ofNullable(settings.get(key));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
package org.jetlinks.reactor.ql.supports.distinct;

import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.statement.select.*;
import org.jetlinks.reactor.ql.ReactorQLMetadata;
import org.jetlinks.reactor.ql.ReactorQLRecord;
import org.jetlinks.reactor.ql.feature.DistinctFeature;
import org.jetlinks.reactor.ql.feature.FeatureId;
import org.jetlinks.reactor.ql.feature.ValueMapFeature;
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.util.function.Tuple2;
import reactor.util.function.Tuples;

import java.util.ArrayList;
import java.util.List;
import java.util.function.Function;

public class DefaultDistinctFeature implements DistinctFeature {
@Override
public Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> createDistinctMapper(Distinct distinct, ReactorQLMetadata metadata) {

List<SelectItem> items = distinct.getOnSelectItems();
if (items == null) {
return flux -> flux.distinct(ReactorQLRecord::getRecord);
}
List<Function<ReactorQLRecord, Mono<Object>>> keySelector = new ArrayList<>();
for (SelectItem item : items) {
item.accept(new SelectItemVisitor() {
@Override
public void visit(AllColumns allColumns) {
keySelector.add(record -> Mono.justOrEmpty(record.getRecord()));
}

@Override
public void visit(AllTableColumns allTableColumns) {
String tname = allTableColumns.getTable().getAlias() != null ? allTableColumns.getTable().getAlias().getName() : allTableColumns.getTable().getName();
keySelector.add(record -> Mono.justOrEmpty(record.getRecord(tname)));
}

@Override
public void visit(SelectExpressionItem selectExpressionItem) {
Expression expr = selectExpressionItem.getExpression();
Function<ReactorQLRecord, ? extends Publisher<?>> mapper = ValueMapFeature.createMapperNow(expr, metadata);
keySelector.add(record -> Mono.from(mapper.apply(record)));
}
});
}
return createDistinct(keySelector);
}

protected Function<Flux<ReactorQLRecord>, Flux<ReactorQLRecord>> createDistinct(List<Function<ReactorQLRecord, Mono<Object>>> keySelector) {
return flux -> flux
.flatMap(record -> Flux.fromIterable(keySelector)
.flatMap(mapper -> mapper.apply(record))
.collectList()
.map(list -> Tuples.of(list, record)))
.distinct(Tuple2::getT1)
.map(Tuple2::getT2);
}

@Override
public String getId() {
return FeatureId.Distinct.defaultId.getId();
}
}
52 changes: 52 additions & 0 deletions src/test/java/org/jetlinks/reactor/ql/ReactorQLTest.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package org.jetlinks.reactor.ql;

import lombok.SneakyThrows;
import net.sf.jsqlparser.parser.CCJSqlParserUtil;
import net.sf.jsqlparser.statement.Statement;
import org.hswebframework.utils.time.DateFormatter;
import org.jetlinks.reactor.ql.supports.map.SingleParameterFunctionMapFeature;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -1004,5 +1007,54 @@ void testQuo() {
.verifyComplete();
}

@Test
void testDistinct() {
ReactorQL.builder()
.sql("select distinct this from \"table\" ")
.build()
.start(Flux.just(1,2,3,3,4,5,6,6,6,7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNextCount(7)
.verifyComplete();
}

@Test
void testDistinctColumn() {
ReactorQL.builder()
.sql("select distinct on(this) this from \"table\" ")
.build()
.start(Flux.just(1,2,3,3,4,5,6,6,6,7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNextCount(7)
.verifyComplete();
}

@Test
void testDistinctOnAll() {
ReactorQL.builder()
.sql("select distinct on(*) this from \"table\" ")
.build()
.start(Flux.just(1,2,3,3,4,5,6,6,6,7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNextCount(7)
.verifyComplete();
}

@Test
void testDistinctOnTable() {
ReactorQL.builder()
.sql("select distinct on(t.*) this from \"table\" t ")
.build()
.start(Flux.just(1,2,3,3,4,5,6,6,6,7))
.doOnNext(System.out::println)
.as(StepVerifier::create)
.expectNextCount(7)
.verifyComplete();
}



}
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package org.jetlinks.reactor.ql.supports;

import org.junit.jupiter.api.Test;

import static org.junit.jupiter.api.Assertions.*;

class DefaultReactorQLMetadataTest {

@Test
void testSettingByOracleHint() {
DefaultReactorQLMetadata metadata = new DefaultReactorQLMetadata("select /*+ distinctBy(bloom),ignoreError */ * from test");

assertEquals(metadata.getSetting("distinctBy").orElse(null), "bloom");
assertEquals(metadata.getSetting("ignoreError").orElse(null), true);

}
}

0 comments on commit af2139a

Please sign in to comment.