From 5cc7b09062892e997d7f5b3c4a84b73fb454c1da Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 24 Jan 2025 10:21:01 -0800 Subject: [PATCH 1/3] Done refactor first pass --- .../internal/snapshot/SnapshotManager.java | 198 +++++++++--------- .../internal/SnapshotManagerSuite.scala | 44 ++-- 2 files changed, 122 insertions(+), 120 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 857568d82e4..acf8f8548a9 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -20,7 +20,6 @@ import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED; import static io.delta.kernel.internal.TableConfig.LOG_RETENTION; import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable; -import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore; import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable; import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs; import static io.delta.kernel.internal.util.Preconditions.checkArgument; @@ -60,7 +59,7 @@ public class SnapshotManager { * The latest {@link SnapshotHint} for this table. The initial value inside the AtomicReference is * `null`. */ - private AtomicReference latestSnapshotHint; + private final AtomicReference latestSnapshotHint; private final Path logPath; private final Path tablePath; @@ -81,12 +80,18 @@ public SnapshotManager(Path logPath, Path tablePath) { * Construct the latest snapshot for given table. * * @param engine Instance of {@link Engine} to use. - * @return - * @throws TableNotFoundException + * @return the latest {@link Snapshot} of the table + * @throws TableNotFoundException if the table does not exist + * @throws InvalidTableException if the table is in an invalid state */ public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshotContext) throws TableNotFoundException { - return getSnapshotAtInit(engine, snapshotContext); + final LogSegment logSegment = + getLogSegmentForVersion(engine, Optional.empty() /* versionToLoad */); + + snapshotContext.setVersion(logSegment.version); + + return createSnapshot(logSegment, engine, snapshotContext); } /** @@ -95,20 +100,15 @@ public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshot * @param engine Instance of {@link Engine} to use. * @param version The snapshot version to construct * @return a {@link Snapshot} of the table at version {@code version} - * @throws TableNotFoundException + * @throws TableNotFoundException if the table does not exist + * @throws InvalidTableException if the table is in an invalid state */ public Snapshot getSnapshotAt(Engine engine, long version, SnapshotQueryContext snapshotContext) throws TableNotFoundException { + final LogSegment logSegment = + getLogSegmentForVersion(engine, Optional.of(version) /* versionToLoadOpt */); - Optional logSegmentOpt = - getLogSegmentForVersion( - engine, - Optional.empty(), /* startCheckpointOpt */ - Optional.of(version) /* versionToLoadOpt */); - - return logSegmentOpt - .map(logSegment -> createSnapshot(logSegment, engine, snapshotContext)) - .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); + return createSnapshot(logSegment, engine, snapshotContext); } /** @@ -265,30 +265,6 @@ private void registerHint(SnapshotHint newHint) { }); } - /** - * Load the Snapshot for this Delta table at initialization. This method uses the `lastCheckpoint` - * file as a hint on where to start listing the transaction log directory. - */ - private SnapshotImpl getSnapshotAtInit(Engine engine, SnapshotQueryContext snapshotContext) - throws TableNotFoundException { - Checkpointer checkpointer = new Checkpointer(logPath); - Optional lastCheckpointOpt = checkpointer.readLastCheckpointFile(engine); - if (!lastCheckpointOpt.isPresent()) { - logger.warn( - "{}: Last checkpoint file is missing or corrupted. " - + "Will search for the checkpoint files directly.", - tablePath); - } - Optional logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt); - // We update the query context version with the resolved version from the log segment listing - // if it exists - logSegmentOpt.ifPresent(logSegment -> snapshotContext.setVersion(logSegment.version)); - - return logSegmentOpt - .map(logSegment -> createSnapshot(logSegment, engine, snapshotContext)) - .orElseThrow(() -> new TableNotFoundException(tablePath.toString())); - } - private SnapshotImpl createSnapshot( LogSegment initSegment, Engine engine, SnapshotQueryContext snapshotContext) { final String startingFromStr = @@ -341,78 +317,108 @@ private SnapshotImpl createSnapshot( } /** - * Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog - * initialization, or None if the directory was empty/missing. + * Generates a {@link LogSegment} for the given `versionToLoadOpt`. If no `versionToLoadOpt` is + * provided, generates a {@code LogSegment} for the latest version of the table. * - * @param startingCheckpoint A checkpoint that we can start our listing from + *

This primarily consists of three steps: + * + *

    + *
  1. First, determine the starting checkpoint version that is at or before `versionToLoadOpt`. + * If no `versionToLoadOpt` is provided, will use the checkpoint pointed to by the + * _last_checkpoint file. + *
  2. Second, LIST the _delta_log for all delta and checkpoint files newer than the starting + * checkpoint version. + *
  3. Third, process and validate this list of _delta_log files to yield a {@code LogSegment}. + *
*/ - private Optional getLogSegmentFrom( - Engine engine, Optional startingCheckpoint) { - return getLogSegmentForVersion( - engine, startingCheckpoint.map(x -> x.version), Optional.empty()); + private LogSegment getLogSegmentForVersion(Engine engine, Optional versionToLoadOpt) { + final String versionToLoadStr = versionToLoadOpt.map(String::valueOf).orElse("latest"); + logger.info("Loading log segment for version {}", versionToLoadStr); + + //////////////////////////////////////////////////////////////////////////////////////////////// + // Step 1: Find the latest checkpoint version. If $versionToLoadOpt is empty, use the version // + // referenced by the _LAST_CHECKPOINT file. If $versionToLoad is present, search for // + // the previous latest complete checkpoint at or before $versionToLoad. // + //////////////////////////////////////////////////////////////////////////////////////////////// + + final Optional startCheckpointVersionOpt = + versionToLoadOpt + .map( + versionToLoad -> { + logger.info( + "Finding last complete checkpoint at or before version {}", versionToLoad); + final long startTimeMillis = System.currentTimeMillis(); + return Checkpointer.findLastCompleteCheckpointBefore( + engine, logPath, versionToLoad + 1) + .map(checkpointInstance -> checkpointInstance.version) + .map( + checkpointVersion -> { + checkArgument( + checkpointVersion <= versionToLoad, + "Last complete checkpoint version %s was not <= targetVersion %s", + checkpointVersion, + versionToLoad); + + logger.info( + "{}: Took {}ms to find last complete checkpoint at or before " + + "targetVersion {}", + tablePath, + System.currentTimeMillis() - startTimeMillis, + versionToLoad); + + return checkpointVersion; + }); + }) + .orElseGet( + () -> { + logger.info("Loading last checkpoint from the _last_checkpoint file"); + return new Checkpointer(logPath) + .readLastCheckpointFile(engine) + .map(x -> x.version); + }); + + return getLogSegmentForVersion(engine, startCheckpointVersionOpt, versionToLoadOpt); } /** - * Get a list of files that can be used to compute a Snapshot at version `versionToLoad`, if - * `versionToLoad` is not provided, will generate the list of files that are needed to load the - * latest version of the Delta table. This method also performs checks to ensure that the delta - * files are contiguous. - * - * @param startCheckpoint A potential start version to perform the listing of the DeltaLog, - * typically that of a known checkpoint. If this version's not provided, we will start listing - * from version 0. - * @param versionToLoad A specific version to load. Typically used with time travel and the Delta - * streaming source. If not provided, we will try to load the latest version of the table. - * @return Some LogSegment to build a Snapshot if files do exist after the given startCheckpoint. - * None, if the delta log directory was missing or empty. + * Helper function for the {@link #getLogSegmentForVersion(Engine, Optional)} above. Exposes the + * startCheckpoint param for testing. */ - public Optional getLogSegmentForVersion( - Engine engine, Optional startCheckpoint, Optional versionToLoad) { - // Only use startCheckpoint if it is <= versionToLoad - Optional startCheckpointToUse = - startCheckpoint.filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get()); - - // if we are loading a specific version and there is no usable starting checkpoint - // try to load a checkpoint that is <= version to load - if (!startCheckpointToUse.isPresent() && versionToLoad.isPresent()) { - long beforeVersion = versionToLoad.get() + 1; - long startTimeMillis = System.currentTimeMillis(); - startCheckpointToUse = - findLastCompleteCheckpointBefore(engine, logPath, beforeVersion).map(x -> x.version); - - logger.info( - "{}: Took {}ms to load last checkpoint before version {}", - tablePath, - System.currentTimeMillis() - startTimeMillis, - beforeVersion); - } - - long startVersion = - startCheckpointToUse.orElseGet( + @VisibleForTesting + public LogSegment getLogSegmentForVersion( + Engine engine, Optional startCheckpointVersionOpt, Optional versionToLoadOpt) { + ///////////////////////////////////////////////////////////////// + // Step 2: Determine the actual version to start listing from. // + ///////////////////////////////////////////////////////////////// + + final long listFromStartVersion = + startCheckpointVersionOpt.orElseGet( () -> { logger.warn( "{}: Starting checkpoint is missing. Listing from version as 0", tablePath); return 0L; }); - long startTimeMillis = System.currentTimeMillis(); + ///////////////////////////////////////////////////////////////// + // Step 3: List the files from $startVersion to $versionToLoad // + ///////////////////////////////////////////////////////////////// + + final long startTimeMillis = System.currentTimeMillis(); final List newFiles = DeltaLogActionUtils.listDeltaLogFiles( engine, new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)), tablePath, - startVersion, - versionToLoad, + listFromStartVersion, + versionToLoadOpt, true /* mustBeRecreatable */); - logger.info( "{}: Took {}ms to list the files after starting checkpoint", tablePath, System.currentTimeMillis() - startTimeMillis); - startTimeMillis = System.currentTimeMillis(); try { - return getLogSegmentForVersion(engine, startCheckpointToUse, versionToLoad, newFiles); + return getLogSegmentForVersionHelper(startCheckpointVersionOpt, versionToLoadOpt, newFiles); } finally { logger.info( "{}: Took {}ms to construct a log segment", @@ -425,8 +431,7 @@ public Optional getLogSegmentForVersion( * Helper function for the getLogSegmentForVersion above. Called with a provided files list, and * will then try to construct a new LogSegment using that. */ - protected Optional getLogSegmentForVersion( - Engine engine, + private LogSegment getLogSegmentForVersionHelper( Optional startCheckpointOpt, Optional versionToLoadOpt, List newFiles) { @@ -625,14 +630,13 @@ protected Optional getLogSegmentForVersion( }) .orElse(Collections.emptyList()); - return Optional.of( - new LogSegment( - logPath, - newVersion, - deltasAfterCheckpoint, - newCheckpointFiles, - newCheckpointOpt.map(x -> x.version), - lastCommitTimestamp)); + return new LogSegment( + logPath, + newVersion, + deltasAfterCheckpoint, + newCheckpointFiles, + newCheckpointOpt.map(x -> x.version), + lastCommitTimestamp); } /** diff --git a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala index c9a05385822..5fa2cd09106 100644 --- a/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala +++ b/kernel/kernel-api/src/test/scala/io/delta/kernel/internal/SnapshotManagerSuite.scala @@ -215,14 +215,13 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } }.getOrElse((Seq.empty, Seq.empty)) - val logSegmentOpt = snapshotManager.getLogSegmentForVersion( + val logSegment = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(listFromProvider(deltas ++ checkpointFiles)("/"), new MockSidecarParquetHandler(expectedSidecars), new MockSidecarJsonHandler(expectedSidecars)), Optional.empty(), versionToLoad ) - assert(logSegmentOpt.isPresent()) val expectedDeltas = deltaFileStatuses( deltaVersions.filter { v => @@ -241,7 +240,7 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { }.getOrElse(Seq.empty) checkLogSegment( - logSegmentOpt.get(), + logSegment, expectedVersion = versionToLoad.orElse(deltaVersions.max), expectedDeltas = expectedDeltas, expectedCheckpoints = expectedCheckpoints, @@ -490,14 +489,13 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { listFromProvider(files)(filePath) } for (checkpointV <- Seq(10, 20)) { - val logSegmentOpt = snapshotManager.getLogSegmentForVersion( + val logSegment = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(listFrom(checkpointV)(_)), Optional.of(checkpointV), Optional.empty() ) - assert(logSegmentOpt.isPresent()) checkLogSegment( - logSegmentOpt.get(), + logSegment, expectedVersion = 24, expectedDeltas = deltaFileStatuses(21L until 25L), expectedCheckpoints = singularCheckpointFileStatuses(Seq(20L)), @@ -524,15 +522,8 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { } test("getLogSegmentForVersion: versionToLoad not constructable from history") { - val files = deltaFileStatuses(20L until 25L) ++ singularCheckpointFileStatuses(Seq(20L)) testExpectedError[RuntimeException]( - files, - versionToLoad = Optional.of(15), - expectedErrorMessageContains = "Cannot load table version 15" - ) - testExpectedError[RuntimeException]( - files, - startCheckpoint = Optional.of(20), + deltaFileStatuses(20L until 25L) ++ singularCheckpointFileStatuses(Seq(20L)), versionToLoad = Optional.of(15), expectedErrorMessageContains = "Cannot load table version 15" ) @@ -744,12 +735,6 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { versionToLoad = Optional.of(17), expectedErrorMessageContains = "missing log file for version 0" ) - testExpectedError[InvalidTableException]( - deltaFileStatuses(15L until 25L) ++ singularCheckpointFileStatuses(Seq(20L)), - startCheckpoint = Optional.of(20), - versionToLoad = Optional.of(17), - expectedErrorMessageContains = "missing log file for version 0" - ) testExpectedError[InvalidTableException]( deltaFileStatuses((0L until 5L) ++ (6L until 9L)), expectedErrorMessageContains = "are not contiguous" @@ -830,15 +815,14 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { .take(4) val checkpoints = singularCheckpointFileStatuses(validVersions) val deltas = deltaFileStatuses(deltaVersions) - val logSegmentOpt = snapshotManager.getLogSegmentForVersion( + val logSegment = snapshotManager.getLogSegmentForVersion( createMockFSListFromEngine(deltas ++ corruptedCheckpoint ++ checkpoints), Optional.empty(), Optional.empty() ) val checkpointVersion = validVersions.sorted.lastOption - assert(logSegmentOpt.isPresent()) checkLogSegment( - logSegment = logSegmentOpt.get(), + logSegment, expectedVersion = deltaVersions.max, expectedDeltas = deltaFileStatuses( deltaVersions.filter(_ > checkpointVersion.getOrElse(-1L))), @@ -860,6 +844,20 @@ class SnapshotManagerSuite extends AnyFunSuite with MockFileSystemClientUtils { assert(exMsg.contains("Missing checkpoint at version 1")) } + + test("getLogSegmentForVersion: startCheckpoint is greater than versionToLoad") { + val exMsg = intercept[IllegalArgumentException] { + snapshotManager.getLogSegmentForVersion( + createMockFSListFromEngine( + singularCheckpointFileStatuses(Seq(10)) ++ deltaFileStatuses(10L until 15L) + ), + Optional.of(10), // startCheckpoint + Optional.of(7) // versionToLoad + ) + }.getMessage + + assert(exMsg.contains("endVersion=7 provided is less than startVersion=10")) + } } trait SidecarIteratorProvider extends VectorTestUtils { From a28751c9deaee9a0570146cc5782cd3c996914fc Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 24 Jan 2025 15:19:40 -0800 Subject: [PATCH 2/3] Refactor getStartCheckpointVersion logic to helper --- .../internal/snapshot/SnapshotManager.java | 84 +++++++++++-------- 1 file changed, 48 insertions(+), 36 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index acf8f8548a9..64f28ac6d3e 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -341,43 +341,12 @@ private LogSegment getLogSegmentForVersion(Engine engine, Optional version // the previous latest complete checkpoint at or before $versionToLoad. // //////////////////////////////////////////////////////////////////////////////////////////////// - final Optional startCheckpointVersionOpt = - versionToLoadOpt - .map( - versionToLoad -> { - logger.info( - "Finding last complete checkpoint at or before version {}", versionToLoad); - final long startTimeMillis = System.currentTimeMillis(); - return Checkpointer.findLastCompleteCheckpointBefore( - engine, logPath, versionToLoad + 1) - .map(checkpointInstance -> checkpointInstance.version) - .map( - checkpointVersion -> { - checkArgument( - checkpointVersion <= versionToLoad, - "Last complete checkpoint version %s was not <= targetVersion %s", - checkpointVersion, - versionToLoad); - - logger.info( - "{}: Took {}ms to find last complete checkpoint at or before " - + "targetVersion {}", - tablePath, - System.currentTimeMillis() - startTimeMillis, - versionToLoad); - - return checkpointVersion; - }); - }) - .orElseGet( - () -> { - logger.info("Loading last checkpoint from the _last_checkpoint file"); - return new Checkpointer(logPath) - .readLastCheckpointFile(engine) - .map(x -> x.version); - }); + final Optional getStartCheckpointVersionOpt = + getStartCheckpointVersion(engine, versionToLoadOpt); + + // TODO: make this method *deep*. Conslidate all of the getLogSegment methods to one. - return getLogSegmentForVersion(engine, startCheckpointVersionOpt, versionToLoadOpt); + return getLogSegmentForVersion(engine, getStartCheckpointVersionOpt, versionToLoadOpt); } /** @@ -639,6 +608,49 @@ private LogSegment getLogSegmentForVersionHelper( lastCommitTimestamp); } + ///////////////////////// + // getLogSegment utils // + ///////////////////////// + + /** + * Determine the starting checkpoint version that is at or before `versionToLoadOpt`. If no + * `versionToLoadOpt` is provided, will use the checkpoint pointed to by the _last_checkpoint + * file. + */ + private Optional getStartCheckpointVersion(Engine engine, Optional versionToLoadOpt) { + return versionToLoadOpt + .map( + versionToLoad -> { + logger.info( + "Finding last complete checkpoint at or before version {}", versionToLoad); + final long startTimeMillis = System.currentTimeMillis(); + return Checkpointer.findLastCompleteCheckpointBefore( + engine, logPath, versionToLoad + 1) + .map(checkpointInstance -> checkpointInstance.version) + .map( + checkpointVersion -> { + checkArgument( + checkpointVersion <= versionToLoad, + "Last complete checkpoint version %s was not <= targetVersion %s", + checkpointVersion, + versionToLoad); + + logger.info( + "{}: Took {}ms to find last complete checkpoint <= targetVersion {}", + tablePath, + System.currentTimeMillis() - startTimeMillis, + versionToLoad); + + return checkpointVersion; + }); + }) + .orElseGet( + () -> { + logger.info("Loading last checkpoint from the _last_checkpoint file"); + return new Checkpointer(logPath).readLastCheckpointFile(engine).map(x -> x.version); + }); + } + /** * Returns a [[LogSegment]] for reading `snapshotVersion` such that the segment's checkpoint * version (if checkpoint present) is LESS THAN `maxExclusiveCheckpointVersion`. This is useful From 377940e9053a13d00e6d77c5842cf1bcfc63201c Mon Sep 17 00:00:00 2001 From: Scott Sandre Date: Fri, 24 Jan 2025 15:20:25 -0800 Subject: [PATCH 3/3] rename private helper --- .../io/delta/kernel/internal/snapshot/SnapshotManager.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java index 64f28ac6d3e..3aaa4d2dd71 100644 --- a/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java +++ b/kernel/kernel-api/src/main/java/io/delta/kernel/internal/snapshot/SnapshotManager.java @@ -387,7 +387,7 @@ public LogSegment getLogSegmentForVersion( System.currentTimeMillis() - startTimeMillis); try { - return getLogSegmentForVersionHelper(startCheckpointVersionOpt, versionToLoadOpt, newFiles); + return constructLogSegmentFromFileList(startCheckpointVersionOpt, versionToLoadOpt, newFiles); } finally { logger.info( "{}: Took {}ms to construct a log segment", @@ -400,7 +400,7 @@ public LogSegment getLogSegmentForVersion( * Helper function for the getLogSegmentForVersion above. Called with a provided files list, and * will then try to construct a new LogSegment using that. */ - private LogSegment getLogSegmentForVersionHelper( + private LogSegment constructLogSegmentFromFileList( Optional startCheckpointOpt, Optional versionToLoadOpt, List newFiles) {