Skip to content

Commit

Permalink
fix sourceIT
Browse files Browse the repository at this point in the history
  • Loading branch information
DongLiang-0 committed Sep 4, 2024
1 parent a369925 commit 71f37d7
Showing 1 changed file with 17 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,18 @@
import org.apache.doris.flink.container.ContainerUtils;
import org.apache.doris.flink.datastream.DorisSourceFunction;
import org.apache.doris.flink.deserialization.SimpleListDeserializationSchema;
import org.apache.doris.flink.exception.DorisRuntimeException;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Properties;
import java.util.Set;

/** DorisSource ITCase. */
public class DorisSourceITCase extends AbstractITCaseService {
Expand Down Expand Up @@ -280,7 +283,8 @@ public void testTableSourceFilterAndProjectionPushDown() throws Exception {
}

@Test
public void testTableSourceFilterWithUnionAll() throws Exception {
public void testTableSourceFilterWithUnionAll() {
LOG.info("starting to execute testTableSourceFilterWithUnionAll case.");
initializeTable(TABLE_READ_TBL_PUSH_DOWN_WITH_UNION_ALL);
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
Expand All @@ -303,20 +307,25 @@ public void testTableSourceFilterWithUnionAll() throws Exception {
getDorisUsername(),
getDorisPassword());
tEnv.executeSql(sourceDDL);
TableResult tableResult =
tEnv.executeSql(
" SELECT * FROM doris_source_filter_with_union_all where age = '18'"
+ " UNION ALL "
+ "SELECT * FROM doris_source_filter_with_union_all where age = '10' order by age");
String querySql =
" SELECT * FROM doris_source_filter_with_union_all where age = '18'"
+ " UNION ALL "
+ "SELECT * FROM doris_source_filter_with_union_all where age = '10'";
TableResult tableResult = tEnv.executeSql(querySql);

List<String> actual = new ArrayList<>();
try (CloseableIterator<Row> iterator = tableResult.collect()) {
while (iterator.hasNext()) {
actual.add(iterator.next().toString());
}
} catch (Exception e) {
LOG.error("Failed to execute sql. sql={}", querySql, e);
throw new DorisRuntimeException(e);
}
Set<String> expected = new HashSet<>(Arrays.asList("+I[flink, 10]", "+I[doris, 18]"));
for (String a : actual) {
Assert.assertTrue(expected.contains(a));
}
String[] expected = new String[] {"+I[flink, 10]", "+I[doris, 18]"};
checkResult("testTableSourceFilterWithUnionAll", expected, actual.toArray());
}

private void checkResult(String testName, Object[] expected, Object[] actual) {
Expand Down

0 comments on commit 71f37d7

Please sign in to comment.