Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GOBBLIN-1869] Create instrumented orc writer #3732

Merged
merged 4 commits into from
Aug 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@
import org.apache.avro.generic.GenericFixed;
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.util.Utf8;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;
import org.apache.orc.TypeDescription;
import org.apache.orc.storage.common.type.HiveDecimal;
import org.apache.orc.storage.ql.exec.vector.BytesColumnVector;
Expand All @@ -50,6 +49,7 @@
import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.util.orc.AvroOrcSchemaConverter;


/**
Expand Down Expand Up @@ -435,4 +435,8 @@ private Converter[] buildConverters(TypeDescription schema, Schema avroSchema) {
}
return result;
}

public int getResizeCount() {
return resizeCount;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.Properties;

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.orc.OrcConf;
Expand All @@ -30,6 +29,7 @@
import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;

import lombok.extern.slf4j.Slf4j;

Expand All @@ -41,22 +41,22 @@
*/
@Slf4j
public abstract class GobblinBaseOrcWriter<S, D> extends FsDataWriter<D> {
static final String ORC_WRITER_PREFIX = "orcWriter.";
public static final String ORC_WRITER_PREFIX = "orcWriter.";
public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize";
public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000;

private final OrcValueWriter<D> valueWriter;
protected final OrcValueWriter<D> valueWriter;
@VisibleForTesting
VectorizedRowBatch rowBatch;
private final TypeDescription typeDescription;
private final Writer orcFileWriter;
protected final Writer orcFileWriter;
private final RowBatchPool rowBatchPool;
private final boolean enableRowBatchPool;

// the close method may be invoked multiple times, but the underlying writer only supports close being called once
private volatile boolean closed = false;
protected volatile boolean closed = false;

private final int batchSize;
protected final int batchSize;
protected final S inputSchema;


Expand Down Expand Up @@ -116,6 +116,11 @@ public long recordsWritten() {
return this.orcFileWriter.getNumberOfRows();
}

@Override
public long bytesWritten() {
return this.orcFileWriter.getRawDataSize();
}

@Override
public State getFinalState() {
/**
Expand All @@ -141,15 +146,19 @@ public void flush()
}
}

private synchronized void closeInternal()
protected void recycleRowBatchPool() {
if (enableRowBatchPool) {
rowBatchPool.recycle(typeDescription, rowBatch);
}
}

protected synchronized void closeInternal()
throws IOException {
if (!closed) {
this.flush();
this.orcFileWriter.close();
this.closed = true;
if (enableRowBatchPool) {
rowBatchPool.recycle(typeDescription, rowBatch);
}
this.recycleRowBatchPool();
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
if (rowBatch.size > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
* The WriterBuilder extension to create {@link GobblinOrcWriter} on top of {@link FsDataWriterBuilder}
*/
public class GobblinOrcWriterBuilder extends FsDataWriterBuilder<Schema, GenericRecord> {
public static final String ORC_WRITER_INSTRUMENTED = GobblinBaseOrcWriter.ORC_WRITER_PREFIX + "instrumented";
public GobblinOrcWriterBuilder() {
}

Expand All @@ -42,6 +43,9 @@ public DataWriter<GenericRecord> build()

switch (this.destination.getType()) {
case HDFS:
if (this.destination.getProperties().getPropAsBoolean(ORC_WRITER_INSTRUMENTED, false)) {
return new InstrumentedGobblinOrcWriter(this, this.destination.getProperties());
}
return new GobblinOrcWriter(this, this.destination.getProperties());
default:
throw new RuntimeException("Unknown destination type: " + this.destination.getType());
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.gobblin.writer;

import java.io.IOException;
import java.util.Map;

import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;

import com.google.common.collect.Maps;

import lombok.extern.slf4j.Slf4j;

import org.apache.gobblin.configuration.State;
import org.apache.gobblin.instrumented.Instrumented;
import org.apache.gobblin.metrics.MetricContext;
import org.apache.gobblin.metrics.event.EventSubmitter;
import org.apache.gobblin.metrics.event.GobblinEventBuilder;


/***
* A class for an event emitting GobblinOrcWriter metrics, such as internal memory resizing and flushing
*/
@Slf4j
public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter {
MetricContext metricContext;
public static final String METRICS_SCHEMA_NAME = "schemaName";
public static final String METRICS_BYTES_WRITTEN = "bytesWritten";
public static final String METRICS_RECORDS_WRITTEN = "recordsWritten";
public static final String METRICS_BUFFER_RESIZES = "bufferResizes";
public static final String METRICS_BUFFER_SIZE = "bufferSize";
public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics";
private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer";

public InstrumentedGobblinOrcWriter(FsDataWriterBuilder<Schema, GenericRecord> builder, State properties) throws IOException {
super(builder, properties);
metricContext = Instrumented.getMetricContext(new State(properties), this.getClass());
}

@Override
protected synchronized void closeInternal() throws IOException {
// close() can be called multiple times by super.commit() and super.close(), but we only want to emit metrics once
if (!this.closed) {
this.flush();
this.orcFileWriter.close();
this.closed = true;
log.info("Emitting ORC event metrics");
this.sendOrcWriterMetadataEvent();
this.recycleRowBatchPool();
} else {
// Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds.
if (rowBatch.size > 0) {
throw new CloseBeforeFlushException(this.inputSchema.toString());
}
}
}

private void sendOrcWriterMetadataEvent() {
GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME, ORC_WRITER_NAMESPACE);
Map<String, String> eventMetadataMap = Maps.newHashMap();
eventMetadataMap.put(METRICS_SCHEMA_NAME, this.inputSchema.getName());
eventMetadataMap.put(METRICS_BYTES_WRITTEN, String.valueOf(this.bytesWritten()));
eventMetadataMap.put(METRICS_RECORDS_WRITTEN, String.valueOf(this.recordsWritten()));
eventMetadataMap.put(METRICS_BUFFER_RESIZES, String.valueOf(((GenericRecordToOrcValueWriter) this.valueWriter).getResizeCount()));
eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(this.batchSize));

builder.addAdditionalMetadata(eventMetadataMap);
EventSubmitter.submit(metricContext, builder);
}
}
Loading