From 63e6668d710099b8c38c0a062edc11b5d35d277e Mon Sep 17 00:00:00 2001 From: William Lo Date: Wed, 26 Jul 2023 15:44:22 -0400 Subject: [PATCH 1/4] Creates an instrumented ORC Writer that emits events on commit and close --- .../writer/GenericRecordToOrcValueWriter.java | 6 +- .../gobblin/writer/GobblinBaseOrcWriter.java | 12 ++-- .../writer/GobblinOrcWriterBuilder.java | 4 ++ .../writer/InstrumentedGobblinOrcWriter.java | 69 +++++++++++++++++++ 4 files changed, 84 insertions(+), 7 deletions(-) create mode 100644 gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java index e764a14db9f..24e203b64dd 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GenericRecordToOrcValueWriter.java @@ -31,7 +31,6 @@ import org.apache.avro.generic.GenericFixed; import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; -import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; import org.apache.orc.TypeDescription; import org.apache.orc.storage.common.type.HiveDecimal; import org.apache.orc.storage.ql.exec.vector.BytesColumnVector; @@ -50,6 +49,7 @@ import lombok.extern.slf4j.Slf4j; import org.apache.gobblin.configuration.State; +import org.apache.gobblin.util.orc.AvroOrcSchemaConverter; /** @@ -435,4 +435,8 @@ private Converter[] buildConverters(TypeDescription schema, Schema avroSchema) { } return result; } + + public int getResizeCount() { + return resizeCount; + } } \ No newline at end of file diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 528ceaaf660..59cfe146ffd 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -20,7 +20,6 @@ import java.io.IOException; import java.util.Properties; -import com.google.common.base.Preconditions; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.SerDeException; import org.apache.orc.OrcConf; @@ -30,6 +29,7 @@ import org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import lombok.extern.slf4j.Slf4j; @@ -41,22 +41,22 @@ */ @Slf4j public abstract class GobblinBaseOrcWriter extends FsDataWriter { - static final String ORC_WRITER_PREFIX = "orcWriter."; + public static final String ORC_WRITER_PREFIX = "orcWriter."; public static final String ORC_WRITER_BATCH_SIZE = ORC_WRITER_PREFIX + "batchSize"; public static final int DEFAULT_ORC_WRITER_BATCH_SIZE = 1000; - private final OrcValueWriter valueWriter; + protected final OrcValueWriter valueWriter; @VisibleForTesting VectorizedRowBatch rowBatch; private final TypeDescription typeDescription; - private final Writer orcFileWriter; + protected final Writer orcFileWriter; private final RowBatchPool rowBatchPool; private final boolean enableRowBatchPool; // the close method may be invoked multiple times, but the underlying writer only supports close being called once private volatile boolean closed = false; - private final int batchSize; + protected final int batchSize; protected final S inputSchema; @@ -141,7 +141,7 @@ public void flush() } } - private synchronized void closeInternal() + protected synchronized void closeInternal() throws IOException { if (!closed) { this.flush(); diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java index 3097397a8eb..47f5c8ca3d4 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java @@ -30,6 +30,7 @@ * The WriterBuilder extension to create {@link GobblinOrcWriter} on top of {@link FsDataWriterBuilder} */ public class GobblinOrcWriterBuilder extends FsDataWriterBuilder { + public static String ORC_WRITER_INSTRUMENTED = GobblinBaseOrcWriter.ORC_WRITER_PREFIX + "instrumented"; public GobblinOrcWriterBuilder() { } @@ -42,6 +43,9 @@ public DataWriter build() switch (this.destination.getType()) { case HDFS: + if (this.destination.getProperties().getPropAsBoolean(ORC_WRITER_INSTRUMENTED, false)) { + return new InstrumentedGobblinOrcWriter(this, this.destination.getProperties()); + } return new GobblinOrcWriter(this, this.destination.getProperties()); default: throw new RuntimeException("Unknown destination type: " + this.destination.getType()); diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java new file mode 100644 index 00000000000..7d4a806ca2d --- /dev/null +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import java.io.IOException; +import java.util.Map; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericRecord; + +import com.google.common.collect.Maps; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.metrics.GobblinTrackingEvent; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.GobblinEventBuilder; + + +/*** + * A class for an event emitting GobblinOrcWriter metrics, such as internal memory resizing and flushing + */ +public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter { + MetricContext metricContext; + public static String METRICS_SCHEMA_NAME = "schemaName"; + public static String METRICS_BYTES_WRITTEN = "bytesWritten"; + public static String METRICS_RECORDS_WRITTEN = "recordsWritten"; + public static String METRICS_BUFFER_RESIZES = "bufferResizes"; + public static String METRICS_BUFFER_SIZE = "bufferSize"; + + public InstrumentedGobblinOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { + super(builder, properties); + metricContext = Instrumented.getMetricContext(new State(), this.getClass()); + } + + @Override + protected synchronized void closeInternal() throws IOException { + // Flushes the ORC file writer + super.closeInternal(); + this.metricContext.submitEvent(this.createOrcWriterMetadataEvent()); + } + + GobblinTrackingEvent createOrcWriterMetadataEvent() throws IOException { + GobblinEventBuilder builder = new GobblinEventBuilder("OrcWriterMetrics"); + Map eventMetadataMap = Maps.newHashMap(); + eventMetadataMap.put(METRICS_SCHEMA_NAME, this.inputSchema.getName()); + eventMetadataMap.put(METRICS_BYTES_WRITTEN, String.valueOf(this.bytesWritten())); + eventMetadataMap.put(METRICS_RECORDS_WRITTEN, String.valueOf(this.recordsWritten())); + eventMetadataMap.put(METRICS_BUFFER_RESIZES, String.valueOf(((GenericRecordToOrcValueWriter) this.valueWriter).getResizeCount())); + eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(rowBatch.size)); + builder.addAdditionalMetadata(eventMetadataMap); + return builder.build(); + } +} From 22a7e1c77b0a19cf50f5f61c7de1798b9208042a Mon Sep 17 00:00:00 2001 From: William Lo Date: Thu, 3 Aug 2023 22:49:53 -0400 Subject: [PATCH 2/4] Adds ORC writer metrics to track high record conversion metrics --- .../gobblin/writer/GobblinBaseOrcWriter.java | 17 ++++++++--- .../writer/InstrumentedGobblinOrcWriter.java | 28 +++++++++++++++---- 2 files changed, 35 insertions(+), 10 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java index 59cfe146ffd..a7de6a56885 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinBaseOrcWriter.java @@ -54,7 +54,7 @@ public abstract class GobblinBaseOrcWriter extends FsDataWriter { private final boolean enableRowBatchPool; // the close method may be invoked multiple times, but the underlying writer only supports close being called once - private volatile boolean closed = false; + protected volatile boolean closed = false; protected final int batchSize; protected final S inputSchema; @@ -116,6 +116,11 @@ public long recordsWritten() { return this.orcFileWriter.getNumberOfRows(); } + @Override + public long bytesWritten() { + return this.orcFileWriter.getRawDataSize(); + } + @Override public State getFinalState() { /** @@ -141,15 +146,19 @@ public void flush() } } + protected void recycleRowBatchPool() { + if (enableRowBatchPool) { + rowBatchPool.recycle(typeDescription, rowBatch); + } + } + protected synchronized void closeInternal() throws IOException { if (!closed) { this.flush(); this.orcFileWriter.close(); this.closed = true; - if (enableRowBatchPool) { - rowBatchPool.recycle(typeDescription, rowBatch); - } + this.recycleRowBatchPool(); } else { // Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds. if (rowBatch.size > 0) { diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java index 7d4a806ca2d..96c2eb0a597 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java @@ -25,6 +25,8 @@ import com.google.common.collect.Maps; +import lombok.extern.slf4j.Slf4j; + import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; import org.apache.gobblin.metrics.GobblinTrackingEvent; @@ -35,6 +37,7 @@ /*** * A class for an event emitting GobblinOrcWriter metrics, such as internal memory resizing and flushing */ +@Slf4j public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter { MetricContext metricContext; public static String METRICS_SCHEMA_NAME = "schemaName"; @@ -42,27 +45,40 @@ public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter { public static String METRICS_RECORDS_WRITTEN = "recordsWritten"; public static String METRICS_BUFFER_RESIZES = "bufferResizes"; public static String METRICS_BUFFER_SIZE = "bufferSize"; + public static String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics"; public InstrumentedGobblinOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { super(builder, properties); - metricContext = Instrumented.getMetricContext(new State(), this.getClass()); + metricContext = Instrumented.getMetricContext(new State(properties), this.getClass()); } @Override protected synchronized void closeInternal() throws IOException { - // Flushes the ORC file writer - super.closeInternal(); - this.metricContext.submitEvent(this.createOrcWriterMetadataEvent()); + // close() can be called multiple times by super.commit() and super.close(), but we only want to emit metrics once + if (!this.closed) { + this.flush(); + this.orcFileWriter.close(); + this.closed = true; + log.info("Emitting ORC event metrics"); + this.metricContext.submitEvent(this.createOrcWriterMetadataEvent()); + this.recycleRowBatchPool(); + } else { + // Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds. + if (rowBatch.size > 0) { + throw new CloseBeforeFlushException(this.inputSchema.toString()); + } + } } GobblinTrackingEvent createOrcWriterMetadataEvent() throws IOException { - GobblinEventBuilder builder = new GobblinEventBuilder("OrcWriterMetrics"); + GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME); Map eventMetadataMap = Maps.newHashMap(); eventMetadataMap.put(METRICS_SCHEMA_NAME, this.inputSchema.getName()); eventMetadataMap.put(METRICS_BYTES_WRITTEN, String.valueOf(this.bytesWritten())); eventMetadataMap.put(METRICS_RECORDS_WRITTEN, String.valueOf(this.recordsWritten())); eventMetadataMap.put(METRICS_BUFFER_RESIZES, String.valueOf(((GenericRecordToOrcValueWriter) this.valueWriter).getResizeCount())); - eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(rowBatch.size)); + eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(this.batchSize)); + builder.addAdditionalMetadata(eventMetadataMap); return builder.build(); } From b26419d2e05c6ebbf048d9c7ba2e13f596012e5f Mon Sep 17 00:00:00 2001 From: William Lo Date: Fri, 4 Aug 2023 19:31:40 -0400 Subject: [PATCH 3/4] Fix add final to config strings --- .../gobblin/writer/GobblinOrcWriterBuilder.java | 2 +- .../gobblin/writer/InstrumentedGobblinOrcWriter.java | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java index 47f5c8ca3d4..d626891b26a 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/GobblinOrcWriterBuilder.java @@ -30,7 +30,7 @@ * The WriterBuilder extension to create {@link GobblinOrcWriter} on top of {@link FsDataWriterBuilder} */ public class GobblinOrcWriterBuilder extends FsDataWriterBuilder { - public static String ORC_WRITER_INSTRUMENTED = GobblinBaseOrcWriter.ORC_WRITER_PREFIX + "instrumented"; + public static final String ORC_WRITER_INSTRUMENTED = GobblinBaseOrcWriter.ORC_WRITER_PREFIX + "instrumented"; public GobblinOrcWriterBuilder() { } diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java index 96c2eb0a597..4afe77d103b 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java @@ -40,12 +40,12 @@ @Slf4j public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter { MetricContext metricContext; - public static String METRICS_SCHEMA_NAME = "schemaName"; - public static String METRICS_BYTES_WRITTEN = "bytesWritten"; - public static String METRICS_RECORDS_WRITTEN = "recordsWritten"; - public static String METRICS_BUFFER_RESIZES = "bufferResizes"; - public static String METRICS_BUFFER_SIZE = "bufferSize"; - public static String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics"; + public static final String METRICS_SCHEMA_NAME = "schemaName"; + public static final String METRICS_BYTES_WRITTEN = "bytesWritten"; + public static final String METRICS_RECORDS_WRITTEN = "recordsWritten"; + public static final String METRICS_BUFFER_RESIZES = "bufferResizes"; + public static final String METRICS_BUFFER_SIZE = "bufferSize"; + public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics"; public InstrumentedGobblinOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { super(builder, properties); From 0f7ad199cdf5baeea0c77b6196b3ed00f8884722 Mon Sep 17 00:00:00 2001 From: William Lo Date: Fri, 11 Aug 2023 01:05:37 -0400 Subject: [PATCH 4/4] Add namespace and use event submitter to send events --- .../gobblin/writer/InstrumentedGobblinOrcWriter.java | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java index 4afe77d103b..c32f1e4e38a 100644 --- a/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java +++ b/gobblin-modules/gobblin-orc/src/main/java/org/apache/gobblin/writer/InstrumentedGobblinOrcWriter.java @@ -29,8 +29,8 @@ import org.apache.gobblin.configuration.State; import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.GobblinTrackingEvent; import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.event.EventSubmitter; import org.apache.gobblin.metrics.event.GobblinEventBuilder; @@ -46,6 +46,7 @@ public class InstrumentedGobblinOrcWriter extends GobblinOrcWriter { public static final String METRICS_BUFFER_RESIZES = "bufferResizes"; public static final String METRICS_BUFFER_SIZE = "bufferSize"; public static final String ORC_WRITER_METRICS_NAME = "OrcWriterMetrics"; + private static final String ORC_WRITER_NAMESPACE = "gobblin.orc.writer"; public InstrumentedGobblinOrcWriter(FsDataWriterBuilder builder, State properties) throws IOException { super(builder, properties); @@ -60,7 +61,7 @@ protected synchronized void closeInternal() throws IOException { this.orcFileWriter.close(); this.closed = true; log.info("Emitting ORC event metrics"); - this.metricContext.submitEvent(this.createOrcWriterMetadataEvent()); + this.sendOrcWriterMetadataEvent(); this.recycleRowBatchPool(); } else { // Throw fatal exception if there's outstanding buffered data since there's risk losing data if proceeds. @@ -70,8 +71,8 @@ protected synchronized void closeInternal() throws IOException { } } - GobblinTrackingEvent createOrcWriterMetadataEvent() throws IOException { - GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME); + private void sendOrcWriterMetadataEvent() { + GobblinEventBuilder builder = new GobblinEventBuilder(ORC_WRITER_METRICS_NAME, ORC_WRITER_NAMESPACE); Map eventMetadataMap = Maps.newHashMap(); eventMetadataMap.put(METRICS_SCHEMA_NAME, this.inputSchema.getName()); eventMetadataMap.put(METRICS_BYTES_WRITTEN, String.valueOf(this.bytesWritten())); @@ -80,6 +81,6 @@ GobblinTrackingEvent createOrcWriterMetadataEvent() throws IOException { eventMetadataMap.put(METRICS_BUFFER_SIZE, String.valueOf(this.batchSize)); builder.addAdditionalMetadata(eventMetadataMap); - return builder.build(); + EventSubmitter.submit(metricContext, builder); } }