Skip to content

Commit

Permalink
[HUDI-7269] Fallback to key based merge if positions are missing from…
Browse files Browse the repository at this point in the history
… log block (#11415)

Fallback to key based merging if positions are missing from a log block when 
doing position based read in the fg reader. Changes:

- Make position based buffer extend key based buffer so we can fall back 
   to key based buffer.
- Move some position based logic from the  base buffer into the position 
   buffer because that is the only place it is used.
- If a log block is found to not have positions, we call a method to convert 
   the map to use keys instead of positions: fallbackToKeyBasedBuffer(). 
   This conversion is not completely effective because delete records may 
   not have key stored. We set a flag "needToDoHybridStrategy" to true and 
   then handle this issue when merging with the base file.

---------

Co-authored-by: Jonathan Vexler <=>
Co-authored-by: Sagar Sumit <[email protected]>
  • Loading branch information
jonvex and codope authored Jun 11, 2024
1 parent 51ef709 commit 5143a98
Show file tree
Hide file tree
Showing 22 changed files with 814 additions and 186 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;

import java.io.IOException;

/**
* Spark merger that always chooses the newer record
*/
public class OverwriteWithLatestSparkMerger extends HoodieSparkRecordMerger {

@Override
public String getMergingStrategy() {
return OVERWRITE_MERGER_STRATEGY_UUID;
}

@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
return Option.of(Pair.of(newer, newSchema));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@

import static org.apache.hudi.common.model.HoodieRecord.RECORD_KEY_METADATA_FIELD;
import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.spark.sql.HoodieInternalRowUtils.getCachedSchema;

/**
Expand All @@ -65,6 +66,8 @@ public HoodieRecordMerger getRecordMerger(String mergerStrategy) {
switch (mergerStrategy) {
case DEFAULT_MERGER_STRATEGY_UUID:
return new HoodieSparkRecordMerger();
case OVERWRITE_MERGER_STRATEGY_UUID:
return new OverwriteWithLatestSparkMerger();
default:
throw new HoodieException("The merger strategy UUID is not supported: " + mergerStrategy);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,14 @@ public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data) {
super(key, data);
}

public HoodieAvroIndexedRecord(HoodieKey key, IndexedRecord data, HoodieRecordLocation currentLocation) {
super(key, data, null, currentLocation, null);
}

public HoodieAvroIndexedRecord(IndexedRecord data, HoodieRecordLocation currentLocation) {
super(null, data, null, currentLocation, null);
}

public HoodieAvroIndexedRecord(
HoodieKey key,
IndexedRecord data,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import org.apache.hudi.ApiMaturityLevel;
import org.apache.hudi.PublicAPIClass;
import org.apache.hudi.common.config.RecordMergeMode;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.model.HoodieRecord.HoodieRecordType;
import org.apache.hudi.common.table.HoodieTableConfig;
Expand All @@ -45,6 +46,8 @@ public interface HoodieRecordMerger extends Serializable {

String DEFAULT_MERGER_STRATEGY_UUID = "eeb8d96f-b1e4-49fd-bbf8-28ac514178e5";

String OVERWRITE_MERGER_STRATEGY_UUID = "ce9acb64-bde0-424c-9b91-f6ebba25356d";

/**
* This method converges combineAndGetUpdateValue and precombine from HoodiePayload.
* It'd be associative operation: f(a, f(b, c)) = f(f(a, b), c) (which we can translate as having 3 versions A, B, C
Expand Down Expand Up @@ -163,4 +166,18 @@ default String[] getMandatoryFieldsForMerging(HoodieTableConfig cfg) {
* The kind of merging strategy this recordMerger belongs to. An UUID represents merging strategy.
*/
String getMergingStrategy();

/**
* The record merge mode that corresponds to this record merger
*/
default RecordMergeMode getRecordMergeMode() {
switch (getMergingStrategy()) {
case DEFAULT_MERGER_STRATEGY_UUID:
return RecordMergeMode.EVENT_TIME_ORDERING;
case OVERWRITE_MERGER_STRATEGY_UUID:
return RecordMergeMode.OVERWRITE_WITH_LATEST;
default:
return RecordMergeMode.CUSTOM;
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.hudi.common.model;

import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.Pair;

import org.apache.avro.Schema;

import java.io.IOException;

/**
* Avro Merger that always chooses the newer record
*/
public class OverwriteWithLatestMerger implements HoodieRecordMerger {

@Override
public Option<Pair<HoodieRecord, Schema>> merge(HoodieRecord older, Schema oldSchema, HoodieRecord newer, Schema newSchema, TypedProperties props) throws IOException {
return Option.of(Pair.of(newer, newSchema));
}

@Override
public HoodieRecord.HoodieRecordType getRecordType() {
return HoodieRecord.HoodieRecordType.AVRO;
}

@Override
public String getMergingStrategy() {
return OVERWRITE_MERGER_STRATEGY_UUID;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -198,6 +198,17 @@ public class HoodieTableConfig extends HoodieConfig {
public static final ConfigProperty<String> RECORD_MERGER_STRATEGY = ConfigProperty
.key("hoodie.compaction.record.merger.strategy")
.defaultValue(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID)
.withInferFunction(cfg -> {
switch (RecordMergeMode.valueOf(cfg.getStringOrDefault(RECORD_MERGE_MODE))) {
case EVENT_TIME_ORDERING:
return Option.of(HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID);
case OVERWRITE_WITH_LATEST:
return Option.of(HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID);
case CUSTOM:
default:
return Option.empty();
}
})
.sinceVersion("0.13.0")
.withDocumentation("Id of merger strategy. Hudi will pick HoodieRecordMerger implementations in hoodie.datasource.write.record.merger.impls which has the same merger strategy id");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@
import java.util.stream.Stream;

import static org.apache.hudi.common.model.HoodieRecordMerger.DEFAULT_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.model.HoodieRecordMerger.OVERWRITE_MERGER_STRATEGY_UUID;
import static org.apache.hudi.common.table.HoodieTableConfig.INITIAL_VERSION;
import static org.apache.hudi.common.table.HoodieTableConfig.RECORD_MERGE_MODE;
import static org.apache.hudi.common.util.ConfigUtils.containsConfigProperty;
Expand Down Expand Up @@ -1307,6 +1308,9 @@ public Properties build() {
if (recordMergeMode != null) {
tableConfig.setValue(RECORD_MERGE_MODE, recordMergeMode.name());
}
if (recordMergerStrategy != null) {
tableConfig.setValue(HoodieTableConfig.RECORD_MERGER_STRATEGY, recordMergerStrategy);
}
}

if (null != tableCreateSchema) {
Expand Down Expand Up @@ -1415,20 +1419,25 @@ private void inferRecordMergeMode() {
boolean recordMergerStrategySet = null != recordMergerStrategy;

if (!recordMergerStrategySet
|| recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)) {
|| recordMergerStrategy.equals(DEFAULT_MERGER_STRATEGY_UUID)
|| recordMergerStrategy.equals(OVERWRITE_MERGER_STRATEGY_UUID)) {
if (payloadClassNameSet) {
if (payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName())) {
recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
recordMergerStrategy = OVERWRITE_MERGER_STRATEGY_UUID;
} else if (payloadClassName.equals(DefaultHoodieRecordPayload.class.getName())) {
recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
recordMergerStrategy = DEFAULT_MERGER_STRATEGY_UUID;
} else {
recordMergeMode = RecordMergeMode.CUSTOM;
}
} else if (payloadTypeSet) {
if (payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())) {
recordMergeMode = RecordMergeMode.OVERWRITE_WITH_LATEST;
recordMergerStrategy = OVERWRITE_MERGER_STRATEGY_UUID;
} else if (payloadType.equals(RecordPayloadType.HOODIE_AVRO_DEFAULT.name())) {
recordMergeMode = RecordMergeMode.EVENT_TIME_ORDERING;
recordMergerStrategy = DEFAULT_MERGER_STRATEGY_UUID;
} else {
recordMergeMode = RecordMergeMode.CUSTOM;
}
Expand Down Expand Up @@ -1458,6 +1467,9 @@ private void validateMergeConfigs() {
|| (payloadClassNameSet && payloadClassName.equals(OverwriteWithLatestAvroPayload.class.getName()))
|| (payloadTypeSet && payloadType.equals(RecordPayloadType.OVERWRITE_LATEST_AVRO.name())),
constructMergeConfigErrorMessage());
checkArgument(recordMergerStrategySet && recordMergerStrategy.equals(OVERWRITE_MERGER_STRATEGY_UUID),
"Record merger strategy (" + (recordMergerStrategySet ? recordMergerStrategy : "null")
+ ") should be consistent with the record merging mode OVERWRITE_WITH_LATEST");
break;
case EVENT_TIME_ORDERING:
checkArgument((!payloadClassNameSet && !payloadTypeSet)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
import org.apache.hudi.common.table.HoodieTableMetaClient;
import org.apache.hudi.common.table.log.KeySpec;
import org.apache.hudi.common.table.log.block.HoodieDataBlock;
import org.apache.hudi.common.table.log.block.HoodieLogBlock;
import org.apache.hudi.common.util.DefaultSizeEstimator;
import org.apache.hudi.common.util.HoodieRecordSizeEstimator;
import org.apache.hudi.common.util.InternalSchemaCache;
Expand All @@ -39,26 +38,19 @@
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.ExternalSpillableMap;
import org.apache.hudi.common.util.collection.Pair;
import org.apache.hudi.exception.HoodieCorruptedDataException;
import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.exception.HoodieKeyException;
import org.apache.hudi.exception.HoodieValidationException;
import org.apache.hudi.internal.schema.InternalSchema;
import org.apache.hudi.internal.schema.action.InternalSchemaMerger;
import org.apache.hudi.internal.schema.convert.AvroInternalSchemaConverter;

import org.apache.avro.Schema;
import org.roaringbitmap.longlong.Roaring64NavigableMap;

import java.io.IOException;
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Function;

import static org.apache.hudi.common.engine.HoodieReaderContext.INTERNAL_META_SCHEMA;
Expand Down Expand Up @@ -97,6 +89,10 @@ public HoodieBaseFileGroupRecordBuffer(HoodieReaderContext<T> readerContext,
this.partitionPathFieldOpt = partitionPathFieldOpt;
this.recordMergeMode = getRecordMergeMode(payloadProps);
this.recordMerger = recordMerger;
//Custom merge mode should produce the same results for any merger so we won't fail if there is a mismatch
if (recordMerger.getRecordMergeMode() != this.recordMergeMode && this.recordMergeMode != RecordMergeMode.CUSTOM) {
throw new IllegalStateException("Record merger is " + recordMerger.getClass().getName() + " but merge mode is " + this.recordMergeMode);
}
this.payloadProps = payloadProps;
this.internalSchema = readerContext.getSchemaHandler().getInternalSchema();
this.hoodieTableMetaClient = hoodieTableMetaClient;
Expand Down Expand Up @@ -293,21 +289,28 @@ protected Option<Pair<T, Map<String, Object>>> doProcessNextDataRecord(T record,
protected Option<DeleteRecord> doProcessNextDeletedRecord(DeleteRecord deleteRecord,
Pair<Option<T>, Map<String, Object>> existingRecordMetadataPair) {
if (existingRecordMetadataPair != null) {
// Merge and store the merged record. The ordering val is taken to decide whether the same key record
// should be deleted or be kept. The old record is kept only if the DELETE record has smaller ordering val.
// For same ordering values, uses the natural order(arrival time semantics).
Comparable existingOrderingVal = readerContext.getOrderingValue(
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), readerSchema,
payloadProps);
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
boolean chooseExisting = !deleteOrderingVal.equals(0)
&& ReflectionUtils.isSameClass(existingOrderingVal, deleteOrderingVal)
&& existingOrderingVal.compareTo(deleteOrderingVal) > 0;
if (chooseExisting) {
// The DELETE message is obsolete if the old message has greater orderingVal.
return Option.empty();
switch (recordMergeMode) {
case OVERWRITE_WITH_LATEST:
return Option.empty();
case EVENT_TIME_ORDERING:
case CUSTOM:
default:
Comparable existingOrderingVal = readerContext.getOrderingValue(
existingRecordMetadataPair.getLeft(), existingRecordMetadataPair.getRight(), readerSchema,
payloadProps);
if (isDeleteRecordWithNaturalOrder(existingRecordMetadataPair.getLeft(), existingOrderingVal)) {
return Option.empty();
}
Comparable deleteOrderingVal = deleteRecord.getOrderingValue();
// Checks the ordering value does not equal to 0
// because we use 0 as the default value which means natural order
boolean chooseExisting = !deleteOrderingVal.equals(0)
&& ReflectionUtils.isSameClass(existingOrderingVal, deleteOrderingVal)
&& existingOrderingVal.compareTo(deleteOrderingVal) > 0;
if (chooseExisting) {
// The DELETE message is obsolete if the old message has greater orderingVal.
return Option.empty();
}
}
}
// Do delete.
Expand Down Expand Up @@ -428,60 +431,6 @@ protected Option<T> merge(Option<T> older, Map<String, Object> olderInfoMap,
}
}

/**
* Filter a record for downstream processing when:
* 1. A set of pre-specified keys exists.
* 2. The key of the record is not contained in the set.
*/
protected boolean shouldSkip(T record, String keyFieldName, boolean isFullKey, Set<String> keys, Schema writerSchema) {
String recordKey = readerContext.getValue(record, writerSchema, keyFieldName).toString();
// Can not extract the record key, throw.
if (recordKey == null || recordKey.isEmpty()) {
throw new HoodieKeyException("Can not extract the key for a record");
}

// No keys are specified. Cannot skip at all.
if (keys.isEmpty()) {
return false;
}

// When the record key matches with one of the keys or key prefixes, can not skip.
if ((isFullKey && keys.contains(recordKey))
|| (!isFullKey && keys.stream().anyMatch(recordKey::startsWith))) {
return false;
}

// Otherwise, this record is not needed.
return true;
}

/**
* Extract the record positions from a log block header.
*
* @param logBlock
* @return
* @throws IOException
*/
protected static List<Long> extractRecordPositions(HoodieLogBlock logBlock) throws IOException {
List<Long> blockPositions = new ArrayList<>();

Roaring64NavigableMap positions = logBlock.getRecordPositions();
if (positions == null || positions.isEmpty()) {
throw new HoodieValidationException("No record position info is found when attempt to do position based merge.");
}

Iterator<Long> iterator = positions.iterator();
while (iterator.hasNext()) {
blockPositions.add(iterator.next());
}

if (blockPositions.isEmpty()) {
throw new HoodieCorruptedDataException("No positions are extracted.");
}

return blockPositions;
}

protected boolean hasNextBaseRecord(T baseRecord, Pair<Option<T>, Map<String, Object>> logRecordInfo) throws IOException {
Map<String, Object> metadata = readerContext.generateMetadataForRecord(
baseRecord, readerSchema);
Expand Down
Loading

0 comments on commit 5143a98

Please sign in to comment.