Skip to content
This repository was archived by the owner on Mar 30, 2021. It is now read-only.

Commit

Permalink
Support ISNULL, add constant folding
Browse files Browse the repository at this point in the history
  • Loading branch information
jpullokkaran authored and Harish Butani committed Feb 19, 2017
1 parent 2b677f2 commit 0d8f746
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

package org.apache.spark.sql.sources.druid

import org.apache.spark.sql.{DataFrame, SQLContext}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.execution.datasources.LogicalRelation
Expand Down Expand Up @@ -96,9 +95,10 @@ trait ProjectFilterTransfom {
case (_, PhysicalOperation(projectList, filters,
l@LogicalRelation(d@DruidRelation(info, None), _, _))) => {
val dqb: Option[DruidQueryBuilder] = Some(DruidQueryBuilder(info))
translateProjectFilter(dqb,
val sfe = ExprUtil.simplifyConjPred(dqb.get, filters)
translateProjectFilter(Some(sfe._2),
projectList,
ExprUtil.simplifyPreds(dqb.get, filters))
sfe._1)
}
case _ => Seq()
}
Expand Down
100 changes: 88 additions & 12 deletions src/main/scala/org/apache/spark/sql/util/ExprUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,13 @@

package org.apache.spark.sql.util

import org.apache.spark.sql.catalyst.expressions.{CaseKeyWhen, _}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.apache.spark.sql.catalyst.trees.CurrentOrigin
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.types._
import org.apache.spark.sql.catalyst.plans.logical.{LogicalPlan, Project}
import org.joda.time.DateTime
import org.sparklinedata.druid.DruidQueryBuilder
import org.sparklinedata.druid.metadata.DruidDimension
import org.sparklinedata.druid.{DruidQueryBuilder, IntervalCondition, IntervalConditionType}


object ExprUtil {
Expand Down Expand Up @@ -90,6 +89,47 @@ object ExprUtil {
}
}

object ExpressionVal {

sealed trait EnumVal

case object ALWAYSFALSE extends EnumVal

case object ALWAYSTRUE extends EnumVal

case object UNKNOWN extends EnumVal

}

val alwaysFalseExpr = EqualTo(Literal(1), Literal(2))

def foldExpr(e: Expression): (Expression, ExpressionVal.EnumVal) = {
val fe = e transformUp {
// Skip redundant folding of literals. This rule is technically not necessary. Placing this
// here avoids running the next rule for Literal values, which would create a new Literal
// object and running eval unnecessarily.
case l: Literal => l
case Or(Literal(false, BooleanType), Literal(false, BooleanType)) => Literal(false)
case Or(le, Literal(false, BooleanType)) => le
case Or(Literal(false, BooleanType), re) => re
// Fold expressions that are foldable.
case e if e.foldable => Literal.create(e.eval(EmptyRow), e.dataType)
}
var eVal: ExpressionVal.EnumVal = ExpressionVal.UNKNOWN
if (fe.foldable) {
val fev = Literal.create(fe.eval(EmptyRow), fe.dataType)
if (fev.dataType == BooleanType) {
if (fev.value.asInstanceOf[Boolean]) {
eVal = ExpressionVal.ALWAYSTRUE
} else {
eVal = ExpressionVal.ALWAYSFALSE
}
}
}

(fe, eVal)
}

/**
* Simplify Cast expression by removing inner most cast if reduendant
*
Expand All @@ -105,20 +145,43 @@ object ExprUtil {
}

/**
* Simplify given predicates
* Simplify given predicate elements of a conjunction.
* If any conjuctive element can be folded to false then NULLSCAN is assumed
* and NULSCAN is set on DruidQueryBuilder by adding an invalid index interval.
*
* @param dqb DruidQueryBuilder
* @param fils Predicates
* @param dqb DruidQueryBuilder
* @param pl Predicates elements of a conjunction
* @return
*/
def simplifyPreds(dqb: DruidQueryBuilder, fils: Seq[Expression]): Seq[Expression] =
fils.foldLeft[Seq[Expression]](List[Expression]()) { (l, f) =>
def simplifyConjPred(dqb: DruidQueryBuilder, pl: Seq[Expression]):
(Seq[Expression], DruidQueryBuilder) = {
var nullScan = false
val spl = pl.foldLeft[Seq[Expression]](List[Expression]()) { (l, pe) =>
var tpl = l
for (tp <- simplifyPred(dqb, f))
tpl = l :+ tp
for (tpe <- simplifyPred(dqb, pe)) {
val fpe = foldExpr(tpe)
if (fpe._2 == ExpressionVal.ALWAYSFALSE) nullScan = true
tpl = l :+ fpe._1
}
tpl
}

var splToRet = spl
if (nullScan) {
// TODO: remove this nonsense once NULLSCAN is properly supported
// Currently we don't push filters that are just literals
splToRet = pl.flatMap(e => e.references).map(e => IsNull(e))
}

// if needed set in NULLSCAN by setting invalid index interval
(splToRet, if (nullScan) {
dqb.interval(IntervalCondition(IntervalConditionType.LT,
dqb.drInfo.druidDS.intervals.head.getStart)).get
} else {
dqb
})
}

/**
* Simplify given Predicate. Does rewrites for cast, Conj/Disj, not null expressions.
*
Expand Down Expand Up @@ -153,6 +216,19 @@ object ExprUtil {
nullFil = None
}
nullFil
case fe@IsNull(ce) =>
var nullFil: Option[Expression] = Some(fe)
if (ce.nullable) {
if (ExprUtil.nullPreserving(ce)) {
val v1 = nullableAttributes(dqb, ce.references)
if (v1._2.isEmpty && v1._1.isEmpty) {
nullFil = Some(alwaysFalseExpr)
}
}
} else {
nullFil = Some(alwaysFalseExpr)
}
nullFil
case _ => Some(e)
}
}
Expand Down Expand Up @@ -372,4 +448,4 @@ object ExprUtil {
case _ => Some(exprs.tail.fold(exprs.head)(And(_,_)))
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -981,5 +981,37 @@ class CodeGenTest extends BaseTest with BeforeAndAfterAll with SPLLogging {
|group by c_name, o_orderdate
""".stripMargin
, 1, true, true)

test("isNull1",
"""
|select sum(c_acctbal), c_name from orderLineItemPartSupplier
|where l_shipdate is null
|group by c_name
""".stripMargin
, 1, true, true)

test("isNull2",
"""
|select sum(c_acctbal), c_name from orderLineItemPartSupplier
|where l_shipdate is null and (cast(l_shipdate as bigint) + 10) is not null
|group by c_name
""".stripMargin
, 1, true, true)

test("isNull3",
"""
|select sum(c_acctbal), c_name from orderLineItemPartSupplier
|where o_orderdate is not null and l_commitdate is null and (cast(l_shipdate as bigint) + 10) is not null
|group by c_name
""".stripMargin
, 1, true, true)

test("isNull4",
"""
|select sum(c_acctbal), c_name from orderLineItemPartSupplier
|where (o_orderdate is not null and l_commitdate is null) or ((cast(l_shipdate as bigint) + 10) is null)
|group by c_name
""".stripMargin
, 1, true, true)
}

0 comments on commit 0d8f746

Please sign in to comment.