From 82414eaf0bc8ee0dccf9c7a4cb2ae6288ea02236 Mon Sep 17 00:00:00 2001 From: Sushant Raikar Date: Thu, 17 Sep 2020 12:07:34 -0700 Subject: [PATCH] Made Predicate Pushdown dynamic based on the Hive Version --- .../mr/hive/HiveIcebergFilterFactory.java | 26 +++++++++++------ .../mr/hive/HiveIcebergInputFormat.java | 29 +++++++++++++++---- 2 files changed, 41 insertions(+), 14 deletions(-) diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java index 63e823c65..07a7980aa 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergFilterFactory.java @@ -27,8 +27,8 @@ import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; -import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.iceberg.common.DynClasses; import org.apache.iceberg.common.DynFields; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.expressions.Expressions; @@ -115,8 +115,16 @@ private static Expression translateLeaf(PredicateLeaf leaf) { // PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to // Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it. + private static final Class PREDICATE_LEAF_IMPL_CLASS = DynClasses.builder() + .impl("org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl") + .build(); + private static final DynFields.UnboundField LITERAL_FIELD = DynFields.builder() - .hiddenImpl(SearchArgumentImpl.PredicateLeafImpl.class, "literal") + .hiddenImpl(PREDICATE_LEAF_IMPL_CLASS, "literal") + .build(); + + private static final DynFields.UnboundField> LITERAL_LIST_FIELD = DynFields.builder() + .hiddenImpl(PREDICATE_LEAF_IMPL_CLASS, "literalList") .build(); private static Object leafToLiteral(PredicateLeaf leaf) { @@ -125,13 +133,13 @@ private static Object leafToLiteral(PredicateLeaf leaf) { case BOOLEAN: case STRING: case FLOAT: - return leaf.getLiteral(); + return LITERAL_FIELD.get(leaf); case DATE: - return daysFromTimestamp((Timestamp) leaf.getLiteral()); + return daysFromTimestamp(new Timestamp(((java.util.Date) LITERAL_FIELD.get(leaf)).getTime())); case TIMESTAMP: return microsFromTimestamp((Timestamp) LITERAL_FIELD.get(leaf)); case DECIMAL: - return hiveDecimalToBigDecimal((HiveDecimalWritable) leaf.getLiteral()); + return hiveDecimalToBigDecimal((HiveDecimalWritable) LITERAL_FIELD.get(leaf)); default: throw new UnsupportedOperationException("Unknown type: " + leaf.getType()); @@ -144,16 +152,16 @@ private static List leafToLiteralList(PredicateLeaf leaf) { case BOOLEAN: case FLOAT: case STRING: - return leaf.getLiteralList(); + return LITERAL_LIST_FIELD.get(leaf); case DATE: - return leaf.getLiteralList().stream().map(value -> daysFromDate((Date) value)) + return LITERAL_LIST_FIELD.get(leaf).stream().map(value -> daysFromDate((Date) value)) .collect(Collectors.toList()); case DECIMAL: - return leaf.getLiteralList().stream() + return LITERAL_LIST_FIELD.get(leaf).stream() .map(value -> hiveDecimalToBigDecimal((HiveDecimalWritable) value)) .collect(Collectors.toList()); case TIMESTAMP: - return leaf.getLiteralList().stream() + return LITERAL_LIST_FIELD.get(leaf).stream() .map(value -> microsFromTimestamp((Timestamp) value)) .collect(Collectors.toList()); default: diff --git a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java index a8a31d2bc..26307b1e8 100644 --- a/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java +++ b/mr/src/main/java/org/apache/iceberg/mr/hive/HiveIcebergInputFormat.java @@ -23,14 +23,13 @@ import java.util.Arrays; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.exec.SerializationUtilities; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; -import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.iceberg.common.DynMethods; import org.apache.iceberg.data.Record; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.mr.InputFormatConfig; @@ -44,15 +43,27 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat implements CombineHiveInputFormat.AvoidSplitCombination { private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class); + private static final DynMethods.StaticMethod DESERIALIZE_OBJECT = DynMethods.builder("deserializeObject") + .impl("org.apache.hadoop.hive.ql.exec.SerializationUtilities", String.class, Class.class) + .impl("org.apache.hadoop.hive.ql.exec.Utilities", String.class, Class.class) + .buildStatic(); + private static final DynMethods.StaticMethod CONSTRUCT_SARG_HIVE_1 = DynMethods.builder("create") + .impl("org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory", ExprNodeGenericFuncDesc.class) + .orNoop() + .buildStatic(); + private static final DynMethods.StaticMethod CONSTRUCT_SARG_HIVE_2 = DynMethods.builder("create") + .impl("org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg", + Configuration.class, ExprNodeGenericFuncDesc.class) + .orNoop() + .buildStatic(); @Override public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { // Convert Hive filter to Iceberg filter String hiveFilter = job.get(TableScanDesc.FILTER_EXPR_CONF_STR); if (hiveFilter != null) { - ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities - .deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class); - SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc); + ExprNodeGenericFuncDesc exprNodeDesc = DESERIALIZE_OBJECT.invoke(hiveFilter, ExprNodeGenericFuncDesc.class); + SearchArgument sarg = constructSearchArgument(job, exprNodeDesc); try { Expression filter = HiveIcebergFilterFactory.generateFilterExpression(sarg); job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter)); @@ -71,4 +82,12 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException { public boolean shouldSkipCombine(Path path, Configuration conf) { return true; } + + private SearchArgument constructSearchArgument(JobConf job, ExprNodeGenericFuncDesc exprNodeDesc) { + SearchArgument searchArgument = CONSTRUCT_SARG_HIVE_2.invoke(job, exprNodeDesc); + if (searchArgument == null) { + searchArgument = CONSTRUCT_SARG_HIVE_1.invoke(exprNodeDesc); + } + return searchArgument; + } }