Skip to content

Commit

Permalink
[Kernel][Metrics][PR#7] Support ScanReport to log metrics for a Scan …
Browse files Browse the repository at this point in the history
…operation (#4068)

<!--
Thanks for sending a pull request!  Here are some tips for you:
1. If this is your first time, please read our contributor guidelines:
https://github.com/delta-io/delta/blob/master/CONTRIBUTING.md
2. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]
Your PR title ...'.
  3. Be sure to keep the PR description updated to reflect all changes.
  4. Please write your PR title to summarize what this PR proposes.
5. If possible, provide a concise example to reproduce the issue for a
faster review.
6. If applicable, include the corresponding issue number in the PR title
and link it in the body.
-->

#### Which Delta project/connector is this regarding?
<!--
Please add the component selected below to the beginning of the pull
request title
For example: [Spark] Title of my pull request
-->

- [ ] Spark
- [ ] Standalone
- [ ] Flink
- [X] Kernel
- [ ] Other (fill in here)

## Description

Adds `ScanReport` for reporting a Scan.

We record `ScanReport` either after all the scan files have successfully
been consumed (and the iterator closed), or if an exception is thrown
while reading/filtering/preparing the scan files to be returned to the
connector. This is done within the `hasNext` and `next` methods on the
returned iterator since that is when we do all of the kernel work/eval
(since the iterator is lazily loaded). We only record a report for
failures that happen within Kernel, if there are failures from within
the connector code, no report will be emitted.

We also add support for serializing `ScanReport` in this PR.

## How was this patch tested?

Adds unit tests.

## Does this PR introduce _any_ user-facing changes?

No.
  • Loading branch information
allisonport-db authored Feb 3, 2025
1 parent 23e4287 commit a6db4e0
Show file tree
Hide file tree
Showing 17 changed files with 1,194 additions and 339 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.math.BigDecimal;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

/**
* A literal value.
Expand Down Expand Up @@ -247,4 +248,16 @@ public String toString() {
public List<Expression> getChildren() {
return Collections.emptyList();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
Literal other = (Literal) o;
return Objects.equals(dataType, other.dataType) && Objects.equals(value, other.value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructType;
import java.util.Optional;

Expand All @@ -34,6 +35,7 @@ public class ScanBuilderImpl implements ScanBuilder {
private final Metadata metadata;
private final StructType snapshotSchema;
private final LogReplay logReplay;
private final SnapshotReport snapshotReport;

private StructType readSchema;
private Optional<Predicate> predicate;
Expand All @@ -43,14 +45,16 @@ public ScanBuilderImpl(
Protocol protocol,
Metadata metadata,
StructType snapshotSchema,
LogReplay logReplay) {
LogReplay logReplay,
SnapshotReport snapshotReport) {
this.dataPath = dataPath;
this.protocol = protocol;
this.metadata = metadata;
this.snapshotSchema = snapshotSchema;
this.logReplay = logReplay;
this.readSchema = snapshotSchema;
this.predicate = Optional.empty();
this.snapshotReport = snapshotReport;
}

@Override
Expand All @@ -72,6 +76,13 @@ public ScanBuilder withReadSchema(StructType readSchema) {
@Override
public Scan build() {
return new ScanImpl(
snapshotSchema, readSchema, protocol, metadata, logReplay, predicate, dataPath);
snapshotSchema,
readSchema,
protocol,
metadata,
logReplay,
predicate,
dataPath,
snapshotReport);
}
}
174 changes: 150 additions & 24 deletions kernel/kernel-api/src/main/java/io/delta/kernel/internal/ScanImpl.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,15 @@
import io.delta.kernel.internal.actions.Protocol;
import io.delta.kernel.internal.data.ScanStateRow;
import io.delta.kernel.internal.fs.Path;
import io.delta.kernel.internal.metrics.ScanMetrics;
import io.delta.kernel.internal.metrics.ScanReportImpl;
import io.delta.kernel.internal.metrics.Timer;
import io.delta.kernel.internal.replay.LogReplay;
import io.delta.kernel.internal.skipping.DataSkippingPredicate;
import io.delta.kernel.internal.skipping.DataSkippingUtils;
import io.delta.kernel.internal.util.*;
import io.delta.kernel.metrics.ScanReport;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.types.StructField;
import io.delta.kernel.types.StructType;
import io.delta.kernel.utils.CloseableIterator;
Expand All @@ -56,9 +61,12 @@ public class ScanImpl implements Scan {
private final Metadata metadata;
private final LogReplay logReplay;
private final Path dataPath;
private final Optional<Predicate> filter;
private final Optional<Tuple2<Predicate, Predicate>> partitionAndDataFilters;
private final Supplier<Map<String, StructField>> partitionColToStructFieldMap;
private boolean accessedScanFiles;
private final SnapshotReport snapshotReport;
private final ScanMetrics scanMetrics = new ScanMetrics();

public ScanImpl(
StructType snapshotSchema,
Expand All @@ -67,12 +75,14 @@ public ScanImpl(
Metadata metadata,
LogReplay logReplay,
Optional<Predicate> filter,
Path dataPath) {
Path dataPath,
SnapshotReport snapshotReport) {
this.snapshotSchema = snapshotSchema;
this.readSchema = readSchema;
this.protocol = protocol;
this.metadata = metadata;
this.logReplay = logReplay;
this.filter = filter;
this.partitionAndDataFilters = splitFilters(filter);
this.dataPath = dataPath;
this.partitionColToStructFieldMap =
Expand All @@ -82,6 +92,7 @@ public ScanImpl(
.filter(field -> partitionColNames.contains(field.getName().toLowerCase(Locale.ROOT)))
.collect(toMap(field -> field.getName().toLowerCase(Locale.ROOT), identity()));
};
this.snapshotReport = snapshotReport;
}

/**
Expand Down Expand Up @@ -118,30 +129,60 @@ public CloseableIterator<FilteredColumnarBatch> getScanFiles(
boolean hasDataSkippingFilter = dataSkippingFilter.isPresent();
boolean shouldReadStats = hasDataSkippingFilter || includeStats;

// Get active AddFiles via log replay
// If there is a partition predicate, construct a predicate to prune checkpoint files
// while constructing the table state.
CloseableIterator<FilteredColumnarBatch> scanFileIter =
logReplay.getAddFilesAsColumnarBatches(
engine,
shouldReadStats,
getPartitionsFilters()
.map(
predicate ->
rewritePartitionPredicateOnCheckpointFileSchema(
predicate, partitionColToStructFieldMap.get())));

// Apply partition pruning
scanFileIter = applyPartitionPruning(engine, scanFileIter);

// Apply data skipping
if (hasDataSkippingFilter) {
// there was a usable data skipping filter --> apply data skipping
scanFileIter = applyDataSkipping(engine, scanFileIter, dataSkippingFilter.get());
}
Timer.Timed planningDuration = scanMetrics.totalPlanningTimer.start();
// ScanReportReporter stores the current context and can be invoked (in the future) with
// `reportError` or `reportSuccess` to stop the planning duration timer and push a report to
// the engine
ScanReportReporter reportReporter =
(exceptionOpt, isFullyConsumed) -> {
planningDuration.stop();
ScanReport scanReport =
new ScanReportImpl(
dataPath.toString() /* tablePath */,
logReplay.getVersion() /* table version */,
snapshotSchema,
snapshotReport.getReportUUID(),
filter,
readSchema,
getPartitionsFilters() /* partitionPredicate */,
dataSkippingFilter.map(p -> p),
isFullyConsumed,
scanMetrics,
exceptionOpt);
engine.getMetricsReporters().forEach(reporter -> reporter.report(scanReport));
};

try {
// Get active AddFiles via log replay
// If there is a partition predicate, construct a predicate to prune checkpoint files
// while constructing the table state.
CloseableIterator<FilteredColumnarBatch> scanFileIter =
logReplay.getAddFilesAsColumnarBatches(
engine,
shouldReadStats,
getPartitionsFilters()
.map(
predicate ->
rewritePartitionPredicateOnCheckpointFileSchema(
predicate, partitionColToStructFieldMap.get())),
scanMetrics);

// Apply partition pruning
scanFileIter = applyPartitionPruning(engine, scanFileIter);

// Apply data skipping
if (hasDataSkippingFilter) {
// there was a usable data skipping filter --> apply data skipping
scanFileIter = applyDataSkipping(engine, scanFileIter, dataSkippingFilter.get());
}

// TODO when !includeStats drop the stats column if present before returning
return wrapWithMetricsReporting(scanFileIter, reportReporter);

// TODO when !includeStats drop the stats column if present before returning
return scanFileIter;
} catch (Exception e) {
reportReporter.reportError(e);
throw e;
}
}

@Override
Expand Down Expand Up @@ -309,4 +350,89 @@ private CloseableIterator<FilteredColumnarBatch> applyDataSkipping(
filteredScanFileBatch.getData(), Optional.of(newSelectionVector));
});
}

/**
* Wraps a scan file iterator such that we emit {@link ScanReport} to the engine upon success and
* failure. Since most of our scan building code is lazily executed (since it occurs as
* maps/filters over an iterator) potential errors don't occur just within `getScanFile`s
* execution, but rather may occur as the returned iterator is consumed. Similarly, we cannot
* report a successful scan until the iterator has been fully consumed and the log read/filtered
* etc. This means we cannot report the successful scan within `getScanFiles` but rather must
* report after the iterator has been consumed.
*
* <p>This method wraps an inner scan file iterator with an outer iterator wrapper that reports
* {@link ScanReport}s as needed. It reports a failed {@link ScanReport} in the case of any
* exceptions originating from the inner iterator `next` and `hasNext` impl. It reports a complete
* or incomplete {@link ScanReport} when the iterator is closed.
*/
private CloseableIterator<FilteredColumnarBatch> wrapWithMetricsReporting(
CloseableIterator<FilteredColumnarBatch> scanIter, ScanReportReporter reporter) {
return new CloseableIterator<FilteredColumnarBatch>() {

/* Whether this iterator has reported an error report */
private boolean errorReported = false;

@Override
public void close() throws IOException {
try {
// If a ScanReport has already been pushed in the case of an exception don't double report
if (!errorReported) {
if (!scanIter.hasNext()) {
// The entire scan file iterator has been successfully consumed report a complete Scan
reporter.reportCompleteScan();
} else {
// The scan file iterator has NOT been fully consumed before being closed
// We have no way of knowing the reason why, this could be due to an exception in the
// connector code, or intentional early termination such as for a LIMIT query
reporter.reportIncompleteScan();
}
}
} finally {
scanIter.close();
}
}

@Override
public boolean hasNext() {
return wrapWithErrorReporting(() -> scanIter.hasNext());
}

@Override
public FilteredColumnarBatch next() {
return wrapWithErrorReporting(() -> scanIter.next());
}

private <T> T wrapWithErrorReporting(Supplier<T> s) {
try {
return s.get();
} catch (Exception e) {
reporter.reportError(e);
errorReported = true;
throw e;
}
}
};
}

/**
* Defines methods to report {@link ScanReport} to the engine. This allows us to avoid ambiguous
* lambdas/anonymous classes as well as reuse the defined default methods.
*/
private interface ScanReportReporter {

default void reportError(Exception e) {
report(Optional.of(e), false /* isFullyConsumed */);
}

default void reportCompleteScan() {
report(Optional.empty(), true /* isFullyConsumed */);
}

default void reportIncompleteScan() {
report(Optional.empty(), false /* isFullyConsumed */);
}

/** Given an optional exception, reports a {@link ScanReport} to the engine */
void report(Optional<Exception> exceptionOpt, boolean isFullyConsumed);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,8 @@ public StructType getSchema() {

@Override
public ScanBuilder getScanBuilder() {
// TODO when we add ScanReport we will pass the SnapshotReport downstream here
return new ScanBuilderImpl(dataPath, protocol, metadata, getSchema(), logReplay);
return new ScanBuilderImpl(
dataPath, protocol, metadata, getSchema(), logReplay, snapshotReport);
}

///////////////////
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,12 @@
import com.fasterxml.jackson.databind.module.SimpleModule;
import com.fasterxml.jackson.databind.ser.std.ToStringSerializer;
import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
import io.delta.kernel.expressions.Predicate;
import io.delta.kernel.metrics.MetricsReport;
import io.delta.kernel.metrics.ScanReport;
import io.delta.kernel.metrics.SnapshotReport;
import io.delta.kernel.metrics.TransactionReport;
import io.delta.kernel.types.StructType;

/** Defines JSON serializers for {@link MetricsReport} types */
public final class MetricsReportSerializers {
Expand All @@ -41,6 +44,15 @@ public static String serializeSnapshotReport(SnapshotReport snapshotReport)
return OBJECT_MAPPER.writeValueAsString(snapshotReport);
}

/**
* Serializes a {@link ScanReport} to a JSON string
*
* @throws JsonProcessingException
*/
public static String serializeScanReport(ScanReport scanReport) throws JsonProcessingException {
return OBJECT_MAPPER.writeValueAsString(scanReport);
}

/**
* Serializes a {@link TransactionReport} to a JSON string
*
Expand All @@ -59,7 +71,11 @@ public static String serializeTransactionReport(TransactionReport transactionRep
new ObjectMapper()
.registerModule(new Jdk8Module()) // To support Optional
.registerModule( // Serialize Exception using toString()
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()));
new SimpleModule().addSerializer(Exception.class, new ToStringSerializer()))
.registerModule( // Serialize StructType using toString
new SimpleModule().addSerializer(StructType.class, new ToStringSerializer()))
.registerModule( // Serialize Predicate using toString
new SimpleModule().addSerializer(Predicate.class, new ToStringSerializer()));

private MetricsReportSerializers() {}
}
Loading

0 comments on commit a6db4e0

Please sign in to comment.