Skip to content

Commit

Permalink
[HUDI-6508] Support compilation on Java 11
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua committed Jun 25, 2024
1 parent 3414166 commit 027690b
Show file tree
Hide file tree
Showing 7 changed files with 183 additions and 72 deletions.
167 changes: 155 additions & 12 deletions .github/workflows/bot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -245,12 +245,6 @@ jobs:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"

steps:
- uses: actions/checkout@v3
Expand Down Expand Up @@ -285,15 +279,13 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Java FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

Expand All @@ -308,6 +300,49 @@ jobs:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.4"
sparkModules: "hudi-spark-datasource/hudi-spark3.4.x"

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
uses: actions/setup-java@v3
with:
java-version: '8'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Build Project
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Scala UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Scala FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java11-17-java-tests:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
Expand All @@ -317,10 +352,65 @@ jobs:

steps:
- uses: actions/checkout@v3
- name: Set up JDK 8
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '8'
java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Build Project
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn clean install -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DskipTests=true $MVN_ARGS -am -pl "hudi-examples/hudi-examples-spark,hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES"
- name: Set up JDK 17
uses: actions/setup-java@v3
with:
java-version: '17'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Quickstart Test
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl hudi-examples/hudi-examples-spark $MVN_ARGS
- name: Java UT - Common & Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DwildcardSuites=skipScalaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Java FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

test-spark-java11-17-scala-tests:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scalaProfile: "scala-2.12"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"
- scalaProfile: "scala-2.13"
sparkProfile: "spark3.5"
sparkModules: "hudi-spark-datasource/hudi-spark3.5.x"

steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
Expand All @@ -342,15 +432,13 @@ jobs:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Punit-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "hudi-common,$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS
- name: Scala FT - Spark
env:
SCALA_PROFILE: ${{ matrix.scalaProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_MODULES: ${{ matrix.sparkModules }}
if: ${{ !endsWith(env.SPARK_PROFILE, '3.2') }} # skip test spark 3.2 as it's covered by Azure CI
run:
mvn test -Pfunctional-tests -Pjava17 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -Dtest=skipJavaTests -DfailIfNoTests=false -pl "$SPARK_COMMON_MODULES,$SPARK_MODULES" $MVN_ARGS

Expand Down Expand Up @@ -525,6 +613,61 @@ jobs:
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh $HUDI_VERSION openjdk17
validate-bundles-java11:
runs-on: ubuntu-latest
strategy:
matrix:
include:
- scalaProfile: 'scala-2.13'
flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
- scalaProfile: 'scala-2.12'
flinkProfile: 'flink1.18'
sparkProfile: 'spark3.5'
sparkRuntime: 'spark3.5.0'
steps:
- uses: actions/checkout@v3
- name: Set up JDK 11
uses: actions/setup-java@v3
with:
java-version: '11'
distribution: 'temurin'
architecture: x64
cache: maven
- name: Build Project
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SCALA_PROFILE: ${{ matrix.scalaProfile }}
run: |
if [ "$SCALA_PROFILE" == "scala-2.13" ]; then
mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -Dmaven.javadoc.skip=true -pl packaging/hudi-hadoop-mr-bundle,packaging/hudi-spark-bundle,packaging/hudi-utilities-bundle,packaging/hudi-utilities-slim-bundle -am
else
mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$SPARK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -Dmaven.javadoc.skip=true
# TODO remove the sudo below. It's a needed workaround as detailed in HUDI-5708.
sudo chown -R "$USER:$(id -g -n)" hudi-platform-service/hudi-metaserver/target/generated-sources
mvn clean package -T 2 -D"$SCALA_PROFILE" -D"$FLINK_PROFILE" -DdeployArtifacts=true -DskipTests=true $MVN_ARGS -Dmaven.javadoc.skip=true -pl packaging/hudi-flink-bundle -am -Davro.version=1.10.0
fi
- name: IT - Bundle Validation - OpenJDK 11
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: ${{ matrix.scalaProfile }}
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh hudi_docker_java11 $HUDI_VERSION openjdk11
- name: IT - Bundle Validation - OpenJDK 17
env:
FLINK_PROFILE: ${{ matrix.flinkProfile }}
SPARK_PROFILE: ${{ matrix.sparkProfile }}
SPARK_RUNTIME: ${{ matrix.sparkRuntime }}
SCALA_PROFILE: ${{ matrix.scalaProfile }}
run: |
HUDI_VERSION=$(mvn help:evaluate -Dexpression=project.version -q -DforceStdout)
./packaging/bundle-validation/ci_run.sh hudi_docker_java17 $HUDI_VERSION openjdk17
integration-tests:
runs-on: ubuntu-latest
strategy:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -687,9 +688,10 @@ public void testHandleUpdateWithMultiplePartitions() throws Exception {
BaseSparkDeltaCommitActionExecutor actionExecutor = new SparkDeleteDeltaCommitActionExecutor(context(), cfg, hoodieTable,
newDeleteTime, HoodieJavaRDD.of(deleteRDD));
actionExecutor.getUpsertPartitioner(new WorkloadProfile(buildProfile(deleteRDD)));
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> deleteStatus = jsc().parallelize(Arrays.asList(1))
.map(x -> (Iterator<List<WriteStatus>>)
actionExecutor.handleUpdate(partitionPath, fileId, fewRecordsForDelete.iterator()))
.map(Transformations::flatten).collect();

// Verify there are errors because records are from multiple partitions (but handleUpdate is invoked for
// specific partition)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Properties;
Expand Down Expand Up @@ -464,9 +465,9 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
final List<HoodieRecord> inserts = dataGen.generateInsertsWithHoodieAvroPayload(instantTime, 100);
BaseSparkCommitActionExecutor actionExecutor = new SparkInsertCommitActionExecutor(context, config, table,
instantTime, context.parallelize(inserts));
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> {
return actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> ws = jsc.parallelize(Arrays.asList(1)).map(x -> (Iterator<List<WriteStatus>>)
actionExecutor.handleInsert(UUID.randomUUID().toString(), inserts.iterator()))
.map(Transformations::flatten).collect();

WriteStatus writeStatus = ws.get(0).get(0);
String fileId = writeStatus.getFileId();
Expand All @@ -478,9 +479,10 @@ public void testInsertUpsertWithHoodieAvroPayload() throws Exception {
table = (HoodieSparkCopyOnWriteTable) HoodieSparkTable.create(config, context, HoodieTableMetaClient.reload(metaClient));
BaseSparkCommitActionExecutor newActionExecutor = new SparkUpsertCommitActionExecutor(context, config, table,
instantTime, context.parallelize(updates));
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1)).map(x -> {
return newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator());
}).map(Transformations::flatten).collect();
final List<List<WriteStatus>> updateStatus = jsc.parallelize(Arrays.asList(1))
.map(x -> (Iterator<List<WriteStatus>>)
newActionExecutor.handleUpdate(partitionPath, fileId, updates.iterator()))
.map(Transformations::flatten).collect();
assertEquals(updates.size() - numRecordsInPartition, updateStatus.get(0).get(0).getTotalErrorRecords());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@
import java.util.Map;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -305,20 +304,14 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
) {
ParquetMetadata metadata = readMetadata(conf, parquetFilePath);

// NOTE: This collector has to have fully specialized generic type params since
// Java 1.8 struggles to infer them
Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, List<HoodieColumnRangeMetadata<Comparable>>>> groupingByCollector =
Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName);

// Collect stats from all individual Parquet blocks
Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
(Map<String, List<HoodieColumnRangeMetadata<Comparable>>>) metadata.getBlocks().stream().sequential()
Stream<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadataStream = metadata.getBlocks().stream().sequential()
.flatMap(blockMetaData ->
blockMetaData.getColumns().stream()
.filter(f -> cols.contains(f.getPath().toDotString()))
.map(columnChunkMetaData -> {
Statistics stats = columnChunkMetaData.getStatistics();
return HoodieColumnRangeMetadata.<Comparable>create(
return (HoodieColumnRangeMetadata<Comparable>) HoodieColumnRangeMetadata.<Comparable>create(
parquetFilePath.getName(),
columnChunkMetaData.getPath().toDotString(),
convertToNativeJavaType(
Expand All @@ -335,8 +328,10 @@ public List<HoodieColumnRangeMetadata<Comparable>> readRangeFromParquetMetadata(
columnChunkMetaData.getTotalSize(),
columnChunkMetaData.getTotalUncompressedSize());
})
)
.collect(groupingByCollector);
);

Map<String, List<HoodieColumnRangeMetadata<Comparable>>> columnToStatsListMap =
hoodieColumnRangeMetadataStream.collect(Collectors.groupingBy(HoodieColumnRangeMetadata::getColumnName));

// Combine those into file-level statistics
// NOTE: Inlining this var makes javac (1.8) upset (due to its inability to infer
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@
import java.util.UUID;
import java.util.function.BiFunction;
import java.util.function.Function;
import java.util.stream.Collector;
import java.util.stream.Collectors;
import java.util.stream.Stream;

Expand Down Expand Up @@ -238,27 +237,25 @@ class ColumnStats {
});
});

Collector<HoodieColumnRangeMetadata<Comparable>, ?, Map<String, HoodieColumnRangeMetadata<Comparable>>> collector =
Collectors.toMap(colRangeMetadata -> colRangeMetadata.getColumnName(), Function.identity());

return (Map<String, HoodieColumnRangeMetadata<Comparable>>) targetFields.stream()
.map(field -> {
Stream<HoodieColumnRangeMetadata<Comparable>> hoodieColumnRangeMetadataStream =
targetFields.stream().map(field -> {
ColumnStats colStats = allColumnStats.get(field.name());
return HoodieColumnRangeMetadata.<Comparable>create(
filePath,
field.name(),
colStats == null ? null : coerceToComparable(field.schema(), colStats.minValue),
colStats == null ? null : coerceToComparable(field.schema(), colStats.maxValue),
colStats == null ? 0 : colStats.nullCount,
colStats == null ? 0 : colStats.valueCount,
colStats == null ? 0L : colStats.nullCount,
colStats == null ? 0L : colStats.valueCount,
// NOTE: Size and compressed size statistics are set to 0 to make sure we're not
// mixing up those provided by Parquet with the ones from other encodings,
// since those are not directly comparable
0,
0
0L,
0L
);
})
.collect(collector);
});
return hoodieColumnRangeMetadataStream.collect(
Collectors.toMap(HoodieColumnRangeMetadata::getColumnName, Function.identity()));
}

/**
Expand Down
14 changes: 0 additions & 14 deletions hudi-examples/hudi-examples-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -40,20 +40,6 @@
</resources>

<plugins>
<plugin>
<groupId>net.alchim31.maven</groupId>
<artifactId>scala-maven-plugin</artifactId>
<executions>
<execution>
<id>scala-compile-first</id>
<phase>process-resources</phase>
<goals>
<goal>add-source</goal>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
Expand Down
Loading

0 comments on commit 027690b

Please sign in to comment.