Skip to content

Commit

Permalink
Don't bind params in nested EXECUTE IMMEDIATE
Browse files Browse the repository at this point in the history
  • Loading branch information
MaxGekk committed Jan 13, 2025
1 parent 22076ca commit 451bd04
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ class SubstituteExecuteImmediate(val catalogManager: CatalogManager)
getVariableReference(u, u.nameParts)
case a: Alias =>
Alias(resolveVariable(a.child), a.name)()
case p: Parameter => p
case other =>
throw QueryCompilationErrors.unsupportedParameterExpression(other)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -162,10 +162,15 @@ object BindParameters extends Rule[LogicalPlan] with QueryErrorsBase {
}
}

private def bind(p: LogicalPlan)(f: PartialFunction[Expression, Expression]): LogicalPlan = {
p.resolveExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse {
case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
})
private def bind(p0: LogicalPlan)(f: PartialFunction[Expression, Expression]): LogicalPlan = {
var stop = false
p0.resolveOperatorsDownWithPruning(_.containsPattern(PARAMETER) && !stop) {
case p1 =>
stop = p1.isInstanceOf[ParameterizedQuery]
p1.transformExpressionsWithPruning(_.containsPattern(PARAMETER)) (f orElse {
case sub: SubqueryExpression => sub.withNewPlan(bind(sub.plan)(f))
})
}
}

override def apply(plan: LogicalPlan): LogicalPlan = {
Expand Down
30 changes: 20 additions & 10 deletions sql/core/src/test/scala/org/apache/spark/sql/ParametersSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -771,21 +771,31 @@ class ParametersSuite extends QueryTest with SharedSparkSession with PlanTest {
}

test("SPARK-50403: parameterized execute immediate") {
checkAnswer(spark.sql("execute immediate 'select ?, ?' using 1", Array(2)), Row(2, 1))
checkAnswer(spark.sql("execute immediate 'select :param1, :param2' using 1 as param1",
Map("param2" -> 2)), Row(1, 2))

checkAnswer(spark.sql("execute immediate 'select ?' using ?", Array(1)), Row(1))
checkAnswer(spark.sql("execute immediate 'select ?, ?' using ?, 2", Array(1)), Row(1, 2))
checkError(
exception = intercept[AnalysisException] {
checkAnswer(spark.sql("execute immediate 'select ?, :param' using 1", Map("param" -> 2)),
Row(2, 1))
spark.sql("execute immediate 'select ?, ?' using 1", Array(2))
},
condition = "INVALID_QUERY_MIXED_QUERY_PARAMETERS")
condition = "UNBOUND_SQL_PARAMETER",
parameters = Map("name" -> "_10"),
context = ExpectedContext("?", 10, 10))

checkAnswer(spark.sql("execute immediate 'select :param1' using :param2 as param1",
Map("param2" -> 42)), Row(42))
checkAnswer(spark.sql(
"execute immediate 'select :param1, :param2' using :param2 as param1, 43 as param2",
Map("param2" -> 42)), Row(42, 43))
checkError(
exception = intercept[AnalysisException] {
checkAnswer(spark.sql("execute immediate 'select ?, :param' using 1 as param", Array(2)),
Row(2, 1))
spark.sql("execute immediate 'select :param1, :param2' using 1 as param1",
Map("param2" -> 2))
},
condition = "INVALID_QUERY_MIXED_QUERY_PARAMETERS")
condition = "UNBOUND_SQL_PARAMETER",
parameters = Map("name" -> "param2"),
context = ExpectedContext(":param2", 16, 22))

checkAnswer(spark.sql("execute immediate 'select ?' using :param", Map("param" -> 2)), Row(2))
checkAnswer(spark.sql("execute immediate 'select :param' using ? as param", Array(3)), Row(3))
}
}

0 comments on commit 451bd04

Please sign in to comment.