Skip to content

Commit

Permalink
Merge branch 'apache:main' into GH-36760-Go]-Adding-avro-ocf-reader--…
Browse files Browse the repository at this point in the history
…-reader
  • Loading branch information
loicalleyne authored Aug 10, 2023
2 parents 5865fef + 0e4fa8a commit f74957d
Show file tree
Hide file tree
Showing 5 changed files with 77 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void initialize(VectorSchemaRoot root) throws SQLException {
for (int i = 1; i <= consumers.length; i++) {
final JdbcFieldInfo columnFieldInfo = JdbcToArrowUtils.getJdbcFieldInfoForColumn(rsmd, i, config);
ArrowType arrowType = config.getJdbcToArrowTypeConverter().apply(columnFieldInfo);
consumers[i - 1] = JdbcToArrowUtils.getConsumer(
consumers[i - 1] = config.getJdbcConsumerGetter().apply(
arrowType, i, isColumnNullable(resultSet.getMetaData(), i, columnFieldInfo), root.getVector(i - 1), config);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Map;
import java.util.function.Function;

import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.ArrowType;

/**
Expand Down Expand Up @@ -76,6 +78,7 @@ public final class JdbcToArrowConfig {
private final int targetBatchSize;

private final Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter;
private final JdbcConsumerFactory jdbcConsumerGetter;

/**
* Constructs a new configuration from the provided allocator and calendar. The <code>allocator</code>
Expand Down Expand Up @@ -195,6 +198,38 @@ public final class JdbcToArrowConfig {
Map<String, String> schemaMetadata,
Map<Integer, Map<String, String>> columnMetadataByColumnIndex,
RoundingMode bigDecimalRoundingMode) {
this(
allocator,
calendar,
includeMetadata,
reuseVectorSchemaRoot,
arraySubTypesByColumnIndex,
arraySubTypesByColumnName,
targetBatchSize,
jdbcToArrowTypeConverter,
null,
explicitTypesByColumnIndex,
explicitTypesByColumnName,
schemaMetadata,
columnMetadataByColumnIndex,
bigDecimalRoundingMode);
}

JdbcToArrowConfig(
BufferAllocator allocator,
Calendar calendar,
boolean includeMetadata,
boolean reuseVectorSchemaRoot,
Map<Integer, JdbcFieldInfo> arraySubTypesByColumnIndex,
Map<String, JdbcFieldInfo> arraySubTypesByColumnName,
int targetBatchSize,
Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter,
JdbcConsumerFactory jdbcConsumerGetter,
Map<Integer, JdbcFieldInfo> explicitTypesByColumnIndex,
Map<String, JdbcFieldInfo> explicitTypesByColumnName,
Map<String, String> schemaMetadata,
Map<Integer, Map<String, String>> columnMetadataByColumnIndex,
RoundingMode bigDecimalRoundingMode) {
Preconditions.checkNotNull(allocator, "Memory allocator cannot be null");
this.allocator = allocator;
this.calendar = calendar;
Expand All @@ -212,6 +247,8 @@ public final class JdbcToArrowConfig {
// set up type converter
this.jdbcToArrowTypeConverter = jdbcToArrowTypeConverter != null ? jdbcToArrowTypeConverter :
(jdbcFieldInfo) -> JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);

this.jdbcConsumerGetter = jdbcConsumerGetter != null ? jdbcConsumerGetter : JdbcToArrowUtils::getConsumer;
}

/**
Expand Down Expand Up @@ -264,6 +301,13 @@ public Function<JdbcFieldInfo, ArrowType> getJdbcToArrowTypeConverter() {
return jdbcToArrowTypeConverter;
}

/**
* Gets the JDBC consumer getter.
*/
public JdbcConsumerFactory getJdbcConsumerGetter() {
return jdbcConsumerGetter;
}

/**
* Returns the array sub-type {@link JdbcFieldInfo} defined for the provided column index.
*
Expand Down Expand Up @@ -338,4 +382,13 @@ public Map<Integer, Map<String, String>> getColumnMetadataByColumnIndex() {
public RoundingMode getBigDecimalRoundingMode() {
return bigDecimalRoundingMode;
}

/**
* Interface for a function that gets a JDBC consumer for the given values.
*/
@FunctionalInterface
public interface JdbcConsumerFactory {
JdbcConsumer apply(ArrowType arrowType, int columnIndex, boolean nullable, FieldVector vector,
JdbcToArrowConfig config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.ArrowType;

/**
Expand All @@ -44,6 +45,7 @@ public class JdbcToArrowConfigBuilder {
private Map<Integer, Map<String, String>> columnMetadataByColumnIndex;
private int targetBatchSize;
private Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter;
private JdbcToArrowConfig.JdbcConsumerFactory jdbcConsumerGetter;
private RoundingMode bigDecimalRoundingMode;

/**
Expand Down Expand Up @@ -221,6 +223,18 @@ public JdbcToArrowConfigBuilder setJdbcToArrowTypeConverter(
return this;
}

/**
* Set the function used to get a JDBC consumer for a given type.
* <p>
* Defaults to wrapping {@link
* JdbcToArrowUtils#getConsumer(ArrowType, Integer, Boolean, FieldVector, JdbcToArrowConfig)}.
*/
public JdbcToArrowConfigBuilder setJdbcConsumerGetter(
JdbcToArrowConfig.JdbcConsumerFactory jdbcConsumerGetter) {
this.jdbcConsumerGetter = jdbcConsumerGetter;
return this;
}

/**
* Set whether to use the same {@link org.apache.arrow.vector.VectorSchemaRoot} instance on each iteration,
* or to allocate a new one.
Expand Down Expand Up @@ -274,6 +288,7 @@ public JdbcToArrowConfig build() {
arraySubTypesByColumnName,
targetBatchSize,
jdbcToArrowTypeConverter,
jdbcConsumerGetter,
explicitTypesByColumnIndex,
explicitTypesByColumnName,
schemaMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static ArrowType getArrowTypeFromJdbcType(final JdbcFieldInfo fieldInfo,
return new ArrowType.Struct();
default:
// no-op, shouldn't get here
return null;
throw new UnsupportedOperationException("Unmapped JDBC type: " + fieldInfo.getJdbcType());
}
}

Expand Down Expand Up @@ -489,7 +489,7 @@ static JdbcConsumer getConsumer(ArrowType arrowType, int columnIndex, boolean nu
return new NullConsumer((NullVector) vector);
default:
// no-op, shouldn't get here
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("No consumer for Arrow type: " + arrowType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,12 @@ private static boolean populateDerbyDatabase() {
}

private static ArrowType getArrowTypeFromJdbcType(final int jdbcDataType, final int precision, final int scale) {
final ArrowType type =
JdbcToArrowUtils.getArrowTypeFromJdbcType(new JdbcFieldInfo(jdbcDataType, precision, scale), DEFAULT_CALENDAR);
return isNull(type) ? ArrowType.Utf8.INSTANCE : type;
try {
return JdbcToArrowUtils.getArrowTypeFromJdbcType(new JdbcFieldInfo(jdbcDataType, precision, scale),
DEFAULT_CALENDAR);
} catch (UnsupportedOperationException ignored) {
return ArrowType.Utf8.INSTANCE;
}
}

private static void saveToVector(final Byte data, final UInt1Vector vector, final int index) {
Expand Down

0 comments on commit f74957d

Please sign in to comment.