Skip to content

Commit

Permalink
[Refactor] Refactor Range<PartitionKey> and PListCell into PCell for …
Browse files Browse the repository at this point in the history
…better abstraction (#53725)

Signed-off-by: shuming.li <[email protected]>
  • Loading branch information
LiShuMing committed Feb 4, 2025
1 parent 6ea8767 commit 0eeec5e
Show file tree
Hide file tree
Showing 34 changed files with 742 additions and 840 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.google.common.collect.Range;
import com.google.common.collect.Sets;
import com.google.gson.annotations.SerializedName;
import com.starrocks.analysis.DescriptorTable.ReferencedPartitionInfo;
Expand Down Expand Up @@ -73,6 +72,7 @@
import com.starrocks.sql.analyzer.Scope;
import com.starrocks.sql.analyzer.SelectAnalyzer;
import com.starrocks.sql.ast.UserIdentity;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.optimizer.CachingMvPlanContextBuilder;
import com.starrocks.sql.optimizer.MvRewritePreprocessor;
Expand Down Expand Up @@ -1530,14 +1530,14 @@ public Map<Table, List<Column>> getRefBaseTablePartitionColumns() {
* we also need to consider other ref table partitions(p0); otherwise, the mv's final result will lose data.
*/
public boolean isCalcPotentialRefreshPartition(List<TableWithPartitions> baseChangedPartitionNames,
Map<Table, Map<String, Range<PartitionKey>>> refBaseTableRangePartitionMap,
Map<Table, Map<String, PCell>> refBaseTablePartitionToCells,
Set<String> mvPartitions,
Map<String, Range<PartitionKey>> mvPartitionNameToRangeMap) {
Map<String, PCell> mvPartitionToCells) {
List<PRangeCell> mvSortedPartitionRanges =
TableWithPartitions.getSortedPartitionRanges(mvPartitionNameToRangeMap, mvPartitions);
TableWithPartitions.getSortedPartitionRanges(mvPartitionToCells, mvPartitions);
for (TableWithPartitions baseTableWithPartition : baseChangedPartitionNames) {
Map<String, Range<PartitionKey>> baseRangePartitionMap =
refBaseTableRangePartitionMap.get(baseTableWithPartition.getTable());
Map<String, PCell> baseRangePartitionMap =
refBaseTablePartitionToCells.get(baseTableWithPartition.getTable());
List<PRangeCell> baseSortedPartitionRanges =
baseTableWithPartition.getSortedPartitionRanges(baseRangePartitionMap);
for (PRangeCell basePartitionRange : baseSortedPartitionRanges) {
Expand Down Expand Up @@ -2141,4 +2141,6 @@ public List<Column> getBaseSchema() {
}
return schema;
}


}
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public class MvBaseTableUpdateInfo {
// The partition names of base table that have been updated
private final Set<String> toRefreshPartitionNames = Sets.newHashSet();
// The mapping of partition name to partition range
private final Map<String, PCell> nameToPartKeys = Maps.newHashMap();
private final Map<String, PCell> partitonToCells = Maps.newHashMap();

// If the base table is a mv, needs to record the mapping of mv partition name to partition range
private final Map<String, PCell> mvPartitionNameToCellMap = Maps.newHashMap();
Expand All @@ -52,8 +52,8 @@ public Set<String> getToRefreshPartitionNames() {
return toRefreshPartitionNames;
}

public Map<String, PCell> getNameToPartKeys() {
return nameToPartKeys;
public Map<String, PCell> getPartitonToCells() {
return partitonToCells;
}

/**
Expand All @@ -71,22 +71,22 @@ public void addToRefreshPartitionNames(Set<String> toRefreshPartitionNames) {
*/
public void addRangePartitionKeys(String partitionName,
Range<PartitionKey> rangePartitionKey) {
nameToPartKeys.put(partitionName, new PRangeCell(rangePartitionKey));
partitonToCells.put(partitionName, new PRangeCell(rangePartitionKey));
}

/**
* Add partition name that needs to be refreshed and its associated list partition key
*/
public void addListPartitionKeys(Map<String, PListCell> listPartitionKeys) {
nameToPartKeys.putAll(listPartitionKeys);
public void addPartitionCells(Map<String, PCell> cells) {
partitonToCells.putAll(cells);
}

/**
* Get the partition name with its associated range partition key when the mv is range partitioned.
*/
public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
Map<String, Range<PartitionKey>> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PRangeCell);
PRangeCell rangeCell = (PRangeCell) e.getValue();
result.put(e.getKey(), rangeCell.getRange());
Expand All @@ -99,7 +99,7 @@ public Map<String, Range<PartitionKey>> getPartitionNameWithRanges() {
*/
public Map<String, PListCell> getPartitionNameWithLists() {
Map<String, PListCell> result = Maps.newHashMap();
for (Map.Entry<String, PCell> e : nameToPartKeys.entrySet()) {
for (Map.Entry<String, PCell> e : partitonToCells.entrySet()) {
Preconditions.checkState(e.getValue() instanceof PListCell);
PListCell listCell = (PListCell) e.getValue();
result.put(e.getKey(), listCell);
Expand All @@ -111,7 +111,7 @@ public Map<String, PListCell> getPartitionNameWithLists() {
public String toString() {
return "BaseTableRefreshInfo{" +
", toRefreshPartitionNames=" + toRefreshPartitionNames +
", nameToPartKeys=" + nameToPartKeys +
", nameToPartKeys=" + partitonToCells +
'}';
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
import com.starrocks.catalog.mv.MVTimelinessRangePartitionArbiter;
import com.starrocks.common.AnalysisException;
import com.starrocks.common.util.DebugUtil;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.UnsupportedException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand All @@ -34,8 +34,8 @@
import java.util.Optional;
import java.util.Set;

import static com.starrocks.connector.PartitionUtil.getMVPartitionNameWithList;
import static com.starrocks.connector.PartitionUtil.getMVPartitionNameWithRange;
import static com.starrocks.connector.PartitionUtil.getMVPartitionToCells;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

/**
Expand Down Expand Up @@ -198,9 +198,9 @@ public static MvBaseTableUpdateInfo getMvBaseTableUpdateInfo(MaterializedView mv
List<Column> refPartitionColumns = refBaseTablePartitionColumns.get(baseTable);
PartitionInfo mvPartitionInfo = mv.getPartitionInfo();
if (mvPartitionInfo.isListPartition()) {
Map<String, PListCell> mvPartitionNameWithList = getMVPartitionNameWithList(baseTable,
Map<String, PCell> mvPartitionNameWithList = getMVPartitionToCells(baseTable,
refPartitionColumns, updatedPartitionNamesList);
baseTableUpdateInfo.addListPartitionKeys(mvPartitionNameWithList);
baseTableUpdateInfo.addPartitionCells(mvPartitionNameWithList);
baseTableUpdateInfo.addToRefreshPartitionNames(mvPartitionNameWithList.keySet());
} else if (mvPartitionInfo.isRangePartition()) {
Preconditions.checkArgument(refPartitionColumns.size() == 1,
Expand Down
28 changes: 28 additions & 0 deletions fe/fe-core/src/main/java/com/starrocks/catalog/OlapTable.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@
import com.starrocks.sql.ast.IndexDef.IndexType;
import com.starrocks.sql.ast.PartitionValue;
import com.starrocks.sql.common.MetaUtils;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PListCell;
import com.starrocks.sql.common.PRangeCell;
import com.starrocks.sql.common.SyncPartitionUtils;
import com.starrocks.sql.optimizer.rule.mv.MVUtils;
import com.starrocks.sql.optimizer.statistics.IDictManager;
Expand Down Expand Up @@ -3676,4 +3678,30 @@ public long getLastCollectProfileTime() {
public void updateLastCollectProfileTime() {
this.lastCollectProfileTime = System.currentTimeMillis();
}

/**
* Return partition name and associate partition cell with specific partition columns.
* If partitionColumnsOpt is empty, return partition cell with all partition columns.
*/
public Map<String, PCell> getPartitionCells(Optional<List<Column>> partitionColumnsOpt) {
PartitionInfo partitionInfo = this.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return null;
}
if (partitionInfo.isRangePartition()) {
Preconditions.checkArgument(partitionColumnsOpt.isEmpty() || partitionColumnsOpt.get().size() == 1);
Map<String, Range<PartitionKey>> rangeMap = getRangePartitionMap();
if (rangeMap == null) {
return null;
}
return rangeMap.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> new PRangeCell(x.getValue())));
} else if (partitionInfo.isListPartition()) {
Map<String, PListCell> listMap = getListPartitionItems(partitionColumnsOpt);
if (listMap == null) {
return null;
}
return listMap.entrySet().stream().collect(Collectors.toMap(x -> x.getKey(), x -> x.getValue()));
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package com.starrocks.catalog.mv;

import com.google.common.base.Preconditions;
import com.google.common.collect.Maps;
import com.google.common.collect.Sets;
import com.starrocks.catalog.BaseTableInfo;
Expand All @@ -22,22 +23,27 @@
import com.starrocks.catalog.MvBaseTableUpdateInfo;
import com.starrocks.catalog.MvUpdateInfo;
import com.starrocks.catalog.OlapTable;
import com.starrocks.catalog.PartitionInfo;
import com.starrocks.catalog.Table;
import com.starrocks.catalog.TableProperty;
import com.starrocks.common.AnalysisException;
import com.starrocks.sql.common.PCell;
import com.starrocks.sql.common.PartitionDiff;
import com.starrocks.sql.common.PartitionDiffResult;
import com.starrocks.sql.common.PartitionDiffer;
import com.starrocks.sql.optimizer.rule.transformation.materialization.MvUtils;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;

import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.function.Consumer;
import java.util.stream.Collectors;

import static com.starrocks.catalog.MvRefreshArbiter.getMvBaseTableUpdateInfo;
import static com.starrocks.catalog.MvRefreshArbiter.needsToRefreshTable;
import static com.starrocks.sql.optimizer.OptimizerTraceUtil.logMVPrepare;

/**
* {@link MVTimelinessArbiter} is the base class of all materialized view timeliness arbiters which is used to determine the mv's
Expand All @@ -50,6 +56,8 @@ public abstract class MVTimelinessArbiter {

// the materialized view to check
protected final MaterializedView mv;
// differ
protected PartitionDiffer differ;
// whether is query rewrite or mv refresh
protected final boolean isQueryRewrite;

Expand Down Expand Up @@ -77,10 +85,11 @@ public MVTimelinessArbiter(MaterializedView mv, boolean isQueryRewrite) {
* @return : partitioned materialized view's all need updated partition names.
*/
public MvUpdateInfo getMVTimelinessUpdateInfo(TableProperty.QueryRewriteConsistencyMode mode) throws AnalysisException {
if (mode == TableProperty.QueryRewriteConsistencyMode.LOOSE) {
return getMVTimelinessUpdateInfoInLoose();
} else {
return getMVTimelinessUpdateInfoInChecked();
switch (mode) {
case LOOSE:
return getMVTimelinessUpdateInfoInLoose();
default:
return getMVTimelinessUpdateInfoInChecked();
}
}

Expand All @@ -90,12 +99,6 @@ public MvUpdateInfo getMVTimelinessUpdateInfo(TableProperty.QueryRewriteConsiste
*/
protected abstract MvUpdateInfo getMVTimelinessUpdateInfoInChecked() throws AnalysisException;

/**
* In Loose mode, do not need to check mv partition's data is consistent with base table's partition's data.
* Only need to check the mv partition existence.
*/
protected abstract MvUpdateInfo getMVTimelinessUpdateInfoInLoose();

/**
* Determine the refresh type of the materialized view.
* @param refBaseTablePartitionCols ref base table partition infos
Expand Down Expand Up @@ -181,14 +184,18 @@ protected Map<Table, Set<String>> collectBaseTableUpdatePartitionNames(Map<Table
* @param baseTableUpdateInfoMap base table update info from MvTimelinessInfo
* @return the base table to its changed partition and cell map if it's mv, empty else
*/
protected void collectExtraBaseTableChangedPartitions(
Map<Table, MvBaseTableUpdateInfo> baseTableUpdateInfoMap,
Consumer<Map.Entry<Table, Map<String, PCell>>> consumer) {
protected void collectExtraBaseTableChangedPartitions(Map<Table, MvBaseTableUpdateInfo> baseTableUpdateInfoMap,
Map<Table, Map<String, PCell>> basePartitionNameToRangeMap) {
Map<Table, Map<String, PCell>> extraChangedPartitions = baseTableUpdateInfoMap.entrySet().stream()
.filter(e -> !e.getValue().getMvPartitionNameToCellMap().isEmpty())
.collect(Collectors.toMap(e -> e.getKey(), e -> e.getValue().getMvPartitionNameToCellMap()));
for (Map.Entry<Table, Map<String, PCell>> entry : extraChangedPartitions.entrySet()) {
consumer.accept(entry);
Table baseTable = entry.getKey();
Preconditions.checkState(basePartitionNameToRangeMap.containsKey(baseTable));
Map<String, PCell> refBaseTablePartitionRangeMap = basePartitionNameToRangeMap.get(baseTable);
Map<String, PCell> basePartitionNameToRanges = entry.getValue();
basePartitionNameToRanges.entrySet().forEach(e ->
refBaseTablePartitionRangeMap.put(e.getKey(), e.getValue()));
}
}

Expand All @@ -206,13 +213,63 @@ protected void addEmptyPartitionsToRefresh(MvUpdateInfo mvUpdateInfo) {
});
}

public Map<Table, Map<String, PCell>> syncBaseTablePartitions(MaterializedView mv) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
if (partitionInfo.isUnPartitioned()) {
return null;
}
Map<Table, Map<String, PCell>> basePartitionNameToRangeMap = differ.syncBaseTablePartitionInfos();
if (CollectionUtils.sizeIsEmpty(basePartitionNameToRangeMap)) {
return null;
}
return basePartitionNameToRangeMap.keySet().stream()
.map(baseTable -> Maps.immutableEntry(baseTable, basePartitionNameToRangeMap.get(baseTable)))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

public PartitionDiff getChangedPartitionDiff(MaterializedView mv,
Map<Table, Map<String, PCell>> basePartitionNameToRangeMap) {
PartitionInfo partitionInfo = mv.getPartitionInfo();
try {
if (partitionInfo.isUnPartitioned()) {
return null;
}
PartitionDiffResult result = differ.computePartitionDiff(null,
basePartitionNameToRangeMap);
if (result == null) {
logMVPrepare(mv, "Partitioned mv compute list diff failed");
return null;
}
return result.diff;
} catch (Exception e) {
LOG.warn("Materialized view compute partition difference with base table failed.", e);
}
return null;
}

/**
* TODO: Optimize performance in loos/force_mv mode
* TODO: in loose mode, ignore partition that both exists in baseTable and mv
* In Loose mode, do not need to check mv partition's data is consistent with base table's partition's data.
* Only need to check the mv partition existence.
*/
protected void collectBaseTableUpdatePartitionNamesInLoose(MvUpdateInfo mvUpdateInfo) {
Map<Table, List<Column>> refBaseTableAndColumns = mv.getRefBaseTablePartitionColumns();
// collect & update mv's to refresh partitions based on base table's partition changes
collectBaseTableUpdatePartitionNames(refBaseTableAndColumns, mvUpdateInfo);
public MvUpdateInfo getMVTimelinessUpdateInfoInLoose() {
MvUpdateInfo mvUpdateInfo = new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.PARTIAL,
TableProperty.QueryRewriteConsistencyMode.LOOSE);
Map<Table, Map<String, PCell>> refBaseTablePartitionMap = syncBaseTablePartitions(mv);
if (refBaseTablePartitionMap == null) {
logMVPrepare(mv, "Sync base table partition infos failed");
return new MvUpdateInfo(MvUpdateInfo.MvToRefreshType.FULL);
}

PartitionDiff diff = getChangedPartitionDiff(mv, refBaseTablePartitionMap);
if (diff == null) {
return null;
}
Map<String, PCell> adds = diff.getAdds();
if (!CollectionUtils.sizeIsEmpty(adds)) {
adds.keySet().stream().forEach(mvPartitionName ->
mvUpdateInfo.getMvToRefreshPartitionNames().add(mvPartitionName));
}
addEmptyPartitionsToRefresh(mvUpdateInfo);
return mvUpdateInfo;
}
}
Loading

0 comments on commit 0eeec5e

Please sign in to comment.