From e0d23f37f980a230b3902cdd1553af0eee5422ec Mon Sep 17 00:00:00 2001 From: Arek Burdach <> Date: Thu, 3 Oct 2024 12:45:53 +0200 Subject: [PATCH] Added test for correct sink interface usage --- docs/docs/flink-writes.md | 2 +- .../iceberg/flink/TestFlinkTableSink.java | 1 - .../flink/TestFlinkTableSinkExtended.java | 54 ++++++++++++++----- 3 files changed, 43 insertions(+), 14 deletions(-) diff --git a/docs/docs/flink-writes.md b/docs/docs/flink-writes.md index 2671ec0309e2..0da6c08e4763 100644 --- a/docs/docs/flink-writes.md +++ b/docs/docs/flink-writes.md @@ -380,7 +380,7 @@ The previous [SinkV1 interface](https://cwiki.apache.org/confluence/display/FLIN had some limitations - for example it created a lot of small files when writing to it. This problem is called the `small-file-compaction` problem in the [FLIP-191 document](https://cwiki.apache.org/confluence/display/FLINK/FLIP-191%3A+Extend+unified+Sink+interface+to+support+small+file+compaction). -The default `FlinkSink` implementation available in `iceberg-flink` module builds its own `StreamOperator`s chain ends with `DiscardingSink`. +The default `FlinkSink` implementation available in `iceberg-flink` module builds its own chain of `StreamOperator`s terminated by `DiscardingSink`. However, in the same module, there is also `IcebergSink` which is based on the SinkV2 API. The SinkV2 based `IcebergSink` is currently an experimental feature. diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java index c5a9dd2d8960..24822a3725ae 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSink.java @@ -40,7 +40,6 @@ public class TestFlinkTableSink extends CatalogTestBase { - private static final String SOURCE_TABLE = "default_catalog.default_database.bounded_source"; private static final String TABLE_NAME = "test_table"; private TableEnvironment tEnv; private Table icebergTable; diff --git a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java index fe50cfa0557d..da596959cf70 100644 --- a/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java +++ b/flink/v1.20/flink/src/test/java/org/apache/iceberg/flink/TestFlinkTableSinkExtended.java @@ -31,6 +31,8 @@ import java.util.stream.IntStream; import org.apache.flink.api.dag.Transformation; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; +import org.apache.flink.streaming.api.transformations.LegacySinkTransformation; +import org.apache.flink.streaming.api.transformations.SinkTransformation; import org.apache.flink.table.api.EnvironmentSettings; import org.apache.flink.table.api.TableEnvironment; import org.apache.flink.table.api.bridge.java.StreamTableEnvironment; @@ -157,6 +159,27 @@ public void clean() throws Exception { catalog.close(); } + @TestTemplate + public void testUsedFlinkSinkInterface() { + String dataId = BoundedTableFactory.registerDataSet(Collections.emptyList()); + sql( + "CREATE TABLE %s(id INT NOT NULL, data STRING NOT NULL)" + + " WITH ('connector'='BoundedSource', 'data-id'='%s')", + SOURCE_TABLE, dataId); + + PlannerBase planner = (PlannerBase) ((TableEnvironmentImpl) getTableEnv()).getPlanner(); + String insertSQL = String.format("INSERT INTO %s SELECT * FROM %s", TABLE, SOURCE_TABLE); + ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); + Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); + if (useV2Sink) { + assertThat(sink).as("Should use SinkV2 API").isInstanceOf(SinkTransformation.class); + } else { + assertThat(sink) + .as("Should use custom chain of StreamOperators terminated by DiscardingSink") + .isInstanceOf(LegacySinkTransformation.class); + } + } + @TestTemplate public void testWriteParallelism() { List dataSet = @@ -176,18 +199,25 @@ public void testWriteParallelism() { "INSERT INTO %s /*+ OPTIONS('write-parallelism'='1') */ SELECT * FROM %s", TABLE, SOURCE_TABLE); ModifyOperation operation = (ModifyOperation) planner.getParser().parse(insertSQL).get(0); - Transformation dummySink = planner.translate(Collections.singletonList(operation)).get(0); - Transformation committer = dummySink.getInputs().get(0); - Transformation writer = committer.getInputs().get(0); - - assertThat(writer.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); - writer - .getInputs() - .forEach( - input -> - assertThat(input.getParallelism()) - .as("Should have the expected parallelism.") - .isEqualTo(isStreamingJob ? 2 : 4)); + Transformation sink = planner.translate(Collections.singletonList(operation)).get(0); + if (useV2Sink) { + assertThat(sink.getParallelism()).as("Should have the expected 1 parallelism.").isEqualTo(1); + Transformation writerInput = sink.getInputs().get(0); + assertThat(writerInput.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4); + } else { + Transformation committer = sink.getInputs().get(0); + Transformation writer = committer.getInputs().get(0); + + assertThat(writer.getParallelism()) + .as("Should have the expected 1 parallelism.") + .isEqualTo(1); + Transformation writerInput = writer.getInputs().get(0); + assertThat(writerInput.getParallelism()) + .as("Should have the expected parallelism.") + .isEqualTo(isStreamingJob ? 2 : 4); + } } @TestTemplate