Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Kernel] Minor SnapshotManager refactor followup #4089

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import static io.delta.kernel.internal.TableConfig.EXPIRED_LOG_CLEANUP_ENABLED;
import static io.delta.kernel.internal.TableConfig.LOG_RETENTION;
import static io.delta.kernel.internal.TableFeatures.validateWriteSupportedTable;
import static io.delta.kernel.internal.checkpoints.Checkpointer.findLastCompleteCheckpointBefore;
import static io.delta.kernel.internal.replay.LogReplayUtils.assertLogFilesBelongToTable;
import static io.delta.kernel.internal.snapshot.MetadataCleanup.cleanupExpiredLogs;
import static io.delta.kernel.internal.util.Preconditions.checkArgument;
Expand Down Expand Up @@ -60,7 +59,7 @@ public class SnapshotManager {
* The latest {@link SnapshotHint} for this table. The initial value inside the AtomicReference is
* `null`.
*/
private AtomicReference<SnapshotHint> latestSnapshotHint;
private final AtomicReference<SnapshotHint> latestSnapshotHint;

private final Path logPath;
private final Path tablePath;
Expand All @@ -81,12 +80,18 @@ public SnapshotManager(Path logPath, Path tablePath) {
* Construct the latest snapshot for given table.
*
* @param engine Instance of {@link Engine} to use.
* @return
* @throws TableNotFoundException
* @return the latest {@link Snapshot} of the table
* @throws TableNotFoundException if the table does not exist
* @throws InvalidTableException if the table is in an invalid state
*/
public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
return getSnapshotAtInit(engine, snapshotContext);
final LogSegment logSegment =
getLogSegmentForVersion(engine, Optional.empty() /* versionToLoad */);

snapshotContext.setVersion(logSegment.version);

return createSnapshot(logSegment, engine, snapshotContext);
}

/**
Expand All @@ -95,20 +100,15 @@ public Snapshot buildLatestSnapshot(Engine engine, SnapshotQueryContext snapshot
* @param engine Instance of {@link Engine} to use.
* @param version The snapshot version to construct
* @return a {@link Snapshot} of the table at version {@code version}
* @throws TableNotFoundException
* @throws TableNotFoundException if the table does not exist
* @throws InvalidTableException if the table is in an invalid state
*/
public Snapshot getSnapshotAt(Engine engine, long version, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
final LogSegment logSegment =
getLogSegmentForVersion(engine, Optional.of(version) /* versionToLoadOpt */);

Optional<LogSegment> logSegmentOpt =
getLogSegmentForVersion(
engine,
Optional.empty(), /* startCheckpointOpt */
Optional.of(version) /* versionToLoadOpt */);

return logSegmentOpt
.map(logSegment -> createSnapshot(logSegment, engine, snapshotContext))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
return createSnapshot(logSegment, engine, snapshotContext);
}

/**
Expand Down Expand Up @@ -265,30 +265,6 @@ private void registerHint(SnapshotHint newHint) {
});
}

/**
* Load the Snapshot for this Delta table at initialization. This method uses the `lastCheckpoint`
* file as a hint on where to start listing the transaction log directory.
*/
private SnapshotImpl getSnapshotAtInit(Engine engine, SnapshotQueryContext snapshotContext)
throws TableNotFoundException {
Checkpointer checkpointer = new Checkpointer(logPath);
Optional<CheckpointMetaData> lastCheckpointOpt = checkpointer.readLastCheckpointFile(engine);
if (!lastCheckpointOpt.isPresent()) {
logger.warn(
"{}: Last checkpoint file is missing or corrupted. "
+ "Will search for the checkpoint files directly.",
tablePath);
}
Optional<LogSegment> logSegmentOpt = getLogSegmentFrom(engine, lastCheckpointOpt);
// We update the query context version with the resolved version from the log segment listing
// if it exists
logSegmentOpt.ifPresent(logSegment -> snapshotContext.setVersion(logSegment.version));

return logSegmentOpt
.map(logSegment -> createSnapshot(logSegment, engine, snapshotContext))
.orElseThrow(() -> new TableNotFoundException(tablePath.toString()));
}

private SnapshotImpl createSnapshot(
LogSegment initSegment, Engine engine, SnapshotQueryContext snapshotContext) {
final String startingFromStr =
Expand Down Expand Up @@ -341,78 +317,77 @@ private SnapshotImpl createSnapshot(
}

/**
* Get the LogSegment that will help in computing the Snapshot of the table at DeltaLog
* initialization, or None if the directory was empty/missing.
* Generates a {@link LogSegment} for the given `versionToLoadOpt`. If no `versionToLoadOpt` is
* provided, generates a {@code LogSegment} for the latest version of the table.
*
* <p>This primarily consists of three steps:
*
* @param startingCheckpoint A checkpoint that we can start our listing from
* <ol>
* <li>First, determine the starting checkpoint version that is at or before `versionToLoadOpt`.
* If no `versionToLoadOpt` is provided, will use the checkpoint pointed to by the
* _last_checkpoint file.
* <li>Second, LIST the _delta_log for all delta and checkpoint files newer than the starting
* checkpoint version.
* <li>Third, process and validate this list of _delta_log files to yield a {@code LogSegment}.
* </ol>
*/
private Optional<LogSegment> getLogSegmentFrom(
Engine engine, Optional<CheckpointMetaData> startingCheckpoint) {
return getLogSegmentForVersion(
engine, startingCheckpoint.map(x -> x.version), Optional.empty());
private LogSegment getLogSegmentForVersion(Engine engine, Optional<Long> versionToLoadOpt) {
final String versionToLoadStr = versionToLoadOpt.map(String::valueOf).orElse("latest");
logger.info("Loading log segment for version {}", versionToLoadStr);

////////////////////////////////////////////////////////////////////////////////////////////////
// Step 1: Find the latest checkpoint version. If $versionToLoadOpt is empty, use the version //
// referenced by the _LAST_CHECKPOINT file. If $versionToLoad is present, search for //
// the previous latest complete checkpoint at or before $versionToLoad. //
////////////////////////////////////////////////////////////////////////////////////////////////

final Optional<Long> getStartCheckpointVersionOpt =
getStartCheckpointVersion(engine, versionToLoadOpt);

// TODO: make this method *deep*. Conslidate all of the getLogSegment methods to one.

return getLogSegmentForVersion(engine, getStartCheckpointVersionOpt, versionToLoadOpt);
}

/**
* Get a list of files that can be used to compute a Snapshot at version `versionToLoad`, if
* `versionToLoad` is not provided, will generate the list of files that are needed to load the
* latest version of the Delta table. This method also performs checks to ensure that the delta
* files are contiguous.
*
* @param startCheckpoint A potential start version to perform the listing of the DeltaLog,
* typically that of a known checkpoint. If this version's not provided, we will start listing
* from version 0.
* @param versionToLoad A specific version to load. Typically used with time travel and the Delta
* streaming source. If not provided, we will try to load the latest version of the table.
* @return Some LogSegment to build a Snapshot if files do exist after the given startCheckpoint.
* None, if the delta log directory was missing or empty.
* Helper function for the {@link #getLogSegmentForVersion(Engine, Optional)} above. Exposes the
* startCheckpoint param for testing.
*/
public Optional<LogSegment> getLogSegmentForVersion(
Engine engine, Optional<Long> startCheckpoint, Optional<Long> versionToLoad) {
// Only use startCheckpoint if it is <= versionToLoad
Optional<Long> startCheckpointToUse =
startCheckpoint.filter(v -> !versionToLoad.isPresent() || v <= versionToLoad.get());

// if we are loading a specific version and there is no usable starting checkpoint
// try to load a checkpoint that is <= version to load
if (!startCheckpointToUse.isPresent() && versionToLoad.isPresent()) {
long beforeVersion = versionToLoad.get() + 1;
long startTimeMillis = System.currentTimeMillis();
startCheckpointToUse =
findLastCompleteCheckpointBefore(engine, logPath, beforeVersion).map(x -> x.version);

logger.info(
"{}: Took {}ms to load last checkpoint before version {}",
tablePath,
System.currentTimeMillis() - startTimeMillis,
beforeVersion);
}

long startVersion =
startCheckpointToUse.orElseGet(
@VisibleForTesting
public LogSegment getLogSegmentForVersion(
Engine engine, Optional<Long> startCheckpointVersionOpt, Optional<Long> versionToLoadOpt) {
/////////////////////////////////////////////////////////////////
// Step 2: Determine the actual version to start listing from. //
/////////////////////////////////////////////////////////////////

final long listFromStartVersion =
startCheckpointVersionOpt.orElseGet(
() -> {
logger.warn(
"{}: Starting checkpoint is missing. Listing from version as 0", tablePath);
return 0L;
});

long startTimeMillis = System.currentTimeMillis();
/////////////////////////////////////////////////////////////////
// Step 3: List the files from $startVersion to $versionToLoad //
/////////////////////////////////////////////////////////////////

final long startTimeMillis = System.currentTimeMillis();
final List<FileStatus> newFiles =
DeltaLogActionUtils.listDeltaLogFiles(
engine,
new HashSet<>(Arrays.asList(DeltaLogFileType.COMMIT, DeltaLogFileType.CHECKPOINT)),
tablePath,
startVersion,
versionToLoad,
listFromStartVersion,
versionToLoadOpt,
true /* mustBeRecreatable */);

logger.info(
"{}: Took {}ms to list the files after starting checkpoint",
tablePath,
System.currentTimeMillis() - startTimeMillis);

startTimeMillis = System.currentTimeMillis();
try {
return getLogSegmentForVersion(engine, startCheckpointToUse, versionToLoad, newFiles);
return constructLogSegmentFromFileList(startCheckpointVersionOpt, versionToLoadOpt, newFiles);
} finally {
logger.info(
"{}: Took {}ms to construct a log segment",
Expand All @@ -425,8 +400,7 @@ public Optional<LogSegment> getLogSegmentForVersion(
* Helper function for the getLogSegmentForVersion above. Called with a provided files list, and
* will then try to construct a new LogSegment using that.
*/
protected Optional<LogSegment> getLogSegmentForVersion(
Engine engine,
private LogSegment constructLogSegmentFromFileList(
Optional<Long> startCheckpointOpt,
Optional<Long> versionToLoadOpt,
List<FileStatus> newFiles) {
Expand Down Expand Up @@ -625,14 +599,56 @@ protected Optional<LogSegment> getLogSegmentForVersion(
})
.orElse(Collections.emptyList());

return Optional.of(
new LogSegment(
logPath,
newVersion,
deltasAfterCheckpoint,
newCheckpointFiles,
newCheckpointOpt.map(x -> x.version),
lastCommitTimestamp));
return new LogSegment(
logPath,
newVersion,
deltasAfterCheckpoint,
newCheckpointFiles,
newCheckpointOpt.map(x -> x.version),
lastCommitTimestamp);
}

/////////////////////////
// getLogSegment utils //
/////////////////////////

/**
* Determine the starting checkpoint version that is at or before `versionToLoadOpt`. If no
* `versionToLoadOpt` is provided, will use the checkpoint pointed to by the _last_checkpoint
* file.
*/
private Optional<Long> getStartCheckpointVersion(Engine engine, Optional<Long> versionToLoadOpt) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future we can also add nice tests for each of these individual getLogSegment util helper methods.

I invision a deep getLogSegment method (as opposed to, basically, getLogSegmentA calls getLogSegmentB which calls getLogSegmentC).

Instead, getLogSegment is 1 large method, and it has little utilities that poke and help out.

return versionToLoadOpt
.map(
versionToLoad -> {
logger.info(
"Finding last complete checkpoint at or before version {}", versionToLoad);
final long startTimeMillis = System.currentTimeMillis();
return Checkpointer.findLastCompleteCheckpointBefore(
engine, logPath, versionToLoad + 1)
.map(checkpointInstance -> checkpointInstance.version)
.map(
checkpointVersion -> {
checkArgument(
checkpointVersion <= versionToLoad,
"Last complete checkpoint version %s was not <= targetVersion %s",
checkpointVersion,
versionToLoad);

logger.info(
"{}: Took {}ms to find last complete checkpoint <= targetVersion {}",
tablePath,
System.currentTimeMillis() - startTimeMillis,
versionToLoad);

return checkpointVersion;
});
})
.orElseGet(
() -> {
logger.info("Loading last checkpoint from the _last_checkpoint file");
return new Checkpointer(logPath).readLastCheckpointFile(engine).map(x -> x.version);
});
}

/**
Expand Down
Loading
Loading