Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1891] Create selftuning buffered ORC writer #3751

Merged
merged 19 commits into from
Sep 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ public class PartitionedDataWriter<S, D> extends WriterWrapper<D> implements Fin
public static final Long DEFAULT_PARTITIONED_WRITER_CACHE_TTL_SECONDS = Long.MAX_VALUE;
public static final String PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = "partitionedDataWriter.write.timeout.seconds";
public static final Long DEFAULT_PARTITIONED_WRITER_WRITE_TIMEOUT_SECONDS = Long.MAX_VALUE;

public static final String CURRENT_PARTITIONED_WRITERS_COUNTER = "partitionedDataWriter.counter";
private static final GenericRecord NON_PARTITIONED_WRITER_KEY =
new GenericData.Record(SchemaBuilder.record("Dummy").fields().endRecord());

Expand Down Expand Up @@ -176,6 +176,7 @@ public DataWriter<D> get() {
log.info(String.format("Adding one more writer to loading cache of existing writer "
+ "with size = %d", partitionWriters.size()));
Future<DataWriter<D>> future = createWriterPool.submit(() -> createPartitionWriter(key));
state.setProp(CURRENT_PARTITIONED_WRITERS_COUNTER, partitionWriters.size() + 1);
return future.get(writeTimeoutInterval, TimeUnit.SECONDS);
} catch (ExecutionException | InterruptedException e) {
throw new RuntimeException("Error creating writer", e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@

import com.google.common.annotations.VisibleForTesting;

import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
Expand All @@ -54,6 +55,8 @@

/**
* The converter for buffering rows and forming columnar batch.
* Additionally, records the estimated size of the data converted in bytes
* TODO: consider using the record size provided by the extractor instead of the converter as it may be more available and accurate
*/
@Slf4j
public class GenericRecordToOrcValueWriter implements OrcValueWriter<GenericRecord> {
Expand All @@ -69,6 +72,10 @@ public class GenericRecordToOrcValueWriter implements OrcValueWriter<GenericReco
@VisibleForTesting
public int resizeCount = 0;

@Getter
long totalBytesConverted = 0;
@Getter
long totalRecordsConverted = 0;
/**
* The interface for the conversion from GenericRecord to ORC's ColumnVectors.
*/
Expand All @@ -79,8 +86,9 @@ interface Converter {
* @param column either the column number or element number
* @param data Object which contains the data
* @param output the ColumnVector to put the value into
* @returns the number of elements converted by the converter, for tracking and estimation purposes
*/
void addValue(int rowId, int column, Object data, ColumnVector output);
long addValue(int rowId, int column, Object data, ColumnVector output);
}

private final Converter[] converters;
Expand All @@ -98,67 +106,90 @@ public GenericRecordToOrcValueWriter(TypeDescription typeDescription, Schema avr
log.info("enabledSmartSizing: {}, enlargeFactor: {}", enabledSmartSizing, enlargeFactor);
}

/** Converts a record from the GenericRecord to the ORC ColumnVectors.
* Additionally, records the number of bytes converted and the number of records converted.
* @param value the data value to write.
* @param output the VectorizedRowBatch to which the output will be written.
* @throws IOException
*/
@Override
public void write(GenericRecord value, VectorizedRowBatch output)
throws IOException {

int row = output.size++;
long bytesConverted = 0;
for (int c = 0; c < converters.length; ++c) {
ColumnVector col = output.cols[c];
if (value.get(c) == null) {
col.noNulls = false;
col.isNull[row] = true;
} else {
col.isNull[row] = false;
converters[c].addValue(row, c, value.get(c), col);
bytesConverted += converters[c].addValue(row, c, value.get(c), col);
}
}
this.totalBytesConverted += bytesConverted;
this.totalRecordsConverted += 1;
}


static class BooleanConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 1;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (boolean) data ? 1 : 0;
return MEMORY_SIZE_BYTES;
}
}

static class ByteConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 1;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (byte) data;
return MEMORY_SIZE_BYTES;
}
}

static class ShortConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 4;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (short) data;
return MEMORY_SIZE_BYTES;
}
}

static class IntConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 4;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (int) data;
return MEMORY_SIZE_BYTES;
}
}

static class LongConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 8;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((LongColumnVector) output).vector[rowId] = (long) data;
return MEMORY_SIZE_BYTES;
}
}

static class FloatConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 4;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((DoubleColumnVector) output).vector[rowId] = (float) data;
return MEMORY_SIZE_BYTES;
}
}

static class DoubleConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
private static final int MEMORY_SIZE_BYTES = 8;
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((DoubleColumnVector) output).vector[rowId] = (double) data;
return MEMORY_SIZE_BYTES;
}
}

static class StringConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
final byte[] value;
if (data instanceof GenericEnumSymbol) {
value = data.toString().getBytes(StandardCharsets.UTF_8);
Expand All @@ -170,11 +201,12 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
value = ((String) data).getBytes(StandardCharsets.UTF_8);
}
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
return value.length;
}
}

static class BytesConverter implements Converter {
public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
final byte[] value;
if (data instanceof GenericFixed) {
value = ((GenericFixed) data).bytes();
Expand All @@ -184,18 +216,22 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
value = (byte[]) data;
}
((BytesColumnVector) output).setRef(rowId, value, 0, value.length);
return value.length;
}
}

static class DecimalConverter implements Converter {
// This is a naive estimation
private static final int MEMORY_SIZE_BYTES = 17;
private final int scale;

public DecimalConverter(int scale) {
this.scale = scale;
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
((DecimalColumnVector) output).vector[rowId].set(getHiveDecimalFromByteBuffer((ByteBuffer) data));
return MEMORY_SIZE_BYTES;
}

/**
Expand Down Expand Up @@ -228,19 +264,22 @@ class StructConverter implements Converter {
}
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
GenericRecord value = (GenericRecord) data;
StructColumnVector cv = (StructColumnVector) output;
long estimatedBytes = 0;
for (int c = 0; c < children.length; ++c) {
ColumnVector field = cv.fields[c];
if (value.get(c) == null) {
field.noNulls = false;
field.isNull[rowId] = true;
estimatedBytes += 1;
} else {
field.isNull[rowId] = false;
children[c].addValue(rowId, c, value.get(c), field);
estimatedBytes += children[c].addValue(rowId, c, value.get(c), field);
}
}
return estimatedBytes;
}
}

Expand All @@ -261,22 +300,24 @@ class UnionConverter implements Converter {
* original data type without union wrapper.
*/
@Override
public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
UnionColumnVector cv = (UnionColumnVector) output;
int tag = (data != null) ? GenericData.get().resolveUnion(unionSchema, data) : children.length;

long estimatedBytes = 0;
for (int c = 0; c < children.length; ++c) {
ColumnVector field = cv.fields[c];
// If c == tag that indicates data must not be null
if (c == tag) {
field.isNull[rowId] = false;
cv.tags[rowId] = c;
children[c].addValue(rowId, c, data, field);
estimatedBytes += children[c].addValue(rowId, c, data, field);
} else {
field.noNulls = false;
field.isNull[rowId] = true;
estimatedBytes += 1;
}
}
return estimatedBytes;
}
}

Expand All @@ -290,7 +331,7 @@ class ListConverter implements Converter {
rowsAdded = 0;
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
rowsAdded += 1;
List value = (List) data;
ListColumnVector cv = (ListColumnVector) output;
Expand All @@ -299,25 +340,29 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
cv.lengths[rowId] = value.size();
cv.offsets[rowId] = cv.childCount;
cv.childCount += cv.lengths[rowId];

// make sure the child is big enough
// If seeing child array being saturated, will need to expand with a reasonable amount.
if (cv.childCount > cv.child.isNull.length) {
int resizedLength = resize(rowsAdded, cv.isNull.length, cv.childCount);
log.info("Column vector: {}, resizing to: {}, child count: {}", cv.child, resizedLength, cv.childCount);
cv.child.ensureSize(resizedLength, true);
}

// Add the size of the empty space of the list
long estimatedBytes = 0;
// Add each element
for (int e = 0; e < cv.lengths[rowId]; ++e) {
int offset = (int) (e + cv.offsets[rowId]);
if (value.get(e) == null) {
cv.child.noNulls = false;
cv.child.isNull[offset] = true;
estimatedBytes += 1;
} else {
cv.child.isNull[offset] = false;
children.addValue(offset, e, value.get(e), cv.child);
estimatedBytes += children.addValue(offset, e, value.get(e), cv.child);
}
}
return estimatedBytes;
}
}

Expand All @@ -333,7 +378,7 @@ class MapConverter implements Converter {
rowsAdded = 0;
}

public void addValue(int rowId, int column, Object data, ColumnVector output) {
public long addValue(int rowId, int column, Object data, ColumnVector output) {
rowsAdded += 1;
Map<Object, Object> map = (Map<Object, Object>) data;
Set<Map.Entry<Object, Object>> entries = map.entrySet();
Expand All @@ -353,24 +398,28 @@ public void addValue(int rowId, int column, Object data, ColumnVector output) {
}
// Add each element
int e = 0;
long estimatedBytes = 0;
for (Map.Entry entry : entries) {
int offset = (int) (e + cv.offsets[rowId]);
if (entry.getKey() == null) {
cv.keys.noNulls = false;
cv.keys.isNull[offset] = true;
estimatedBytes += 1;
} else {
cv.keys.isNull[offset] = false;
keyConverter.addValue(offset, e, entry.getKey(), cv.keys);
estimatedBytes += keyConverter.addValue(offset, e, entry.getKey(), cv.keys);
}
if (entry.getValue() == null) {
cv.values.noNulls = false;
cv.values.isNull[offset] = true;
estimatedBytes += 1;
} else {
cv.values.isNull[offset] = false;
valueConverter.addValue(offset, e, entry.getValue(), cv.values);
estimatedBytes += valueConverter.addValue(offset, e, entry.getValue(), cv.values);
}
e++;
}
return estimatedBytes;
}
}

Expand Down
Loading
Loading