Skip to content

Commit

Permalink
[GOBBLIN-1891] Create selftuning buffered ORC writer (#3751)
Browse files Browse the repository at this point in the history
* Selftuning ORCwriter with configurations

* Cleanup

* Address reviews and clean up

* More clean up, adds comments, fix bug where tuning was not happening at a regular interval

* cleanup

* Fix checkstyle

* renames and log improvements

* Add configs to handle multiple tasks and read correct orc stripe size properties, fix some more bugs

* Fix bug in orc converter memory manager where it was miscalculating buffer size

* Algorithm improvement/bug fix, log improvements, flush only when batchsize decreases

* Fix findbugs and address review

* Comment cleanup

* Add log for startup batchsize

* Change naming of batch size memory factor -> rowbatch memory factor to be more accurate to what it's for

* Add memorymanager tests and change algorithm to take into account for children byte size

* Fix test to not rely on an implied large batch size

* Add basic test cases for selftuning

* Decrease the size of tests as github actions cannot handle a large array

* Address last review
  • Loading branch information
Will-Lo authored Sep 1, 2023
1 parent 10b6eb2 commit a0787aa
Show file tree
Hide file tree
Showing 10 changed files with 680 additions and 48 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
public static final Long DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS = Long.MAX_VALUE;
public static final String PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = "partitionedDataWriter.write.timeout.seconds";
public static final Long DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = Long.MAX_VALUE;

public static final String CURRENT_PARTITIONED_WRITERS_COUNTER = "partitionedDataWriter.counter";
private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());

Expand Down Expand Up @@ -176,6 +176,7 @@ public DataWriter<D> get() {
log.info(String.format("Adding one more writer to loading cache of existing writer "
+ "with size = %d", partitionWriters.size()));
Future<DataWriter<D>> future = createWriterPool.submit(() -> createPartitionWriter(key));
state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1);
return future.get(writeTimeoutInterval, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error creating writer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import com.google.common.annotations.VisibleForTesting;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
Expand All @@ -54,6 +55,8 @@

/**
* The converter for buffering rows and forming columnar batch.
* Additionally, records the estimated size of the data converted in bytes
* TODO: consider using the record size provided by the extractor instead of the converter as it may be more available and accurate
*/
@Slf4j
public class GenericRecordToOrcValueWriter implements OrcValueWriter<GenericRecord> {
Expand All @@ -69,6 +72,10 @@ public class GenericRecordToOrcValueWriter implements OrcValueWriter<GenericReco
@VisibleForTesting
public int resizeCount = 0;

@Getter
long totalBytesConverted = 0;
@Getter
long totalRecordsConverted = 0;
/**
* The interface for the conversion from GenericRecord to ORC's ColumnVectors.
*/
Expand All @@ -79,8 +86,9 @@ interface Converter {
* @param column either the column number or element number
* @param data Object which contains the data
* @param output the ColumnVector to put the value into
* @returns the number of elements converted by the converter, for tracking and estimation purposes
*/
void addValue(int rowId, int column, Object data, ColumnVector output);
long addValue(int rowId, int column, Object data, ColumnVector output);
}

private final Converter[] converters;
Expand All @@ -98,67 +106,90 @@ public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema avr
log.info("enabledSmartSizing: {}, enlargeFactor: {}", enabledSmartSizing, enlargeFactor);
}

/** Converts a record from the GenericRecord to the ORC ColumnVectors.
* Additionally, records the number of bytes converted and the number of records converted.
* @param value the data value to write.
* @param output the VectorizedRowBatch to which the output will be written.
* @throws IOException
*/
@Override
public void write(GenericRecord value, VectorizedRowBatch output)
throws IOException {

int row = output.size++;
long bytesConverted = 0;
for (int c = 0; c < converters.length; ++c) {
ColumnVector col = output.cols[c];
if (value.get(c) == null) {
col.noNulls = false;
col.isNull[row] = true;
} else {
col.isNull[row] = false;
converters[c].addValue(row, c, value.get(c), col);
bytesConverted += converters[c].addValue(row, c, value.get(c), col);
}
}
this.totalBytesConverted += bytesConverted;
this.totalRecordsConverted += 1;
}


static class BooleanConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 1;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (boolean) data ? 1 : 0;
return MEMORY_SIZE_BYTES;
}
}

static class ByteConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 1;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (byte) data;
return MEMORY_SIZE_BYTES;
}
}

static class ShortConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 4;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (short) data;
return MEMORY_SIZE_BYTES;
}
}

static class IntConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 4;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (int) data;
return MEMORY_SIZE_BYTES;
}
}

static class LongConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 8;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (long) data;
return MEMORY_SIZE_BYTES;
}
}

static class FloatConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 4;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((DoubleColumnVector) output).vector[rowId] = (float) data;
return MEMORY_SIZE_BYTES;
}
}

static class DoubleConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 8;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((DoubleColumnVector) output).vector[rowId] = (double) data;
return MEMORY_SIZE_BYTES;
}
}

static class StringConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
final byte[] value;
if (data instanceof GenericEnumSymbol) {
value = data.toString().getBytes(StandardCharsets.UTF_8);
Expand All @@ -170,11 +201,12 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
value = ((String) data).getBytes(StandardCharsets.UTF_8);
}
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
return value.length;
}
}

static class BytesConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
final byte[] value;
if (data instanceof GenericFixed) {
value = ((GenericFixed) data).bytes();
Expand All @@ -184,18 +216,22 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
value = (byte[]) data;
}
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
return value.length;
}
}

static class DecimalConverter implements Converter {
// This is a naive estimation
private static final int MEMORY_SIZE_BYTES = 17;
private final int scale;

public DecimalConverter(int scale) {
this.scale = scale;
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].set(getHiveDecimalFromByteBuffer((ByteBuffer) data));
return MEMORY_SIZE_BYTES;
}

/**
Expand Down Expand Up @@ -228,19 +264,22 @@ class StructConverter implements Converter {
}
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
GenericRecord value = (GenericRecord) data;
StructColumnVector cv = (StructColumnVector) output;
long estimatedBytes = 0;
for (int c = 0; c < children.length; ++c) {
ColumnVector field = cv.fields[c];
if (value.get(c) == null) {
field.noNulls = false;
field.isNull[rowId] = true;
estimatedBytes += 1;
} else {
field.isNull[rowId] = false;
children[c].addValue(rowId, c, value.get(c), field);
estimatedBytes += children[c].addValue(rowId, c, value.get(c), field);
}
}
return estimatedBytes;
}
}

Expand All @@ -261,22 +300,24 @@ class UnionConverter implements Converter {
* original data type without union wrapper.
*/
@Override
public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
UnionColumnVector cv = (UnionColumnVector) output;
int tag = (data != null) ? GenericData.get().resolveUnion(unionSchema, data) : children.length;

long estimatedBytes = 0;
for (int c = 0; c < children.length; ++c) {
ColumnVector field = cv.fields[c];
// If c == tag that indicates data must not be null
if (c == tag) {
field.isNull[rowId] = false;
cv.tags[rowId] = c;
children[c].addValue(rowId, c, data, field);
estimatedBytes += children[c].addValue(rowId, c, data, field);
} else {
field.noNulls = false;
field.isNull[rowId] = true;
estimatedBytes += 1;
}
}
return estimatedBytes;
}
}

Expand All @@ -290,7 +331,7 @@ class ListConverter implements Converter {
rowsAdded = 0;
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
rowsAdded += 1;
List value = (List) data;
ListColumnVector cv = (ListColumnVector) output;
Expand All @@ -299,25 +340,29 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
cv.lengths[rowId] = value.size();
cv.offsets[rowId] = cv.childCount;
cv.childCount += cv.lengths[rowId];

// make sure the child is big enough
// If seeing child array being saturated, will need to expand with a reasonable amount.
if (cv.childCount > cv.child.isNull.length) {
int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
log.info("Column vector: {}, resizing to: {}, child count: {}", cv.child, resizedLength, cv.childCount);
cv.child.ensureSize(resizedLength, true);
}

// Add the size of the empty space of the list
long estimatedBytes = 0;
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int offset = (int) (e + cv.offsets[rowId]);
if (value.get(e) == null) {
cv.child.noNulls = false;
cv.child.isNull[offset] = true;
estimatedBytes += 1;
} else {
cv.child.isNull[offset] = false;
children.addValue(offset, e, value.get(e), cv.child);
estimatedBytes += children.addValue(offset, e, value.get(e), cv.child);
}
}
return estimatedBytes;
}
}

Expand All @@ -333,7 +378,7 @@ class MapConverter implements Converter {
rowsAdded = 0;
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
rowsAdded += 1;
Map<Object, Object> map = (Map<Object, Object>) data;
Set<Map.Entry<Object, Object>> entries = map.entrySet();
Expand All @@ -353,24 +398,28 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
}
// Add each element
int e = 0;
long estimatedBytes = 0;
for (Map.Entry entry : entries) {
int offset = (int) (e + cv.offsets[rowId]);
if (entry.getKey() == null) {
cv.keys.noNulls = false;
cv.keys.isNull[offset] = true;
estimatedBytes += 1;
} else {
cv.keys.isNull[offset] = false;
keyConverter.addValue(offset, e, entry.getKey(), cv.keys);
estimatedBytes += keyConverter.addValue(offset, e, entry.getKey(), cv.keys);
}
if (entry.getValue() == null) {
cv.values.noNulls = false;
cv.values.isNull[offset] = true;
estimatedBytes += 1;
} else {
cv.values.isNull[offset] = false;
valueConverter.addValue(offset, e, entry.getValue(), cv.values);
estimatedBytes += valueConverter.addValue(offset, e, entry.getValue(), cv.values);
}
e++;
}
return estimatedBytes;
}
}

Expand Down
Loading

0 comments on commit a0787aa

Please sign in to comment.