Skip to content

Commit

Permalink
Made Predicate Pushdown dynamic based on the Hive Version
Browse files Browse the repository at this point in the history
  • Loading branch information
HotSushi authored and shardulm94 committed Sep 25, 2020
1 parent 46f2a4d commit 82414ea
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
import org.apache.hadoop.hive.ql.io.sarg.ExpressionTree;
import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl;
import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable;
import org.apache.iceberg.common.DynClasses;
import org.apache.iceberg.common.DynFields;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
Expand Down Expand Up @@ -115,8 +115,16 @@ private static Expression translateLeaf(PredicateLeaf leaf) {

// PredicateLeafImpl has a work-around for Kryo serialization with java.util.Date objects where it converts values to
// Timestamp using Date#getTime. This conversion discards microseconds, so this is a necessary to avoid it.
private static final Class PREDICATE_LEAF_IMPL_CLASS = DynClasses.builder()
.impl("org.apache.hadoop.hive.ql.io.sarg.SearchArgumentImpl$PredicateLeafImpl")
.build();

private static final DynFields.UnboundField<?> LITERAL_FIELD = DynFields.builder()
.hiddenImpl(SearchArgumentImpl.PredicateLeafImpl.class, "literal")
.hiddenImpl(PREDICATE_LEAF_IMPL_CLASS, "literal")
.build();

private static final DynFields.UnboundField<List<Object>> LITERAL_LIST_FIELD = DynFields.builder()
.hiddenImpl(PREDICATE_LEAF_IMPL_CLASS, "literalList")
.build();

private static Object leafToLiteral(PredicateLeaf leaf) {
Expand All @@ -125,13 +133,13 @@ private static Object leafToLiteral(PredicateLeaf leaf) {
case BOOLEAN:
case STRING:
case FLOAT:
return leaf.getLiteral();
return LITERAL_FIELD.get(leaf);
case DATE:
return daysFromTimestamp((Timestamp) leaf.getLiteral());
return daysFromTimestamp(new Timestamp(((java.util.Date) LITERAL_FIELD.get(leaf)).getTime()));
case TIMESTAMP:
return microsFromTimestamp((Timestamp) LITERAL_FIELD.get(leaf));
case DECIMAL:
return hiveDecimalToBigDecimal((HiveDecimalWritable) leaf.getLiteral());
return hiveDecimalToBigDecimal((HiveDecimalWritable) LITERAL_FIELD.get(leaf));

default:
throw new UnsupportedOperationException("Unknown type: " + leaf.getType());
Expand All @@ -144,16 +152,16 @@ private static List<Object> leafToLiteralList(PredicateLeaf leaf) {
case BOOLEAN:
case FLOAT:
case STRING:
return leaf.getLiteralList();
return LITERAL_LIST_FIELD.get(leaf);
case DATE:
return leaf.getLiteralList().stream().map(value -> daysFromDate((Date) value))
return LITERAL_LIST_FIELD.get(leaf).stream().map(value -> daysFromDate((Date) value))
.collect(Collectors.toList());
case DECIMAL:
return leaf.getLiteralList().stream()
return LITERAL_LIST_FIELD.get(leaf).stream()
.map(value -> hiveDecimalToBigDecimal((HiveDecimalWritable) value))
.collect(Collectors.toList());
case TIMESTAMP:
return leaf.getLiteralList().stream()
return LITERAL_LIST_FIELD.get(leaf).stream()
.map(value -> microsFromTimestamp((Timestamp) value))
.collect(Collectors.toList());
default:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,14 +23,13 @@
import java.util.Arrays;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.exec.SerializationUtilities;
import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat;
import org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg;
import org.apache.hadoop.hive.ql.io.sarg.SearchArgument;
import org.apache.hadoop.hive.ql.plan.ExprNodeGenericFuncDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.iceberg.common.DynMethods;
import org.apache.iceberg.data.Record;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.mr.InputFormatConfig;
Expand All @@ -44,15 +43,27 @@ public class HiveIcebergInputFormat extends MapredIcebergInputFormat<Record>
implements CombineHiveInputFormat.AvoidSplitCombination {

private static final Logger LOG = LoggerFactory.getLogger(HiveIcebergInputFormat.class);
private static final DynMethods.StaticMethod DESERIALIZE_OBJECT = DynMethods.builder("deserializeObject")
.impl("org.apache.hadoop.hive.ql.exec.SerializationUtilities", String.class, Class.class)
.impl("org.apache.hadoop.hive.ql.exec.Utilities", String.class, Class.class)
.buildStatic();
private static final DynMethods.StaticMethod CONSTRUCT_SARG_HIVE_1 = DynMethods.builder("create")
.impl("org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory", ExprNodeGenericFuncDesc.class)
.orNoop()
.buildStatic();
private static final DynMethods.StaticMethod CONSTRUCT_SARG_HIVE_2 = DynMethods.builder("create")
.impl("org.apache.hadoop.hive.ql.io.sarg.ConvertAstToSearchArg",
Configuration.class, ExprNodeGenericFuncDesc.class)
.orNoop()
.buildStatic();

@Override
public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
// Convert Hive filter to Iceberg filter
String hiveFilter = job.get(TableScanDesc.FILTER_EXPR_CONF_STR);
if (hiveFilter != null) {
ExprNodeGenericFuncDesc exprNodeDesc = SerializationUtilities
.deserializeObject(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = ConvertAstToSearchArg.create(job, exprNodeDesc);
ExprNodeGenericFuncDesc exprNodeDesc = DESERIALIZE_OBJECT.invoke(hiveFilter, ExprNodeGenericFuncDesc.class);
SearchArgument sarg = constructSearchArgument(job, exprNodeDesc);
try {
Expression filter = HiveIcebergFilterFactory.generateFilterExpression(sarg);
job.set(InputFormatConfig.FILTER_EXPRESSION, SerializationUtil.serializeToBase64(filter));
Expand All @@ -71,4 +82,12 @@ public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
public boolean shouldSkipCombine(Path path, Configuration conf) {
return true;
}

private SearchArgument constructSearchArgument(JobConf job, ExprNodeGenericFuncDesc exprNodeDesc) {
SearchArgument searchArgument = CONSTRUCT_SARG_HIVE_2.invoke(job, exprNodeDesc);
if (searchArgument == null) {
searchArgument = CONSTRUCT_SARG_HIVE_1.invoke(exprNodeDesc);
}
return searchArgument;
}
}

0 comments on commit 82414ea

Please sign in to comment.