From 1a7b746a0dede8992d9f64c45333ed4ba9280e22 Mon Sep 17 00:00:00 2001 From: "mingming.ge@kyligence.io" Date: Tue, 20 Aug 2019 18:16:36 +0800 Subject: [PATCH] #6 add single record size check --- parquet-arrow/pom.xml | 2 +- parquet-avro/pom.xml | 2 +- parquet-benchmarks/pom.xml | 2 +- parquet-cascading/pom.xml | 2 +- parquet-cascading3/pom.xml | 2 +- parquet-cli/pom.xml | 2 +- parquet-column/pom.xml | 2 +- parquet-common/pom.xml | 2 +- parquet-encoding/pom.xml | 2 +- parquet-format-structures/pom.xml | 2 +- parquet-generator/pom.xml | 2 +- parquet-hadoop-bundle/pom.xml | 2 +- parquet-hadoop/pom.xml | 2 +- .../hadoop/InternalParquetRecordWriter.java | 33 ++++++++++--------- .../parquet/hadoop/api/WriteSupport.java | 2 ++ parquet-hive-bundle/pom.xml | 2 +- .../parquet-hive-0.10-binding/pom.xml | 2 +- .../parquet-hive-0.12-binding/pom.xml | 2 +- .../parquet-hive-binding-bundle/pom.xml | 2 +- .../parquet-hive-binding-factory/pom.xml | 2 +- .../parquet-hive-binding-interface/pom.xml | 2 +- parquet-hive/parquet-hive-binding/pom.xml | 2 +- .../parquet-hive-storage-handler/pom.xml | 2 +- parquet-hive/pom.xml | 2 +- parquet-jackson/pom.xml | 2 +- parquet-pig-bundle/pom.xml | 2 +- parquet-pig/pom.xml | 2 +- parquet-protobuf/pom.xml | 2 +- parquet-scala/pom.xml | 2 +- parquet-scrooge/pom.xml | 2 +- parquet-thrift/pom.xml | 2 +- parquet-tools/pom.xml | 2 +- pom.xml | 2 +- 33 files changed, 50 insertions(+), 47 deletions(-) diff --git a/parquet-arrow/pom.xml b/parquet-arrow/pom.xml index 09867eed8e..d34c4fc587 100644 --- a/parquet-arrow/pom.xml +++ b/parquet-arrow/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-avro/pom.xml b/parquet-avro/pom.xml index c18abb8453..752081892d 100644 --- a/parquet-avro/pom.xml +++ b/parquet-avro/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-benchmarks/pom.xml b/parquet-benchmarks/pom.xml index f06e5d8229..de03614611 100644 --- a/parquet-benchmarks/pom.xml +++ b/parquet-benchmarks/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-cascading/pom.xml b/parquet-cascading/pom.xml index 17c3ac9cea..47536c27bb 100644 --- a/parquet-cascading/pom.xml +++ b/parquet-cascading/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-cascading3/pom.xml b/parquet-cascading3/pom.xml index bddb478268..b76d9499aa 100644 --- a/parquet-cascading3/pom.xml +++ b/parquet-cascading3/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-cli/pom.xml b/parquet-cli/pom.xml index adf676c2a5..8d95abafe3 100644 --- a/parquet-cli/pom.xml +++ b/parquet-cli/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-column/pom.xml b/parquet-column/pom.xml index f1326902cc..bf3c3fcc69 100644 --- a/parquet-column/pom.xml +++ b/parquet-column/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-common/pom.xml b/parquet-common/pom.xml index af5b5263bd..6a9b08f0f5 100644 --- a/parquet-common/pom.xml +++ b/parquet-common/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-encoding/pom.xml b/parquet-encoding/pom.xml index 7b50bca485..1e4ae29db8 100644 --- a/parquet-encoding/pom.xml +++ b/parquet-encoding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-format-structures/pom.xml b/parquet-format-structures/pom.xml index 4f951dbcf4..562cd68dc9 100644 --- a/parquet-format-structures/pom.xml +++ b/parquet-format-structures/pom.xml @@ -24,7 +24,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 parquet-format-structures diff --git a/parquet-generator/pom.xml b/parquet-generator/pom.xml index bf010e0e97..6eafbc0f99 100644 --- a/parquet-generator/pom.xml +++ b/parquet-generator/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hadoop-bundle/pom.xml b/parquet-hadoop-bundle/pom.xml index 3b231a8f89..8315e5f781 100644 --- a/parquet-hadoop-bundle/pom.xml +++ b/parquet-hadoop-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml index eec80f1a8a..2cf44268c4 100644 --- a/parquet-hadoop/pom.xml +++ b/parquet-hadoop/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java index d8af379d13..50ba6092bd 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/InternalParquetRecordWriter.java @@ -75,14 +75,14 @@ class InternalParquetRecordWriter { * @param compressor the codec used to compress */ public InternalParquetRecordWriter( - ParquetFileWriter parquetFileWriter, - WriteSupport writeSupport, - MessageType schema, - Map extraMetaData, - long rowGroupSize, - BytesCompressor compressor, - boolean validating, - ParquetProperties props) { + ParquetFileWriter parquetFileWriter, + WriteSupport writeSupport, + MessageType schema, + Map extraMetaData, + long rowGroupSize, + BytesCompressor compressor, + boolean validating, + ParquetProperties props) { this.parquetFileWriter = parquetFileWriter; this.writeSupport = checkNotNull(writeSupport, "writeSupport"); this.schema = schema; @@ -102,7 +102,7 @@ public ParquetMetadata getFooter() { private void initStore() { pageStore = new ColumnChunkPageWriteStore(compressor, schema, props.getAllocator(), - props.getColumnIndexTruncateLength()); + props.getColumnIndexTruncateLength()); columnStore = props.newColumnWriteStore(schema, pageStore); MessageColumnIO columnIO = new ColumnIOFactory(validating).getColumnIO(schema); this.recordConsumer = columnIO.getRecordWriter(columnStore); @@ -138,7 +138,8 @@ public long getDataSize() { } private void checkBlockSizeReached() throws IOException { - if (recordCount >= recordCountForNextMemCheck) { // checking the memory size is relatively expensive, so let's not do it for every record. + if (recordCount >= recordCountForNextMemCheck || writeSupport.needCheckRowSize) { // checking the memory size is relatively expensive, so let's not do it for every record. + writeSupport.needCheckRowSize = false; long memSize = columnStore.getBufferedSize(); long recordSize = memSize / recordCount; // flush the row group if it is within ~2 records of the limit @@ -151,16 +152,16 @@ private void checkBlockSizeReached() throws IOException { this.lastRowGroupEndPos = parquetFileWriter.getPos(); } else { recordCountForNextMemCheck = min( - max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway - recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead - ); + max(MINIMUM_RECORD_COUNT_FOR_CHECK, (recordCount + (long)(nextRowGroupSize / ((float)recordSize))) / 2), // will check halfway + recordCount + MAXIMUM_RECORD_COUNT_FOR_CHECK // will not look more than max records ahead + ); LOG.debug("Checked mem at {} will check again at: {}", recordCount, recordCountForNextMemCheck); } } } private void flushRowGroupToStore() - throws IOException { + throws IOException { recordConsumer.flush(); LOG.debug("Flushing mem columnStore to file. allocated memory: {}", columnStore.getAllocatedSize()); if (columnStore.getAllocatedSize() > (3 * rowGroupSizeThreshold)) { @@ -174,8 +175,8 @@ private void flushRowGroupToStore() recordCount = 0; parquetFileWriter.endBlock(); this.nextRowGroupSize = Math.min( - parquetFileWriter.getNextRowGroupSize(), - rowGroupSizeThreshold); + parquetFileWriter.getNextRowGroupSize(), + rowGroupSizeThreshold); } columnStore = null; diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java index c08882f8ca..4db63b1d88 100644 --- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java +++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/api/WriteSupport.java @@ -37,6 +37,8 @@ */ abstract public class WriteSupport { + public boolean needCheckRowSize = false; + /** * information to be persisted in the file */ diff --git a/parquet-hive-bundle/pom.xml b/parquet-hive-bundle/pom.xml index d11863567e..c9ccdfdfa8 100644 --- a/parquet-hive-bundle/pom.xml +++ b/parquet-hive-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml index bb20ea55b4..302b6d5227 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet-hive-binding ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml index 93bb709ff8..2c958be476 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet-hive-binding ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml index 693481f364..784cd127c3 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-bundle/pom.xml @@ -23,7 +23,7 @@ org.apache.parquet parquet-hive-binding ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml index 99d7cb0771..60b78ebc76 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet-hive-binding ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml index 35725e2f92..1b02e635a3 100644 --- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml +++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet-hive-binding ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-binding/pom.xml b/parquet-hive/parquet-hive-binding/pom.xml index c90d1098b1..eb04d1b1a6 100644 --- a/parquet-hive/parquet-hive-binding/pom.xml +++ b/parquet-hive/parquet-hive-binding/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet-hive ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/parquet-hive-storage-handler/pom.xml b/parquet-hive/parquet-hive-storage-handler/pom.xml index 66b77e1b22..254cb5daf0 100644 --- a/parquet-hive/parquet-hive-storage-handler/pom.xml +++ b/parquet-hive/parquet-hive-storage-handler/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet-hive ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-hive/pom.xml b/parquet-hive/pom.xml index 6f6d381a44..f7ffbeaed8 100644 --- a/parquet-hive/pom.xml +++ b/parquet-hive/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-jackson/pom.xml b/parquet-jackson/pom.xml index 44ede63723..b02bdaf10a 100644 --- a/parquet-jackson/pom.xml +++ b/parquet-jackson/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-pig-bundle/pom.xml b/parquet-pig-bundle/pom.xml index 48bffe8ca2..55db953fa7 100644 --- a/parquet-pig-bundle/pom.xml +++ b/parquet-pig-bundle/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml index 897527ed83..f6859994d7 100644 --- a/parquet-pig/pom.xml +++ b/parquet-pig/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-protobuf/pom.xml b/parquet-protobuf/pom.xml index 82a2f0fc70..4d4b26dd7b 100644 --- a/parquet-protobuf/pom.xml +++ b/parquet-protobuf/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-scala/pom.xml b/parquet-scala/pom.xml index 344bf475a2..418e19ee2b 100644 --- a/parquet-scala/pom.xml +++ b/parquet-scala/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-scrooge/pom.xml b/parquet-scrooge/pom.xml index 887be5c424..66665775e9 100644 --- a/parquet-scrooge/pom.xml +++ b/parquet-scrooge/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml index ae78bedba1..85b036c968 100644 --- a/parquet-thrift/pom.xml +++ b/parquet-thrift/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/parquet-tools/pom.xml b/parquet-tools/pom.xml index 542c4aaa37..2762beb235 100644 --- a/parquet-tools/pom.xml +++ b/parquet-tools/pom.xml @@ -21,7 +21,7 @@ org.apache.parquet parquet ../pom.xml - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 4.0.0 diff --git a/pom.xml b/pom.xml index 24ed4b5a0c..7c061dcb2f 100644 --- a/pom.xml +++ b/pom.xml @@ -9,7 +9,7 @@ org.apache.parquet parquet - 1.12.0-kylin-r2 + 1.12.0-kylin-r3 pom Apache Parquet MR