Skip to content

Commit

Permalink
update
Browse files Browse the repository at this point in the history
  • Loading branch information
zml1206 committed Sep 6, 2024
1 parent 93e14cc commit 5015496
Showing 1 changed file with 63 additions and 61 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ import org.apache.spark.sql.catalyst.rules.Rule
import org.apache.spark.sql.execution.{DeserializeToObjectExec, LeafExecNode, ProjectExec, SerializeFromObjectExec, SparkPlan, UnionExec}

import scala.collection.mutable
import scala.collection.mutable.Map

/**
* The Spark implementations of input_file_name/input_file_block_start/input_file_block_length uses
Expand All @@ -45,72 +44,75 @@ object PushDownInputFileExpression {
case _ => expr.children.exists(containsInputFileRelatedExpr)
}
}
}

object PushDownInputFileExpressionBeforeLeaf extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case ProjectExec(projectList, child)
if projectList.exists(PushDownInputFileExpression.containsInputFileRelatedExpr) =>
val replacedExprs = mutable.Map[String, Alias]()
val newProjectList = projectList.map {
expr => rewriteExpr(expr, replacedExprs).asInstanceOf[NamedExpression]
}
val newChild = addMetadataCol(child, replacedExprs)
ProjectExec(newProjectList, newChild)
}

private def rewriteExpr(expr: Expression, replacedExprs: Map[String, Alias]): Expression = {
expr match {
case _: InputFileName =>
replacedExprs
.getOrElseUpdate(expr.prettyName, Alias(InputFileName(), expr.prettyName)())
.toAttribute
case _: InputFileBlockStart =>
replacedExprs
.getOrElseUpdate(expr.prettyName, Alias(InputFileBlockStart(), expr.prettyName)())
.toAttribute
case _: InputFileBlockLength =>
replacedExprs
.getOrElseUpdate(expr.prettyName, Alias(InputFileBlockLength(), expr.prettyName)())
.toAttribute
case other =>
other.withNewChildren(other.children.map(child => rewriteExpr(child, replacedExprs)))
object PreOffload extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case ProjectExec(projectList, child) if projectList.exists(containsInputFileRelatedExpr) =>
val replacedExprs = mutable.Map[String, Alias]()
val newProjectList = projectList.map {
expr => rewriteExpr(expr, replacedExprs).asInstanceOf[NamedExpression]
}
val newChild = addMetadataCol(child, replacedExprs)
ProjectExec(newProjectList, newChild)
}
}

def addMetadataCol(plan: SparkPlan, replacedExprs: Map[String, Alias]): SparkPlan = plan match {
case p: LeafExecNode =>
ProjectExec(p.output ++ replacedExprs.values, p)
// Output of SerializeFromObjectExec's child and output of DeserializeToObjectExec must be a
// single-field row.
case p @ (_: SerializeFromObjectExec | _: DeserializeToObjectExec) =>
ProjectExec(p.output ++ replacedExprs.values, p)
case p: ProjectExec =>
p.copy(
projectList = p.projectList ++ replacedExprs.values.toSeq.map(_.toAttribute),
child = addMetadataCol(p.child, replacedExprs))
case u @ UnionExec(children) =>
val newFirstChild = addMetadataCol(children.head, replacedExprs)
val newOtherChildren = children.tail.map {
child =>
// Make sure exprId is unique in each child of Union.
val newReplacedExprs = replacedExprs.map {
expr => (expr._1, Alias(expr._2.child, expr._2.name)())
private def rewriteExpr(
expr: Expression,
replacedExprs: mutable.Map[String, Alias]): Expression =
expr match {
case _: InputFileName =>
replacedExprs
.getOrElseUpdate(expr.prettyName, Alias(InputFileName(), expr.prettyName)())
.toAttribute
case _: InputFileBlockStart =>
replacedExprs
.getOrElseUpdate(expr.prettyName, Alias(InputFileBlockStart(), expr.prettyName)())
.toAttribute
case _: InputFileBlockLength =>
replacedExprs
.getOrElseUpdate(expr.prettyName, Alias(InputFileBlockLength(), expr.prettyName)())
.toAttribute
case other =>
other.withNewChildren(other.children.map(child => rewriteExpr(child, replacedExprs)))
}

private def addMetadataCol(
plan: SparkPlan,
replacedExprs: mutable.Map[String, Alias]): SparkPlan =
plan match {
case p: LeafExecNode =>
ProjectExec(p.output ++ replacedExprs.values, p)
// Output of SerializeFromObjectExec's child and output of DeserializeToObjectExec must be
// a single-field row.
case p @ (_: SerializeFromObjectExec | _: DeserializeToObjectExec) =>
ProjectExec(p.output ++ replacedExprs.values, p)
case p: ProjectExec =>
p.copy(
projectList = p.projectList ++ replacedExprs.values.toSeq.map(_.toAttribute),
child = addMetadataCol(p.child, replacedExprs))
case u @ UnionExec(children) =>
val newFirstChild = addMetadataCol(children.head, replacedExprs)
val newOtherChildren = children.tail.map {
child =>
// Make sure exprId is unique in each child of Union.
val newReplacedExprs = replacedExprs.map {
expr => (expr._1, Alias(expr._2.child, expr._2.name)())
}
addMetadataCol(child, newReplacedExprs)
}
addMetadataCol(child, newReplacedExprs)
u.copy(children = newFirstChild +: newOtherChildren)
case p => p.withNewChildren(p.children.map(child => addMetadataCol(child, replacedExprs)))
}
u.copy(children = newFirstChild +: newOtherChildren)
case p => p.withNewChildren(p.children.map(child => addMetadataCol(child, replacedExprs)))
}
}

object PushDownInputFileExpressionToScan extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case p @ ProjectExec(projectList, child: FileSourceScanExecTransformer)
if projectList.exists(PushDownInputFileExpression.containsInputFileRelatedExpr) =>
child.copy(output = p.output)
case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
if projectList.exists(PushDownInputFileExpression.containsInputFileRelatedExpr) =>
child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
object PostOffload extends Rule[SparkPlan] {
override def apply(plan: SparkPlan): SparkPlan = plan.transformUp {
case p @ ProjectExec(projectList, child: FileSourceScanExecTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output)
case p @ ProjectExec(projectList, child: BatchScanExecTransformer)
if projectList.exists(containsInputFileRelatedExpr) =>
child.copy(output = p.output.asInstanceOf[Seq[AttributeReference]])
}
}
}

0 comments on commit 5015496

Please sign in to comment.