Skip to content

Commit

Permalink
Kyligence#6 Configurable record counts for block size checks
Browse files Browse the repository at this point in the history
  • Loading branch information
7mming7 committed Aug 27, 2019
1 parent dc3d6b2 commit f904561
Show file tree
Hide file tree
Showing 36 changed files with 157 additions and 67 deletions.
2 changes: 1 addition & 1 deletion parquet-arrow/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-avro/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-benchmarks/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-cascading/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-cascading3/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-cli/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-column/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ public class ParquetProperties {
public static final int DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK = 10000;
public static final int DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH = 64;
public static final int DEFAULT_PAGE_ROW_COUNT_LIMIT = 20_000;
public static final boolean DEFAULT_PAGE_ROW_COUNT_CHECK_ENABLE = false;
public static final long DEFAULT_SINGLE_ROW_SIZE_LIMIT = 512 * 1024;

public static final ValuesWriterFactory DEFAULT_VALUES_WRITER_FACTORY = new DefaultValuesWriterFactory();

Expand Down Expand Up @@ -87,10 +89,13 @@ public static WriterVersion fromString(String name) {
private final ValuesWriterFactory valuesWriterFactory;
private final int columnIndexTruncateLength;
private final int pageRowCountLimit;
private final boolean pageSizeCheckEnable;
private final long singleRowSizeLimit;

private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPageSize, boolean enableDict, int minRowCountForPageSizeCheck,
int maxRowCountForPageSizeCheck, boolean estimateNextSizeCheck, ByteBufferAllocator allocator,
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit) {
ValuesWriterFactory writerFactory, int columnIndexMinMaxTruncateLength, int pageRowCountLimit,
boolean pageSizeCheckEnable, long singleRowSizeLimit) {
this.pageSizeThreshold = pageSize;
this.initialSlabSize = CapacityByteArrayOutputStream
.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
Expand All @@ -105,6 +110,8 @@ private ParquetProperties(WriterVersion writerVersion, int pageSize, int dictPag
this.valuesWriterFactory = writerFactory;
this.columnIndexTruncateLength = columnIndexMinMaxTruncateLength;
this.pageRowCountLimit = pageRowCountLimit;
this.pageSizeCheckEnable = pageSizeCheckEnable;
this.singleRowSizeLimit = singleRowSizeLimit;
}

public ValuesWriter newRepetitionLevelWriter(ColumnDescriptor path) {
Expand Down Expand Up @@ -145,6 +152,14 @@ public int getPageSizeThreshold() {
return pageSizeThreshold;
}

public static boolean isDefaultPageRowCountCheckEnable() {
return DEFAULT_PAGE_ROW_COUNT_CHECK_ENABLE;
}

public static long getDefaultSingleRowSizeLimit() {
return DEFAULT_SINGLE_ROW_SIZE_LIMIT;
}

public int getInitialSlabSize() {
return initialSlabSize;
}
Expand Down Expand Up @@ -181,6 +196,10 @@ public int getMinRowCountForPageSizeCheck() {
return minRowCountForPageSizeCheck;
}

public boolean isPageSizeCheckEnable() {
return pageSizeCheckEnable;
}

public int getMaxRowCountForPageSizeCheck() {
return maxRowCountForPageSizeCheck;
}
Expand All @@ -201,6 +220,10 @@ public int getPageRowCountLimit() {
return pageRowCountLimit;
}

public long getSingleRowSizeLimit() {
return singleRowSizeLimit;
}

public static Builder builder() {
return new Builder();
}
Expand All @@ -221,6 +244,8 @@ public static class Builder {
private ValuesWriterFactory valuesWriterFactory = DEFAULT_VALUES_WRITER_FACTORY;
private int columnIndexTruncateLength = DEFAULT_COLUMN_INDEX_TRUNCATE_LENGTH;
private int pageRowCountLimit = DEFAULT_PAGE_ROW_COUNT_LIMIT;
private boolean pageSizeCheckEnable = DEFAULT_PAGE_ROW_COUNT_CHECK_ENABLE;
private long singleRowSizeLimit = DEFAULT_SINGLE_ROW_SIZE_LIMIT;

private Builder() {
}
Expand All @@ -236,6 +261,7 @@ private Builder(ParquetProperties toCopy) {
this.valuesWriterFactory = toCopy.valuesWriterFactory;
this.allocator = toCopy.allocator;
this.pageRowCountLimit = toCopy.pageRowCountLimit;
this.singleRowSizeLimit = toCopy.singleRowSizeLimit;
}

/**
Expand Down Expand Up @@ -330,11 +356,22 @@ public Builder withPageRowCountLimit(int rowCount) {
return this;
}

public Builder withPageRowCheckEnable(boolean enable) {
pageSizeCheckEnable = enable;
return this;
}

public Builder withSingleRowSizeLimit(long size) {
singleRowSizeLimit = size;
return this;
}

public ParquetProperties build() {
ParquetProperties properties =
new ParquetProperties(writerVersion, pageSize, dictPageSize,
enableDict, minRowCountForPageSizeCheck, maxRowCountForPageSizeCheck,
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength, pageRowCountLimit);
estimateNextSizeCheck, allocator, valuesWriterFactory, columnIndexTruncateLength,
pageRowCountLimit, pageSizeCheckEnable, singleRowSizeLimit);
// we pass a constructed but uninitialized factory to ParquetProperties above as currently
// creation of ValuesWriters is invoked from within ParquetProperties. In the future
// we'd like to decouple that and won't need to pass an object to properties and then pass the
Expand Down
2 changes: 1 addition & 1 deletion parquet-common/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-encoding/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-format-structures/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<artifactId>parquet-format-structures</artifactId>
Expand Down
2 changes: 1 addition & 1 deletion parquet-generator/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-hadoop-bundle/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
2 changes: 1 addition & 1 deletion parquet-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
<groupId>org.apache.parquet</groupId>
<artifactId>parquet</artifactId>
<relativePath>../pom.xml</relativePath>
<version>1.12.0-kylin-r2</version>
<version>1.12.0-kylin-r3</version>
</parent>

<modelVersion>4.0.0</modelVersion>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,14 +18,6 @@
*/
package org.apache.parquet.hadoop;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.parquet.Preconditions.checkNotNull;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import org.apache.parquet.column.ColumnWriteStore;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.CodecFactory.BytesCompressor;
Expand All @@ -39,6 +31,14 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

import static java.lang.Math.max;
import static java.lang.Math.min;
import static org.apache.parquet.Preconditions.checkNotNull;

class InternalParquetRecordWriter<T> {
private static final Logger LOG = LoggerFactory.getLogger(InternalParquetRecordWriter.class);

Expand Down Expand Up @@ -66,23 +66,25 @@ class InternalParquetRecordWriter<T> {
private ColumnChunkPageWriteStore pageStore;
private RecordConsumer recordConsumer;

private boolean pageSizeCheckEnable;

/**
* @param parquetFileWriter the file to write to
* @param writeSupport the class to convert incoming records
* @param schema the schema of the records
* @param extraMetaData extra meta data to write in the footer of the file
* @param rowGroupSize the size of a block in the file (this will be approximate)
* @param compressor the codec used to compress
* @param writeSupport the class to convert incoming records
* @param schema the schema of the records
* @param extraMetaData extra meta data to write in the footer of the file
* @param rowGroupSize the size of a block in the file (this will be approximate)
* @param compressor the codec used to compress
*/
public InternalParquetRecordWriter(
ParquetFileWriter parquetFileWriter,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long rowGroupSize,
BytesCompressor compressor,
boolean validating,
ParquetProperties props) {
ParquetFileWriter parquetFileWriter,
WriteSupport<T> writeSupport,
MessageType schema,
Map<String, String> extraMetaData,
long rowGroupSize,
BytesCompressor compressor,
boolean validating,
ParquetProperties props) {
this.parquetFileWriter = parquetFileWriter;
this.writeSupport = checkNotNull(writeSupport, "writeSupport");
this.schema = schema;
Expand All @@ -92,6 +94,7 @@ public InternalParquetRecordWriter(
this.nextRowGroupSize = rowGroupSizeThreshold;
this.compressor = compressor;
this.validating = validating;
this.pageSizeCheckEnable = props.isPageSizeCheckEnable();
this.props = props;
initStore();
}
Expand All @@ -102,7 +105,7 @@ public ParquetMetadata getFooter() {

private void initStore() {
pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(),
props.getColumnIndexTruncateLength());
props.getColumnIndexTruncateLength());
columnStore = props.newColumnWriteStore(schema, pageStore);
MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema);
this.recordConsumer = columnIO.getRecordWriter(columnStore);
Expand All @@ -125,9 +128,14 @@ public void close() throws IOException, InterruptedException {
}

public void write(T value) throws IOException, InterruptedException {
writeSupport.write(value);
++ recordCount;
checkBlockSizeReached();
Long singleRowLimit = props.getSingleRowSizeLimit();
Long recordSize = writeSupport.writeAndFetchRecordSize(value);
++recordCount;
if (singleRowLimit <= recordSize) {
checkBlockSizeReachedManual();
} else {
checkBlockSizeReached();
}
}

/**
Expand All @@ -144,25 +152,38 @@ private void checkBlockSizeReached() throws IOException {
// flush the row group if it is within ~2 records of the limit
// it is much better to be slightly under size than to be over at all
if (memSize > (nextRowGroupSize - 2 * recordSize)) {
LOG.debug("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount);
LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount);
flushRowGroupToStore();
initStore();
recordCountForNextMemCheck = min(max(MINIMUM_RECORD_COUNT_FOR_CHECK, recordCount / 2), MAXIMUM_RECORD_COUNT_FOR_CHECK);
this.lastRowGroupEndPos = parquetFileWriter.getPos();
} else {
recordCountForNextMemCheck = min(
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck);
max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway
recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead
);
LOG.info("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck);
}
}
}

private void checkBlockSizeReachedManual() throws IOException {
long memSize = columnStore.getBufferedSize();
long recordSize = memSize / recordCount;
if (memSize > (nextRowGroupSize - 2 * recordSize)) {
LOG.info("mem size {} > {}: flushing {} records to disk.", memSize, nextRowGroupSize, recordCount);
flushRowGroupToStore();
initStore();
this.lastRowGroupEndPos = parquetFileWriter.getPos();
} else {
LOG.info("Checked mem at {}", recordCount);
}
}

private void flushRowGroupToStore()
throws IOException {
throws IOException {
recordConsumer.flush();
LOG.debug("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize());
LOG.info("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize());
if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) {
LOG.warn("Too much memory used: {}", columnStore.memUsageString());
}
Expand All @@ -174,8 +195,8 @@ private void flushRowGroupToStore()
recordCount = 0;
parquetFileWriter.endBlock();
this.nextRowGroupSize = Math.min(
parquetFileWriter.getNextRowGroupSize(),
rowGroupSizeThreshold);
parquetFileWriter.getNextRowGroupSize(),
rowGroupSizeThreshold);
}

columnStore = null;
Expand Down
Loading

0 comments on commit f904561

Please sign in to comment.