Skip to content

Commit

Permalink
Added test for correct sink interface usage
Browse files Browse the repository at this point in the history
  • Loading branch information
Arek Burdach committed Oct 3, 2024
1 parent 213668f commit e0d23f3
Show file tree
Hide file tree
Showing 3 changed files with 43 additions and 14 deletions.
2 changes: 1 addition & 1 deletion docs/docs/flink-writes.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLIN
had some limitations - for example it created a lot of small files when writing to it. This problem is called
the `small-file-compaction` problem in
the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction).
The default `FlinkSink` implementation available in `iceberg-flink` module builds its own `StreamOperator`s chain ends with `DiscardingSink`.
The default `FlinkSink` implementation available in `iceberg-flink` module builds its own chain of `StreamOperator`s terminated by `DiscardingSink`.
However, in the same module, there is also `IcebergSink` which is based on the SinkV2 API.
The SinkV2 based `IcebergSink` is currently an experimental feature.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@

public class TestFlinkTableSink extends CatalogTestBase {

private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source";
private static final String TABLE_NAME = "test_table";
private TableEnvironment tEnv;
private Table icebergTable;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import java.util.stream.IntStream;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.transformations.LegacySinkTransformation;
import org.apache.flink.streaming.api.transformations.SinkTransformation;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
Expand Down Expand Up @@ -157,6 +159,27 @@ public void clean() throws Exception {
catalog.close();
}

@TestTemplate
public void testUsedFlinkSinkInterface() {
String dataId = BoundedTableFactory.registerDataSet(Collections.emptyList());
sql(
"CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)"
+ " WITH ('connector'='BoundedSource', 'data-id'='%s')",
SOURCE_TABLE, dataId);

PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner();
String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0);
if (useV2Sink) {
assertThat(sink).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class);
} else {
assertThat(sink)
.as("Should use custom chain of StreamOperators terminated by DiscardingSink")
.isInstanceOf(LegacySinkTransformation.class);
}
}

@TestTemplate
public void testWriteParallelism() {
List<Row> dataSet =
Expand All @@ -176,18 +199,25 @@ public void testWriteParallelism() {
"INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s",
TABLE, SOURCE_TABLE);
ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0);
Transformation<?> dummySink = planner.translate(Collections.singletonList(operation)).get(0);
Transformation<?> committer = dummySink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);

assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
writer
.getInputs()
.forEach(
input ->
assertThat(input.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4));
Transformation<?> sink = planner.translate(Collections.singletonList(operation)).get(0);
if (useV2Sink) {
assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1);
Transformation<?> writerInput = sink.getInputs().get(0);
assertThat(writerInput.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4);
} else {
Transformation<?> committer = sink.getInputs().get(0);
Transformation<?> writer = committer.getInputs().get(0);

assertThat(writer.getParallelism())
.as("Should have the expected 1 parallelism.")
.isEqualTo(1);
Transformation<?> writerInput = writer.getInputs().get(0);
assertThat(writerInput.getParallelism())
.as("Should have the expected parallelism.")
.isEqualTo(isStreamingJob ? 2 : 4);
}
}

@TestTemplate
Expand Down

0 comments on commit e0d23f3

Please sign in to comment.