diff --git a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java index d5f0e817700..cc0e1d4e7d5 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/writer/PartitionedDataWriter.java @@ -87,7 +87,7 @@ public class PartitionedDataWriter extends WriterWrapper implements Fin public static final Long DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS = Long.MAX_VALUE; public static final String PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = "partitionedDataWriter.write.timeout.seconds"; public static final Long DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = Long.MAX_VALUE; - + public static final String CURRENT_PARTITIONED_WRITERS_COUNTER = "partitionedDataWriter.counter"; private static final GenericRecord NON_PARTITIONED_WRITER_KEY = new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord()); @@ -176,6 +176,7 @@ public DataWriter get() { log.info(String.format("Adding one more writer to loading cache of existing writer " + "with size = %d", partitionWriters.size())); Future> future = createWriterPool.submit(() -> createPartitionWriter(key)); + state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1); return future.get(writeTimeoutInterval, TimeUnit.SECONDS); } catch (ExecutionException | InterruptedException e) { throw new RuntimeException("Error creating writer", e); diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java index 24e203b64dd..27068258b59 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java @@ -46,6 +46,7 @@ import com.google.common.annotations.VisibleForTesting; +import lombok.Getter; import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.State; @@ -54,6 +55,8 @@ /** * The converter for buffering rows and forming columnar batch. + * Additionally, records the estimated size of the data converted in bytes + * TODO: consider using the record size provided by the extractor instead of the converter as it may be more available and accurate */ @Slf4j public class GenericRecordToOrcValueWriter implements OrcValueWriter { @@ -69,6 +72,10 @@ public class GenericRecordToOrcValueWriter implements OrcValueWriter cv.child.isNull.length) { @@ -306,18 +348,21 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) { log.info("Column vector: {}, resizing to: {}, child count: {}", cv.child, resizedLength, cv.childCount); cv.child.ensureSize(resizedLength, true); } - + // Add the size of the empty space of the list + long estimatedBytes = 0; // Add each element for (int e = 0; e < cv.lengths[rowId]; ++e) { int offset = (int) (e + cv.offsets[rowId]); if (value.get(e) == null) { cv.child.noNulls = false; cv.child.isNull[offset] = true; + estimatedBytes += 1; } else { cv.child.isNull[offset] = false; - children.addValue(offset, e, value.get(e), cv.child); + estimatedBytes += children.addValue(offset, e, value.get(e), cv.child); } } + return estimatedBytes; } } @@ -333,7 +378,7 @@ class MapConverter implements Converter { rowsAdded = 0; } - public void addValue(int rowId, int column, Object data, ColumnVector output) { + public long addValue(int rowId, int column, Object data, ColumnVector output) { rowsAdded += 1; Map map = (Map) data; Set> entries = map.entrySet(); @@ -353,24 +398,28 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) { } // Add each element int e = 0; + long estimatedBytes = 0; for (Map.Entry entry : entries) { int offset = (int) (e + cv.offsets[rowId]); if (entry.getKey() == null) { cv.keys.noNulls = false; cv.keys.isNull[offset] = true; + estimatedBytes += 1; } else { cv.keys.isNull[offset] = false; - keyConverter.addValue(offset, e, entry.getKey(), cv.keys); + estimatedBytes += keyConverter.addValue(offset, e, entry.getKey(), cv.keys); } if (entry.getValue() == null) { cv.values.noNulls = false; cv.values.isNull[offset] = true; + estimatedBytes += 1; } else { cv.values.isNull[offset] = false; - valueConverter.addValue(offset, e, entry.getValue(), cv.values); + estimatedBytes += valueConverter.addValue(offset, e, entry.getValue(), cv.values); } e++; } + return estimatedBytes; } } diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index a7de6a56885..6e7a00bed7e 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -18,7 +18,11 @@ package org.apache.gobblin.writer; import java.io.IOException; +import java.util.Arrays; +import java.util.LinkedList; import java.util.Properties; +import java.util.Queue; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDeException; @@ -44,21 +48,57 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { public static final String ORC_WRITER_PREFIX = "orcWriter."; public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize"; public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000; + public static final String ORC_WRITER_AUTO_SELFTUNE_ENABLED = ORC_WRITER_PREFIX + "auto.selfTune.enabled"; + public static final String ORC_WRITER_ESTIMATED_RECORD_SIZE = ORC_WRITER_PREFIX + "estimated.recordSize"; + public static final String ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = ORC_WRITER_PREFIX + "auto.selfTune.rowsBetweenCheck"; + public static final String ORCWRITER_ROWBATCH_MEMORY_USAGE_FACTOR = ORC_WRITER_PREFIX + "auto.selfTune.memory.usage.factor"; + public static final int DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK = 500; + public static final String ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY = ORC_WRITER_PREFIX + "estimated.bytes.allocated.converter.memory"; + public static final String ORC_WRITER_CONCURRENT_TASKS = ORC_WRITER_PREFIX + "auto.selfTune.concurrent.tasks"; + + // This value gives an estimation on how many writers are buffering records at the same time in a container. + // Since time-based partition scheme is a commonly used practice, plus the chances for late-arrival data, + // usually there would be 2-3 writers running during the hourly boundary. 3 is chosen here for being conservative. + private static final int CONCURRENT_WRITERS_DEFAULT = 3; + public static final double DEFAULT_ORCWRITER_BATCHSIZE_MEMORY_USAGE_FACTOR = 0.5; + public static final int DEFAULT_ORCWRITER_BATCHSIZE_ROWCHECK_FACTOR = 5; + // Tune iff the new batch size is 10% different from the current batch size + public static final double DEFAULT_ORCWRITER_TUNE_BATCHSIZE_SENSITIVITY = 0.1; + public static final int DEFAULT_MIN_ORCWRITER_ROWCHECK = 150; + public static final int DEFAULT_MAX_ORCWRITER_ROWCHECK = 5000; protected final OrcValueWriter valueWriter; @VisibleForTesting VectorizedRowBatch rowBatch; private final TypeDescription typeDescription; - protected final Writer orcFileWriter; + protected Writer orcFileWriter; private final RowBatchPool rowBatchPool; private final boolean enableRowBatchPool; + protected long estimatedRecordSizeBytes = -1; // the close method may be invoked multiple times, but the underlying writer only supports close being called once protected volatile boolean closed = false; - protected final int batchSize; + protected int batchSize; protected final S inputSchema; + private final boolean selfTuningWriter; + private int selfTuneRowsBetweenCheck; + private double rowBatchMemoryUsageFactor; + private int nextSelfTune; + private boolean initialEstimatingRecordSizePhase = false; + private Queue initialSelfTuneCheckpoints = new LinkedList<>(Arrays.asList(10, 100, 500)); + private AtomicInteger recordCounter = new AtomicInteger(0); + @VisibleForTesting + long availableMemory = -1; + private long orcWriterStripeSizeBytes; + private int concurrentWriterTasks; + private int orcFileWriterRowsBetweenCheck; + // Holds the maximum size of the previous run's maximum buffer or the max of the current run's maximum buffer + private long estimatedBytesAllocatedConverterMemory = -1; + private OrcConverterMemoryManager converterMemoryManager; + + Configuration writerConfig; public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { @@ -68,29 +108,56 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) this.inputSchema = builder.getSchema(); this.typeDescription = getOrcSchema(); this.valueWriter = getOrcValueWriter(typeDescription, this.inputSchema, properties); - this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE); + this.selfTuningWriter = properties.getPropAsBoolean(ORC_WRITER_AUTO_SELFTUNE_ENABLED, false); + this.batchSize = this.selfTuningWriter ? DEFAULT_ORC_WRITER_BATCH_SIZE : properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE); this.rowBatchPool = RowBatchPool.instance(properties); this.enableRowBatchPool = properties.getPropAsBoolean(RowBatchPool.ENABLE_ROW_BATCH_POOL, false); + this.selfTuneRowsBetweenCheck = properties.getPropAsInt(ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK, DEFAULT_ORC_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK); + this.rowBatchMemoryUsageFactor = properties.getPropAsDouble(ORCWRITER_ROWBATCH_MEMORY_USAGE_FACTOR, DEFAULT_ORCWRITER_BATCHSIZE_MEMORY_USAGE_FACTOR); this.rowBatch = enableRowBatchPool ? rowBatchPool.getRowBatch(typeDescription, batchSize) : typeDescription.createRowBatch(batchSize); - log.info("Created ORC writer, batch size: {}, {}: {}", - batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), - properties.getProp( - OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), - OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString())); - + this.converterMemoryManager = new OrcConverterMemoryManager(this.rowBatch); + this.orcWriterStripeSizeBytes = properties.getPropAsLong(OrcConf.STRIPE_SIZE.getAttribute(), (long) OrcConf.STRIPE_SIZE.getDefaultValue()); + // Track the number of other writer tasks from different datasets ingesting on the same container + this.concurrentWriterTasks = properties.getPropAsInt(ORC_WRITER_CONCURRENT_TASKS, 1); // Create file-writer - Configuration conf = new Configuration(); + this.writerConfig = new Configuration(); // Populate job Configurations into Conf as well so that configurations related to ORC writer can be tuned easily. for (Object key : properties.getProperties().keySet()) { - conf.set((String) key, properties.getProp((String) key)); + this.writerConfig.set((String) key, properties.getProp((String) key)); } - - OrcFile.WriterOptions options = OrcFile.writerOptions(properties.getProperties(), conf); + OrcFile.WriterOptions options = OrcFile.writerOptions(properties.getProperties(), this.writerConfig); options.setSchema(typeDescription); - // For buffer-writer, flush has to be executed before close so it is better we maintain the life-cycle of fileWriter - // instead of delegating it to closer object in FsDataWriter. - this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options); + // Get the amount of allocated and future space available + this.availableMemory = (Runtime.getRuntime().maxMemory() - (Runtime.getRuntime().totalMemory() - Runtime.getRuntime().freeMemory()))/this.concurrentWriterTasks; + log.info("Available memory for ORC writer: {}", this.availableMemory); + + if (this.selfTuningWriter) { + if (properties.contains(ORC_WRITER_ESTIMATED_RECORD_SIZE) && properties.getPropAsLong(ORC_WRITER_ESTIMATED_RECORD_SIZE) != -1) { + this.estimatedRecordSizeBytes = properties.getPropAsLong(ORC_WRITER_ESTIMATED_RECORD_SIZE); + this.estimatedBytesAllocatedConverterMemory = properties.getPropAsLong(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, -1); + this.orcFileWriterRowsBetweenCheck = properties.getPropAsInt(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), (int) OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue()); + // Use the last run's rows between check value for the underlying file size writer, if it exists. Otherwise it will default to 5000 + log.info("Using previously stored properties to calculate new batch size, ORC Estimated Record size is : {}," + + "estimated bytes converter allocated is : {}, ORC rows between check is {}", + this.estimatedRecordSizeBytes, this.estimatedBytesAllocatedConverterMemory, this.orcFileWriterRowsBetweenCheck); + this.tuneBatchSize(estimatedRecordSizeBytes); + log.info("Initialized batch size at {}", this.batchSize); + this.nextSelfTune = this.selfTuneRowsBetweenCheck; + } else { + // We will need to incrementally tune the writer based on the first few records + this.nextSelfTune = 5; + this.initialEstimatingRecordSizePhase = true; + } + } else { + this.batchSize = properties.getPropAsInt(ORC_WRITER_BATCH_SIZE, DEFAULT_ORC_WRITER_BATCH_SIZE); + log.info("Created ORC writer, batch size: {}, {}: {}", + this.batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), + this.writerConfig.get( + OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), + OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString())); + this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options); + } } /** @@ -113,12 +180,12 @@ public GobblinBaseOrcWriter(FsDataWriterBuilder builder, State properties) @Override public long recordsWritten() { - return this.orcFileWriter.getNumberOfRows(); + return this.orcFileWriter != null ? this.orcFileWriter.getNumberOfRows(): 0; } @Override public long bytesWritten() { - return this.orcFileWriter.getRawDataSize(); + return this.orcFileWriter != null ? this.orcFileWriter.getRawDataSize() : 0; } @Override @@ -141,6 +208,11 @@ public State getFinalState() { public void flush() throws IOException { if (rowBatch.size > 0) { + // We only initialize the native ORC file writer once to avoid creating too many small files, as reconfiguring rows between memory check + // requires one to close the writer and start a new file + if (this.orcFileWriter == null) { + initializeOrcFileWriter(); + } orcFileWriter.addRowBatch(rowBatch); rowBatch.reset(); } @@ -183,9 +255,66 @@ public void commit() throws IOException { closeInternal(); super.commit(); + if (this.selfTuningWriter) { + properties.setProp(ORC_WRITER_ESTIMATED_RECORD_SIZE, String.valueOf(estimatedRecordSizeBytes)); + properties.setProp(ORC_WRITER_ESTIMATED_BYTES_ALLOCATED_CONVERTER_MEMORY, String.valueOf(this.converterMemoryManager.getConverterBufferTotalSize())); + properties.setProp(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), String.valueOf(this.orcFileWriterRowsBetweenCheck)); + } } /** + * Modifies the size of the writer buffer based on the average size of the records written so far, the amount of available memory during initialization, and the number of concurrent writers. + * The new batch size is calculated as follows: + * 1. Memory available = (available memory during startup)/(concurrent writers) - (memory used by ORCFile writer) + * 2. Average file size, estimated during Avro -> ORC conversion + * 3. Estimate of memory used by the converter lists, as during resize the internal buffer size can grow large + * 4. New batch size = (Memory available - Estimated memory used by converter lists) / Average file size + * Generally in this writer, the memory the converter uses for large arrays is the leading cause of OOM in streaming, along with the records stored in the rowBatch + * Another potential approach is to also check the memory available before resizing the converter lists, and to flush the batch whenever a resize is needed. + */ + void tuneBatchSize(long averageSizePerRecord) throws IOException { + this.estimatedBytesAllocatedConverterMemory = Math.max(this.estimatedBytesAllocatedConverterMemory, this.converterMemoryManager.getConverterBufferTotalSize()); + int currentPartitionedWriters = this.properties.getPropAsInt(PartitionedDataWriter.CURRENT_PARTITIONED_WRITERS_COUNTER, CONCURRENT_WRITERS_DEFAULT); + // In the native ORC writer implementation, it will flush the writer if the internal memory exceeds the size of a stripe after rows between check + // So worst case the most memory the writer can hold is the size of a stripe plus size of records * number of records between checks + // Note that this is an overestimate as the native ORC file writer should have some compression ratio + long maxMemoryInFileWriter = averageSizePerRecord * this.orcFileWriterRowsBetweenCheck + this.orcWriterStripeSizeBytes; + + int newBatchSize = (int) ((this.availableMemory*1.0 / currentPartitionedWriters * this.rowBatchMemoryUsageFactor - maxMemoryInFileWriter + - this.estimatedBytesAllocatedConverterMemory) / averageSizePerRecord); + // Handle scenarios where new batch size can be 0 or less due to overestimating memory used by other components + newBatchSize = Math.min(Math.max(1, newBatchSize), DEFAULT_ORC_WRITER_BATCH_SIZE); + if (Math.abs(newBatchSize - this.batchSize) > DEFAULT_ORCWRITER_TUNE_BATCHSIZE_SENSITIVITY * this.batchSize) { + log.info("Tuning ORC writer batch size from {} to {} based on average byte size per record: {} with available memory {} and {} bytes " + + "of allocated memory in converter buffers, with {} partitioned writers", + batchSize, newBatchSize, averageSizePerRecord, availableMemory, + estimatedBytesAllocatedConverterMemory, currentPartitionedWriters); + // We need to always flush because ORC VectorizedRowBatch.ensureSize() does not provide an option to preserve data, refer to + // https://orc.apache.org/api/hive-storage-api/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.html + this.flush(); + this.batchSize = newBatchSize; + this.rowBatch.ensureSize(this.batchSize); + } + } + + void initializeOrcFileWriter() { + try { + this.orcFileWriterRowsBetweenCheck = Math.max(Math.min(this.batchSize * DEFAULT_ORCWRITER_BATCHSIZE_ROWCHECK_FACTOR, DEFAULT_MAX_ORCWRITER_ROWCHECK), DEFAULT_MIN_ORCWRITER_ROWCHECK); + this.writerConfig.set(OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), String.valueOf(this.orcFileWriterRowsBetweenCheck)); + log.info("Created ORC writer, batch size: {}, {}: {}", + this.batchSize, OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), + this.writerConfig.get( + OrcConf.ROWS_BETWEEN_CHECKS.getAttribute(), + OrcConf.ROWS_BETWEEN_CHECKS.getDefaultValue().toString())); + OrcFile.WriterOptions options = OrcFile.writerOptions(properties.getProperties(), this.writerConfig); + options.setSchema(typeDescription); + this.orcFileWriter = OrcFile.createWriter(this.stagingFile, options); + } catch (IOException e) { + log.error("Failed to flush the current batch", e); + } + } + + /* * Note: orc.rows.between.memory.checks is the configuration available to tune memory-check sensitivity in ORC-Core * library. By default it is set to 5000. If the user-application is dealing with large-row Kafka topics for example, * one should consider lower this value to make memory-check more active. @@ -194,10 +323,21 @@ public void commit() public void write(D record) throws IOException { Preconditions.checkState(!closed, "Writer already closed"); - valueWriter.write(record, rowBatch); + this.valueWriter.write(record, this.rowBatch); + int recordCount = this.recordCounter.incrementAndGet(); + if (this.selfTuningWriter && recordCount == this.nextSelfTune) { + long totalBytes = ((GenericRecordToOrcValueWriter) valueWriter).getTotalBytesConverted(); + long totalRecords = ((GenericRecordToOrcValueWriter) valueWriter).getTotalRecordsConverted(); + this.estimatedRecordSizeBytes = totalRecords == 0 ? 0 : totalBytes / totalRecords; + this.tuneBatchSize(this.estimatedRecordSizeBytes); + if (this.initialEstimatingRecordSizePhase && !initialSelfTuneCheckpoints.isEmpty()) { + this.nextSelfTune = initialSelfTuneCheckpoints.poll(); + } else { + this.nextSelfTune += this.selfTuneRowsBetweenCheck; + } + } if (rowBatch.size == this.batchSize) { - orcFileWriter.addRowBatch(rowBatch); - rowBatch.reset(); + this.flush(); } } } diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java new file mode 100644 index 00000000000..dcd6250dcde --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/OrcConverterMemoryManager.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; +import org.apache.orc.storage.ql.exec.vector.ColumnVector; +import org.apache.orc.storage.ql.exec.vector.DecimalColumnVector; +import org.apache.orc.storage.ql.exec.vector.DoubleColumnVector; +import org.apache.orc.storage.ql.exec.vector.ListColumnVector; +import org.apache.orc.storage.ql.exec.vector.LongColumnVector; +import org.apache.orc.storage.ql.exec.vector.MapColumnVector; +import org.apache.orc.storage.ql.exec.vector.StructColumnVector; +import org.apache.orc.storage.ql.exec.vector.UnionColumnVector; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; + + +/** + * A helper class to calculate the size of array buffers in a {@link VectorizedRowBatch}. + * This estimate is mainly based on the maximum size of each variable length column, which can be resized + * Since the resizing algorithm for each column can balloon, this can affect likelihood of OOM + */ +public class OrcConverterMemoryManager { + + private VectorizedRowBatch rowBatch; + + // TODO: Consider moving the resize algorithm from the converter to this class + OrcConverterMemoryManager(VectorizedRowBatch rowBatch) { + this.rowBatch = rowBatch; + } + + /** + * Estimates the approximate size in bytes of elements in a column + * This takes into account the default null values of different ORC ColumnVectors and approximates their sizes + * Although its a rough calculation, it can accurately depict the weight of resizes in a column for very large arrays and maps + * Which tend to dominate the size of the buffer overall + * TODO: Clean this method up considerably and refactor resize logic here + * @param col + * @return + */ + public long calculateSizeOfColHelper(ColumnVector col) { + long converterBufferColSize = 0; + if (col instanceof ListColumnVector) { + ListColumnVector listColumnVector = (ListColumnVector) col; + converterBufferColSize += calculateSizeOfColHelper(listColumnVector.child); + } else if (col instanceof MapColumnVector) { + MapColumnVector mapColumnVector = (MapColumnVector) col; + converterBufferColSize += calculateSizeOfColHelper(mapColumnVector.keys); + converterBufferColSize += calculateSizeOfColHelper(mapColumnVector.values); + } else if (col instanceof StructColumnVector) { + StructColumnVector structColumnVector = (StructColumnVector) col; + for (int j = 0; j < structColumnVector.fields.length; j++) { + converterBufferColSize += calculateSizeOfColHelper(structColumnVector.fields[j]); + } + } else if (col instanceof UnionColumnVector) { + UnionColumnVector unionColumnVector = (UnionColumnVector) col; + for (int j = 0; j < unionColumnVector.fields.length; j++) { + converterBufferColSize += calculateSizeOfColHelper(unionColumnVector.fields[j]); + } + } else if (col instanceof LongColumnVector || col instanceof DoubleColumnVector || col instanceof DecimalColumnVector) { + // Memory space in bytes of native type + converterBufferColSize += col.isNull.length * 8; + } else if (col instanceof BytesColumnVector) { + // Contains two integer list references of size vector for tracking so will use that as null size + converterBufferColSize += ((BytesColumnVector) col).vector.length * 8; + } + // Calculate overhead of the column's own null reference + converterBufferColSize += col.isNull.length; + return converterBufferColSize; + } + + /** + * Returns the total size of all variable length columns in a {@link VectorizedRowBatch} + * TODO: Consider calculating this value on the fly everytime a resize is called + * @return + */ + public long getConverterBufferTotalSize() { + long converterBufferTotalSize = 0; + ColumnVector[] cols = this.rowBatch.cols; + for (int i = 0; i < cols.length; i++) { + converterBufferTotalSize += calculateSizeOfColHelper(cols[i]); + } + return converterBufferTotalSize; + } + +} diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java index 0f10316afc8..c22c62827ff 100644 --- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java +++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriterTest.java @@ -27,7 +27,6 @@ import org.apache.avro.Schema; import org.apache.avro.generic.GenericRecord; -import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -48,6 +47,8 @@ import lombok.extern.slf4j.Slf4j; +import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; + import static org.apache.orc.mapred.OrcMapredRecordReader.nextValue; @@ -172,6 +173,33 @@ public void testListResize() Assert.assertEquals(valueWriter.resizeCount, 2); } + @Test + public void testConvertedBytesCalculation() + throws Exception { + Schema schema = + new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc")); + + TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema); + GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema); + // Make the batch size very small so that the enlarge behavior would easily be triggered. + // But this has to more than the number of records that we deserialized form data.json, as here we don't reset batch. + VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10); + + List recordList = GobblinOrcWriterTest + .deserializeAvroRecords(this.getClass(), schema, "list_map_test/data.json"); + Assert.assertEquals(recordList.size(), 6); + for (GenericRecord record : recordList) { + valueWriter.write(record, rowBatch); + } + // We want to add the sum of the sizes of the elements in the list and map, as well as any isNull values created by resizing the array + long byteSumOfIdList = 4 * 3 * 6; + // Sum of keys + values + long byteSumOfMaps = 1 * 2 * 6 + 4 * 2 * 6; + long expectedBytesConverted = byteSumOfIdList + byteSumOfMaps; + Assert.assertEquals(valueWriter.getTotalBytesConverted(), expectedBytesConverted); + Assert.assertEquals(valueWriter.getTotalRecordsConverted(), 6); + } + /** * Accessing "fields" using reflection to work-around access modifiers. */ diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java index 0b0912cf7a7..dfee836448a 100644 --- a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java +++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/GobblinOrcWriterTest.java @@ -31,6 +31,7 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Writable; +import org.apache.orc.OrcConf; import org.mockito.Mockito; import org.testng.Assert; import org.testng.annotations.Test; @@ -43,7 +44,7 @@ import org.apache.gobblin.source.workunit.WorkUnit; import static org.apache.gobblin.writer.GenericRecordToOrcValueWriterTest.deserializeOrcRecords; -import static org.mockito.Mockito.*; +import static org.mockito.Mockito.when; /** @@ -139,4 +140,146 @@ public void testWrite() throws Exception { Assert.fail(); } } + + @Test + public void testSelfTuneRowBatchSizeIncrease() throws Exception { + Schema schema = + new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc")); + List recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_test/data_multi.json"); + + // Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder + FsDataWriterBuilder mockBuilder = + (FsDataWriterBuilder) Mockito.mock(FsDataWriterBuilder.class); + when(mockBuilder.getSchema()).thenReturn(schema); + + State dummyState = new WorkUnit(); + String stagingDir = Files.createTempDir().getAbsolutePath(); + String outputDir = Files.createTempDir().getAbsolutePath(); + dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir); + dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune"); + dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir); + dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, "true"); + when(mockBuilder.getFileName(dummyState)).thenReturn("file"); + Path outputFilePath = new Path(outputDir, "selfTune/file"); + + // Having a closer to manage the life-cycle of the writer object. + Closer closer = Closer.create(); + GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState)); + // Initialize the rowBatch such that it should store all records + orcWriter.rowBatch.ensureSize(5); + orcWriter.batchSize=5; + + for (GenericRecord record : recordList) { + orcWriter.write(record); + } + // Force the batchSize to increase, lets ensure that the records are not lost in the rowBatch + orcWriter.tuneBatchSize(1); + Assert.assertFalse(orcWriter.batchSize == 5); + Assert.assertTrue(orcWriter.rowBatch.size == 0, "Expected the row batch to be flushed to preserve data"); + + // Not yet flushed in ORC + Assert.assertEquals(orcWriter.recordsWritten(), 0); + + orcWriter.commit(); + Assert.assertEquals(orcWriter.recordsWritten(), 4); + + // Verify ORC file contains correct records. + FileSystem fs = FileSystem.getLocal(new Configuration()); + Assert.assertTrue(fs.exists(outputFilePath)); + List orcRecords = deserializeOrcRecords(outputFilePath, fs); + Assert.assertEquals(orcRecords.size(), 4); + } + + @Test + public void testSelfTuneRowBatchSizeDecrease() throws Exception { + Schema schema = + new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc")); + List recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_test/data_multi.json"); + + // Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder + FsDataWriterBuilder mockBuilder = + (FsDataWriterBuilder) Mockito.mock(FsDataWriterBuilder.class); + when(mockBuilder.getSchema()).thenReturn(schema); + + State dummyState = new WorkUnit(); + String stagingDir = Files.createTempDir().getAbsolutePath(); + String outputDir = Files.createTempDir().getAbsolutePath(); + dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir); + dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune"); + dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir); + dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, "true"); + dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ROWS_BETWEEN_CHECK, "1"); + when(mockBuilder.getFileName(dummyState)).thenReturn("file"); + Path outputFilePath = new Path(outputDir, "selfTune/file"); + + // Having a closer to manage the life-cycle of the writer object. + Closer closer = Closer.create(); + GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState)); + // Force a larger initial batchSize that can be tuned down + orcWriter.batchSize = 10; + orcWriter.rowBatch.ensureSize(10); + + for (GenericRecord record : recordList) { + orcWriter.write(record); + } + // Force the batchSize to decrease + orcWriter.tuneBatchSize(1000000000); + Assert.assertTrue(orcWriter.batchSize == 1); + Assert.assertTrue(orcWriter.rowBatch.size == 0, "Expected the row batch to be flushed to preserve data"); + + // Not yet flushed in ORC + Assert.assertEquals(orcWriter.recordsWritten(), 0); + + orcWriter.commit(); + Assert.assertEquals(orcWriter.recordsWritten(), 4); + + // Verify ORC file contains correct records. + FileSystem fs = FileSystem.getLocal(new Configuration()); + Assert.assertTrue(fs.exists(outputFilePath)); + List orcRecords = deserializeOrcRecords(outputFilePath, fs); + Assert.assertEquals(orcRecords.size(), 4); + } + + + @Test + public void testSelfTuneRowBatchCalculation() throws Exception { + Schema schema = + new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("orc_writer_test/schema.avsc")); + List recordList = deserializeAvroRecords(this.getClass(), schema, "orc_writer_test/data_multi.json"); + + // Mock WriterBuilder, bunch of mocking behaviors to work-around precondition checks in writer builder + FsDataWriterBuilder mockBuilder = + (FsDataWriterBuilder) Mockito.mock(FsDataWriterBuilder.class); + when(mockBuilder.getSchema()).thenReturn(schema); + + State dummyState = new WorkUnit(); + String stagingDir = Files.createTempDir().getAbsolutePath(); + String outputDir = Files.createTempDir().getAbsolutePath(); + dummyState.setProp(ConfigurationKeys.WRITER_STAGING_DIR, stagingDir); + dummyState.setProp(ConfigurationKeys.WRITER_FILE_PATH, "selfTune"); + dummyState.setProp(ConfigurationKeys.WRITER_OUTPUT_DIR, outputDir); + dummyState.setProp(GobblinBaseOrcWriter.ORC_WRITER_AUTO_SELFTUNE_ENABLED, "true"); + dummyState.setProp(OrcConf.STRIPE_SIZE.getAttribute(), "100"); + when(mockBuilder.getFileName(dummyState)).thenReturn("file"); + + // Having a closer to manage the life-cycle of the writer object. + Closer closer = Closer.create(); + GobblinOrcWriter orcWriter = closer.register(new GobblinOrcWriter(mockBuilder, dummyState)); + // Force a larger initial batchSize that can be tuned down + orcWriter.batchSize = 10; + orcWriter.rowBatch.ensureSize(10); + orcWriter.availableMemory = 100000000; + // Given the amount of available memory and a low stripe size, and estimated rowBatchSize, the resulting batchsize should be maxed out + orcWriter.tuneBatchSize(10); + Assert.assertTrue(orcWriter.batchSize == GobblinOrcWriter.DEFAULT_ORC_WRITER_BATCH_SIZE); + orcWriter.availableMemory = 100; + orcWriter.tuneBatchSize(10); + // Given that the amount of available memory is low, the resulting batchsize should be 1 + Assert.assertTrue(orcWriter.batchSize == 1); + orcWriter.availableMemory = 10000; + orcWriter.rowBatch.ensureSize(10000); + // Since the rowBatch is large, the resulting batchsize should still be 1 even with more memory + orcWriter.tuneBatchSize(10); + Assert.assertTrue(orcWriter.batchSize == 1); + } } \ No newline at end of file diff --git a/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java new file mode 100644 index 00000000000..c0477818704 --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/test/java/org/apache/gobblin/writer/OrcConverterMemoryManagerTest.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import java.util.List; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; +import org.apache.orc.TypeDescription; +import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; +import org.testng.Assert; +import org.testng.annotations.Test; + +import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; + + +public class OrcConverterMemoryManagerTest { + + @Test + public void testBufferSizeCalculationResize() + throws Exception { + Schema schema = + new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("list_map_test/schema.avsc")); + TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema); + // Make batch size small so that the enlarge behavior would easily be triggered. + VectorizedRowBatch rowBatch = orcSchema.createRowBatch(10); + OrcConverterMemoryManager memoryManager = new OrcConverterMemoryManager(rowBatch); + GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema); + + List recordList = GobblinOrcWriterTest + .deserializeAvroRecords(this.getClass(), schema, "list_map_test/data.json"); + Assert.assertEquals(recordList.size(), 6); + for (GenericRecord record : recordList) { + valueWriter.write(record, rowBatch); + } + // Expected size is the size of the lists, map keys and map vals after resize. Since there are 6 records, and each array/map have at least 2 elements, then + // One resize is performed when the respective list/maps exceed the initial size of 10, in this case 12. + // So the resized total length would be 12*3 for the list, map keys and map vals, with 8 bytes per value . + int expectedSize = 36 * 9 + 36 * 9 + 36 * 9 + 10*2; + Assert.assertEquals(memoryManager.getConverterBufferTotalSize(), expectedSize); + } + + @Test + public void testBufferSizeCalculatedDeepNestedList() throws Exception { + Schema schema = + new Schema.Parser().parse(this.getClass().getClassLoader().getResourceAsStream("converter_memory_manager_nested_test/schema.avsc")); + TypeDescription orcSchema = AvroOrcSchemaConverter.getOrcSchema(schema); + // Make batch such that only deeply nested list is resized + VectorizedRowBatch rowBatch = orcSchema.createRowBatch(15); + OrcConverterMemoryManager memoryManager = new OrcConverterMemoryManager(rowBatch); + GenericRecordToOrcValueWriter valueWriter = new GenericRecordToOrcValueWriter(orcSchema, schema); + + List recordList = GobblinOrcWriterTest + .deserializeAvroRecords(this.getClass(), schema, "converter_memory_manager_nested_test/data.json"); + Assert.assertEquals(recordList.size(), 1); + for (GenericRecord record : recordList) { + valueWriter.write(record, rowBatch); + } + // Deeply nested list should be resized once, since it resizes at 30 elements (5+10+15) to 90 + // Other fields should not be resized, (map keys and vals, and top level arrays) + // Account for size of top level arrays that should be small + int expectedSize = 30*3*9 + 30*9 + 15*4; // Deeply nested list + maps + other structure overhead + Assert.assertEquals(memoryManager.getConverterBufferTotalSize(), expectedSize); + } +} diff --git a/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/data.json b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/data.json new file mode 100644 index 00000000000..37105d9fb7b --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/data.json @@ -0,0 +1,38 @@ +{ + "parentList": [ + [ + { + "nestedField1": "A", + "nestedField2": "B", + "deeplyNestedList": [1,2,3,4,5] + }, + { + "nestedField1": "A", + "nestedField2": "B", + "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10] + }, + { + "nestedField1": "A", + "nestedField2": "B", + "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] + } + ], + [ + { + "nestedField1": "A", + "nestedField2": "B", + "deeplyNestedList": [1,2,3,4,5] + }, + { + "nestedField1": "A", + "nestedField2": "B", + "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10] + }, + { + "nestedField1": "A", + "nestedField2": "B", + "deeplyNestedList": [1,2,3,4,5,6,7,8,9,10,11,12,13,14,15] + } + ] + ] +} diff --git a/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/schema.avsc b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/schema.avsc new file mode 100644 index 00000000000..36a63660379 --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/test/resources/converter_memory_manager_nested_test/schema.avsc @@ -0,0 +1,37 @@ +{ + "type" : "record", + "name" : "parentRecordName", + "fields" : [ + { + "name": "parentList", + "type": { + "type": "array", + "items": { + "type": "array", + "name": "nestedList", + "items": { + "type": "record", + "name": "nestedRecordName", + "fields": [ + { + "name": "nestedField1", + "type": "string" + }, + { + "name": "nestedField2", + "type": "string" + }, + { + "name": "deeplyNestedList", + "type": { + "type": "array", + "items": "int" + } + } + ] + } + } + } + } + ] +} \ No newline at end of file diff --git a/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data_multi.json b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data_multi.json new file mode 100644 index 00000000000..45e4931e01d --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/test/resources/orc_writer_test/data_multi.json @@ -0,0 +1,16 @@ +{ + "id": 1, + "name": "Alyssa" +} +{ + "id": 2, + "name": "Bob" +} +{ + "id": 3, + "name": "Charlie" +} +{ + "id": 4, + "name": "Diane" +} \ No newline at end of file