Skip to content

Commit

Permalink
[GOBBLIN-1869] Create instrumented orc writer (apache#3732)
Browse files Browse the repository at this point in the history
* Creates an instrumented ORC Writer that emits events on commit and close

* Adds ORC writer metrics to track high record conversion metrics

* Fix add final to config strings

* Add namespace and use event submitter to send events
  • Loading branch information
Will-Lo authored and phet committed Aug 15, 2023
1 parent ae11e3c commit 320d8a2
Show file tree
Hide file tree
Showing 4 changed files with 114 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
Expand All @@ -50,6 +49,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;


/**
Expand Down Expand Up @@ -435,4 +435,8 @@ private Converter[] buildConverters(TypeDescription schema, Schema avroSchema) {
}
return result;
}

public int getResizeCount() {
return resizeCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.Properties;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
Expand All @@ -30,6 +29,7 @@
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -41,22 +41,22 @@
*/
@Slf4j
public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
static final String ORC_WRITER_PREFIX = "orcWriter.";
public static final String ORC_WRITER_PREFIX = "orcWriter.";
public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize";
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;

private final OrcValueWriter<D> valueWriter;
protected final OrcValueWriter<D> valueWriter;
@VisibleForTesting
VectorizedRowBatch rowBatch;
private final TypeDescription typeDescription;
private final Writer orcFileWriter;
protected final Writer orcFileWriter;
private final RowBatchPool rowBatchPool;
private final boolean enableRowBatchPool;

// the close method may be invoked multiple times, but the underlying writer only supports close being called once
private volatile boolean closed = false;
protected volatile boolean closed = false;

private final int batchSize;
protected final int batchSize;
protected final S inputSchema;


Expand Down Expand Up @@ -116,6 +116,11 @@ public long recordsWritten() {
return this.orcFileWriter.getNumberOfRows();
}

@Override
public long bytesWritten() {
return this.orcFileWriter.getRawDataSize();
}

@Override
public State getFinalState() {
/**
Expand All @@ -141,15 +146,19 @@ public void flush()
}
}

private synchronized void closeInternal()
protected void recycleRowBatchPool() {
if (enableRowBatchPool) {
rowBatchPool.recycle(typeDescription, rowBatch);
}
}

protected synchronized void closeInternal()
throws IOException {
if (!closed) {
this.flush();
this.orcFileWriter.close();
this.closed = true;
if (enableRowBatchPool) {
rowBatchPool.recycle(typeDescription, rowBatch);
}
this.recycleRowBatchPool();
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
if (rowBatch.size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* The WriterBuilder extension to create {@link GobblinOrcWriter} on top of {@link FsDataWriterBuilder}
*/
public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, GenericRecord> {
public static final String ORC_WRITER_INSTRUMENTED = GobblinBaseOrcWriter.ORC_WRITER_PREFIX + "instrumented";
public GobblinOrcWriterBuilder() {
}

Expand All @@ -42,6 +43,9 @@ public DataWriter<GenericRecord> build()

switch (this.destination.getType()) {
case HDFS:
if (this.destination.getProperties().getPropAsBoolean(ORC_WRITER_INSTRUMENTED, false)) {
return new InstrumentedGobblinOrcWriter(this, this.destination.getProperties());
}
return new GobblinOrcWriter(this, this.destination.getProperties());
default:
throw new RuntimeException("Unknown destination type: " + this.destination.getType());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import java.io.IOException;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;


/***
* A class for an event emitting GobblinOrcWriter metrics, such as internal memory resizing and flushing
*/
@Slf4j
public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter {
MetricContext metricContext;
public static final String METRICS_SCHEMA_NAME = "schemaName";
public static final String METRICS_BYTES_WRITTEN = "bytesWritten";
public static final String METRICS_RECORDS_WRITTEN = "recordsWritten";
public static final String METRICS_BUFFER_RESIZES = "bufferResizes";
public static final String METRICS_BUFFER_SIZE = "bufferSize";
public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics";
private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";

public InstrumentedGobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
super(builder, properties);
metricContext = Instrumented.getMetricContext(new State(properties), this.getClass());
}

@Override
protected synchronized void closeInternal() throws IOException {
// close() can be called multiple times by super.commit() and super.close(), but we only want to emit metrics once
if (!this.closed) {
this.flush();
this.orcFileWriter.close();
this.closed = true;
log.info("Emitting ORC event metrics");
this.sendOrcWriterMetadataEvent();
this.recycleRowBatchPool();
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
if (rowBatch.size > 0) {
throw new CloseBeforeFlushException(this.inputSchema.toString());
}
}
}

private void sendOrcWriterMetadataEvent() {
GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME, ORC_WRITER_NAMESPACE);
Map<String, String> eventMetadataMap = Maps.newHashMap();
eventMetadataMap.put(METRICS_SCHEMA_NAME, this.inputSchema.getName());
eventMetadataMap.put(METRICS_BYTES_WRITTEN, String.valueOf(this.bytesWritten()));
eventMetadataMap.put(METRICS_RECORDS_WRITTEN, String.valueOf(this.recordsWritten()));
eventMetadataMap.put(METRICS_BUFFER_RESIZES, String.valueOf(((GenericRecordToOrcValueWriter) this.valueWriter).getResizeCount()));
eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(this.batchSize));

builder.addAdditionalMetadata(eventMetadataMap);
EventSubmitter.submit(metricContext, builder);
}
}

0 comments on commit 320d8a2

Please sign in to comment.