Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37321][table] Make window functions with wrong descriptors fail during planning rather than runtime #26157

Open
wants to merge 5 commits into
base: master
Choose a base branch
from

Conversation

snuyanzin
Copy link
Contributor

What is the purpose of the change

Queries with wrongly defined descriptors should fail during planning, like

SELECT *
FROM TABLE(
 CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0' MINUTE, INTERVAL '0' HOUR));

SELECT *
FROM TABLE(
 HOP(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0' MINUTE, INTERVAL '0' MINUTE));

SELECT *
FROM TABLE(TUMBLE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0' MINUTE));

SELECT *
FROM TABLE(SESSION(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '0' MINUTE));

Brief change log

WindowUtil

Verifying this change

WindowTableFunctionTest.scala

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): ( no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): ( no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): ( no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: ( no)

Documentation

  • Does this pull request introduce a new feature? ( no)
  • If yes, how is the feature documented? (not applicable)

@snuyanzin snuyanzin changed the title [FLINK37321][table] Make window functions with descriptors fail during planning rather than runtime [FLINK-37321][table] Make window functions with descriptors fail during planning rather than runtime Feb 13, 2025
@snuyanzin snuyanzin changed the title [FLINK-37321][table] Make window functions with descriptors fail during planning rather than runtime [FLINK-37321][table] Make window functions with wrong descriptors fail during planning rather than runtime Feb 13, 2025
@@ -217,6 +217,10 @@ object WindowUtil {
null
}
val interval = getOperandAsLong(windowCall.operands(2))
if (interval <= 0) {
throw new TableException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since TableException is already used for getOperandAsLong

case _: RexLiteral =>
throw new TableException(
"Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time unit. " +
"MONTH and YEAR time unit are not supported yet.")
case _ => throw new TableException("Only constant window descriptors are supported.")

@@ -217,6 +217,10 @@ object WindowUtil {
null
}
val interval = getOperandAsLong(windowCall.operands(2))
if (interval <= 0) {
Copy link
Contributor Author

@snuyanzin snuyanzin Feb 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

otherwise will fail at runtime at

if (size <= 0) {
throw new IllegalArgumentException(
"TumblingWindowAssigner parameters must satisfy size > 0");
}

@@ -227,6 +231,10 @@ object WindowUtil {
}
val slide = getOperandAsLong(windowCall.operands(2))
val size = getOperandAsLong(windowCall.operands(3))
if (slide <= 0 || size <= 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise

if (size <= 0 || slide <= 0) {
throw new IllegalArgumentException(
"SlidingWindowAssigner parameters must satisfy slide > 0 and size > 0");
}

@@ -237,13 +245,25 @@ object WindowUtil {
}
val step = getOperandAsLong(windowCall.operands(2))
val maxSize = getOperandAsLong(windowCall.operands(3))
if (step <= 0 || maxSize <= 0) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise

if (maxSize <= 0 || step <= 0) {
throw new IllegalArgumentException(
"CumulativeWindowAssigner parameters must satisfy step > 0 and size > 0");
}
if (maxSize % step != 0) {
throw new IllegalArgumentException(
"CumulativeWindowAssigner requires size must be an integral multiple of step.");
}

new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset)
case FlinkSqlOperatorTable.SESSION =>
val tableArgCall = windowCall.operands(0).asInstanceOf[RexTableArgCall]
if (!tableArgCall.getOrderKeys.isEmpty) {
throw new ValidationException("Session window TVF doesn't support order by clause.")
}
val gap = getOperandAsLong(windowCall.operands(2))
if (gap <= 0) {
throw new TableException(
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Otherwise

if (sessionGap <= 0) {
throw new IllegalArgumentException(
"SessionWindowAssigner parameters must satisfy 0 < size");
}

@flinkbot
Copy link
Collaborator

flinkbot commented Feb 13, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@@ -1120,7 +1120,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl
| b,
| count(distinct c) AS uv
|FROM TABLE(
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR))
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '20' MINUTE, INTERVAL '1' HOUR))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks like was incorrect test which didn't fail because it tested only planning

@@ -227,6 +232,11 @@ object WindowUtil {
}
val slide = getOperandAsLong(windowCall.operands(2))
val size = getOperandAsLong(windowCall.operands(3))
if (slide <= 0 || size <= 0) {
throw new TableException(
s"HOP table function based aggregate requires slide and size being positive," +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: being -> to be.
nit2: Just a thought we could have a separate message for each invalid value.

Copy link
Contributor Author

@snuyanzin snuyanzin Feb 14, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: being -> to be.

thanks, fixed

nit2: Just a thought we could have a separate message for each invalid value.

I have a different opinion about that

  1. since it will lead to the fact that once one value is fixed there will be another issue.
  2. i would prefer to have similar approach as in runtime to avoid some missed cases

@snuyanzin snuyanzin requested a review from davidradl February 14, 2025 16:46
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants