-
Notifications
You must be signed in to change notification settings - Fork 13.5k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[FLINK-37321][table] Make window functions with wrong descriptors fail during planning rather than runtime #26157
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change | ||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -217,6 +217,10 @@ object WindowUtil { | |||||||||||||||||
null | ||||||||||||||||||
} | ||||||||||||||||||
val interval = getOperandAsLong(windowCall.operands(2)) | ||||||||||||||||||
if (interval <= 0) { | ||||||||||||||||||
throw new TableException( | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. since Lines 390 to 394 in cd0cfe4
|
||||||||||||||||||
"Only positive interval constant for TUMBLE window descriptors is supported.") | ||||||||||||||||||
} | ||||||||||||||||||
new TumblingWindowSpec(Duration.ofMillis(interval), offset) | ||||||||||||||||||
|
||||||||||||||||||
case FlinkSqlOperatorTable.HOP => | ||||||||||||||||||
|
@@ -227,6 +231,10 @@ object WindowUtil { | |||||||||||||||||
} | ||||||||||||||||||
val slide = getOperandAsLong(windowCall.operands(2)) | ||||||||||||||||||
val size = getOperandAsLong(windowCall.operands(3)) | ||||||||||||||||||
if (slide <= 0 || size <= 0) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise Lines 58 to 61 in cd0cfe4
|
||||||||||||||||||
throw new TableException( | ||||||||||||||||||
"Only positive slide and size constant for HOP window descriptors are supported.") | ||||||||||||||||||
} | ||||||||||||||||||
new HoppingWindowSpec(Duration.ofMillis(size), Duration.ofMillis(slide), offset) | ||||||||||||||||||
|
||||||||||||||||||
case FlinkSqlOperatorTable.CUMULATE => | ||||||||||||||||||
|
@@ -237,13 +245,21 @@ object WindowUtil { | |||||||||||||||||
} | ||||||||||||||||||
val step = getOperandAsLong(windowCall.operands(2)) | ||||||||||||||||||
val maxSize = getOperandAsLong(windowCall.operands(3)) | ||||||||||||||||||
if (step <= 0 || maxSize <= 0) { | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise Lines 53 to 60 in cd0cfe4
|
||||||||||||||||||
throw new TableException( | ||||||||||||||||||
"Only positive step and size constant for CUMULATE window descriptors are supported.") | ||||||||||||||||||
} | ||||||||||||||||||
new CumulativeWindowSpec(Duration.ofMillis(maxSize), Duration.ofMillis(step), offset) | ||||||||||||||||||
case FlinkSqlOperatorTable.SESSION => | ||||||||||||||||||
val tableArgCall = windowCall.operands(0).asInstanceOf[RexTableArgCall] | ||||||||||||||||||
if (!tableArgCall.getOrderKeys.isEmpty) { | ||||||||||||||||||
throw new ValidationException("Session window TVF doesn't support order by clause.") | ||||||||||||||||||
} | ||||||||||||||||||
val gap = getOperandAsLong(windowCall.operands(2)) | ||||||||||||||||||
if (gap <= 0) { | ||||||||||||||||||
throw new TableException( | ||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Otherwise Lines 47 to 50 in cd0cfe4
|
||||||||||||||||||
"Only positive gap constant for SESSION window descriptors is supported.") | ||||||||||||||||||
} | ||||||||||||||||||
new SessionWindowSpec(Duration.ofMillis(gap), tableArgCall.getPartitionKeys) | ||||||||||||||||||
} | ||||||||||||||||||
|
||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
otherwise will fail at runtime at
flink/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/window/groupwindow/assigners/TumblingWindowAssigner.java
Lines 50 to 53 in cd0cfe4