Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Poc executor service #156

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
## Unreleased

* Update parquet prefetching logic to track adjacent columns
* Change default of sequential prefetch base to 2.0
* Added CHANGELOG

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package software.amazon.s3.dataaccelerator;

import java.io.IOException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.Getter;
import lombok.NonNull;
import software.amazon.s3.dataaccelerator.common.telemetry.Telemetry;
Expand Down Expand Up @@ -51,6 +54,8 @@ public class S3SeekableInputStreamFactory implements AutoCloseable {
private final Telemetry telemetry;
private final ObjectFormatSelector objectFormatSelector;

private final ExecutorService executorService = Executors.newFixedThreadPool(400);

/**
* Creates a new instance of {@link S3SeekableInputStreamFactory}. This factory should be used to
* create instances of the input stream to allow for sharing resources such as the object client
Expand All @@ -62,6 +67,7 @@ public class S3SeekableInputStreamFactory implements AutoCloseable {
public S3SeekableInputStreamFactory(
@NonNull ObjectClient objectClient,
@NonNull S3SeekableInputStreamConfiguration configuration) {

this.objectClient = objectClient;
this.configuration = configuration;
this.telemetry = Telemetry.createTelemetry(configuration.getTelemetryConfiguration());
Expand Down Expand Up @@ -96,7 +102,7 @@ LogicalIO createLogicalIO(S3URI s3URI) {
new PhysicalIOImpl(s3URI, objectMetadataStore, objectBlobStore, telemetry),
telemetry,
configuration.getLogicalIOConfiguration(),
parquetColumnPrefetchStore);
parquetColumnPrefetchStore, executorService);

default:
return new DefaultLogicalIOImpl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ public class LogicalIOConfiguration {
private static final int DEFAULT_PARQUET_METADATA_STORE_SIZE = 45;
private static final int DEFAULT_MAX_COLUMN_ACCESS_STORE_SIZE = 15;
private static final String DEFAULT_PARQUET_FORMAT_SELECTOR_REGEX = "^.*.(parquet|par)$";
private static final PrefetchMode DEFAULT_PREFETCHING_MODE = PrefetchMode.ROW_GROUP;
private static final PrefetchMode DEFAULT_PREFETCHING_MODE = PrefetchMode.ALL;

@Builder.Default private boolean footerCachingEnabled = DEFAULT_FOOTER_CACHING_ENABLED;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.s3.dataaccelerator.io.logical.impl;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

import lombok.NonNull;
import software.amazon.s3.dataaccelerator.common.telemetry.Telemetry;
import software.amazon.s3.dataaccelerator.io.logical.LogicalIOConfiguration;
Expand Down Expand Up @@ -44,13 +46,14 @@ public ParquetLogicalIOImpl(
@NonNull PhysicalIO physicalIO,
@NonNull Telemetry telemetry,
@NonNull LogicalIOConfiguration logicalIOConfiguration,
@NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore) {
@NonNull ParquetColumnPrefetchStore parquetColumnPrefetchStore,
ExecutorService executorService) {
super(physicalIO);

// Initialise prefetcher and start prefetching
this.parquetPrefetcher =
new ParquetPrefetcher(
s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore);
s3Uri, physicalIO, telemetry, logicalIOConfiguration, parquetColumnPrefetchStore, executorService);
this.parquetPrefetcher.prefetchFooterAndBuildMetadata();
}

Expand All @@ -68,7 +71,7 @@ public ParquetLogicalIOImpl(
public int read(byte[] buf, int off, int len, long position) throws IOException {
// Perform async prefetching before doing the blocking read
this.parquetPrefetcher.prefetchRemainingColumnChunk(position, len);
this.parquetPrefetcher.addToRecentColumnList(position);
this.parquetPrefetcher.addToRecentColumnList(position, len);

return super.read(buf, off, len, position);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
package software.amazon.s3.dataaccelerator.io.logical.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

import lombok.AccessLevel;
import lombok.AllArgsConstructor;
import lombok.NonNull;
Expand Down Expand Up @@ -52,11 +55,15 @@ public class ParquetPrefetcher {
@NonNull private final ParquetPrefetchRemainingColumnTask parquetPrefetchRemainingColumnTask;
@NonNull private final ParquetPredictivePrefetchingTask parquetPredictivePrefetchingTask;

@NonNull private ExecutorService executorService;

private static final String OPERATION_PARQUET_PREFETCH_COLUMN_CHUNK =
"parquet.prefetcher.prefetch.column.chunk.async";
private static final String OPERATION_PARQUET_PREFETCH_FOOTER_AND_METADATA =
"parquet.prefetcher.prefetch.footer.and.metadata.async";



/**
* Constructs a ParquetPrefetcher.
*
Expand All @@ -72,7 +79,8 @@ public ParquetPrefetcher(
PhysicalIO physicalIO,
Telemetry telemetry,
LogicalIOConfiguration logicalIOConfiguration,
ParquetColumnPrefetchStore parquetColumnPrefetchStore) {
ParquetColumnPrefetchStore parquetColumnPrefetchStore,
ExecutorService executorService) {
this(
s3Uri,
logicalIOConfiguration,
Expand All @@ -84,7 +92,8 @@ public ParquetPrefetcher(
new ParquetPrefetchRemainingColumnTask(
s3Uri, telemetry, physicalIO, parquetColumnPrefetchStore),
new ParquetPredictivePrefetchingTask(
s3Uri, telemetry, logicalIOConfiguration, physicalIO, parquetColumnPrefetchStore));
s3Uri, telemetry, logicalIOConfiguration, physicalIO, parquetColumnPrefetchStore),
executorService);
}

/**
Expand Down Expand Up @@ -156,8 +165,8 @@ private CompletableFuture<IOPlanExecution> prefetchFooterAndBuildMetadataImpl()
if (shouldPrefetch()) {
// TODO: https://github.com/awslabs/s3-connector-framework/issues/88
CompletableFuture<ColumnMappers> columnMappersCompletableFuture =
CompletableFuture.supplyAsync(parquetReadTailTask::readFileTail)
.thenApply(parquetMetadataParsingTask::storeColumnMappers);
CompletableFuture.supplyAsync(parquetReadTailTask::readFileTail, executorService)
.thenApplyAsync(parquetMetadataParsingTask::storeColumnMappers, executorService);

return prefetchPredictedColumns(columnMappersCompletableFuture);
}
Expand All @@ -170,10 +179,10 @@ private CompletableFuture<IOPlanExecution> prefetchPredictedColumns(
CompletableFuture<ColumnMappers> columnMappersCompletableFuture) {

if (logicalIOConfiguration.getPrefetchingMode() == PrefetchMode.ALL) {
return columnMappersCompletableFuture.thenApply(
return columnMappersCompletableFuture.thenApplyAsync(
(ColumnMappers columnMappers) ->
parquetPredictivePrefetchingTask.prefetchRecentColumns(
columnMappers, ParquetUtils.constructRowGroupsToPrefetch()));
columnMappers, ParquetUtils.constructRowGroupsToPrefetch()), executorService);
}

return CompletableFuture.completedFuture(
Expand All @@ -184,10 +193,11 @@ private CompletableFuture<IOPlanExecution> prefetchPredictedColumns(
* Record this position in the recent column list
*
* @param position the position to record
* @param len The length of the current read
*/
public void addToRecentColumnList(long position) {
public void addToRecentColumnList(long position, int len) {
if (logicalIOConfiguration.getPrefetchingMode() != PrefetchMode.OFF) {
this.parquetPredictivePrefetchingTask.addToRecentColumnList(position);
this.parquetPredictivePrefetchingTask.addToRecentColumnList(position, len);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,13 @@
*/
package software.amazon.s3.dataaccelerator.io.logical.parquet;

import static software.amazon.s3.dataaccelerator.util.Constants.DEFAULT_MIN_ADJACENT_COLUMN_LENGTH;

import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletionException;
import lombok.NonNull;
Expand Down Expand Up @@ -110,24 +111,38 @@ public ParquetPredictivePrefetchingTask(
* columns list.
*
* @param position current read position
* @param len the length of the current read
* @return name of column added as recent column
*/
public Optional<String> addToRecentColumnList(long position) {
public List<ColumnMetadata> addToRecentColumnList(long position, int len) {
if (parquetColumnPrefetchStore.getColumnMappers(s3Uri) != null) {
ColumnMappers columnMappers = parquetColumnPrefetchStore.getColumnMappers(s3Uri);
if (columnMappers.getOffsetIndexToColumnMap().containsKey(position)) {
ColumnMetadata columnMetadata = columnMappers.getOffsetIndexToColumnMap().get(position);
parquetColumnPrefetchStore.addRecentColumn(columnMetadata);

// Maybe prefetch all recent columns for the current row group, if they have not been
// prefetched already.
prefetchCurrentRowGroup(columnMappers, columnMetadata);

return Optional.of(columnMetadata.getColumnName());
List<ColumnMetadata> addedColumns =
addAdjacentColumnsInLength(columnMetadata, columnMappers, position, len);
addedColumns.add(columnMetadata);

return addedColumns;
} else if (len > DEFAULT_MIN_ADJACENT_COLUMN_LENGTH) {
// If the read does not align to a column boundary, then check if it lies within the
// boundary of a column, this can happen when reading adjacent columns, where reads don't
// always start at the file_offset. The check for len >
// logicalIOConfiguration.getMinAdjacentColumnLength()
// is required to prevent doing this multiple times. This is especially important as when
// reading dictionaries/columnIndexes,
// parquet-mr issues thousands of 1 byte read(0, pos, 1), and so without this we will end up
// in this else clause more times than intended!
return addCurrentColumnAtPosition(position);
}
}

return Optional.empty();
return Collections.emptyList();
}

/**
Expand Down Expand Up @@ -202,6 +217,106 @@ public IOPlanExecution prefetchRecentColumns(
});
}

/**
* When reading adjacent columns in a schema, reads may not fully align to the parquet schema. If
* the schema is like:
*
* <p>ColumnChunk name = ss_a file_offset =0 total_compressed_size = 7MB ColumnChunk name = ss_b
* file_offset = 7.5MB total_compressed_size = 2MB ColumnChunk name = ss_c file_offset = 9.5MB
* total_compressed_size = 5MB
*
* <p>And we want to read columns ss_a, ss_b, ss_c, then the reads from the parquet reader can
* look like:
*
* <p>read(0, 8MB) // Read at pos 0, with len 8MB, for ss_a and a part of ss_b read(8MB, 5MB) //
* Read the remainder of ss_a, ss_c
*
* <p>Since the reads do not align to column boundaries, that is, they do not start at the file
* offset of the column, to track columns for prefetching additional logic is required. Here, we
* loop through the column file offsets and find the column that this read belongs to. For
* example, for the read(8MB, 5MB) means we are reading column ss_b, since the position 8MB lies
* within the boundary of ss_b as 8MB > file offset of ss_b > and 8MB < fil_offset of ss_c.
*
* @param position The current position in the read
* @return Optional<ColumnMetadata> The column added to the recently read list
*/
private List<ColumnMetadata> addCurrentColumnAtPosition(long position) {
ArrayList<Long> columnPositions =
new ArrayList<>(
parquetColumnPrefetchStore
.getColumnMappers(s3Uri)
.getOffsetIndexToColumnMap()
.keySet());
Collections.sort(columnPositions);

// For the last index, also add its end position so we can track reads that lie within
// that column boundary
long lastColumnStartPos = columnPositions.get(columnPositions.size() - 1);
ColumnMetadata lastColumnMetadata =
parquetColumnPrefetchStore
.getColumnMappers(s3Uri)
.getOffsetIndexToColumnMap()
.get(lastColumnStartPos);
columnPositions.add(lastColumnStartPos + lastColumnMetadata.getCompressedSize());

for (int i = 0; i < columnPositions.size() - 1; i++) {
if (position > columnPositions.get(i) && position < columnPositions.get(i + 1)) {
ColumnMetadata currentColumnMetadata =
parquetColumnPrefetchStore
.getColumnMappers(s3Uri)
.getOffsetIndexToColumnMap()
.get(columnPositions.get(i));
parquetColumnPrefetchStore.addRecentColumn(currentColumnMetadata);
List<ColumnMetadata> addedColumns = new ArrayList<>();
addedColumns.add(currentColumnMetadata);
return addedColumns;
}
}

return Collections.emptyList();
}

/**
* Adds adjacent columns to recently read list. For more details on why this required, see
* documentation for addCurrentColumnAtPosition(). Here, we track all columns that are contained
* in the read. For example, the read(0, 8MB) contains both ss_a and ss_b, so add both to the
* recently tracked list. logicalIOConfiguration.getMinAdjacentColumnLength() controls the minimum
* length for which this is done, so in case the schema has many small columns, this value can be
* adjusted to prevent many columns being added to the tracked list.
*
* @param columnMetadata Column metadata of the current column being read
* @param columnMappers Parquet file column mappings
* @param position The current position in the stream
* @param len The length of the current read
* @return List<ColumnMetadata> List of column metadata read
*/
private List<ColumnMetadata> addAdjacentColumnsInLength(
ColumnMetadata columnMetadata, ColumnMappers columnMappers, long position, int len) {
List<ColumnMetadata> addedColumns = new ArrayList<>();

if (len > columnMetadata.getCompressedSize() && len > DEFAULT_MIN_ADJACENT_COLUMN_LENGTH) {

long remainingLen = len - columnMetadata.getCompressedSize();
long currentPos = position + columnMetadata.getCompressedSize();

while (remainingLen > 0) {
ColumnMetadata currentColumnMetadata =
columnMappers.getOffsetIndexToColumnMap().get(currentPos);

if (currentColumnMetadata == null || columnMetadata.getCompressedSize() == 0) {
break;
}

parquetColumnPrefetchStore.addRecentColumn(currentColumnMetadata);
remainingLen = remainingLen - currentColumnMetadata.getCompressedSize();
currentPos = currentPos + currentColumnMetadata.getCompressedSize();
addedColumns.add(currentColumnMetadata);
}
}

return addedColumns;
}

private Set<String> getRecentColumns(HashMap<Long, ColumnMetadata> offsetIndexToColumnMap) {
if (!offsetIndexToColumnMap.isEmpty()) {
Map.Entry<Long, ColumnMetadata> firstColumnData =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ private Constants() {}
public static final int ONE_MB = 1024 * 1024;
public static final int PARQUET_MAGIC_STR_LENGTH = 4;
public static final int PARQUET_FOOTER_LENGTH_SIZE = 4;
public static final long DEFAULT_MIN_ADJACENT_COLUMN_LENGTH = 500 * ONE_KB;
}
Original file line number Diff line number Diff line change
Expand Up @@ -428,10 +428,10 @@ public void testAddToRecentColumnListProxiesCallsToDependency() {
parquetPredictivePrefetchingTask);

// When: a column is added to recent list
parquetPrefetcher.addToRecentColumnList(100);
parquetPrefetcher.addToRecentColumnList(100, 0);

// Then: it is also added within the task
verify(parquetPredictivePrefetchingTask, times(1)).addToRecentColumnList(100);
verify(parquetPredictivePrefetchingTask, times(1)).addToRecentColumnList(100, 0);
}

private ParquetReadTailTask getTestParquetReadTailTask() {
Expand Down
Loading