-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37321][table] Make window functions with wrong descriptors fail during planning rather than runtime #26157
base: master
Are you sure you want to change the base?
Conversation
…ng planning rather than runtime
5193a9b
to
9637446
Compare
@@ -217,6 +217,10 @@ object WindowUtil { | |||
null | |||
} | |||
val interval = getOperandAsLong(windowCall.operands(2)) | |||
if (interval <= 0) { | |||
throw new TableException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
since TableException
is already used for getOperandAsLong
Lines 390 to 394 in cd0cfe4
case _: RexLiteral => | |
throw new TableException( | |
"Window aggregate only support SECOND, MINUTE, HOUR, DAY as the time unit. " + | |
"MONTH and YEAR time unit are not supported yet.") | |
case _ => throw new TableException("Only constant window descriptors are supported.") |
@@ -217,6 +217,10 @@ object WindowUtil { | |||
null | |||
} | |||
val interval = getOperandAsLong(windowCall.operands(2)) | |||
if (interval <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise will fail at runtime at
Lines 50 to 53 in cd0cfe4
if (size <= 0) { | |
throw new IllegalArgumentException( | |
"TumblingWindowAssigner parameters must satisfy size > 0"); | |
} |
@@ -227,6 +231,10 @@ object WindowUtil { | |||
} | |||
val slide = getOperandAsLong(windowCall.operands(2)) | |||
val size = getOperandAsLong(windowCall.operands(3)) | |||
if (slide <= 0 || size <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise
Lines 58 to 61 in cd0cfe4
if (size <= 0 || slide <= 0) { | |
throw new IllegalArgumentException( | |
"SlidingWindowAssigner parameters must satisfy slide > 0 and size > 0"); | |
} |
@@ -237,13 +245,25 @@ object WindowUtil { | |||
} | |||
val step = getOperandAsLong(windowCall.operands(2)) | |||
val maxSize = getOperandAsLong(windowCall.operands(3)) | |||
if (step <= 0 || maxSize <= 0) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise
Lines 53 to 60 in cd0cfe4
if (maxSize <= 0 || step <= 0) { | |
throw new IllegalArgumentException( | |
"CumulativeWindowAssigner parameters must satisfy step > 0 and size > 0"); | |
} | |
if (maxSize % step != 0) { | |
throw new IllegalArgumentException( | |
"CumulativeWindowAssigner requires size must be an integral multiple of step."); | |
} |
new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset) | ||
case FlinkSqlOperatorTable.SESSION => | ||
val tableArgCall = windowCall.operands(0).asInstanceOf[RexTableArgCall] | ||
if (!tableArgCall.getOrderKeys.isEmpty) { | ||
throw new ValidationException("Session window TVF doesn't support order by clause.") | ||
} | ||
val gap = getOperandAsLong(windowCall.operands(2)) | ||
if (gap <= 0) { | ||
throw new TableException( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Otherwise
Lines 47 to 50 in cd0cfe4
if (sessionGap <= 0) { | |
throw new IllegalArgumentException( | |
"SessionWindowAssigner parameters must satisfy 0 < size"); | |
} |
@@ -1120,7 +1120,7 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl | |||
| b, | |||
| count(distinct c) AS uv | |||
|FROM TABLE( | |||
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '25' MINUTE, INTERVAL '1' HOUR)) | |||
| CUMULATE(TABLE MyTable, DESCRIPTOR(rowtime), INTERVAL '20' MINUTE, INTERVAL '1' HOUR)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
looks like was incorrect test which didn't fail because it tested only planning
@@ -227,6 +232,11 @@ object WindowUtil { | |||
} | |||
val slide = getOperandAsLong(windowCall.operands(2)) | |||
val size = getOperandAsLong(windowCall.operands(3)) | |||
if (slide <= 0 || size <= 0) { | |||
throw new TableException( | |||
s"HOP table function based aggregate requires slide and size being positive," + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: being -> to be.
nit2: Just a thought we could have a separate message for each invalid value.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: being -> to be.
thanks, fixed
nit2: Just a thought we could have a separate message for each invalid value.
I have a different opinion about that
- since it will lead to the fact that once one value is fixed there will be another issue.
- i would prefer to have similar approach as in runtime to avoid some missed cases
What is the purpose of the change
Queries with wrongly defined descriptors should fail during planning, like
Brief change log
WindowUtil
Verifying this change
WindowTableFunctionTest.scala
Does this pull request potentially affect one of the following parts:
@Public(Evolving)
: ( no)Documentation