Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-37274] Add examples for DataStream V2 #26118

Merged
merged 3 commits into from
Feb 18, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,56 +46,113 @@ public interface StateManager {
*/
<K> K getCurrentKey() throws UnsupportedOperationException;

/**
* Get the optional of the specific list state.
*
* @param stateDeclaration of this state.
* @return the list state corresponds to the state declaration, this may be empty.
*/
<T> Optional<ListState<T>> getStateOptional(ListStateDeclaration<T> stateDeclaration)
throws Exception;

/**
* Get the specific list state.
*
* @param stateDeclaration of this state.
* @return the list state corresponds to the state declaration.
* @return the list state corresponds to the state declaration
* @throws RuntimeException if the state is not available.
*/
<T> Optional<ListState<T>> getState(ListStateDeclaration<T> stateDeclaration) throws Exception;
<T> ListState<T> getState(ListStateDeclaration<T> stateDeclaration) throws Exception;

/**
* Get the optional of the specific value state.
*
* @param stateDeclaration of this state.
* @return the value state corresponds to the state declaration, this may be empty.
*/
<T> Optional<ValueState<T>> getStateOptional(ValueStateDeclaration<T> stateDeclaration)
throws Exception;

/**
* Get the specific value state.
*
* @param stateDeclaration of this state.
* @return the value state corresponds to the state declaration.
* @throws RuntimeException if the state is not available.
*/
<T> ValueState<T> getState(ValueStateDeclaration<T> stateDeclaration) throws Exception;

/**
* Get the optional of the specific map state.
*
* @param stateDeclaration of this state.
* @return the map state corresponds to the state declaration, this may be empty.
*/
<T> Optional<ValueState<T>> getState(ValueStateDeclaration<T> stateDeclaration)
<K, V> Optional<MapState<K, V>> getStateOptional(MapStateDeclaration<K, V> stateDeclaration)
throws Exception;

/**
* Get the specific map state.
*
* @param stateDeclaration of this state.
* @return the map state corresponds to the state declaration.
* @throws RuntimeException if the state is not available.
*/
<K, V> Optional<MapState<K, V>> getState(MapStateDeclaration<K, V> stateDeclaration)
<K, V> MapState<K, V> getState(MapStateDeclaration<K, V> stateDeclaration) throws Exception;

/**
* Get the optional of the specific reducing state.
*
* @param stateDeclaration of this state.
* @return the reducing state corresponds to the state declaration, this may be empty.
*/
<T> Optional<ReducingState<T>> getStateOptional(ReducingStateDeclaration<T> stateDeclaration)
throws Exception;

/**
* Get the specific reducing state.
*
* @param stateDeclaration of this state.
* @return the reducing state corresponds to the state declaration.
* @throws RuntimeException if the state is not available.
*/
<T> Optional<ReducingState<T>> getState(ReducingStateDeclaration<T> stateDeclaration)
throws Exception;
<T> ReducingState<T> getState(ReducingStateDeclaration<T> stateDeclaration) throws Exception;

/**
* Get the optional of the specific aggregating state.
*
* @param stateDeclaration of this state.
* @return the aggregating state corresponds to the state declaration, this may be empty.
*/
<IN, ACC, OUT> Optional<AggregatingState<IN, OUT>> getStateOptional(
AggregatingStateDeclaration<IN, ACC, OUT> stateDeclaration) throws Exception;

/**
* Get the specific aggregating state.
*
* @param stateDeclaration of this state.
* @return the aggregating state corresponds to the state declaration.
* @throws RuntimeException if the state is not available.
*/
<IN, ACC, OUT> Optional<AggregatingState<IN, OUT>> getState(
<IN, ACC, OUT> AggregatingState<IN, OUT> getState(
AggregatingStateDeclaration<IN, ACC, OUT> stateDeclaration) throws Exception;

/**
* Get the optional of the specific broadcast state.
*
* @param stateDeclaration of this state.
* @return the broadcast state corresponds to the state declaration, this may be empty.
*/
<K, V> Optional<BroadcastState<K, V>> getStateOptional(
BroadcastStateDeclaration<K, V> stateDeclaration) throws Exception;

/**
* Get the specific broadcast state.
*
* @param stateDeclaration of this state.
* @return the broadcast state corresponds to the state declaration.
* @throws RuntimeException if the state is not available.
*/
<K, V> Optional<BroadcastState<K, V>> getState(BroadcastStateDeclaration<K, V> stateDeclaration)
<K, V> BroadcastState<K, V> getState(BroadcastStateDeclaration<K, V> stateDeclaration)
throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ default void onEventTimeWatermark(
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx)
throws Exception {}
codenohup marked this conversation as resolved.
Show resolved Hide resolved
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ default void onEventTimeWatermark(
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx)
throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,5 +43,6 @@ default void onEventTimeWatermark(
* Invoked when an event-time timer fires. Note that it is only used in {@link
* KeyedPartitionStream}.
*/
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
default void onEventTimer(long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx)
throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ default void onEventTimer(
long timestamp,
Collector<OUT1> output1,
Collector<OUT2> output2,
TwoOutputPartitionedContext<OUT1, OUT2> ctx) {}
TwoOutputPartitionedContext<OUT1, OUT2> ctx)
throws Exception {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ void processRecord(IN record, Collector<OUT> output, PartitionedContext<OUT> ctx
*
* @param ctx the context in which this function is executed.
*/
default void endInput(NonPartitionedContext<OUT> ctx) {}
default void endInput(NonPartitionedContext<OUT> ctx) throws Exception {}

/**
* Callback for processing timer.
Expand All @@ -66,11 +66,12 @@ default void endInput(NonPartitionedContext<OUT> ctx) {}
* @param ctx runtime context in which this function is executed.
*/
default void onProcessingTimer(
long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception {}

/** Callback function when receive watermark. */
default WatermarkHandlingResult onWatermark(
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) {
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {
return WatermarkHandlingResult.PEEK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,15 +70,15 @@ void processRecordFromBroadcastInput(IN2 record, NonPartitionedContext<OUT> ctx)
*
* @param ctx the context in which this function is executed.
*/
default void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) {}
default void endNonBroadcastInput(NonPartitionedContext<OUT> ctx) throws Exception {}

/**
* This is a life-cycle method indicates that this function will no longer receive any data from
* the broadcast input.
*
* @param ctx the context in which this function is executed.
*/
default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
default void endBroadcastInput(NonPartitionedContext<OUT> ctx) throws Exception {}

/**
* Callback for processing timer.
Expand All @@ -88,7 +88,7 @@ default void endBroadcastInput(NonPartitionedContext<OUT> ctx) {}
* @param ctx runtime context in which this function is executed.
*/
default void onProcessingTimer(
long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception {}

/**
* Callback function when receive the watermark from broadcast input.
Expand All @@ -98,7 +98,8 @@ default void onProcessingTimer(
* @param ctx runtime context in which this function is executed.
*/
default WatermarkHandlingResult onWatermarkFromBroadcastInput(
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) {
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {
return WatermarkHandlingResult.PEEK;
}

Expand All @@ -110,7 +111,8 @@ default WatermarkHandlingResult onWatermarkFromBroadcastInput(
* @param ctx runtime context in which this function is executed.
*/
default WatermarkHandlingResult onWatermarkFromNonBroadcastInput(
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) {
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {
return WatermarkHandlingResult.PEEK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -66,15 +66,15 @@ void processRecordFromSecondInput(
*
* @param ctx the context in which this function is executed.
*/
default void endFirstInput(NonPartitionedContext<OUT> ctx) {}
default void endFirstInput(NonPartitionedContext<OUT> ctx) throws Exception {}

/**
* This is a life-cycle method indicates that this function will no longer receive any data from
* the second input.
*
* @param ctx the context in which this function is executed.
*/
default void endSecondInput(NonPartitionedContext<OUT> ctx) {}
default void endSecondInput(NonPartitionedContext<OUT> ctx) throws Exception {}

/**
* Callback for processing timer.
Expand All @@ -84,7 +84,7 @@ default void endSecondInput(NonPartitionedContext<OUT> ctx) {}
* @param ctx runtime context in which this function is executed.
*/
default void onProcessingTimer(
long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) {}
long timestamp, Collector<OUT> output, PartitionedContext<OUT> ctx) throws Exception {}

/**
* Callback function when receive the watermark from the first input.
Expand All @@ -94,7 +94,8 @@ default void onProcessingTimer(
* @param ctx runtime context in which this function is executed.
*/
default WatermarkHandlingResult onWatermarkFromFirstInput(
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) {
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {
return WatermarkHandlingResult.PEEK;
}

Expand All @@ -106,7 +107,8 @@ default WatermarkHandlingResult onWatermarkFromFirstInput(
* @param ctx runtime context in which this function is executed.
*/
default WatermarkHandlingResult onWatermarkFromSecondInput(
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx) {
Watermark watermark, Collector<OUT> output, NonPartitionedContext<OUT> ctx)
throws Exception {
return WatermarkHandlingResult.PEEK;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ void processRecord(
*
* @param ctx the context in which this function is executed.
*/
default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {}
default void endInput(TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) throws Exception {}

/**
* Callback for processing timer.
Expand All @@ -75,7 +75,8 @@ default void onProcessingTimer(
long timestamp,
Collector<OUT1> output1,
Collector<OUT2> output2,
TwoOutputPartitionedContext<OUT1, OUT2> ctx) {}
TwoOutputPartitionedContext<OUT1, OUT2> ctx)
throws Exception {}

/**
* Callback function when receive the watermark from the input.
Expand All @@ -89,7 +90,8 @@ default WatermarkHandlingResult onWatermark(
Watermark watermark,
Collector<OUT1> output1,
Collector<OUT2> output2,
TwoOutputNonPartitionedContext<OUT1, OUT2> ctx) {
TwoOutputNonPartitionedContext<OUT1, OUT2> ctx)
throws Exception {
return WatermarkHandlingResult.PEEK;
}
}
Loading