Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37321][table] Make window functions with wrong descriptors fail during planning rather than runtime #26157

Merged
merged 5 commits into from
Feb 19, 2025
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,10 @@ object WindowUtil {
null
}
val interval = getOperandAsLong(windowCall.operands(2))
if (interval <= 0) {
Copy link
Contributor Author

@snuyanzin snuyanzin Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise will fail at runtime at

if (size <= 0) {
throw new IllegalArgumentException(
"TumblingWindowAssigner parameters must satisfy size > 0");
}

throw new TableException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since TableException is already used for getOperandAsLong

case _: RexLiteral =>
throw new TableException(
"Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time unit. " +
"MONTH and YEAR time unit are not supported yet.")
case _ => throw new TableException("Only constant window descriptors are supported.")

"Only positive interval constant for TUMBLE window descriptors is supported.")
}
new TumblingWindowSpec(Duration.ofMillis(interval), offset)

case FlinkSqlOperatorTable.HOP =>
Expand All @@ -227,6 +231,10 @@ object WindowUtil {
}
val slide = getOperandAsLong(windowCall.operands(2))
val size = getOperandAsLong(windowCall.operands(3))
if (slide <= 0 || size <= 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise

if (size <= 0 || slide <= 0) {
throw new IllegalArgumentException(
"SlidingWindowAssigner parameters must satisfy slide > 0 and size > 0");
}

throw new TableException(
"Only positive slide and size constant for HOP window descriptors are supported.")
}
new HoppingWindowSpec(Duration.ofMillis(size), Duration.ofMillis(slide), offset)

case FlinkSqlOperatorTable.CUMULATE =>
Expand All @@ -237,13 +245,21 @@ object WindowUtil {
}
val step = getOperandAsLong(windowCall.operands(2))
val maxSize = getOperandAsLong(windowCall.operands(3))
if (step <= 0 || maxSize <= 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise

if (maxSize <= 0 || step <= 0) {
throw new IllegalArgumentException(
"CumulativeWindowAssigner parameters must satisfy step > 0 and size > 0");
}
if (maxSize % step != 0) {
throw new IllegalArgumentException(
"CumulativeWindowAssigner requires size must be an integral multiple of step.");
}

throw new TableException(
"Only positive step and size constant for CUMULATE window descriptors are supported.")
}
new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset)
case FlinkSqlOperatorTable.SESSION =>
val tableArgCall = windowCall.operands(0).asInstanceOf[RexTableArgCall]
if (!tableArgCall.getOrderKeys.isEmpty) {
throw new ValidationException("Session window TVF doesn't support order by clause.")
}
val gap = getOperandAsLong(windowCall.operands(2))
if (gap <= 0) {
throw new TableException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise

if (sessionGap <= 0) {
throw new IllegalArgumentException(
"SessionWindowAssigner parameters must satisfy 0 < size");
}

"Only positive gap constant for SESSION window descriptors is supported.")
}
new SessionWindowSpec(Duration.ofMillis(gap), tableArgCall.getPartitionKeys)
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,14 @@
*/
package org.apache.flink.table.planner.plan.stream.sql

import org.apache.flink.table.api.ValidationException
import org.apache.flink.table.api.{TableException, ValidationException}
import org.apache.flink.table.api.config.ExecutionConfigOptions
import org.apache.flink.table.planner.utils.TableTestBase

import org.assertj.core.api.Assertions.assertThatThrownBy
import org.junit.jupiter.api.Test
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.{CsvSource, ValueSource, ValueSources}

import java.time.Duration

Expand Down Expand Up @@ -347,6 +349,68 @@ class WindowTableFunctionTest extends TableTestBase {
util.verifyRelPlan(sql)
}

@ParameterizedTest(name = "{index}: {0}")
@ValueSource(ints = Array[Int](-1, 0))
def testTumbleWindowWithWrongInterval(interval: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$interval' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new TableException(
"Only positive interval constant for TUMBLE window descriptors is supported."))
}

@ParameterizedTest(name = "{index}: {0}, {1}")
@CsvSource(Array[String]("-1, 1", "0, 2", "3, 0", "4, -3"))
def testCumulateWindowWithWrongStepAndSize(step: Int, size: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$step' MINUTE, INTERVAL '$size' HOUR))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new TableException(
"Only positive step and size constant for CUMULATE window descriptors are supported."))
}

@ParameterizedTest(name = "{index}: {0}, {1}")
@CsvSource(Array[String]("-1, 1", "0, 2", "3, 0", "4, -3"))
def testHopWindowWithWrongSlideAndSize(slide: Int, size: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(
| HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$slide' MINUTE, INTERVAL '$size' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new TableException(
"Only positive slide and size constant for HOP window descriptors are supported."))
}

@ParameterizedTest(name = "{index}: {0}")
@ValueSource(ints = Array[Int](-1, 0))
def testSessionWindowWithWrongGap(gap: Int): Unit = {
val sql =
s"""
|SELECT *
|FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '$gap' MINUTE))
|""".stripMargin

assertThatThrownBy(() => util.verifyRelPlan(sql))
.hasCause(
new TableException(
"Only positive gap constant for SESSION window descriptors is supported."))
}

private def enableMiniBatch(): Unit = {
util.tableConfig.set(
ExecutionConfigOptions.TABLE_EXEC_MINIBATCH_ENABLED,
Expand Down