Skip to content

Commit

Permalink
Re-add 1.18 and 1.19 changes because they inherit base class changes
Browse files Browse the repository at this point in the history
  • Loading branch information
mxm committed Jan 28, 2025
1 parent 5719395 commit d3d4e46
Show file tree
Hide file tree
Showing 10 changed files with 138 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD

@Override
protected Object get(RowData struct, int index) {
// Be sure to check for null values, even if the field is required. Without an explicit null
// check, BinaryRowData will ignore the null flag and parse random bytes as actual values.
// This will produce incorrect writes instead of failing with a NullPointerException.
if (struct.isNullAt(index)) {
return null;
}
return fieldGetter[index].getFieldOrNull(struct);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L);
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
writeAndValidate(schema, expectedRecords);
}

protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema);

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord)
throws IOException {
@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

Expand All @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
try (CloseableIterable<RowData> reader = createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
for (int i = 0; i < expectedRecords.size(); i++) {
assertThat(rows).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next());
}
Expand All @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
.build()) {
Iterator<RowData> expected = expectedRows.iterator();
Iterator<Record> records = reader.iterator();
for (int i = 0; i < numRecord; i += 1) {
for (int i = 0; i < expectedRecords.size(); i += 1) {
assertThat(records).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next());
}
Expand Down Expand Up @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException {
1643811742000L,
10.24d));

writeAndValidate(SCHEMA_NUM_TYPE, expected, 2);
writeAndValidate(SCHEMA_NUM_TYPE, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Disabled;

public class TestFlinkOrcReaderWriter extends DataTest {
private static final int NUM_RECORDS = 100;

@Override
protected void writeAndValidate(Schema schema) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L);
writeAndValidate(schema, expectedRecords);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

File recordsFile = File.createTempFile("junit", null, temp.toFile());
Expand Down Expand Up @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException {
assertThat(records).isExhausted();
}
}

@Override
@Disabled("ORC file format supports null values even for required fields")
public void testWriteNullValueForRequiredType() {
super.testWriteNullValueForRequiredType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
Expand All @@ -33,10 +36,12 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkParquetWriter extends DataTest {
Expand Down Expand Up @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException {
schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
schema);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException {
RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema));
List<RowData> binaryRowList = Lists.newArrayList();
for (Record record : expectedData) {
RowData rowData = RowDataConverter.convert(schema, record);
BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
binaryRowList.add(binaryRow);
}
writeAndValidate(binaryRowList, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,9 @@ static class RowDataWriter extends GenericOrcWriters.StructWriter<RowData> {

@Override
protected Object get(RowData struct, int index) {
if (struct.isNullAt(index)) {
return null;
}
return fieldGetters.get(index).getFieldOrNull(struct);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -498,6 +498,12 @@ private static class RowDataWriter extends ParquetValueWriters.StructWriter<RowD

@Override
protected Object get(RowData struct, int index) {
// Be sure to check for null values, even if the field is required. Without an explicit null
// check, BinaryRowData will ignore the null flag and parse random bytes as actual values.
// This will produce incorrect writes instead of failing with a NullPointerException.
if (struct.isNullAt(index)) {
return null;
}
return fieldGetter[index].getFieldOrNull(struct);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ public abstract class AbstractTestFlinkAvroReaderWriter extends DataTest {
@Override
protected void writeAndValidate(Schema schema) throws IOException {
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1991L);
writeAndValidate(schema, expectedRecords, NUM_RECORDS);
writeAndValidate(schema, expectedRecords);
}

protected abstract Avro.ReadBuilder createAvroReadBuilder(File recordsFile, Schema schema);

private void writeAndValidate(Schema schema, List<Record> expectedRecords, int numRecord)
throws IOException {
@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

Expand All @@ -93,7 +93,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
try (CloseableIterable<RowData> reader = createAvroReadBuilder(recordsFile, schema).build()) {
Iterator<Record> expected = expectedRecords.iterator();
Iterator<RowData> rows = reader.iterator();
for (int i = 0; i < numRecord; i++) {
for (int i = 0; i < expectedRecords.size(); i++) {
assertThat(rows).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, expected.next(), rows.next());
}
Expand All @@ -120,7 +120,7 @@ private void writeAndValidate(Schema schema, List<Record> expectedRecords, int n
.build()) {
Iterator<RowData> expected = expectedRows.iterator();
Iterator<Record> records = reader.iterator();
for (int i = 0; i < numRecord; i += 1) {
for (int i = 0; i < expectedRecords.size(); i += 1) {
assertThat(records).hasNext();
TestHelpers.assertRowData(schema.asStruct(), flinkSchema, records.next(), expected.next());
}
Expand Down Expand Up @@ -177,6 +177,6 @@ public void testNumericTypes() throws IOException {
1643811742000L,
10.24d));

writeAndValidate(SCHEMA_NUM_TYPE, expected, 2);
writeAndValidate(SCHEMA_NUM_TYPE, expected);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,14 +39,20 @@
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.orc.ORC;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.Disabled;

public class TestFlinkOrcReaderWriter extends DataTest {
private static final int NUM_RECORDS = 100;

@Override
protected void writeAndValidate(Schema schema) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<Record> expectedRecords = RandomGenericData.generate(schema, NUM_RECORDS, 1990L);
writeAndValidate(schema, expectedRecords);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedRecords) throws IOException {
RowType flinkSchema = FlinkSchemaUtil.convert(schema);
List<RowData> expectedRows = Lists.newArrayList(RandomRowData.convert(schema, expectedRecords));

File recordsFile = File.createTempFile("junit", null, temp.toFile());
Expand Down Expand Up @@ -104,4 +110,10 @@ protected void writeAndValidate(Schema schema) throws IOException {
assertThat(records).isExhausted();
}
}

@Override
@Disabled("ORC file format supports null values even for required fields")
public void testWriteNullValueForRequiredType() {
super.testWriteNullValueForRequiredType();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@
import java.io.IOException;
import java.nio.file.Path;
import java.util.Iterator;
import java.util.List;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.binary.BinaryRowData;
import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
import org.apache.flink.table.types.logical.LogicalType;
import org.apache.iceberg.Files;
import org.apache.iceberg.Schema;
Expand All @@ -33,10 +36,12 @@
import org.apache.iceberg.data.Record;
import org.apache.iceberg.data.parquet.GenericParquetReaders;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.RowDataConverter;
import org.apache.iceberg.flink.TestHelpers;
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.junit.jupiter.api.io.TempDir;

public class TestFlinkParquetWriter extends DataTest {
Expand Down Expand Up @@ -91,4 +96,16 @@ protected void writeAndValidate(Schema schema) throws IOException {
schema, NUM_RECORDS, 21124, NUM_RECORDS / 20)),
schema);
}

@Override
protected void writeAndValidate(Schema schema, List<Record> expectedData) throws IOException {
RowDataSerializer rowDataSerializer = new RowDataSerializer(FlinkSchemaUtil.convert(schema));
List<RowData> binaryRowList = Lists.newArrayList();
for (Record record : expectedData) {
RowData rowData = RowDataConverter.convert(schema, record);
BinaryRowData binaryRow = rowDataSerializer.toBinaryRow(rowData);
binaryRowList.add(binaryRow);
}
writeAndValidate(binaryRowList, schema);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.flink.sink;

import static org.apache.iceberg.flink.TestFixtures.DATABASE;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand All @@ -27,20 +28,24 @@
import java.util.Map;
import org.apache.flink.api.dag.Transformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.RowData;
import org.apache.flink.types.Row;
import org.apache.flink.util.ExceptionUtils;
import org.apache.iceberg.DistributionMode;
import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Parameter;
import org.apache.iceberg.ParameterizedTestExtension;
import org.apache.iceberg.Parameters;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.catalog.TableIdentifier;
import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.FlinkWriteOptions;
import org.apache.iceberg.flink.MiniFlinkClusterExtension;
import org.apache.iceberg.flink.SimpleDataUtil;
Expand All @@ -51,6 +56,9 @@
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Maps;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Assume;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.TestTemplate;
import org.junit.jupiter.api.extension.ExtendWith;
Expand Down Expand Up @@ -414,6 +422,49 @@ void testOperatorsUidNameWitUidSuffix() throws Exception {
assertThat(secondTransformation.getName()).isEqualTo("data-ingestion");
}

@TestTemplate
void testErrorOnNullForRequiredField() throws Exception {
Assume.assumeFalse(
"ORC file format supports null values even for required fields.", format == FileFormat.ORC);

Schema icebergSchema =
new Schema(
Types.NestedField.required(1, "id2", Types.IntegerType.get()),
Types.NestedField.required(2, "data2", Types.StringType.get()));
Table table2 =
CATALOG_EXTENSION
.catalog()
.createTable(
TableIdentifier.of(DATABASE, "t2"),
icebergSchema,
PartitionSpec.unpartitioned(),
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name()));

// Null out a required field
List<Row> rows = List.of(Row.of(42, null));

env = StreamExecutionEnvironment.getExecutionEnvironment();

DataStream<Row> dataStream =
env.addSource(createBoundedSource(rows), ROW_TYPE_INFO).uid("mySourceId");

TableSchema flinkSchema = FlinkSchemaUtil.toSchema(icebergSchema);
IcebergSink.forRow(dataStream, flinkSchema)
.table(table2)
.tableLoader(tableLoader)
.tableSchema(flinkSchema)
.writeParallelism(parallelism)
.distributionMode(DistributionMode.HASH)
.append();

try {
env.execute();
Assert.fail();
} catch (JobExecutionException e) {
assertThat(ExceptionUtils.findThrowable(e, t -> t instanceof NullPointerException));
}
}

private void testWriteRow(TableSchema tableSchema, DistributionMode distributionMode)
throws Exception {
List<Row> rows = createRows("");
Expand Down

0 comments on commit d3d4e46

Please sign in to comment.