From a6db4e05d943fec019522c9c08ff0b64125cbc5d Mon Sep 17 00:00:00 2001 From: Allison Portis Date: Mon, 3 Feb 2025 12:01:10 -0800 Subject: [PATCH] [Kernel][Metrics][PR#7] Support ScanReport to log metrics for a Scan operation (#4068) #### Which Delta project/connector is this regarding? - [ ] Spark - [ ] Standalone - [ ] Flink - [X] Kernel - [ ] Other (fill in here) ## Description Adds `ScanReport` for reporting a Scan. We record `ScanReport` either after all the scan files have successfully been consumed (and the iterator closed), or if an exception is thrown while reading/filtering/preparing the scan files to be returned to the connector. This is done within the `hasNext` and `next` methods on the returned iterator since that is when we do all of the kernel work/eval (since the iterator is lazily loaded). We only record a report for failures that happen within Kernel, if there are failures from within the connector code, no report will be emitted. We also add support for serializing `ScanReport` in this PR. ## How was this patch tested? Adds unit tests. ## Does this PR introduce _any_ user-facing changes? No. --- .../io/delta/kernel/expressions/Literal.java | 13 + .../kernel/internal/ScanBuilderImpl.java | 15 +- .../io/delta/kernel/internal/ScanImpl.java | 174 ++++++- .../delta/kernel/internal/SnapshotImpl.java | 4 +- .../metrics/MetricsReportSerializers.java | 18 +- .../kernel/internal/metrics/ScanMetrics.java | 97 ++++ .../internal/metrics/ScanReportImpl.java | 108 ++++ .../replay/ActiveAddFilesIterator.java | 29 +- .../kernel/internal/replay/LogReplay.java | 8 +- .../internal/replay/LogReplayMetrics.java | 97 ---- .../kernel/metrics/ScanMetricsResult.java | 74 +++ .../io/delta/kernel/metrics/ScanReport.java | 83 +++ .../MetricsReportSerializerSuite.scala | 119 ++++- .../engine/LoggingMetricsReporter.java | 4 + .../ActiveAddFilesLogReplayMetricsSuite.scala | 192 ------- .../metrics/MetricsReportTestUtils.scala | 10 + .../defaults/metrics/ScanReportSuite.scala | 488 ++++++++++++++++++ 17 files changed, 1194 insertions(+), 339 deletions(-) create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanMetrics.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanReportImpl.java delete mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplayMetrics.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanMetricsResult.java create mode 100644 kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanReport.java delete mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ActiveAddFilesLogReplayMetricsSuite.scala create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/ScanReportSuite.scala diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java b/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java index 90e27741c29..5bc9bb6a2db 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/expressions/Literal.java @@ -22,6 +22,7 @@ import java.math.BigDecimal; import java.util.Collections; import java.util.List; +import java.util.Objects; /** * A literal value. @@ -247,4 +248,16 @@ public String toString() { public List getChildren() { return Collections.emptyList(); } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + Literal other = (Literal) o; + return Objects.equals(dataType, other.dataType) && Objects.equals(value, other.value); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java index d725f8dbf4c..17e872dd0ef 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanBuilderImpl.java @@ -23,6 +23,7 @@ import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.replay.LogReplay; +import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.types.StructType; import java.util.Optional; @@ -34,6 +35,7 @@ public class ScanBuilderImpl implements ScanBuilder { private final Metadata metadata; private final StructType snapshotSchema; private final LogReplay logReplay; + private final SnapshotReport snapshotReport; private StructType readSchema; private Optional predicate; @@ -43,7 +45,8 @@ public ScanBuilderImpl( Protocol protocol, Metadata metadata, StructType snapshotSchema, - LogReplay logReplay) { + LogReplay logReplay, + SnapshotReport snapshotReport) { this.dataPath = dataPath; this.protocol = protocol; this.metadata = metadata; @@ -51,6 +54,7 @@ public ScanBuilderImpl( this.logReplay = logReplay; this.readSchema = snapshotSchema; this.predicate = Optional.empty(); + this.snapshotReport = snapshotReport; } @Override @@ -72,6 +76,13 @@ public ScanBuilder withReadSchema(StructType readSchema) { @Override public Scan build() { return new ScanImpl( - snapshotSchema, readSchema, protocol, metadata, logReplay, predicate, dataPath); + snapshotSchema, + readSchema, + protocol, + metadata, + logReplay, + predicate, + dataPath, + snapshotReport); } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java index 96e93cd02e6..b695717b607 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java @@ -30,10 +30,15 @@ import io.delta.kernel.internal.actions.Protocol; import io.delta.kernel.internal.data.ScanStateRow; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.metrics.ScanMetrics; +import io.delta.kernel.internal.metrics.ScanReportImpl; +import io.delta.kernel.internal.metrics.Timer; import io.delta.kernel.internal.replay.LogReplay; import io.delta.kernel.internal.skipping.DataSkippingPredicate; import io.delta.kernel.internal.skipping.DataSkippingUtils; import io.delta.kernel.internal.util.*; +import io.delta.kernel.metrics.ScanReport; +import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.types.StructField; import io.delta.kernel.types.StructType; import io.delta.kernel.utils.CloseableIterator; @@ -56,9 +61,12 @@ public class ScanImpl implements Scan { private final Metadata metadata; private final LogReplay logReplay; private final Path dataPath; + private final Optional filter; private final Optional> partitionAndDataFilters; private final Supplier> partitionColToStructFieldMap; private boolean accessedScanFiles; + private final SnapshotReport snapshotReport; + private final ScanMetrics scanMetrics = new ScanMetrics(); public ScanImpl( StructType snapshotSchema, @@ -67,12 +75,14 @@ public ScanImpl( Metadata metadata, LogReplay logReplay, Optional filter, - Path dataPath) { + Path dataPath, + SnapshotReport snapshotReport) { this.snapshotSchema = snapshotSchema; this.readSchema = readSchema; this.protocol = protocol; this.metadata = metadata; this.logReplay = logReplay; + this.filter = filter; this.partitionAndDataFilters = splitFilters(filter); this.dataPath = dataPath; this.partitionColToStructFieldMap = @@ -82,6 +92,7 @@ public ScanImpl( .filter(field -> partitionColNames.contains(field.getName().toLowerCase(Locale.ROOT))) .collect(toMap(field -> field.getName().toLowerCase(Locale.ROOT), identity())); }; + this.snapshotReport = snapshotReport; } /** @@ -118,30 +129,60 @@ public CloseableIterator getScanFiles( boolean hasDataSkippingFilter = dataSkippingFilter.isPresent(); boolean shouldReadStats = hasDataSkippingFilter || includeStats; - // Get active AddFiles via log replay - // If there is a partition predicate, construct a predicate to prune checkpoint files - // while constructing the table state. - CloseableIterator scanFileIter = - logReplay.getAddFilesAsColumnarBatches( - engine, - shouldReadStats, - getPartitionsFilters() - .map( - predicate -> - rewritePartitionPredicateOnCheckpointFileSchema( - predicate, partitionColToStructFieldMap.get()))); - - // Apply partition pruning - scanFileIter = applyPartitionPruning(engine, scanFileIter); - - // Apply data skipping - if (hasDataSkippingFilter) { - // there was a usable data skipping filter --> apply data skipping - scanFileIter = applyDataSkipping(engine, scanFileIter, dataSkippingFilter.get()); - } + Timer.Timed planningDuration = scanMetrics.totalPlanningTimer.start(); + // ScanReportReporter stores the current context and can be invoked (in the future) with + // `reportError` or `reportSuccess` to stop the planning duration timer and push a report to + // the engine + ScanReportReporter reportReporter = + (exceptionOpt, isFullyConsumed) -> { + planningDuration.stop(); + ScanReport scanReport = + new ScanReportImpl( + dataPath.toString() /* tablePath */, + logReplay.getVersion() /* table version */, + snapshotSchema, + snapshotReport.getReportUUID(), + filter, + readSchema, + getPartitionsFilters() /* partitionPredicate */, + dataSkippingFilter.map(p -> p), + isFullyConsumed, + scanMetrics, + exceptionOpt); + engine.getMetricsReporters().forEach(reporter -> reporter.report(scanReport)); + }; + + try { + // Get active AddFiles via log replay + // If there is a partition predicate, construct a predicate to prune checkpoint files + // while constructing the table state. + CloseableIterator scanFileIter = + logReplay.getAddFilesAsColumnarBatches( + engine, + shouldReadStats, + getPartitionsFilters() + .map( + predicate -> + rewritePartitionPredicateOnCheckpointFileSchema( + predicate, partitionColToStructFieldMap.get())), + scanMetrics); + + // Apply partition pruning + scanFileIter = applyPartitionPruning(engine, scanFileIter); + + // Apply data skipping + if (hasDataSkippingFilter) { + // there was a usable data skipping filter --> apply data skipping + scanFileIter = applyDataSkipping(engine, scanFileIter, dataSkippingFilter.get()); + } + + // TODO when !includeStats drop the stats column if present before returning + return wrapWithMetricsReporting(scanFileIter, reportReporter); - // TODO when !includeStats drop the stats column if present before returning - return scanFileIter; + } catch (Exception e) { + reportReporter.reportError(e); + throw e; + } } @Override @@ -309,4 +350,89 @@ private CloseableIterator applyDataSkipping( filteredScanFileBatch.getData(), Optional.of(newSelectionVector)); }); } + + /** + * Wraps a scan file iterator such that we emit {@link ScanReport} to the engine upon success and + * failure. Since most of our scan building code is lazily executed (since it occurs as + * maps/filters over an iterator) potential errors don't occur just within `getScanFile`s + * execution, but rather may occur as the returned iterator is consumed. Similarly, we cannot + * report a successful scan until the iterator has been fully consumed and the log read/filtered + * etc. This means we cannot report the successful scan within `getScanFiles` but rather must + * report after the iterator has been consumed. + * + *

This method wraps an inner scan file iterator with an outer iterator wrapper that reports + * {@link ScanReport}s as needed. It reports a failed {@link ScanReport} in the case of any + * exceptions originating from the inner iterator `next` and `hasNext` impl. It reports a complete + * or incomplete {@link ScanReport} when the iterator is closed. + */ + private CloseableIterator wrapWithMetricsReporting( + CloseableIterator scanIter, ScanReportReporter reporter) { + return new CloseableIterator() { + + /* Whether this iterator has reported an error report */ + private boolean errorReported = false; + + @Override + public void close() throws IOException { + try { + // If a ScanReport has already been pushed in the case of an exception don't double report + if (!errorReported) { + if (!scanIter.hasNext()) { + // The entire scan file iterator has been successfully consumed report a complete Scan + reporter.reportCompleteScan(); + } else { + // The scan file iterator has NOT been fully consumed before being closed + // We have no way of knowing the reason why, this could be due to an exception in the + // connector code, or intentional early termination such as for a LIMIT query + reporter.reportIncompleteScan(); + } + } + } finally { + scanIter.close(); + } + } + + @Override + public boolean hasNext() { + return wrapWithErrorReporting(() -> scanIter.hasNext()); + } + + @Override + public FilteredColumnarBatch next() { + return wrapWithErrorReporting(() -> scanIter.next()); + } + + private T wrapWithErrorReporting(Supplier s) { + try { + return s.get(); + } catch (Exception e) { + reporter.reportError(e); + errorReported = true; + throw e; + } + } + }; + } + + /** + * Defines methods to report {@link ScanReport} to the engine. This allows us to avoid ambiguous + * lambdas/anonymous classes as well as reuse the defined default methods. + */ + private interface ScanReportReporter { + + default void reportError(Exception e) { + report(Optional.of(e), false /* isFullyConsumed */); + } + + default void reportCompleteScan() { + report(Optional.empty(), true /* isFullyConsumed */); + } + + default void reportIncompleteScan() { + report(Optional.empty(), false /* isFullyConsumed */); + } + + /** Given an optional exception, reports a {@link ScanReport} to the engine */ + void report(Optional exceptionOpt, boolean isFullyConsumed); + } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java index 3054e28760c..199fce56440 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/SnapshotImpl.java @@ -118,8 +118,8 @@ public StructType getSchema() { @Override public ScanBuilder getScanBuilder() { - // TODO when we add ScanReport we will pass the SnapshotReport downstream here - return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(), logReplay); + return new ScanBuilderImpl( + dataPath, protocol, metadata, getSchema(), logReplay, snapshotReport); } /////////////////// diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java index faa99697f24..156ad0bc767 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/MetricsReportSerializers.java @@ -20,9 +20,12 @@ import com.fasterxml.jackson.databind.module.SimpleModule; import com.fasterxml.jackson.databind.ser.std.ToStringSerializer; import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import io.delta.kernel.expressions.Predicate; import io.delta.kernel.metrics.MetricsReport; +import io.delta.kernel.metrics.ScanReport; import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.metrics.TransactionReport; +import io.delta.kernel.types.StructType; /** Defines JSON serializers for {@link MetricsReport} types */ public final class MetricsReportSerializers { @@ -41,6 +44,15 @@ public static String serializeSnapshotReport(SnapshotReport snapshotReport) return OBJECT_MAPPER.writeValueAsString(snapshotReport); } + /** + * Serializes a {@link ScanReport} to a JSON string + * + * @throws JsonProcessingException + */ + public static String serializeScanReport(ScanReport scanReport) throws JsonProcessingException { + return OBJECT_MAPPER.writeValueAsString(scanReport); + } + /** * Serializes a {@link TransactionReport} to a JSON string * @@ -59,7 +71,11 @@ public static String serializeTransactionReport(TransactionReport transactionRep new ObjectMapper() .registerModule(new Jdk8Module()) // To support Optional .registerModule( // Serialize Exception using toString() - new SimpleModule().addSerializer(Exception.class, new ToStringSerializer())); + new SimpleModule().addSerializer(Exception.class, new ToStringSerializer())) + .registerModule( // Serialize StructType using toString + new SimpleModule().addSerializer(StructType.class, new ToStringSerializer())) + .registerModule( // Serialize Predicate using toString + new SimpleModule().addSerializer(Predicate.class, new ToStringSerializer())); private MetricsReportSerializers() {} } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanMetrics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanMetrics.java new file mode 100644 index 00000000000..cb198ace531 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanMetrics.java @@ -0,0 +1,97 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +import io.delta.kernel.metrics.ScanMetricsResult; + +/** + * Stores the metrics for an ongoing scan. These metrics are updated and recorded throughout the + * scan using this class. + * + *

At report time, we create an immutable {@link ScanMetricsResult} from an instance of {@link + * ScanMetrics} to capture the metrics collected during the scan. The {@link ScanMetricsResult} + * interface exposes getters for any metrics collected in this class. + */ +public class ScanMetrics { + + public final Timer totalPlanningTimer = new Timer(); + + public final Counter addFilesCounter = new Counter(); + + public final Counter addFilesFromDeltaFilesCounter = new Counter(); + + public final Counter activeAddFilesCounter = new Counter(); + + public final Counter duplicateAddFilesCounter = new Counter(); + + public final Counter removeFilesFromDeltaFilesCounter = new Counter(); + + public ScanMetricsResult captureScanMetricsResult() { + return new ScanMetricsResult() { + + final long totalPlanningDurationNs = totalPlanningTimer.totalDurationNs(); + final long numAddFilesSeen = addFilesCounter.value(); + final long numAddFilesSeenFromDeltaFiles = addFilesFromDeltaFilesCounter.value(); + final long numActiveAddFiles = activeAddFilesCounter.value(); + final long numDuplicateAddFiles = duplicateAddFilesCounter.value(); + final long numRemoveFilesSeenFromDeltaFiles = removeFilesFromDeltaFilesCounter.value(); + + @Override + public long getTotalPlanningDurationNs() { + return totalPlanningDurationNs; + } + + @Override + public long getNumAddFilesSeen() { + return numAddFilesSeen; + } + + @Override + public long getNumAddFilesSeenFromDeltaFiles() { + return numAddFilesSeenFromDeltaFiles; + } + + @Override + public long getNumActiveAddFiles() { + return numActiveAddFiles; + } + + @Override + public long getNumDuplicateAddFiles() { + return numDuplicateAddFiles; + } + + @Override + public long getNumRemoveFilesSeenFromDeltaFiles() { + return numRemoveFilesSeenFromDeltaFiles; + } + }; + } + + @Override + public String toString() { + return String.format( + "ScanMetrics(totalPlanningTimer=%s, addFilesCounter=%s, addFilesFromDeltaFilesCounter=%s," + + " activeAddFilesCounter=%s, duplicateAddFilesCounter=%s, " + + "removeFilesFromDeltaFilesCounter=%s", + totalPlanningTimer, + addFilesCounter, + addFilesFromDeltaFilesCounter, + activeAddFilesCounter, + duplicateAddFilesCounter, + removeFilesFromDeltaFilesCounter); + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanReportImpl.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanReportImpl.java new file mode 100644 index 00000000000..11571e9a195 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/metrics/ScanReportImpl.java @@ -0,0 +1,108 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.internal.metrics; + +import static java.util.Objects.requireNonNull; + +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.metrics.ScanMetricsResult; +import io.delta.kernel.metrics.ScanReport; +import io.delta.kernel.types.StructType; +import java.util.Optional; +import java.util.UUID; + +/** A basic POJO implementation of {@link ScanReport} for creating them */ +public class ScanReportImpl extends DeltaOperationReportImpl implements ScanReport { + + private final long tableVersion; + private final StructType tableSchema; + private final UUID snapshotReportUUID; + private final Optional filter; + private final StructType readSchema; + private final Optional partitionPredicate; + private final Optional dataSkippingFilter; + private final boolean isFullyConsumed; + private final ScanMetricsResult scanMetricsResult; + + public ScanReportImpl( + String tablePath, + long tableVersion, + StructType tableSchema, + UUID snapshotReportUUID, + Optional filter, + StructType readSchema, + Optional partitionPredicate, + Optional dataSkippingFilter, + boolean isFullyConsumed, + ScanMetrics scanMetrics, + Optional exception) { + super(tablePath, exception); + this.tableVersion = tableVersion; + this.tableSchema = requireNonNull(tableSchema); + this.snapshotReportUUID = requireNonNull(snapshotReportUUID); + this.filter = requireNonNull(filter); + this.readSchema = requireNonNull(readSchema); + this.partitionPredicate = requireNonNull(partitionPredicate); + this.dataSkippingFilter = requireNonNull(dataSkippingFilter); + this.isFullyConsumed = isFullyConsumed; + this.scanMetricsResult = requireNonNull(scanMetrics).captureScanMetricsResult(); + } + + @Override + public long getTableVersion() { + return tableVersion; + } + + @Override + public StructType getTableSchema() { + return tableSchema; + } + + @Override + public UUID getSnapshotReportUUID() { + return snapshotReportUUID; + } + + @Override + public Optional getFilter() { + return filter; + } + + @Override + public StructType getReadSchema() { + return readSchema; + } + + @Override + public Optional getPartitionPredicate() { + return partitionPredicate; + } + + @Override + public Optional getDataSkippingFilter() { + return dataSkippingFilter; + } + + @Override + public boolean getIsFullyConsumed() { + return isFullyConsumed; + } + + @Override + public ScanMetricsResult getScanMetrics() { + return scanMetricsResult; + } +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java index 05477ccfb64..fe053c19cdc 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/ActiveAddFilesIterator.java @@ -34,6 +34,7 @@ import io.delta.kernel.internal.InternalScanFileUtils; import io.delta.kernel.internal.actions.DeletionVectorDescriptor; import io.delta.kernel.internal.fs.Path; +import io.delta.kernel.internal.metrics.ScanMetrics; import io.delta.kernel.internal.replay.LogReplayUtils.UniqueFileActionTuple; import io.delta.kernel.internal.util.Utils; import io.delta.kernel.types.StringType; @@ -72,18 +73,20 @@ public class ActiveAddFilesIterator implements CloseableIterator iter, Path tableRoot) { + ActiveAddFilesIterator( + Engine engine, CloseableIterator iter, Path tableRoot, ScanMetrics metrics) { this.engine = engine; this.tableRoot = tableRoot; this.iter = iter; this.tombstonesFromJson = new HashSet<>(); this.addFilesFromJson = new HashSet<>(); this.next = Optional.empty(); + this.metrics = metrics; } @Override @@ -175,7 +178,7 @@ private void prepareNext() { .map(DeletionVectorDescriptor::getUniqueId); final UniqueFileActionTuple key = new UniqueFileActionTuple(pathAsUri, dvId); tombstonesFromJson.add(key); - metrics.incNumTombstonesSeen(); + metrics.removeFilesFromDeltaFilesCounter.increment(); } } @@ -192,9 +195,9 @@ private void prepareNext() { continue; // selectionVector will be `false` at rowId by default } - metrics.incNumAddFilesSeen(); + metrics.addFilesCounter.increment(); if (!isFromCheckpoint) { - metrics.incNumAddFilesSeenFromDeltaFiles(); + metrics.addFilesFromDeltaFilesCounter.increment(); } final String path = getAddFilePath(addsVector, rowId); @@ -219,10 +222,10 @@ private void prepareNext() { if (!alreadyDeleted) { doSelect = true; selectionVectorBuffer[rowId] = true; - metrics.incNumActiveAddFiles(); + metrics.activeAddFilesCounter.increment(); } } else { - metrics.incNumDuplicateAddFiles(); + metrics.duplicateAddFilesCounter.increment(); } if (!doSelect) { @@ -288,12 +291,4 @@ public static DeletionVectorDescriptor getRemoveFileDV(ColumnVector removeFileVe return DeletionVectorDescriptor.fromColumnVector( removeFileVector.getChild(REMOVE_FILE_DV_ORDINAL), rowId); } - - /** - * Returns the metrics for the log replay. Currently used in tests only. Caution: The metrics - * should be fetched only after the iterator is closed, to avoid reading incomplete metrics. - */ - public LogReplayMetrics getMetrics() { - return metrics; - } } diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java index e3d0213884e..fd00ad9a2a1 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplay.java @@ -31,6 +31,7 @@ import io.delta.kernel.internal.checkpoints.SidecarFile; import io.delta.kernel.internal.fs.Path; import io.delta.kernel.internal.lang.Lazy; +import io.delta.kernel.internal.metrics.ScanMetrics; import io.delta.kernel.internal.metrics.SnapshotMetrics; import io.delta.kernel.internal.snapshot.LogSegment; import io.delta.kernel.internal.snapshot.SnapshotHint; @@ -179,14 +180,17 @@ public long getVersion() { * */ public CloseableIterator getAddFilesAsColumnarBatches( - Engine engine, boolean shouldReadStats, Optional checkpointPredicate) { + Engine engine, + boolean shouldReadStats, + Optional checkpointPredicate, + ScanMetrics scanMetrics) { final CloseableIterator addRemoveIter = new ActionsIterator( engine, logSegment.allLogFilesReversed(), getAddRemoveReadSchema(shouldReadStats), checkpointPredicate); - return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath); + return new ActiveAddFilesIterator(engine, addRemoveIter, dataPath, scanMetrics); } //////////////////// diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplayMetrics.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplayMetrics.java deleted file mode 100644 index f0befacdac7..00000000000 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/replay/LogReplayMetrics.java +++ /dev/null @@ -1,97 +0,0 @@ -/* - * Copyright (2024) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.internal.replay; - -/** - * Class capturing various metrics during log replay. This class can be used in places where actions - * from the logs are replayed, and we want to capture some metrics. - */ -public class LogReplayMetrics { - /** Number of `AddFile` actions seen in log replay both from checkpoint files and delta files. */ - private long numAddFilesSeen = 0; - - /** Number of `AddFile` actions seen in log replay from delta files. */ - private long numAddFilesSeenFromDeltaFiles = 0; - - /** Number of active `AddFile`s that survived (i.e. belong to the table state) the log replay. */ - private long numActiveAddFiles = 0; - - /** - * Number of `AddFile`s that are duplicates. Same `AddFile` (with the same path and DV) can be - * present in multiple commit files. This happens when stats collection is run on the table in - * which the same `AddFile` will be added with `stats` without removing it first. - */ - private long numDuplicateAddFiles = 0; - - /** Number of `RemoveFile`s seen in log replay both from delta files (not from checkpoint). */ - private long numTombstonesSeen = 0; - - public void incNumAddFilesSeen() { - numAddFilesSeen++; - } - - public void incNumAddFilesSeenFromDeltaFiles() { - numAddFilesSeenFromDeltaFiles++; - } - - public void incNumActiveAddFiles() { - numActiveAddFiles++; - } - - public void incNumDuplicateAddFiles() { - numDuplicateAddFiles++; - } - - public void incNumTombstonesSeen() { - numTombstonesSeen++; - } - - public long getNumAddFilesSeen() { - return numAddFilesSeen; - } - - public long getNumAddFilesSeenFromDeltaFiles() { - return numAddFilesSeenFromDeltaFiles; - } - - public long getNumActiveAddFiles() { - return numActiveAddFiles; - } - - public long getNumDuplicateAddFiles() { - return numDuplicateAddFiles; - } - - public long getNumTombstonesSeen() { - return numTombstonesSeen; - } - - /** Returns a summary of the metrics. */ - @Override - public String toString() { - return String.format( - "Number of AddFiles seen: %d\n" - + "Number of AddFiles seen from delta files: %d\n" - + "Number of active AddFiles: %d\n" - + "Number of duplicate AddFiles: %d\n" - + "Number of tombstones seen: %d\n", - numAddFilesSeen, - numAddFilesSeenFromDeltaFiles, - numActiveAddFiles, - numDuplicateAddFiles, - numTombstonesSeen); - } -} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanMetricsResult.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanMetricsResult.java new file mode 100644 index 00000000000..9d52e019120 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanMetricsResult.java @@ -0,0 +1,74 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.metrics; + +import com.fasterxml.jackson.annotation.JsonPropertyOrder; + +/** Stores the metrics results for a {@link ScanReport} */ +@JsonPropertyOrder({ + "totalPlanningDurationNs", + "numAddFilesSeen", + "numAddFilesSeenFromDeltaFiles", + "numActiveAddFiles", + "numDuplicateAddFiles", + "numRemoveFilesSeenFromDeltaFiles" +}) +public interface ScanMetricsResult { + + /** + * Returns the total duration to find, filter, and consume the scan files. This begins at the + * request for the scan files and terminates once all the scan files have been consumed and the + * scan file iterator closed. It includes reading the _delta_log, log replay, filtering + * optimizations, and any work from the connector before closing the scan file iterator. + * + * @return the total duration to find, filter, and consume the scan files + */ + long getTotalPlanningDurationNs(); + + /** + * @return the number of AddFile actions seen during log replay (from both checkpoint and delta + * files). For a failed or incomplete scan this metric may be incomplete. + */ + long getNumAddFilesSeen(); + + /** + * @return the number of AddFile actions seen during log replay from delta files only. For a + * failed or incomplete scan this metric may be incomplete. + */ + long getNumAddFilesSeenFromDeltaFiles(); + + /** + * @return the number of active AddFile actions that survived log replay (i.e. belong to the table + * state). For a failed or incomplete scan this metric may be incomplete. + */ + long getNumActiveAddFiles(); + + /** + * Returns the number of duplicate AddFile actions seen during log replay. The same AddFile (same + * path and DV) can be present in multiple commit files when stats collection is run on the table. + * In this case, the same AddFile will be added with stats without removing the original. + * + * @return the number of AddFile actions seen during log replay that are duplicates. For a failed + * or incomplete scan this metric may be incomplete. + */ + long getNumDuplicateAddFiles(); + + /** + * @return the number of RemoveFiles seen in log replay (only from delta files). For a failed or + * incomplete scan this metric may be incomplete. + */ + long getNumRemoveFilesSeenFromDeltaFiles(); +} diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanReport.java b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanReport.java new file mode 100644 index 00000000000..21412b16d63 --- /dev/null +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/metrics/ScanReport.java @@ -0,0 +1,83 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.metrics; + +import com.fasterxml.jackson.annotation.JsonPropertyOrder; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import io.delta.kernel.expressions.Predicate; +import io.delta.kernel.types.StructType; +import java.util.Optional; +import java.util.UUID; + +/** Defines the metadata and metrics for a Scan {@link MetricsReport} */ +@JsonSerialize(as = ScanReport.class) +@JsonPropertyOrder({ + "tablePath", + "operationType", + "reportUUID", + "exception", + "tableVersion", + "tableSchema", + "snapshotReportUUID", + "filter", + "readSchema", + "partitionPredicate", + "dataSkippingFilter", + "isFullyConsumed", + "scanMetrics" +}) +public interface ScanReport extends DeltaOperationReport { + + /** @return the version of the table in this scan */ + long getTableVersion(); + + /** @return the schema of the table for this scan */ + StructType getTableSchema(); + + /** + * @return the {@link SnapshotReport#getReportUUID} for the snapshot this scan was created from + */ + UUID getSnapshotReportUUID(); + + /** @return the filter provided when building the scan */ + Optional getFilter(); + + /** @return the read schema provided when building the scan */ + StructType getReadSchema(); + + /** @return the part of {@link ScanReport#getFilter()} that was used for partition pruning */ + Optional getPartitionPredicate(); + + /** @return the filter used for data skipping using the file statistics */ + Optional getDataSkippingFilter(); + + /** + * Whether the scan file iterator had been fully consumed when it was closed. The iterator may be + * closed early (before being fully consumed) either due to an exception originating within + * connector code or intentionally (such as for a LIMIT query). + * + * @return whether the scan file iterator had been fully consumed when it was closed + */ + boolean getIsFullyConsumed(); + + /** @return the metrics for this scan */ + ScanMetricsResult getScanMetrics(); + + @Override + default String getOperationType() { + return "Scan"; + } +} diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala index d88cbbab4bb..5d6b57c18f3 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/metrics/MetricsReportSerializerSuite.scala @@ -15,9 +15,11 @@ */ package io.delta.kernel.internal.metrics -import java.util.Optional +import java.util.{Optional, UUID} -import io.delta.kernel.metrics.{SnapshotReport, TransactionReport} +import io.delta.kernel.expressions.{Column, Literal, Predicate} +import io.delta.kernel.metrics.{ScanReport, SnapshotReport, TransactionReport} +import io.delta.kernel.types.{IntegerType, StructType} import org.scalatest.funsuite.AnyFunSuite class MetricsReportSerializerSuite extends AnyFunSuite { @@ -189,4 +191,117 @@ class MetricsReportSerializerSuite extends AnyFunSuite { ) testTransactionReport(transactionReport2) } + + + private def testScanReport(scanReport: ScanReport): Unit = { + val exception: Optional[String] = scanReport.getException().map(_.toString) + val filter: Optional[String] = scanReport.getFilter.map(_.toString) + val partitionPredicate: Optional[String] = scanReport.getPartitionPredicate().map(_.toString) + val dataSkippingFilter: Optional[String] = scanReport.getDataSkippingFilter().map(_.toString) + val scanMetrics = scanReport.getScanMetrics + + val expectedJson = + s""" + |{"tablePath":"${scanReport.getTablePath()}", + |"operationType":"Scan", + |"reportUUID":"${scanReport.getReportUUID()}", + |"exception":${optionToString(exception)}, + |"tableVersion":${scanReport.getTableVersion()}, + |"tableSchema":"${scanReport.getTableSchema()}", + |"snapshotReportUUID":"${scanReport.getSnapshotReportUUID}", + |"filter":${optionToString(filter)}, + |"readSchema":"${scanReport.getReadSchema}", + |"partitionPredicate":${optionToString(partitionPredicate)}, + |"dataSkippingFilter":${optionToString(dataSkippingFilter)}, + |"isFullyConsumed":${scanReport.getIsFullyConsumed}, + |"scanMetrics":{ + |"totalPlanningDurationNs":${scanMetrics.getTotalPlanningDurationNs}, + |"numAddFilesSeen":${scanMetrics.getNumAddFilesSeen}, + |"numAddFilesSeenFromDeltaFiles":${scanMetrics.getNumAddFilesSeenFromDeltaFiles}, + |"numActiveAddFiles":${scanMetrics.getNumActiveAddFiles}, + |"numDuplicateAddFiles":${scanMetrics.getNumDuplicateAddFiles}, + |"numRemoveFilesSeenFromDeltaFiles":${scanMetrics.getNumRemoveFilesSeenFromDeltaFiles} + |} + |} + |""".stripMargin.replaceAll("\n", "") + assert(expectedJson == MetricsReportSerializers.serializeScanReport(scanReport)) + } + + test("ScanReport serializer") { + val snapshotReportUUID = UUID.randomUUID() + val tableSchema = new StructType() + .add("part", IntegerType.INTEGER) + .add("id", IntegerType.INTEGER) + val partitionPredicate = new Predicate(">", new Column("part"), Literal.ofInt(1)) + val exception = new RuntimeException("something something failed") + + // Initialize transaction metrics and record some values + val scanMetrics = new ScanMetrics() + scanMetrics.totalPlanningTimer.record(200) + scanMetrics.addFilesCounter.increment(100) + scanMetrics.addFilesFromDeltaFilesCounter.increment(90) + scanMetrics.activeAddFilesCounter.increment(10) + scanMetrics.removeFilesFromDeltaFilesCounter.increment(10) + + val scanReport1 = new ScanReportImpl( + "/table/path", + 1, + tableSchema, + snapshotReportUUID, + Optional.of(partitionPredicate), + new StructType().add("id", IntegerType.INTEGER), + Optional.of(partitionPredicate), + Optional.empty(), + true, + scanMetrics, + Optional.of(exception) + ) + + // Manually check expected JSON + val expectedJson = + s""" + |{"tablePath":"/table/path", + |"operationType":"Scan", + |"reportUUID":"${scanReport1.getReportUUID}", + |"exception":"$exception", + |"tableVersion":1, + |"tableSchema":"struct(StructField(name=part,type=integer,nullable=true,metadata={}), + | StructField(name=id,type=integer,nullable=true,metadata={}))", + |"snapshotReportUUID":"$snapshotReportUUID", + |"filter":"(column(`part`) > 1)", + |"readSchema":"struct(StructField(name=id,type=integer,nullable=true,metadata={}))", + |"partitionPredicate":"(column(`part`) > 1)", + |"dataSkippingFilter":null, + |"isFullyConsumed":true, + |"scanMetrics":{ + |"totalPlanningDurationNs":200, + |"numAddFilesSeen":100, + |"numAddFilesSeenFromDeltaFiles":90, + |"numActiveAddFiles":10, + |"numDuplicateAddFiles":0, + |"numRemoveFilesSeenFromDeltaFiles":10 + |} + |} + |""".stripMargin.replaceAll("\n", "") + assert(expectedJson == MetricsReportSerializers.serializeScanReport(scanReport1)) + + // Check with test function + testScanReport(scanReport1) + + // Empty options for all possible fields (version, providedTimestamp and exception) + val scanReport2 = new ScanReportImpl( + "/table/path", + 1, + tableSchema, + snapshotReportUUID, + Optional.empty(), + tableSchema, + Optional.empty(), + Optional.empty(), + false, // isFullyConsumed + new ScanMetrics(), + Optional.empty() + ) + testScanReport(scanReport2) + } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java index 46737795db7..a10f995ee14 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/engine/LoggingMetricsReporter.java @@ -19,6 +19,7 @@ import io.delta.kernel.engine.MetricsReporter; import io.delta.kernel.internal.metrics.MetricsReportSerializers; import io.delta.kernel.metrics.MetricsReport; +import io.delta.kernel.metrics.ScanReport; import io.delta.kernel.metrics.SnapshotReport; import io.delta.kernel.metrics.TransactionReport; import org.slf4j.Logger; @@ -39,6 +40,9 @@ public void report(MetricsReport report) { logger.info( "SnapshotReport = {}", MetricsReportSerializers.serializeSnapshotReport((SnapshotReport) report)); + } else if (report instanceof ScanReport) { + logger.info( + "ScanReport = {}", MetricsReportSerializers.serializeScanReport((ScanReport) report)); } else if (report instanceof TransactionReport) { logger.info( "TransactionReport = {}", diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ActiveAddFilesLogReplayMetricsSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ActiveAddFilesLogReplayMetricsSuite.scala deleted file mode 100644 index 6acc3ae4941..00000000000 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/ActiveAddFilesLogReplayMetricsSuite.scala +++ /dev/null @@ -1,192 +0,0 @@ -/* - * Copyright (2024) The Delta Lake Project Authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package io.delta.kernel.defaults - -import io.delta.kernel.Table -import io.delta.kernel.defaults.engine.DefaultEngine -import io.delta.kernel.defaults.utils.{TestRow, TestUtils} -import io.delta.kernel.engine.Engine -import io.delta.kernel.internal.replay.ActiveAddFilesIterator -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.Path -import org.apache.spark.sql.delta.DeltaLog -import org.apache.spark.sql.delta.sources.DeltaSQLConf -import org.apache.spark.sql.delta.stats.StatisticsCollection -import org.scalatest.funsuite.AnyFunSuite - -/** - * Test suite to test the metrics captured during log replay to find the active `AddFile`s - * in a table snapshot. - */ -class ActiveAddFilesLogReplayMetricsSuite extends AnyFunSuite with TestUtils { - - test("active add files log replay metrics: only delta files") { - withTmpDirAndEngine { (path, engine) => - for (_ <- 0 to 9) { - appendCommit(path) - } - loadAndCheckLogReplayMetrics( - engine, - path, - expNumAddFilesSeen = 20, // each commit creates 2 files - expNumAddFilesSeenFromDeltaFiles = 20, - expNumActiveAddFiles = 20) - } - } - - - Seq(true, false).foreach { multipartCheckpoint => - val checkpointStr = if (multipartCheckpoint) "multipart " else "" - test(s"active add files log replay metrics: ${checkpointStr}checkpoint + delta files") { - withTmpDirAndEngine { (path, engine) => - for (_ <- 0 to 3) { - appendCommit(path) - } - checkpoint(path, actionsPerFile = if (multipartCheckpoint) 2 else 1000000) - for (_ <- 4 to 9) { - appendCommit(path) - } - - loadAndCheckLogReplayMetrics( - engine, - path, - expNumAddFilesSeen = 20, // each commit creates 2 files - expNumAddFilesSeenFromDeltaFiles = 12, // checkpoint is created at version 3 - expNumActiveAddFiles = 20) - } - } - } - - Seq(true, false).foreach { multipartCheckpoint => - val checkpointStr = if (multipartCheckpoint) "multipart " else "" - test(s"active add files log replay metrics: ${checkpointStr}checkpoint + " + - s"delta files + tombstones") { - withTmpDirAndEngine { (path, engine) => - for (_ <- 0 to 3) { - appendCommit(path) - } // has 8 add files - deleteCommit(path) // version 4 - deletes 4 files and adds 1 file - checkpoint(path, actionsPerFile = if (multipartCheckpoint) 2 else 1000000) // version 4 - appendCommit(path) // version 5 - adds 2 files - deleteCommit(path) // version 6 - deletes 1 file and adds 1 file - appendCommit(path) // version 7 - adds 2 files - appendCommit(path) // version 8 - adds 2 files - deleteCommit(path) // version 9 - deletes 2 files and adds 1 file - - loadAndCheckLogReplayMetrics( - engine, - path, - expNumAddFilesSeen = 5 /* checkpoint */ + 8, /* delta */ - expNumAddFilesSeenFromDeltaFiles = 8, - expNumActiveAddFiles = 10, - expNumTombstonesSeen = 3 - ) - } - } - } - - Seq(true, false).foreach { multipartCheckpoint => - val checkpointStr = if (multipartCheckpoint) "multipart " else "" - test(s"active add files log replay metrics: ${checkpointStr}checkpoint + delta files +" + - s" tombstones + duplicate adds") { - withTmpDirAndEngine { (path, engine) => - for (_ <- 0 to 1) { - appendCommit(path) - } // activeAdds = 4 - deleteCommit(path) // ver 2 - deletes 2 files and adds 1 file, activeAdds = 3 - checkpoint(path, actionsPerFile = if (multipartCheckpoint) 2 else 1000000) // version 2 - appendCommit(path) // ver 3 - adds 2 files, activeAdds = 5 - recomputeStats(path) // ver 4 - adds the same 5 add files again, activeAdds = 5, dupes = 5 - deleteCommit(path) // ver 5 - removes 1 file and adds 1 file, activeAdds = 5, dupes = 5 - appendCommit(path) // ver 6 - adds 2 files, activeAdds = 7, dupes = 4 - recomputeStats(path) // ver 7 - adds the same 7 add files again, activeAdds = 7, dupes = 12 - deleteCommit(path) // ver 8 - removes 1 file and adds 1 files, activeAdds = 7, dupes = 12 - - loadAndCheckLogReplayMetrics( - engine, - path, - expNumAddFilesSeen = 3 /* checkpoint */ + 18, /* delta */ - expNumAddFilesSeenFromDeltaFiles = 18, - expNumActiveAddFiles = 7, - expNumTombstonesSeen = 2, - expNumDuplicateAddFiles = 12 - ) - } - } - } - - ///////////////////////// - // Test Helper Methods // - ///////////////////////// - def loadAndCheckLogReplayMetrics( - engine: Engine, - tablePath: String, - expNumAddFilesSeen: Long, - expNumAddFilesSeenFromDeltaFiles: Long, - expNumActiveAddFiles: Long, - expNumDuplicateAddFiles: Long = 0L, - expNumTombstonesSeen: Long = 0L): Unit = { - - val scanFileIter = Table.forPath(engine, tablePath) - .getLatestSnapshot(engine) - .getScanBuilder() - .build() - .getScanFiles(engine) - - // this will trigger the log replay, consumes actions and closes the iterator - scanFileIter.toSeq - - val metrics = scanFileIter.asInstanceOf[ActiveAddFilesIterator].getMetrics - assert(metrics.getNumAddFilesSeen == expNumAddFilesSeen) - assert(metrics.getNumAddFilesSeenFromDeltaFiles == expNumAddFilesSeenFromDeltaFiles) - assert(metrics.getNumActiveAddFiles == expNumActiveAddFiles) - assert(metrics.getNumDuplicateAddFiles == expNumDuplicateAddFiles) - assert(metrics.getNumTombstonesSeen == expNumTombstonesSeen) - - - val expResults = spark.sql(s"SELECT * FROM delta.`$tablePath`").collect().map(TestRow(_)) - checkTable(tablePath, expResults) - } - - def withTmpDirAndEngine(f: (String, Engine) => Unit): Unit = { - val engine = DefaultEngine.create(new Configuration() { - { - // Set the batch sizes to small so that we get to test the multiple batch scenarios. - set("delta.kernel.default.parquet.reader.batch-size", "2"); - set("delta.kernel.default.json.reader.batch-size", "2"); - } - }) - withTempDir { dir => f(dir.getAbsolutePath, engine) } - } - - def appendCommit(path: String): Unit = - spark.range(10).repartition(2).write.format("delta").mode("append").save(path) - - def deleteCommit(path: String): Unit = { - spark.sql("DELETE FROM delta.`%s` WHERE id = 5".format(path)) - } - - def recomputeStats(path: String): Unit = { - val deltaLog = DeltaLog.forTable(spark, new Path(path)) - StatisticsCollection.recompute(spark, deltaLog, catalogTable = None) - } - - def checkpoint(path: String, actionsPerFile: Int): Unit = { - withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> actionsPerFile.toString) { - DeltaLog.forTable(spark, path).checkpoint() - } - } -} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/MetricsReportTestUtils.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/MetricsReportTestUtils.scala index bb4f779dde2..1f115a56a49 100644 --- a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/MetricsReportTestUtils.scala +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/MetricsReportTestUtils.scala @@ -19,9 +19,11 @@ import java.util import scala.collection.mutable.ArrayBuffer +import io.delta.kernel.defaults.engine.DefaultEngine import io.delta.kernel.defaults.utils.TestUtils import io.delta.kernel.engine._ import io.delta.kernel.metrics.MetricsReport +import org.apache.hadoop.conf.Configuration /** * Test utilities for testing the Kernel-API created [[MetricsReports]]s. @@ -31,6 +33,14 @@ import io.delta.kernel.metrics.MetricsReport */ trait MetricsReportTestUtils extends TestUtils { + override lazy val defaultEngine = DefaultEngine.create(new Configuration() { + { + // Set the batch sizes to small so that we get to test the multiple batch scenarios. + set("delta.kernel.default.parquet.reader.batch-size", "2"); + set("delta.kernel.default.json.reader.batch-size", "2"); + } + }) + // For now this just uses the default engine since we have no need to override it, if we would // like to use a specific engine in the future for other tests we can simply add another arg here /** diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/ScanReportSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/ScanReportSuite.scala new file mode 100644 index 00000000000..30126936b05 --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/metrics/ScanReportSuite.scala @@ -0,0 +1,488 @@ +/* + * Copyright (2025) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.metrics + +import java.util.Collections + +import io.delta.kernel._ +import io.delta.kernel.data.FilteredColumnarBatch +import io.delta.kernel.engine._ +import io.delta.kernel.expressions.{Column, Literal, Predicate} +import io.delta.kernel.internal.data.GenericRow +import io.delta.kernel.internal.fs.Path +import io.delta.kernel.internal.metrics.Timer +import io.delta.kernel.internal.util.{FileNames, Utils} +import io.delta.kernel.metrics.{ScanReport, SnapshotReport} +import io.delta.kernel.types.{IntegerType, LongType, StructType} +import io.delta.kernel.utils.CloseableIterator + +import org.apache.spark.sql.functions.col +import org.scalatest.funsuite.AnyFunSuite + +import org.apache.spark.sql.delta.DeltaLog +import org.apache.spark.sql.delta.sources.DeltaSQLConf +import org.apache.spark.sql.delta.stats.StatisticsCollection + +class ScanReportSuite extends AnyFunSuite with MetricsReportTestUtils { + + /** + * Creates a [[Scan]] using `getScan` and then requests and consumes the scan files. Uses a custom + * engine to collect emitted metrics reports (exactly 1 [[ScanReport]] and 1 [[SnapshotReport]] is + * expected). Also times and returns the duration it takes to consume the scan files. + * + * @param getScan function to generate a [[Scan]] given an engine + * @param expectException whether we expect consuming the scan files to throw an exception, which + * if so, is caught and returned with the other results + * @return (ScanReport, durationToConsumeScanFiles, SnapshotReport, ExceptionIfThrown) + */ + def getScanAndSnapshotReport( + getScan: Engine => Scan, + expectException: Boolean, + consumeScanFiles: CloseableIterator[FilteredColumnarBatch] => Unit + ): (ScanReport, Long, SnapshotReport, Option[Exception]) = { + val timer = new Timer() + + val (metricsReports, exception) = collectMetricsReports( + engine => { + val scan = getScan(engine) + // Time the actual operation + timer.timeCallable(() => consumeScanFiles(scan.getScanFiles(engine))) + }, + expectException + ) + + val scanReports = metricsReports.filter(_.isInstanceOf[ScanReport]) + assert(scanReports.length == 1, "Expected exactly 1 ScanReport") + val snapshotReports = metricsReports.filter(_.isInstanceOf[SnapshotReport]) + assert(snapshotReports.length == 1, "Expected exactly 1 SnapshotReport") + (scanReports.head.asInstanceOf[ScanReport], timer.totalDurationNs(), + snapshotReports.head.asInstanceOf[SnapshotReport], exception) + } + + /** + * Given a table path, constructs the latest snapshot, and uses it to generate a Scan with the + * provided filter and readSchema (if provided). Consumes the scan files from the scan and + * collects the emitted [[ScanReport]] and checks that the report is as expected. + * + * @param path table path to query + * @param expectException whether we expect consuming the scan files to throw an exception + * @param expectedNumAddFiles expected number of add files seen + * @param expectedNumAddFilesFromDeltaFiles expected number of add files seen from delta files + * @param expectedNumActiveAddFiles expected number of active add files + * @param expectedNumDuplicateAddFiles expected number of duplicate add files seen + * @param expectedNumRemoveFilesSeenFromDeltaFiles expected number of remove files seen + * @param expectedPartitionPredicate expected partition predicate + * @param expectedDataSkippingFilter expected data skipping filter + * @param filter filter to build the scan with + * @param readSchema read schema to build the scan with + * @param consumeScanFiles function to consume scan file iterator + */ + // scalastyle:off + def checkScanReport( + path: String, + expectException: Boolean, + expectedNumAddFiles: Long, + expectedNumAddFilesFromDeltaFiles: Long, + expectedNumActiveAddFiles: Long, + expectedNumDuplicateAddFiles: Long = 0, + expectedNumRemoveFilesSeenFromDeltaFiles: Long = 0, + expectedPartitionPredicate: Option[Predicate] = None, + expectedDataSkippingFilter: Option[Predicate] = None, + expectedIsFullyConsumed: Boolean = true, + filter: Option[Predicate] = None, + readSchema: Option[StructType] = None, + // toSeq triggers log replay, consumes the actions and closes the iterator + consumeScanFiles: CloseableIterator[FilteredColumnarBatch] => Unit = iter => iter.toSeq + ): Unit = { + // scalastyle:on + // We need to save the snapshotSchema to check against the generated scan report + // In order to use the utils to collect the reports, we need to generate the snapshot in a anon + // fx, thus we save the snapshotSchema as a side-effect + var snapshotSchema: StructType = null + + val (scanReport, durationNs, snapshotReport, exceptionOpt) = getScanAndSnapshotReport( + engine => { + val snapshot = Table.forPath(engine, path).getLatestSnapshot(engine) + snapshotSchema = snapshot.getSchema() + var scanBuilder = snapshot.getScanBuilder() + if (filter.nonEmpty) { + scanBuilder = scanBuilder.withFilter(filter.get) + } + if (readSchema.nonEmpty) { + scanBuilder = scanBuilder.withReadSchema(readSchema.get) + } + scanBuilder.build() + }, + expectException, + consumeScanFiles + ) + + // Verify contents + assert(scanReport.getTablePath == defaultEngine.getFileSystemClient.resolvePath(path)) + assert(scanReport.getOperationType == "Scan") + exceptionOpt match { + case Some(e) => + assert(scanReport.getException().isPresent) + assert(scanReport.getException().get().getClass == e.getClass) + assert(scanReport.getException().get().getMessage == e.getMessage) + case None => assert(!scanReport.getException().isPresent) + } + assert(scanReport.getReportUUID != null) + + assert(snapshotReport.getVersion.isPresent, + "Version should be present for success SnapshotReport") + assert(scanReport.getTableVersion() == snapshotReport.getVersion.get()) + assert(scanReport.getTableSchema() == snapshotSchema) + assert(scanReport.getSnapshotReportUUID == snapshotReport.getReportUUID) + assert(scanReport.getFilter.toScala == filter) + assert(scanReport.getReadSchema == readSchema.getOrElse(snapshotSchema)) + assert(scanReport.getPartitionPredicate.toScala == expectedPartitionPredicate) + assert(scanReport.getIsFullyConsumed == expectedIsFullyConsumed) + + (scanReport.getDataSkippingFilter.toScala, expectedDataSkippingFilter) match { + case (Some(found), Some(expected)) => + assert(found.getName == expected.getName && found.getChildren == expected.getChildren) + case (found, expected) => assert(found == expected) + } + + // Since we cannot know the actual duration of the scan we sanity check that they are > 0 and + // less than the total operation duration + assert(scanReport.getScanMetrics.getTotalPlanningDurationNs > 0) + assert(scanReport.getScanMetrics.getTotalPlanningDurationNs < durationNs) + + assert(scanReport.getScanMetrics.getNumAddFilesSeen == expectedNumAddFiles) + assert(scanReport.getScanMetrics.getNumAddFilesSeenFromDeltaFiles == + expectedNumAddFilesFromDeltaFiles) + assert(scanReport.getScanMetrics.getNumActiveAddFiles == expectedNumActiveAddFiles) + assert(scanReport.getScanMetrics.getNumDuplicateAddFiles == expectedNumDuplicateAddFiles) + assert(scanReport.getScanMetrics.getNumRemoveFilesSeenFromDeltaFiles == + expectedNumRemoveFilesSeenFromDeltaFiles) + } + + test("ScanReport: basic case with no extra parameters") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // Set up delta table with 1 add file + spark.range(10).write.format("delta").mode("append").save(path) + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 1, + expectedNumAddFilesFromDeltaFiles = 1, + expectedNumActiveAddFiles = 1 + ) + } + } + + test("ScanReport: basic case with read schema") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // Set up delta table with 1 add file + spark.range(10).withColumn("c2", col("id") % 2) + .write.format("delta").mode("append").save(path) + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 1, + expectedNumAddFilesFromDeltaFiles = 1, + expectedNumActiveAddFiles = 1, + readSchema = Some(new StructType().add("id", LongType.LONG)) + ) + } + } + + test("ScanReport: different filter scenarios") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // Set up partitioned table + spark.range(10).withColumn("part", col("id") % 2) + .write.format("delta").partitionBy("part").save(path) + + val partFilter = new Predicate("=", new Column("part"), Literal.ofLong(0)) + val dataFilter = new Predicate("<=", new Column("id"), Literal.ofLong(0)) + val expectedSkippingFilter = new Predicate( + "<=", new Column(Array("minValues", "id")), Literal.ofLong(0)) + + // The below metrics are incremented during log replay before any filtering happens and thus + // should be the same for all of the following test cases + val expectedNumAddFiles = 2 + val expectedNumAddFilesFromDeltaFiles = 2 + val expectedNumActiveAddFiles = 2 + + // No filter - 2 add files one for each partition + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = expectedNumAddFiles, + expectedNumAddFilesFromDeltaFiles = expectedNumAddFilesFromDeltaFiles, + expectedNumActiveAddFiles = expectedNumActiveAddFiles + ) + + // With partition filter + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = expectedNumAddFiles, + expectedNumAddFilesFromDeltaFiles = expectedNumAddFilesFromDeltaFiles, + expectedNumActiveAddFiles = expectedNumActiveAddFiles, + filter = Some(partFilter), + expectedPartitionPredicate = Some(partFilter) + ) + + // With data filter + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = expectedNumAddFiles, + expectedNumAddFilesFromDeltaFiles = expectedNumAddFilesFromDeltaFiles, + expectedNumActiveAddFiles = expectedNumActiveAddFiles, + filter = Some(dataFilter), + expectedDataSkippingFilter = Some(expectedSkippingFilter) + ) + + // With data and partition filter + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = expectedNumAddFiles, + expectedNumAddFilesFromDeltaFiles = expectedNumAddFilesFromDeltaFiles, + expectedNumActiveAddFiles = expectedNumActiveAddFiles, + filter = Some(new Predicate("AND", partFilter, dataFilter)), + expectedDataSkippingFilter = Some(expectedSkippingFilter), + expectedPartitionPredicate = Some(partFilter) + ) + } + } + + test("ScanReport: close scan file iterator early") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // Set up delta table with 2 add files + spark.range(10).write.format("delta").mode("append").save(path) + spark.range(10).write.format("delta").mode("append").save(path) + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 1, + expectedNumAddFilesFromDeltaFiles = 1, + expectedNumActiveAddFiles = 1, + expectedIsFullyConsumed = false, + consumeScanFiles = iter => iter.close() // Close iterator before consuming any scan files + ) + } + } + + ////////////////// + // Error cases /// + ////////////////// + + test("ScanReport error case - unrecognized partition filter") { + // Thrown during partition pruning when the expression handler cannot evaluate the filter + // Because partition pruning happens within a `map` on the iterator, this is caught and reported + // within `map` + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // Set up partitioned table + spark.range(10).withColumn("part", col("id") % 2) + .write.format("delta").partitionBy("part").save(path) + + val partFilter = new Predicate("foo", new Column("part"), Literal.ofLong(0)) + + checkScanReport( + path, + expectException = true, + expectedNumAddFiles = 0, + expectedNumAddFilesFromDeltaFiles = 0, + expectedNumActiveAddFiles = 0, + expectedIsFullyConsumed = false, + filter = Some(partFilter), + expectedPartitionPredicate = Some(partFilter) + ) + } + } + + test("ScanReport error case - error reading the log files") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + + // We set up a table with a giberish json file at version 0 and a valid json file at version 1 + // that contains the P&M + // This is so the snapshot loading will happen successful, and we will only fail when trying + // to load up the scan files + // This exception is thrown from within the `hasNext` method on the iterator since that is + // when we load the actions from the log files. This exception is caught and reported within + // `hasNext` + spark.range(10).write.format("delta").save(path) + // Update protocol and metadata (so that version 1 has both P&M present) + spark.sql( + s"ALTER TABLE delta.`$path` SET TBLPROPERTIES ('delta.columnMapping.mode' = 'name')") + // Overwrite json file with giberish (this will have a schema mismatch issue for `add`) + val giberishRow = new GenericRow( + new StructType().add("add", IntegerType.INTEGER), + Collections.singletonMap(0, Integer.valueOf(0)) + ) + defaultEngine.getJsonHandler.writeJsonFileAtomically( + FileNames.deltaFile(new Path(tempDir.toString, "_delta_log"), 0), + Utils.singletonCloseableIterator(giberishRow), + true + ) + + checkScanReport( + path, + expectException = true, + expectedNumAddFiles = 0, + expectedNumAddFilesFromDeltaFiles = 0, + expectedNumActiveAddFiles = 0, + expectedIsFullyConsumed = false + ) + } + } + + /////////////////////////////// + // Log replay metrics tests /// + /////////////////////////////// + + test("active add files log replay metrics: only delta files") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + for (_ <- 0 to 9) { + appendCommit(path) + } + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 20, // each commit creates 2 files + expectedNumAddFilesFromDeltaFiles = 20, + expectedNumActiveAddFiles = 20 + ) + } + } + + Seq(true, false).foreach { multipartCheckpoint => + val checkpointStr = if (multipartCheckpoint) "multipart " else "" + test(s"active add files log replay metrics: ${checkpointStr}checkpoint + delta files") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + for (_ <- 0 to 3) { + appendCommit(path) + } + checkpoint(path, actionsPerFile = if (multipartCheckpoint) 2 else 1000000) + for (_ <- 4 to 9) { + appendCommit(path) + } + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 20, // each commit creates 2 files + expectedNumAddFilesFromDeltaFiles = 12, // checkpoint is created at version 3 + expectedNumActiveAddFiles = 20 + ) + } + } + } + + Seq(true, false).foreach { multipartCheckpoint => + val checkpointStr = if (multipartCheckpoint) "multipart " else "" + test(s"active add files log replay metrics: ${checkpointStr}checkpoint + " + + s"delta files + tombstones") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + for (_ <- 0 to 3) { + appendCommit(path) + } // has 8 add files + deleteCommit(path) // version 4 - deletes 4 files and adds 1 file + checkpoint(path, actionsPerFile = if (multipartCheckpoint) 2 else 1000000) // version 4 + appendCommit(path) // version 5 - adds 2 files + deleteCommit(path) // version 6 - deletes 1 file and adds 1 file + appendCommit(path) // version 7 - adds 2 files + appendCommit(path) // version 8 - adds 2 files + deleteCommit(path) // version 9 - deletes 2 files and adds 1 file + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 5 /* checkpoint */ + 8, /* delta */ + expectedNumAddFilesFromDeltaFiles = 8, + expectedNumActiveAddFiles = 10, + expectedNumRemoveFilesSeenFromDeltaFiles = 3 + ) + } + } + } + + Seq(true, false).foreach { multipartCheckpoint => + val checkpointStr = if (multipartCheckpoint) "multipart " else "" + test(s"active add files log replay metrics: ${checkpointStr}checkpoint + delta files +" + + s" tombstones + duplicate adds") { + withTempDir { tempDir => + val path = tempDir.getCanonicalPath + for (_ <- 0 to 1) { + appendCommit(path) + } // activeAdds = 4 + deleteCommit(path) // ver 2 - deletes 2 files and adds 1 file, activeAdds = 3 + checkpoint(path, actionsPerFile = if (multipartCheckpoint) 2 else 1000000) // version 2 + appendCommit(path) // ver 3 - adds 2 files, activeAdds = 5 + recomputeStats(path) // ver 4 - adds the same 5 add files again, activeAdds = 5, dupes = 5 + deleteCommit(path) // ver 5 - removes 1 file and adds 1 file, activeAdds = 5, dupes = 5 + appendCommit(path) // ver 6 - adds 2 files, activeAdds = 7, dupes = 4 + recomputeStats(path) // ver 7 - adds the same 7 add files again, activeAdds = 7, dupes = 12 + deleteCommit(path) // ver 8 - removes 1 file and adds 1 files, activeAdds = 7, dupes = 12 + + checkScanReport( + path, + expectException = false, + expectedNumAddFiles = 3 /* checkpoint */ + 18, /* delta */ + expectedNumAddFilesFromDeltaFiles = 18, + expectedNumActiveAddFiles = 7, + expectedNumDuplicateAddFiles = 12, + expectedNumRemoveFilesSeenFromDeltaFiles = 2 + ) + } + } + } + + ///////////////////////////////////////////// + // Helpers for testing log replay metrics /// + ///////////////////////////////////////////// + + def appendCommit(path: String): Unit = + spark.range(10).repartition(2).write.format("delta").mode("append").save(path) + + def deleteCommit(path: String): Unit = { + spark.sql("DELETE FROM delta.`%s` WHERE id = 5".format(path)) + } + + def recomputeStats(path: String): Unit = { + val deltaLog = DeltaLog.forTable(spark, new org.apache.hadoop.fs.Path(path)) + StatisticsCollection.recompute(spark, deltaLog, catalogTable = None) + } + + def checkpoint(path: String, actionsPerFile: Int): Unit = { + withSQLConf(DeltaSQLConf.DELTA_CHECKPOINT_PART_SIZE.key -> actionsPerFile.toString) { + DeltaLog.forTable(spark, path).checkpoint() + } + } +}