Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GH-37021 [Java][arrow-jdbc] Pluggable getConsumer #37085

Merged
merged 8 commits into from
Aug 10, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ private void initialize(VectorSchemaRoot root) throws SQLException {
for (int i = 1; i <= consumers.length; i++) {
final JdbcFieldInfo columnFieldInfo = JdbcToArrowUtils.getJdbcFieldInfoForColumn(rsmd, i, config);
ArrowType arrowType = config.getJdbcToArrowTypeConverter().apply(columnFieldInfo);
consumers[i - 1] = JdbcToArrowUtils.getConsumer(
consumers[i - 1] = config.getJdbcConsumerGetter().apply(
arrowType, i, isColumnNullable(resultSet.getMetaData(), i, columnFieldInfo), root.getVector(i - 1), config);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import java.util.Map;
import java.util.function.Function;

import org.apache.arrow.adapter.jdbc.consumer.JdbcConsumer;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.ArrowType;

/**
Expand Down Expand Up @@ -76,6 +78,7 @@ public final class JdbcToArrowConfig {
private final int targetBatchSize;

private final Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter;
private final JdbcConsumerFactory jdbcConsumerGetter;

/**
* Constructs a new configuration from the provided allocator and calendar. The <code>allocator</code>
Expand Down Expand Up @@ -195,6 +198,38 @@ public final class JdbcToArrowConfig {
Map<String, String> schemaMetadata,
Map<Integer, Map<String, String>> columnMetadataByColumnIndex,
RoundingMode bigDecimalRoundingMode) {
this(
allocator,
calendar,
includeMetadata,
reuseVectorSchemaRoot,
arraySubTypesByColumnIndex,
arraySubTypesByColumnName,
targetBatchSize,
jdbcToArrowTypeConverter,
null,
explicitTypesByColumnIndex,
explicitTypesByColumnName,
schemaMetadata,
columnMetadataByColumnIndex,
bigDecimalRoundingMode);
}

JdbcToArrowConfig(
BufferAllocator allocator,
Calendar calendar,
boolean includeMetadata,
boolean reuseVectorSchemaRoot,
Map<Integer, JdbcFieldInfo> arraySubTypesByColumnIndex,
Map<String, JdbcFieldInfo> arraySubTypesByColumnName,
int targetBatchSize,
Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter,
JdbcConsumerFactory jdbcConsumerGetter,
Map<Integer, JdbcFieldInfo> explicitTypesByColumnIndex,
Map<String, JdbcFieldInfo> explicitTypesByColumnName,
Map<String, String> schemaMetadata,
Map<Integer, Map<String, String>> columnMetadataByColumnIndex,
RoundingMode bigDecimalRoundingMode) {
Preconditions.checkNotNull(allocator, "Memory allocator cannot be null");
this.allocator = allocator;
this.calendar = calendar;
Expand All @@ -212,6 +247,8 @@ public final class JdbcToArrowConfig {
// set up type converter
this.jdbcToArrowTypeConverter = jdbcToArrowTypeConverter != null ? jdbcToArrowTypeConverter :
(jdbcFieldInfo) -> JdbcToArrowUtils.getArrowTypeFromJdbcType(jdbcFieldInfo, calendar);

this.jdbcConsumerGetter = jdbcConsumerGetter != null ? jdbcConsumerGetter : JdbcToArrowUtils::getConsumer;
}

/**
Expand Down Expand Up @@ -264,6 +301,13 @@ public Function<JdbcFieldInfo, ArrowType> getJdbcToArrowTypeConverter() {
return jdbcToArrowTypeConverter;
}

/**
* Gets the JDBC consumer getter.
*/
public JdbcConsumerFactory getJdbcConsumerGetter() {
return jdbcConsumerGetter;
}

/**
* Returns the array sub-type {@link JdbcFieldInfo} defined for the provided column index.
*
Expand Down Expand Up @@ -338,4 +382,13 @@ public Map<Integer, Map<String, String>> getColumnMetadataByColumnIndex() {
public RoundingMode getBigDecimalRoundingMode() {
return bigDecimalRoundingMode;
}

/**
* Interface for a function that gets a JDBC consumer for the given values.
*/
@FunctionalInterface
public interface JdbcConsumerFactory {
JdbcConsumer apply(ArrowType arrowType, int columnIndex, boolean nullable, FieldVector vector,
JdbcToArrowConfig config);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.util.Preconditions;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.types.pojo.ArrowType;

/**
Expand All @@ -44,6 +45,7 @@ public class JdbcToArrowConfigBuilder {
private Map<Integer, Map<String, String>> columnMetadataByColumnIndex;
private int targetBatchSize;
private Function<JdbcFieldInfo, ArrowType> jdbcToArrowTypeConverter;
private JdbcToArrowConfig.JdbcConsumerFactory jdbcConsumerGetter;
private RoundingMode bigDecimalRoundingMode;

/**
Expand Down Expand Up @@ -221,6 +223,18 @@ public JdbcToArrowConfigBuilder setJdbcToArrowTypeConverter(
return this;
}

/**
* Set the function used to get a JDBC consumer for a given type.
* <p>
* Defaults to wrapping {@link
* JdbcToArrowUtils#getConsumer(ArrowType, Integer, Boolean, FieldVector, JdbcToArrowConfig)}.
*/
public JdbcToArrowConfigBuilder setJdbcConsumerGetter(
JdbcToArrowConfig.JdbcConsumerFactory jdbcConsumerGetter) {
this.jdbcConsumerGetter = jdbcConsumerGetter;
return this;
}

/**
* Set whether to use the same {@link org.apache.arrow.vector.VectorSchemaRoot} instance on each iteration,
* or to allocate a new one.
Expand Down Expand Up @@ -274,6 +288,7 @@ public JdbcToArrowConfig build() {
arraySubTypesByColumnName,
targetBatchSize,
jdbcToArrowTypeConverter,
jdbcConsumerGetter,
explicitTypesByColumnIndex,
explicitTypesByColumnName,
schemaMetadata,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ public static ArrowType getArrowTypeFromJdbcType(final JdbcFieldInfo fieldInfo,
return new ArrowType.Struct();
default:
// no-op, shouldn't get here
return null;
throw new UnsupportedOperationException("Unmapped JDBC type: " + fieldInfo.getJdbcType());
}
}

Expand Down Expand Up @@ -489,7 +489,7 @@ static JdbcConsumer getConsumer(ArrowType arrowType, int columnIndex, boolean nu
return new NullConsumer((NullVector) vector);
default:
// no-op, shouldn't get here
throw new UnsupportedOperationException();
throw new UnsupportedOperationException("No consumer for Arrow type: " + arrowType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -299,9 +299,12 @@ private static boolean populateDerbyDatabase() {
}

private static ArrowType getArrowTypeFromJdbcType(final int jdbcDataType, final int precision, final int scale) {
final ArrowType type =
JdbcToArrowUtils.getArrowTypeFromJdbcType(new JdbcFieldInfo(jdbcDataType, precision, scale), DEFAULT_CALENDAR);
return isNull(type) ? ArrowType.Utf8.INSTANCE : type;
try {
return JdbcToArrowUtils.getArrowTypeFromJdbcType(new JdbcFieldInfo(jdbcDataType, precision, scale),
DEFAULT_CALENDAR);
} catch (UnsupportedOperationException ignored) {
return ArrowType.Utf8.INSTANCE;
}
}

private static void saveToVector(final Byte data, final UInt1Vector vector, final int index) {
Expand Down
Loading