From cf71206265a2e388166e1229a93d78559c82f533 Mon Sep 17 00:00:00 2001 From: Xu Huang Date: Fri, 24 Jan 2025 10:29:39 +0800 Subject: [PATCH] [FLINK-37274][datastream] Add examples for DataStream V2 --- .../flink-examples-streaming/pom.xml | 4 + .../eventtime/StatisticNewsClickNumber.java | 278 ++++++++ .../util/StatisticNewsClickNumberData.java | 631 +++++++++++++++++ .../streaming/examples/dsv2/join/Join.java | 158 +++++ .../examples/dsv2/join/util/JoinData.java | 54 ++ .../dsv2/watermark/SyncOffsetByWatermark.java | 193 ++++++ .../windowing/CountProductSalesWindowing.java | 235 +++++++ .../util/CountProductSalesWindowingData.java | 648 ++++++++++++++++++ .../examples/dsv2/wordcount/WordCount.java | 335 +++++++++ .../test/examples/DSv2ExamplesITCase.java | 108 +++ 10 files changed, 2644 insertions(+) create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/StatisticNewsClickNumberData.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/Join.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/util/JoinData.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/CountProductSalesWindowing.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/util/CountProductSalesWindowingData.java create mode 100644 flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java create mode 100644 flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/DSv2ExamplesITCase.java diff --git a/flink-examples/flink-examples-streaming/pom.xml b/flink-examples/flink-examples-streaming/pom.xml index c2c9990442562a..022a0e6a5c82f6 100644 --- a/flink-examples/flink-examples-streaming/pom.xml +++ b/flink-examples/flink-examples-streaming/pom.xml @@ -36,6 +36,10 @@ under the License. 10.0.0 + --add-opens=java.base/java.util=ALL-UNNAMED + diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java new file mode 100644 index 00000000000000..38d0f03499db15 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/StatisticNewsClickNumber.java @@ -0,0 +1,278 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.eventtime; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction; +import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData; +import org.apache.flink.util.ParameterTool; + +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +/** + * This example illustrates how to count the number of clicks on each news at 1 minute, 5 minutes, + * 10 minutes, and 1 hour after news publication. + * + *

The input consists of a series of {@link NewsEvent}s, which fall into two categories: news + * releases and news clicks. Each {@link NewsEvent} contains three components: the event type, the + * news ID and the timestamp. Notably, there is only one event of type {@link NewsEventType#RELEASE} + * for each news. + * + *

Usage: + * + *

+ * + *

This example shows how to: + * + *

+ * + *

Please note that if you intend to run this example in an IDE, you must first add the following + * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is necessary because the module + * system in JDK 17+ restricts some reflection operations. + * + *

Please note that the DataStream API V2 is a new set of APIs, to gradually replace the original + * DataStream API. It is currently in the experimental stage and is not fully available for + * production. + */ +public class StatisticNewsClickNumber { + + // we will count the number of clicks within 1min/5min/10min/30min/1hour after the news release + private static final Duration[] TIMES_AFTER_NEWS_RELEASE = + new Duration[] { + Duration.ofMinutes(1), + Duration.ofMinutes(5), + Duration.ofMinutes(10), + Duration.ofMinutes(30), + Duration.ofHours(1) + }; + + /** + * The type of {@link NewsEvent}, note that only one event of type {@link NewsEventType#RELEASE} + * for each news. + */ + public enum NewsEventType { + RELEASE, + CLICK + } + + /** + * The {@link NewsEvent} represents an event on news, containing the event type, news id and the + * timestamp. + */ + public static class NewsEvent { + public NewsEventType type; + public long newsId; + public long timestamp; + + public NewsEvent(NewsEventType type, long newsId, long timestamp) { + this.type = type; + this.newsId = newsId; + this.timestamp = timestamp; + } + } + + /** + * The {@link NewsClickNumber} represents the number of clicks on news within a specified + * duration following its release. For example, NewsClickNumber{newsId="12345678", + * timeAfterRelease=60000, clickNumber=132} indicates that the news "12345678" has been clicked + * 132 times within 60,000 milliseconds after its release. + */ + public static class NewsClickNumber { + public long newsId; + public long timeAfterRelease; + public long clickNumber; + + public NewsClickNumber(long newsId, long timeAfterRelease, long clickCount) { + this.newsId = newsId; + this.timeAfterRelease = timeAfterRelease; + this.clickNumber = clickCount; + } + + public String toString() { + return String.format( + "(%d,%d,%d)", this.newsId, this.timeAfterRelease, this.clickNumber); + } + } + + public static void main(String[] args) throws Exception { + // parse the parameters + final ParameterTool params = ParameterTool.fromArgs(args); + final boolean fileOutput = params.has("output"); + + // obtain execution environment + ExecutionEnvironment env = ExecutionEnvironment.getInstance(); + + // the input consists of a series of {code NewsEvent}s, which include two types: news + // release event and news click event. + NonKeyedPartitionStream source = + env.fromSource( + DataStreamV2SourceUtils.fromData( + Arrays.asList(StatisticNewsClickNumberData.NEWS_EVENTS)), + "news event source"); + + NonKeyedPartitionStream clickNumberStream = + // extract event time and generate the event time watermark + source.process( + // the timestamp field of the input is considered to be the + // event time + EventTimeExtension.newWatermarkGeneratorBuilder( + event -> event.timestamp) + // generate event time watermarks every 200ms + .periodicWatermark(Duration.ofMillis(200)) + // if the input is idle for more than 30 seconds, it + // is ignored during the event time watermark + // combination process + .withIdleness(Duration.ofSeconds(30)) + // set the maximum out-of-order time of the event to + // 30 seconds, meaning that if an event is received + // at 12:00:00, then no further events should be + // received earlier than 11:59:30 + .withMaxOutOfOrderTime(Duration.ofSeconds(10)) + // build the event time watermark generator as + // ProcessFunction + .buildAsProcessFunction()) + // key by the news id + .keyBy(event -> event.newsId) + // count the click number of each news + .process( + EventTimeExtension.wrapProcessFunction( + new CountNewsClickNumberProcessFunction())); + + if (fileOutput) { + // write joined results to file + clickNumberStream + .toSink( + new WrappedSink<>( + FileSink.forRowFormat( + new Path(params.get("output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize( + MemorySize.ofMebiBytes(1)) + .withRolloverInterval( + Duration.ofSeconds(10)) + .build()) + .build())) + .withName("output"); + } else { + // Print the results to the STDOUT. + clickNumberStream.toSink(new WrappedSink<>(new PrintSink<>())).withName("print-sink"); + } + + env.execute("StatisticNewsClickNumberExample"); + } + + /** + * This process function will consume {@link NewsEvent} and count the number of clicks within 1 + * minute, 5 minutes, 10 minutes, 30 minutes and 1 hour of the news releasing and send the + * results {@link NewsClickNumber} to the output. + * + *

To achieve the goal, we will register a series of timers for the news, which will be + * triggered at the time of the news's release time + 1 minute/5 minutes/10 minutes/30 minutes/1 + * hour, and record a list of the click times of each news. In the timer callback {@code + * onEventTimer}, we will count the number of clicks between the news release time and the timer + * trigger timer and send the result to the output. + */ + public static class CountNewsClickNumberProcessFunction + implements OneInputEventTimeStreamProcessFunction { + + private EventTimeManager eventTimeManager; + + // news id to release time + private final Map releaseTimeOfNews = new HashMap<>(); + + // news id to click time list + private final Map> clickTimeListOfNews = new HashMap<>(); + + @Override + public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) { + this.eventTimeManager = eventTimeManager; + } + + @Override + public void processRecord( + NewsEvent record, + Collector output, + PartitionedContext ctx) + throws Exception { + if (record.type == NewsEventType.RELEASE) { + // for the news release event, record the release time and register timers + long releaseTime = record.timestamp; + releaseTimeOfNews.put(record.newsId, releaseTime); + for (Duration targetTime : TIMES_AFTER_NEWS_RELEASE) { + eventTimeManager.registerTimer(releaseTime + targetTime.toMillis()); + } + } else { + // for the news click event, record the click time + clickTimeListOfNews + .computeIfAbsent(record.newsId, k -> new ArrayList<>()) + .add(record.timestamp); + } + } + + @Override + public void onEventTimer( + long timestamp, + Collector output, + PartitionedContext ctx) { + // get the news that the current event timer belongs to + long newsId = ctx.getStateManager().getCurrentKey(); + + // calculate the difference between the current time and the news release time + Duration diffTime = Duration.ofMillis(timestamp - releaseTimeOfNews.get(newsId)); + + // calculate the number of clicks on the news at the current time. + List clickTimeList = clickTimeListOfNews.get(newsId); + long clickCount = 0; + for (Long clickTime : clickTimeList) { + if (clickTime <= timestamp) { + clickCount++; + } + } + + // send the result to output + output.collect(new NewsClickNumber(newsId, diffTime.toMillis(), clickCount)); + } + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/StatisticNewsClickNumberData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/StatisticNewsClickNumberData.java new file mode 100644 index 00000000000000..76e72e783b2b9a --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/eventtime/util/StatisticNewsClickNumberData.java @@ -0,0 +1,631 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.eventtime.util; + +import org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.NewsEvent; +import org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.NewsEventType; + +public class StatisticNewsClickNumberData { + + public static final NewsEvent[] NEWS_EVENTS = + new NewsEvent[] { + new NewsEvent(NewsEventType.RELEASE, 19, 1738832642387L), + new NewsEvent(NewsEventType.RELEASE, 1, 1738832388479L), + new NewsEvent(NewsEventType.RELEASE, 18, 1738832782110L), + new NewsEvent(NewsEventType.RELEASE, 5, 1738832592184L), + new NewsEvent(NewsEventType.RELEASE, 9, 1738828021221L), + new NewsEvent(NewsEventType.CLICK, 5, 1738828389597L), + new NewsEvent(NewsEventType.RELEASE, 17, 1738827955216L), + new NewsEvent(NewsEventType.RELEASE, 13, 1738828052442L), + new NewsEvent(NewsEventType.RELEASE, 20, 1738830001745L), + new NewsEvent(NewsEventType.RELEASE, 3, 1738832650438L), + new NewsEvent(NewsEventType.RELEASE, 15, 1738832947087L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832001616L), + new NewsEvent(NewsEventType.CLICK, 15, 1738827680244L), + new NewsEvent(NewsEventType.CLICK, 20, 1738830937364L), + new NewsEvent(NewsEventType.CLICK, 19, 1738829532910L), + new NewsEvent(NewsEventType.CLICK, 19, 1738832388911L), + new NewsEvent(NewsEventType.RELEASE, 10, 1738831928718L), + new NewsEvent(NewsEventType.RELEASE, 16, 1738832613236L), + new NewsEvent(NewsEventType.RELEASE, 2, 1738831575775L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832140516L), + new NewsEvent(NewsEventType.RELEASE, 14, 1738829659362L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831046190L), + new NewsEvent(NewsEventType.CLICK, 9, 1738830223146L), + new NewsEvent(NewsEventType.CLICK, 9, 1738830700848L), + new NewsEvent(NewsEventType.CLICK, 17, 1738832953995L), + new NewsEvent(NewsEventType.CLICK, 13, 1738829616796L), + new NewsEvent(NewsEventType.CLICK, 20, 1738831192045L), + new NewsEvent(NewsEventType.CLICK, 18, 1738829229830L), + new NewsEvent(NewsEventType.CLICK, 1, 1738827670493L), + new NewsEvent(NewsEventType.CLICK, 14, 1738830136393L), + new NewsEvent(NewsEventType.RELEASE, 12, 1738831935199L), + new NewsEvent(NewsEventType.CLICK, 2, 1738830633003L), + new NewsEvent(NewsEventType.CLICK, 15, 1738832584362L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831827987L), + new NewsEvent(NewsEventType.CLICK, 1, 1738830306933L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832531618L), + new NewsEvent(NewsEventType.CLICK, 3, 1738831754282L), + new NewsEvent(NewsEventType.CLICK, 20, 1738832800871L), + new NewsEvent(NewsEventType.CLICK, 5, 1738828842293L), + new NewsEvent(NewsEventType.RELEASE, 11, 1738832256995L), + new NewsEvent(NewsEventType.CLICK, 9, 1738831485230L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832546671L), + new NewsEvent(NewsEventType.CLICK, 2, 1738829641821L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831507823L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831948622L), + new NewsEvent(NewsEventType.CLICK, 19, 1738832636843L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830490820L), + new NewsEvent(NewsEventType.RELEASE, 4, 1738830918703L), + new NewsEvent(NewsEventType.CLICK, 18, 1738830359206L), + new NewsEvent(NewsEventType.RELEASE, 8, 1738828281847L), + new NewsEvent(NewsEventType.CLICK, 18, 1738830548196L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830674312L), + new NewsEvent(NewsEventType.CLICK, 3, 1738827884370L), + new NewsEvent(NewsEventType.CLICK, 3, 1738829517538L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829539622L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829707330L), + new NewsEvent(NewsEventType.CLICK, 20, 1738828393327L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829959396L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832514032L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832691283L), + new NewsEvent(NewsEventType.CLICK, 5, 1738831614616L), + new NewsEvent(NewsEventType.CLICK, 4, 1738831810554L), + new NewsEvent(NewsEventType.CLICK, 12, 1738828790146L), + new NewsEvent(NewsEventType.CLICK, 20, 1738830764427L), + new NewsEvent(NewsEventType.RELEASE, 7, 1738831203224L), + new NewsEvent(NewsEventType.CLICK, 17, 1738832760928L), + new NewsEvent(NewsEventType.CLICK, 1, 1738828184754L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830519284L), + new NewsEvent(NewsEventType.RELEASE, 6, 1738831189317L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828616162L), + new NewsEvent(NewsEventType.CLICK, 12, 1738831691291L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832026350L), + new NewsEvent(NewsEventType.CLICK, 5, 1738830078588L), + new NewsEvent(NewsEventType.CLICK, 20, 1738829221413L), + new NewsEvent(NewsEventType.CLICK, 12, 1738829719204L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830425374L), + new NewsEvent(NewsEventType.CLICK, 12, 1738832166848L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828448279L), + new NewsEvent(NewsEventType.CLICK, 18, 1738829701504L), + new NewsEvent(NewsEventType.CLICK, 1, 1738827928895L), + new NewsEvent(NewsEventType.CLICK, 15, 1738830292624L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828002115L), + new NewsEvent(NewsEventType.CLICK, 8, 1738828266565L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829855256L), + new NewsEvent(NewsEventType.CLICK, 16, 1738831339947L), + new NewsEvent(NewsEventType.CLICK, 3, 1738828688202L), + new NewsEvent(NewsEventType.CLICK, 6, 1738832712915L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831987576L), + new NewsEvent(NewsEventType.CLICK, 20, 1738829428940L), + new NewsEvent(NewsEventType.CLICK, 12, 1738827634453L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832892122L), + new NewsEvent(NewsEventType.CLICK, 6, 1738830843308L), + new NewsEvent(NewsEventType.CLICK, 11, 1738832846056L), + new NewsEvent(NewsEventType.CLICK, 4, 1738828877841L), + new NewsEvent(NewsEventType.CLICK, 13, 1738832812928L), + new NewsEvent(NewsEventType.CLICK, 15, 1738829339894L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829956198L), + new NewsEvent(NewsEventType.CLICK, 20, 1738828187884L), + new NewsEvent(NewsEventType.CLICK, 18, 1738833000696L), + new NewsEvent(NewsEventType.CLICK, 15, 1738827922580L), + new NewsEvent(NewsEventType.CLICK, 18, 1738829029065L), + new NewsEvent(NewsEventType.CLICK, 3, 1738829660871L), + new NewsEvent(NewsEventType.CLICK, 5, 1738829047609L), + new NewsEvent(NewsEventType.CLICK, 7, 1738831348515L), + new NewsEvent(NewsEventType.CLICK, 9, 1738828186776L), + new NewsEvent(NewsEventType.CLICK, 4, 1738831946701L), + new NewsEvent(NewsEventType.CLICK, 19, 1738827909051L), + new NewsEvent(NewsEventType.CLICK, 4, 1738831863995L), + new NewsEvent(NewsEventType.CLICK, 12, 1738831891103L), + new NewsEvent(NewsEventType.CLICK, 11, 1738831321338L), + new NewsEvent(NewsEventType.CLICK, 16, 1738832665860L), + new NewsEvent(NewsEventType.CLICK, 5, 1738829420039L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832714172L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828912747L), + new NewsEvent(NewsEventType.CLICK, 10, 1738830106427L), + new NewsEvent(NewsEventType.CLICK, 20, 1738832373136L), + new NewsEvent(NewsEventType.CLICK, 18, 1738829253964L), + new NewsEvent(NewsEventType.CLICK, 9, 1738830552647L), + new NewsEvent(NewsEventType.CLICK, 13, 1738832299840L), + new NewsEvent(NewsEventType.CLICK, 9, 1738827634716L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832907113L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832677051L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830928184L), + new NewsEvent(NewsEventType.CLICK, 19, 1738831429620L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829004170L), + new NewsEvent(NewsEventType.CLICK, 13, 1738830618322L), + new NewsEvent(NewsEventType.CLICK, 15, 1738827898910L), + new NewsEvent(NewsEventType.CLICK, 18, 1738827792507L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830905040L), + new NewsEvent(NewsEventType.CLICK, 16, 1738827955140L), + new NewsEvent(NewsEventType.CLICK, 1, 1738827805924L), + new NewsEvent(NewsEventType.CLICK, 10, 1738827993730L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828186869L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828794107L), + new NewsEvent(NewsEventType.CLICK, 16, 1738832830432L), + new NewsEvent(NewsEventType.CLICK, 6, 1738829217310L), + new NewsEvent(NewsEventType.CLICK, 8, 1738829574048L), + new NewsEvent(NewsEventType.CLICK, 12, 1738832823596L), + new NewsEvent(NewsEventType.CLICK, 9, 1738829685195L), + new NewsEvent(NewsEventType.CLICK, 11, 1738828392708L), + new NewsEvent(NewsEventType.CLICK, 18, 1738828486277L), + new NewsEvent(NewsEventType.CLICK, 13, 1738832978288L), + new NewsEvent(NewsEventType.CLICK, 20, 1738829576453L), + new NewsEvent(NewsEventType.CLICK, 7, 1738830754624L), + new NewsEvent(NewsEventType.CLICK, 2, 1738830160101L), + new NewsEvent(NewsEventType.CLICK, 11, 1738832381219L), + new NewsEvent(NewsEventType.CLICK, 1, 1738828311862L), + new NewsEvent(NewsEventType.CLICK, 6, 1738832261114L), + new NewsEvent(NewsEventType.CLICK, 20, 1738830300210L), + new NewsEvent(NewsEventType.CLICK, 17, 1738832514430L), + new NewsEvent(NewsEventType.CLICK, 19, 1738831550734L), + new NewsEvent(NewsEventType.CLICK, 12, 1738831406902L), + new NewsEvent(NewsEventType.CLICK, 9, 1738828825295L), + new NewsEvent(NewsEventType.CLICK, 10, 1738831256243L), + new NewsEvent(NewsEventType.CLICK, 19, 1738831409115L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828260690L), + new NewsEvent(NewsEventType.CLICK, 7, 1738830241857L), + new NewsEvent(NewsEventType.CLICK, 3, 1738832280431L), + new NewsEvent(NewsEventType.CLICK, 4, 1738828477318L), + new NewsEvent(NewsEventType.CLICK, 5, 1738829353630L), + new NewsEvent(NewsEventType.CLICK, 17, 1738830446872L), + new NewsEvent(NewsEventType.CLICK, 11, 1738829265485L), + new NewsEvent(NewsEventType.CLICK, 11, 1738829025200L), + new NewsEvent(NewsEventType.CLICK, 9, 1738830254576L), + new NewsEvent(NewsEventType.CLICK, 15, 1738831329199L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828835405L), + new NewsEvent(NewsEventType.CLICK, 17, 1738829733968L), + new NewsEvent(NewsEventType.CLICK, 9, 1738831970976L), + new NewsEvent(NewsEventType.CLICK, 14, 1738830422178L), + new NewsEvent(NewsEventType.CLICK, 12, 1738831156825L), + new NewsEvent(NewsEventType.CLICK, 6, 1738831834199L), + new NewsEvent(NewsEventType.CLICK, 12, 1738829517787L), + new NewsEvent(NewsEventType.CLICK, 3, 1738828591418L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830573341L), + new NewsEvent(NewsEventType.CLICK, 19, 1738831671604L), + new NewsEvent(NewsEventType.CLICK, 19, 1738832453323L), + new NewsEvent(NewsEventType.CLICK, 6, 1738829482139L), + new NewsEvent(NewsEventType.CLICK, 3, 1738831578752L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829603256L), + new NewsEvent(NewsEventType.CLICK, 9, 1738832994492L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831737002L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830056920L), + new NewsEvent(NewsEventType.CLICK, 20, 1738828478759L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830018281L), + new NewsEvent(NewsEventType.CLICK, 8, 1738828723160L), + new NewsEvent(NewsEventType.CLICK, 12, 1738832726133L), + new NewsEvent(NewsEventType.CLICK, 7, 1738828285112L), + new NewsEvent(NewsEventType.CLICK, 15, 1738829809344L), + new NewsEvent(NewsEventType.CLICK, 7, 1738828100979L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830117562L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830966114L), + new NewsEvent(NewsEventType.CLICK, 6, 1738827898586L), + new NewsEvent(NewsEventType.CLICK, 8, 1738831825214L), + new NewsEvent(NewsEventType.CLICK, 12, 1738828981447L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830034310L), + new NewsEvent(NewsEventType.CLICK, 20, 1738830200456L), + new NewsEvent(NewsEventType.CLICK, 19, 1738830351507L), + new NewsEvent(NewsEventType.CLICK, 19, 1738827983135L), + new NewsEvent(NewsEventType.CLICK, 7, 1738832046819L), + new NewsEvent(NewsEventType.CLICK, 6, 1738828243569L), + new NewsEvent(NewsEventType.CLICK, 19, 1738832821989L), + new NewsEvent(NewsEventType.CLICK, 9, 1738832980347L), + new NewsEvent(NewsEventType.CLICK, 9, 1738830787227L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832461503L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830471418L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829415950L), + new NewsEvent(NewsEventType.CLICK, 16, 1738831096958L), + new NewsEvent(NewsEventType.CLICK, 19, 1738832822986L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829865798L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831006854L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830991505L), + new NewsEvent(NewsEventType.CLICK, 3, 1738831548249L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829453117L), + new NewsEvent(NewsEventType.CLICK, 7, 1738831973517L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828920716L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832784800L), + new NewsEvent(NewsEventType.CLICK, 8, 1738830559426L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830291939L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830890761L), + new NewsEvent(NewsEventType.CLICK, 9, 1738831464109L), + new NewsEvent(NewsEventType.CLICK, 5, 1738827857012L), + new NewsEvent(NewsEventType.CLICK, 8, 1738830650905L), + new NewsEvent(NewsEventType.CLICK, 5, 1738830816183L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830860691L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831116446L), + new NewsEvent(NewsEventType.CLICK, 19, 1738831772208L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830446496L), + new NewsEvent(NewsEventType.CLICK, 9, 1738832669074L), + new NewsEvent(NewsEventType.CLICK, 16, 1738827799010L), + new NewsEvent(NewsEventType.CLICK, 8, 1738831905264L), + new NewsEvent(NewsEventType.CLICK, 2, 1738831547596L), + new NewsEvent(NewsEventType.CLICK, 17, 1738827858314L), + new NewsEvent(NewsEventType.CLICK, 4, 1738829888598L), + new NewsEvent(NewsEventType.CLICK, 13, 1738829684826L), + new NewsEvent(NewsEventType.CLICK, 3, 1738830326895L), + new NewsEvent(NewsEventType.CLICK, 9, 1738829074067L), + new NewsEvent(NewsEventType.CLICK, 16, 1738832473117L), + new NewsEvent(NewsEventType.CLICK, 7, 1738831067256L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828179993L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831523109L), + new NewsEvent(NewsEventType.CLICK, 2, 1738829445840L), + new NewsEvent(NewsEventType.CLICK, 9, 1738828605396L), + new NewsEvent(NewsEventType.CLICK, 7, 1738831190997L), + new NewsEvent(NewsEventType.CLICK, 14, 1738831089246L), + new NewsEvent(NewsEventType.CLICK, 4, 1738832838512L), + new NewsEvent(NewsEventType.CLICK, 9, 1738829133472L), + new NewsEvent(NewsEventType.CLICK, 16, 1738832877139L), + new NewsEvent(NewsEventType.CLICK, 5, 1738829152260L), + new NewsEvent(NewsEventType.CLICK, 19, 1738827886907L), + new NewsEvent(NewsEventType.CLICK, 19, 1738830429743L), + new NewsEvent(NewsEventType.CLICK, 10, 1738830687479L), + new NewsEvent(NewsEventType.CLICK, 19, 1738828588461L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831508680L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828092694L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832998421L), + new NewsEvent(NewsEventType.CLICK, 9, 1738828248584L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830137695L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832215855L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828516778L), + new NewsEvent(NewsEventType.CLICK, 9, 1738831881157L), + new NewsEvent(NewsEventType.CLICK, 6, 1738830875841L), + new NewsEvent(NewsEventType.CLICK, 8, 1738828791333L), + new NewsEvent(NewsEventType.CLICK, 15, 1738830811468L), + new NewsEvent(NewsEventType.CLICK, 3, 1738831907994L), + new NewsEvent(NewsEventType.CLICK, 15, 1738829439441L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830424652L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829648738L), + new NewsEvent(NewsEventType.CLICK, 10, 1738827843313L), + new NewsEvent(NewsEventType.CLICK, 14, 1738829564459L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830707964L), + new NewsEvent(NewsEventType.CLICK, 19, 1738828974306L), + new NewsEvent(NewsEventType.CLICK, 14, 1738831518211L), + new NewsEvent(NewsEventType.CLICK, 14, 1738829416752L), + new NewsEvent(NewsEventType.CLICK, 20, 1738830438102L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830595721L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830268769L), + new NewsEvent(NewsEventType.CLICK, 5, 1738828566041L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831895753L), + new NewsEvent(NewsEventType.CLICK, 8, 1738831702685L), + new NewsEvent(NewsEventType.CLICK, 11, 1738832845125L), + new NewsEvent(NewsEventType.CLICK, 17, 1738828328850L), + new NewsEvent(NewsEventType.CLICK, 4, 1738829091848L), + new NewsEvent(NewsEventType.CLICK, 16, 1738831396956L), + new NewsEvent(NewsEventType.CLICK, 19, 1738829698288L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828469727L), + new NewsEvent(NewsEventType.CLICK, 19, 1738831903652L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829921650L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829821195L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828093852L), + new NewsEvent(NewsEventType.CLICK, 10, 1738831134650L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832587337L), + new NewsEvent(NewsEventType.CLICK, 10, 1738828794834L), + new NewsEvent(NewsEventType.CLICK, 2, 1738827883318L), + new NewsEvent(NewsEventType.CLICK, 2, 1738830076008L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830153243L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828363567L), + new NewsEvent(NewsEventType.CLICK, 13, 1738829712097L), + new NewsEvent(NewsEventType.CLICK, 9, 1738832933123L), + new NewsEvent(NewsEventType.CLICK, 2, 1738830021677L), + new NewsEvent(NewsEventType.CLICK, 4, 1738832862650L), + new NewsEvent(NewsEventType.CLICK, 4, 1738827820500L), + new NewsEvent(NewsEventType.CLICK, 6, 1738829885940L), + new NewsEvent(NewsEventType.CLICK, 13, 1738831714682L), + new NewsEvent(NewsEventType.CLICK, 10, 1738831314406L), + new NewsEvent(NewsEventType.CLICK, 3, 1738829143198L), + new NewsEvent(NewsEventType.CLICK, 19, 1738829246756L), + new NewsEvent(NewsEventType.CLICK, 5, 1738828180281L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829858555L), + new NewsEvent(NewsEventType.CLICK, 4, 1738828480493L), + new NewsEvent(NewsEventType.CLICK, 18, 1738827646491L), + new NewsEvent(NewsEventType.CLICK, 10, 1738831197168L), + new NewsEvent(NewsEventType.CLICK, 11, 1738832560750L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829501518L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829710560L), + new NewsEvent(NewsEventType.CLICK, 9, 1738828762008L), + new NewsEvent(NewsEventType.CLICK, 8, 1738827787388L), + new NewsEvent(NewsEventType.CLICK, 2, 1738831818002L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830791234L), + new NewsEvent(NewsEventType.CLICK, 17, 1738830224413L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832526210L), + new NewsEvent(NewsEventType.CLICK, 12, 1738829980902L), + new NewsEvent(NewsEventType.CLICK, 11, 1738829481182L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832706892L), + new NewsEvent(NewsEventType.CLICK, 10, 1738830656095L), + new NewsEvent(NewsEventType.CLICK, 17, 1738829842011L), + new NewsEvent(NewsEventType.CLICK, 8, 1738829668493L), + new NewsEvent(NewsEventType.CLICK, 19, 1738830811284L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830486469L), + new NewsEvent(NewsEventType.CLICK, 18, 1738830495709L), + new NewsEvent(NewsEventType.CLICK, 8, 1738831251101L), + new NewsEvent(NewsEventType.CLICK, 4, 1738828039164L), + new NewsEvent(NewsEventType.CLICK, 12, 1738830152130L), + new NewsEvent(NewsEventType.CLICK, 14, 1738831203096L), + new NewsEvent(NewsEventType.CLICK, 4, 1738832652827L), + new NewsEvent(NewsEventType.CLICK, 20, 1738830498857L), + new NewsEvent(NewsEventType.CLICK, 1, 1738828470448L), + new NewsEvent(NewsEventType.CLICK, 7, 1738829473305L), + new NewsEvent(NewsEventType.CLICK, 17, 1738829612730L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832616017L), + new NewsEvent(NewsEventType.CLICK, 11, 1738828278320L), + new NewsEvent(NewsEventType.CLICK, 8, 1738829367128L), + new NewsEvent(NewsEventType.CLICK, 10, 1738827815010L), + new NewsEvent(NewsEventType.CLICK, 12, 1738827639910L), + new NewsEvent(NewsEventType.CLICK, 6, 1738831554688L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832603669L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830623786L), + new NewsEvent(NewsEventType.CLICK, 6, 1738829679259L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832365420L), + new NewsEvent(NewsEventType.CLICK, 10, 1738831138708L), + new NewsEvent(NewsEventType.CLICK, 5, 1738829143018L), + new NewsEvent(NewsEventType.CLICK, 20, 1738828108275L), + new NewsEvent(NewsEventType.CLICK, 4, 1738832275483L), + new NewsEvent(NewsEventType.CLICK, 2, 1738827948639L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832804971L), + new NewsEvent(NewsEventType.CLICK, 11, 1738831979195L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829801117L), + new NewsEvent(NewsEventType.CLICK, 3, 1738827625716L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832204177L), + new NewsEvent(NewsEventType.CLICK, 15, 1738830671854L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829217115L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831200846L), + new NewsEvent(NewsEventType.CLICK, 15, 1738827802023L), + new NewsEvent(NewsEventType.CLICK, 11, 1738827903126L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830192031L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829775876L), + new NewsEvent(NewsEventType.CLICK, 15, 1738830921309L), + new NewsEvent(NewsEventType.CLICK, 10, 1738830143991L), + new NewsEvent(NewsEventType.CLICK, 17, 1738829085228L), + new NewsEvent(NewsEventType.CLICK, 14, 1738831430632L), + new NewsEvent(NewsEventType.CLICK, 11, 1738832469470L), + new NewsEvent(NewsEventType.CLICK, 2, 1738830066247L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828218777L), + new NewsEvent(NewsEventType.CLICK, 7, 1738828391392L), + new NewsEvent(NewsEventType.CLICK, 7, 1738832825021L), + new NewsEvent(NewsEventType.CLICK, 3, 1738829982251L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832459868L), + new NewsEvent(NewsEventType.CLICK, 18, 1738828650450L), + new NewsEvent(NewsEventType.CLICK, 2, 1738829654768L), + new NewsEvent(NewsEventType.CLICK, 3, 1738830004856L), + new NewsEvent(NewsEventType.CLICK, 17, 1738828702929L), + new NewsEvent(NewsEventType.CLICK, 7, 1738828208190L), + new NewsEvent(NewsEventType.CLICK, 8, 1738830378061L), + new NewsEvent(NewsEventType.CLICK, 6, 1738830755944L), + new NewsEvent(NewsEventType.CLICK, 6, 1738831488369L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830106165L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832187972L), + new NewsEvent(NewsEventType.CLICK, 12, 1738829930706L), + new NewsEvent(NewsEventType.CLICK, 8, 1738832777512L), + new NewsEvent(NewsEventType.CLICK, 1, 1738829210681L), + new NewsEvent(NewsEventType.CLICK, 12, 1738831568546L), + new NewsEvent(NewsEventType.CLICK, 20, 1738829537889L), + new NewsEvent(NewsEventType.CLICK, 7, 1738831387499L), + new NewsEvent(NewsEventType.CLICK, 10, 1738829283992L), + new NewsEvent(NewsEventType.CLICK, 4, 1738829505420L), + new NewsEvent(NewsEventType.CLICK, 19, 1738830275501L), + new NewsEvent(NewsEventType.CLICK, 18, 1738827772329L), + new NewsEvent(NewsEventType.CLICK, 3, 1738830805888L), + new NewsEvent(NewsEventType.CLICK, 2, 1738831261431L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829642545L), + new NewsEvent(NewsEventType.CLICK, 11, 1738828651524L), + new NewsEvent(NewsEventType.CLICK, 11, 1738831932649L), + new NewsEvent(NewsEventType.CLICK, 13, 1738829825615L), + new NewsEvent(NewsEventType.CLICK, 8, 1738832720078L), + new NewsEvent(NewsEventType.CLICK, 8, 1738831349636L), + new NewsEvent(NewsEventType.CLICK, 12, 1738829728684L), + new NewsEvent(NewsEventType.CLICK, 8, 1738829650682L), + new NewsEvent(NewsEventType.CLICK, 13, 1738830973720L), + new NewsEvent(NewsEventType.CLICK, 8, 1738832143838L), + new NewsEvent(NewsEventType.CLICK, 5, 1738830937323L), + new NewsEvent(NewsEventType.CLICK, 19, 1738830931836L), + new NewsEvent(NewsEventType.CLICK, 7, 1738828937269L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832404713L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829377359L), + new NewsEvent(NewsEventType.CLICK, 6, 1738828334782L), + new NewsEvent(NewsEventType.CLICK, 2, 1738832592892L), + new NewsEvent(NewsEventType.CLICK, 1, 1738830828678L), + new NewsEvent(NewsEventType.CLICK, 16, 1738829385223L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830403679L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831122765L), + new NewsEvent(NewsEventType.CLICK, 19, 1738830588059L), + new NewsEvent(NewsEventType.CLICK, 20, 1738829013292L), + new NewsEvent(NewsEventType.CLICK, 14, 1738828370810L), + new NewsEvent(NewsEventType.CLICK, 10, 1738828961322L), + new NewsEvent(NewsEventType.CLICK, 14, 1738830263720L), + new NewsEvent(NewsEventType.CLICK, 1, 1738831348216L), + new NewsEvent(NewsEventType.CLICK, 20, 1738827718386L), + new NewsEvent(NewsEventType.CLICK, 18, 1738830694709L), + new NewsEvent(NewsEventType.CLICK, 6, 1738829253866L), + new NewsEvent(NewsEventType.CLICK, 8, 1738828845274L), + new NewsEvent(NewsEventType.CLICK, 9, 1738832891486L), + new NewsEvent(NewsEventType.CLICK, 7, 1738830519717L), + new NewsEvent(NewsEventType.CLICK, 7, 1738827899615L), + new NewsEvent(NewsEventType.CLICK, 20, 1738832634142L), + new NewsEvent(NewsEventType.CLICK, 20, 1738831375235L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832642865L), + new NewsEvent(NewsEventType.CLICK, 3, 1738832629162L), + new NewsEvent(NewsEventType.CLICK, 3, 1738828241862L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828523286L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830595301L), + new NewsEvent(NewsEventType.CLICK, 18, 1738831708578L), + new NewsEvent(NewsEventType.CLICK, 13, 1738829838628L), + new NewsEvent(NewsEventType.CLICK, 5, 1738830805512L), + new NewsEvent(NewsEventType.CLICK, 20, 1738832231942L), + new NewsEvent(NewsEventType.CLICK, 10, 1738832465329L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832653782L), + new NewsEvent(NewsEventType.CLICK, 13, 1738830125816L), + new NewsEvent(NewsEventType.CLICK, 13, 1738832240156L), + new NewsEvent(NewsEventType.CLICK, 10, 1738828086165L), + new NewsEvent(NewsEventType.CLICK, 8, 1738829020873L), + new NewsEvent(NewsEventType.CLICK, 18, 1738830320418L), + new NewsEvent(NewsEventType.CLICK, 12, 1738828960518L), + new NewsEvent(NewsEventType.CLICK, 5, 1738831346734L), + new NewsEvent(NewsEventType.CLICK, 11, 1738832630566L), + new NewsEvent(NewsEventType.CLICK, 18, 1738828114521L), + new NewsEvent(NewsEventType.CLICK, 16, 1738830466874L), + new NewsEvent(NewsEventType.CLICK, 13, 1738827845827L), + new NewsEvent(NewsEventType.CLICK, 8, 1738832012765L), + new NewsEvent(NewsEventType.CLICK, 8, 1738829989992L), + new NewsEvent(NewsEventType.CLICK, 2, 1738832057675L), + new NewsEvent(NewsEventType.CLICK, 12, 1738832435399L), + new NewsEvent(NewsEventType.CLICK, 1, 1738831871383L), + new NewsEvent(NewsEventType.CLICK, 1, 1738831949698L), + new NewsEvent(NewsEventType.CLICK, 19, 1738829676943L), + new NewsEvent(NewsEventType.CLICK, 13, 1738829775898L), + new NewsEvent(NewsEventType.CLICK, 8, 1738832113785L), + new NewsEvent(NewsEventType.CLICK, 13, 1738830875157L), + new NewsEvent(NewsEventType.CLICK, 4, 1738830303909L), + new NewsEvent(NewsEventType.CLICK, 14, 1738832464360L), + new NewsEvent(NewsEventType.CLICK, 3, 1738829842036L), + new NewsEvent(NewsEventType.CLICK, 4, 1738832401808L), + new NewsEvent(NewsEventType.CLICK, 2, 1738827851490L), + new NewsEvent(NewsEventType.CLICK, 1, 1738830198332L), + new NewsEvent(NewsEventType.CLICK, 17, 1738832649321L), + new NewsEvent(NewsEventType.CLICK, 5, 1738830991616L), + new NewsEvent(NewsEventType.CLICK, 3, 1738830243874L), + new NewsEvent(NewsEventType.CLICK, 3, 1738832327027L), + new NewsEvent(NewsEventType.CLICK, 10, 1738830203895L), + new NewsEvent(NewsEventType.CLICK, 18, 1738832699554L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828537192L), + new NewsEvent(NewsEventType.CLICK, 19, 1738829326939L), + new NewsEvent(NewsEventType.CLICK, 3, 1738829226081L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828964677L), + new NewsEvent(NewsEventType.CLICK, 3, 1738830066378L), + new NewsEvent(NewsEventType.CLICK, 15, 1738828277533L), + new NewsEvent(NewsEventType.CLICK, 9, 1738832768071L), + new NewsEvent(NewsEventType.CLICK, 12, 1738828944345L), + new NewsEvent(NewsEventType.CLICK, 19, 1738832293712L), + new NewsEvent(NewsEventType.CLICK, 15, 1738830766654L), + new NewsEvent(NewsEventType.CLICK, 5, 1738832307494L), + new NewsEvent(NewsEventType.CLICK, 6, 1738832672088L), + new NewsEvent(NewsEventType.CLICK, 3, 1738832826971L), + new NewsEvent(NewsEventType.CLICK, 13, 1738828228762L), + new NewsEvent(NewsEventType.CLICK, 17, 1738831469200L), + new NewsEvent(NewsEventType.CLICK, 16, 1738828668440L), + new NewsEvent(NewsEventType.CLICK, 1, 1738832264382L), + new NewsEvent(NewsEventType.CLICK, 2, 1738827742471L), + new NewsEvent(NewsEventType.CLICK, 17, 1738827625930L), + new NewsEvent(NewsEventType.CLICK, 12, 1738832168647L), + new NewsEvent(NewsEventType.CLICK, 11, 1738830004701L), + new NewsEvent(NewsEventType.CLICK, 1, 1738832207731L), + }; + + public static final String STATISTIC_RESULT = + "(19,60000,26)\n" + + "(19,300000,28)\n" + + "(19,600000,28)\n" + + "(19,1800000,28)\n" + + "(19,3600000,28)\n" + + "(1,60000,15)\n" + + "(1,300000,15)\n" + + "(1,600000,15)\n" + + "(1,1800000,15)\n" + + "(1,3600000,15)\n" + + "(18,60000,31)\n" + + "(18,300000,33)\n" + + "(18,600000,33)\n" + + "(18,1800000,33)\n" + + "(18,3600000,33)\n" + + "(5,60000,23)\n" + + "(5,300000,24)\n" + + "(5,600000,25)\n" + + "(5,1800000,25)\n" + + "(5,3600000,25)\n" + + "(9,60000,1)\n" + + "(9,300000,3)\n" + + "(9,600000,4)\n" + + "(9,1800000,9)\n" + + "(9,3600000,16)\n" + + "(17,60000,2)\n" + + "(17,300000,2)\n" + + "(17,600000,3)\n" + + "(17,1800000,7)\n" + + "(17,3600000,11)\n" + + "(13,60000,2)\n" + + "(13,300000,4)\n" + + "(13,600000,6)\n" + + "(13,1800000,14)\n" + + "(13,3600000,20)\n" + + "(20,60000,10)\n" + + "(20,300000,12)\n" + + "(20,600000,14)\n" + + "(20,1800000,18)\n" + + "(20,3600000,22)\n" + + "(3,60000,23)\n" + + "(3,300000,24)\n" + + "(3,600000,24)\n" + + "(3,1800000,24)\n" + + "(3,3600000,24)\n" + + "(15,60000,22)\n" + + "(15,300000,22)\n" + + "(15,600000,22)\n" + + "(15,1800000,22)\n" + + "(15,3600000,22)\n" + + "(10,60000,24)\n" + + "(10,300000,26)\n" + + "(10,600000,28)\n" + + "(10,1800000,31)\n" + + "(10,3600000,31)\n" + + "(16,60000,31)\n" + + "(16,300000,33)\n" + + "(16,600000,33)\n" + + "(16,1800000,33)\n" + + "(16,3600000,33)\n" + + "(2,60000,14)\n" + + "(2,300000,15)\n" + + "(2,600000,16)\n" + + "(2,1800000,17)\n" + + "(2,3600000,17)\n" + + "(14,60000,3)\n" + + "(14,300000,3)\n" + + "(14,600000,4)\n" + + "(14,1800000,9)\n" + + "(14,3600000,18)\n" + + "(12,60000,25)\n" + + "(12,300000,27)\n" + + "(12,600000,28)\n" + + "(12,1800000,30)\n" + + "(12,3600000,30)\n" + + "(11,60000,21)\n" + + "(11,300000,23)\n" + + "(11,600000,27)\n" + + "(11,1800000,27)\n" + + "(11,3600000,27)\n" + + "(4,60000,15)\n" + + "(4,300000,15)\n" + + "(4,600000,15)\n" + + "(4,1800000,21)\n" + + "(4,3600000,23)\n" + + "(8,60000,2)\n" + + "(8,300000,2)\n" + + "(8,600000,5)\n" + + "(8,1800000,11)\n" + + "(8,3600000,18)\n" + + "(7,60000,18)\n" + + "(7,300000,20)\n" + + "(7,600000,20)\n" + + "(7,1800000,23)\n" + + "(7,3600000,23)\n" + + "(6,60000,11)\n" + + "(6,300000,12)\n" + + "(6,600000,13)\n" + + "(6,1800000,17)\n" + + "(6,3600000,17)\n"; +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/Join.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/Join.java new file mode 100644 index 00000000000000..4a9c25f9388da8 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/Join.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.join; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.builtin.BuiltinFuncs; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.RuntimeContext; +import org.apache.flink.datastream.api.extension.join.JoinFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.examples.dsv2.join.util.JoinData; +import org.apache.flink.util.ParameterTool; + +import java.time.Duration; +import java.util.Arrays; + +/** + * Example illustrating a join between two data streams. The example works on two input streams with + * pairs (name, grade) and (name, salary) * respectively. It joins the streams based on "name". + * + *

The example uses a built-in sample data {@link JoinData} as inputs. + * + *

Usage: + * + *

+ * + *

This example shows how to: + * + *

+ * + *

Please note that if you intend to run this example in an IDE, you must first add the following + * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is necessary because the module + * system in JDK 17+ restricts some reflection operations. + * + *

Please note that the DataStream API V2 is a new set of APIs, to gradually replace the original + * DataStream API. It is currently in the experimental stage and is not fully available for + * production. + */ +public class Join { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + // parse the parameters + final ParameterTool params = ParameterTool.fromArgs(args); + final boolean fileOutput = params.has("output"); + + // obtain execution environment + ExecutionEnvironment env = ExecutionEnvironment.getInstance(); + + // create the data sources for both grades and salaries + NonKeyedPartitionStream> grades = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(JoinData.GRADE_DATAS)), + "grades source"); + NonKeyedPartitionStream> salaries = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(JoinData.SALARY_DATAS)), + "salaries source"); + + // joining two DataStreams + NonKeyedPartitionStream> joinedStream = + BuiltinFuncs.join( + grades, + new GradeKeySelector(), + salaries, + new SalaryKeySelector(), + new JoinGradeAndSalaryFunction()); + + if (fileOutput) { + // write joined results to file + joinedStream + .toSink( + new WrappedSink<>( + FileSink.>forRowFormat( + new Path(params.get("output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize( + MemorySize.ofMebiBytes(1)) + .withRolloverInterval( + Duration.ofSeconds(10)) + .build()) + .build())) + .withName("output"); + } else { + // Print the results to the STDOUT. + joinedStream.toSink(new WrappedSink<>(new PrintSink<>())).withName("print-sink"); + } + + // execute program + env.execute("Join Example"); + } + + private static class GradeKeySelector implements KeySelector, String> { + @Override + public String getKey(Tuple2 value) { + return value.f0; + } + } + + private static class SalaryKeySelector implements KeySelector, String> { + @Override + public String getKey(Tuple2 value) { + return value.f0; + } + } + + private static class JoinGradeAndSalaryFunction + implements JoinFunction< + Tuple2, Tuple2, Tuple3> { + + @Override + public void processRecord( + Tuple2 leftRecord, + Tuple2 rightRecord, + Collector> output, + RuntimeContext ctx) + throws Exception { + output.collect(Tuple3.of(leftRecord.f0, leftRecord.f1, rightRecord.f1)); + } + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/util/JoinData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/util/JoinData.java new file mode 100644 index 00000000000000..b22250973814ef --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/join/util/JoinData.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.join.util; + +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.streaming.examples.dsv2.join.Join; + +/** Sample data for the {@link Join} example. */ +public class JoinData { + + public static final Tuple2[] GRADE_DATAS = + new Tuple2[] { + Tuple2.of("tom", 1), + Tuple2.of("jerry", 2), + Tuple2.of("alice", 3), + Tuple2.of("bob", 4), + Tuple2.of("john", 5), + Tuple2.of("grace", 3) + }; + + public static final Tuple2[] SALARY_DATAS = + new Tuple2[] { + Tuple2.of("tom", 2000L), + Tuple2.of("jerry", 4000L), + Tuple2.of("alice", 6000L), + Tuple2.of("bob", 8000L), + Tuple2.of("john", 10000L), + Tuple2.of("grace", 6500L) + }; + + public static final String JOINED_DATAS_AS_TUPLES = + "(tom,1,2000)\n" + + "(jerry,2,4000)\n" + + "(alice,3,6000)\n" + + "(bob,4,8000)\n" + + "(john,5,10000)\n" + + "(grace,3,6500)\n"; +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java new file mode 100644 index 00000000000000..24168b465c9bbb --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/watermark/SyncOffsetByWatermark.java @@ -0,0 +1,193 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.watermark; + +import org.apache.flink.api.common.typeinfo.TypeInformation; +import org.apache.flink.api.common.watermark.LongWatermark; +import org.apache.flink.api.common.watermark.LongWatermarkDeclaration; +import org.apache.flink.api.common.watermark.Watermark; +import org.apache.flink.api.common.watermark.WatermarkDeclaration; +import org.apache.flink.api.common.watermark.WatermarkDeclarations; +import org.apache.flink.api.common.watermark.WatermarkHandlingResult; +import org.apache.flink.api.connector.dsv2.WrappedSource; +import org.apache.flink.api.connector.source.SourceReaderContext; +import org.apache.flink.connector.datagen.source.DataGeneratorSource; +import org.apache.flink.connector.datagen.source.GeneratorFunction; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.NonPartitionedContext; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; + +import java.util.Set; + +/** + * The example simulates Flink reading a message queue. Each partition of the topic has its own + * consumption offset, and the example shows how to synchronise these offsets. + * + *

The example uses a {@link DataGeneratorSource} to simulate a message queue and a custom {@link + * Watermark} to propagate the offset in the stream. + * + *

This example shows how to: + * + *

+ * + *

Please note that if you intend to run this example in an IDE, you must first add the following + * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is necessary because the module + * system in JDK 17+ restricts some reflection operations. + * + *

Please note that the DataStream API V2 is a new set of APIs, to gradually replace the original + * DataStream API. It is currently in the experimental stage and is not fully available for + * production. + */ +public class SyncOffsetByWatermark { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + /** + * The {@link Message} represents the message in the message queue. Each message has a topic, + * partition, offset and associated data. + */ + public static class Message { + public String topic; + public long partition; + public long offset; + public String data; + + public Message(String topic, long partition, long offset, String data) { + this.topic = topic; + this.partition = partition; + this.offset = offset; + this.data = data; + } + } + + /** + * Firstly, we define an offset watermark, which represents the currently consumed offset of the + * partition. Since the watermark needs to convey the offset, its data type is long. To + * determine the minimum offset across all partitions, we utilize the combineFunctionMin() + * method to combine the watermarks. The default handling strategy is forward, meaning that the + * watermark will typically be advanced to downstream operators in most scenarios. Thus, we + * create a WatermarkDeclaration instance that can be used to declare and generate the + * watermark. + */ + public static final LongWatermarkDeclaration OFFSET_WATERMARK_DECLARATION = + WatermarkDeclarations.newBuilder("OFFSET_WATERMARK") + .typeLong() + .combineFunctionMin() + .combineWaitForAllChannels(true) + .defaultHandlingStrategyForward() + .build(); + + public static void main(String[] args) throws Exception { + // obtain execution environment + ExecutionEnvironment env = ExecutionEnvironment.getInstance(); + + // Create a source that simulates a message queue with a parallelism set to 5, indicating + // that the consumed topic has 5 partitions. + // The source will declare and generate offset watermarks. + NonKeyedPartitionStream source = + env.fromSource( + new WrappedSource<>(new MessageQueueSource()), + "message queue source") + .withParallelism(5); + + source.process( + // handle offset watermark in downstream + new OneInputStreamProcessFunction<>() { + @Override + public void processRecord( + Message record, + Collector output, + PartitionedContext ctx) + throws Exception { + // do something + } + + @Override + public WatermarkHandlingResult onWatermark( + Watermark watermark, + Collector output, + NonPartitionedContext ctx) { + if (watermark + .getIdentifier() + .equals(OFFSET_WATERMARK_DECLARATION.getIdentifier())) { + // For our custom offset watermark, we can obtain the combined offset + // here and take further action with it. + long syncedOffset = ((LongWatermark) watermark).getValue(); + System.out.println( + "All partitions have been consumed up to the " + + syncedOffset + + " offset."); + return WatermarkHandlingResult.PEEK; + } + + return WatermarkHandlingResult.PEEK; + } + }); + + // execute program + env.execute("SyncOffsetByWatermark Example"); + } + + private static class MessageQueueSource extends DataGeneratorSource { + + public MessageQueueSource() { + super(new CustomGeneratorFunction(), 1_000_000L, TypeInformation.of(Message.class)); + } + + @Override + public Set declareWatermarks() { + // Declare the offset watermark. + return Set.of(OFFSET_WATERMARK_DECLARATION); + } + } + + private static class CustomGeneratorFunction implements GeneratorFunction { + + // each subtask represents one partition of the topic + private int indexOfSubtask; + + private SourceReaderContext readerContext; + + // The offset of the current partition. + private long offset = 0; + + @Override + public void open(SourceReaderContext readerContext) throws Exception { + this.indexOfSubtask = readerContext.getIndexOfSubtask(); + this.readerContext = readerContext; + } + + @Override + public Message map(Long value) throws Exception { + offset++; + if (offset % 1000 == 0) { + // Send an offset watermark every 1000 records. + readerContext.emitWatermark(OFFSET_WATERMARK_DECLARATION.newWatermark(offset)); + } + + return new Message("default-topic", indexOfSubtask, offset, ""); + } + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/CountProductSalesWindowing.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/CountProductSalesWindowing.java new file mode 100644 index 00000000000000..77bfbb558bc1c6 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/CountProductSalesWindowing.java @@ -0,0 +1,235 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.windowing; + +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.core.fs.Path; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.builtin.BuiltinFuncs; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension; +import org.apache.flink.datastream.api.extension.window.context.OneInputWindowContext; +import org.apache.flink.datastream.api.extension.window.function.OneInputWindowStreamProcessFunction; +import org.apache.flink.datastream.api.extension.window.strategy.WindowStrategy; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.examples.dsv2.windowing.util.CountProductSalesWindowingData; +import org.apache.flink.util.ParameterTool; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Set; + +/** + * Example illustrating how to use Window to count the sales of each product in each hour by + * DataStream API V2. + * + *

The input is a [list of] order, each order is a tuple of productId and orderTime. + * + *

Usage: + * + *

    + *
  • --output <path>The output directory where the Job will write the + * results. If no output path is provided, the Job will print the results to stdout + * . + *
+ * + *

This example shows how to: + * + *

    + *
  • Usage of Window extension in DataStream API V2 + *
+ * + *

Please note that if you intend to run this example in an IDE, you must first add the following + * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is necessary because the module + * system in JDK 17+ restricts some reflection operations. + * + *

Please note that the DataStream API V2 is a new set of APIs, to gradually replace the original + * DataStream API. It is currently in the experimental stage and is not fully available for + * production. + */ +public class CountProductSalesWindowing { + + public static void main(String[] args) throws Exception { + // parse the parameters + final ParameterTool params = ParameterTool.fromArgs(args); + final boolean fileOutput = params.has("output"); + + // Get the execution environment instance. This is the main entrypoint + // to building a Flink application. + final ExecutionEnvironment env = ExecutionEnvironment.getInstance(); + + // create order source stream + NonKeyedPartitionStream> orders = + env.fromSource( + DataStreamV2SourceUtils.fromData( + Arrays.asList(CountProductSalesWindowingData.ORDERS)), + "order source"); + + // extract and propagate event time from order + NonKeyedPartitionStream> orderStream = + orders.process( + EventTimeExtension.>newWatermarkGeneratorBuilder( + order -> order.f1) + .periodicWatermark(Duration.ofMillis(200)) + .buildAsProcessFunction()); + + NonKeyedPartitionStream> productSalesQuantityStream = + orderStream + // key by productId + .keyBy(order -> order.f0) + .process( + BuiltinFuncs.window( + // declare tumbling window with window size 1 hour + WindowStrategy.tumbling( + Duration.ofHours(1), WindowStrategy.EVENT_TIME), + // define window process function to calculate total sales + // quantity per product per window. + // you can choose to use either CountSalesQuantity or + // CountSalesQuantityWithPreAggregation; both + // implementations are equivalent, but the latter is more + // efficient. + new CountSalesQuantity())); + + if (fileOutput) { + productSalesQuantityStream + .toSink( + new WrappedSink<>( + FileSink.>forRowFormat( + new Path(params.get("output")), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize( + MemorySize.ofMebiBytes(1)) + .withRolloverInterval( + Duration.ofSeconds(10)) + .build()) + .build())) + .withName("file-sink"); + } else { + // Print the results to the STDOUT. + productSalesQuantityStream + .toSink(new WrappedSink<>(new PrintSink<>())) + .withName("print-sink"); + } + + env.execute("CountProductSalesWindowing example"); + } + + /** + * Count sales quantity per product. + * + *

We will obtain all orders and calculate the total sales quantity for each product when + * window trigger. + */ + public static class CountSalesQuantity + implements OneInputWindowStreamProcessFunction< + Tuple2, Tuple3> { + + @Override + public void onTrigger( + Collector> output, + PartitionedContext> ctx, + OneInputWindowContext> windowContext) + throws Exception { + // get current productId + int productId = ctx.getStateManager().getCurrentKey(); + // calculate total sales quantity + long totalSalesQuantity = 0; + for (Tuple2 ignored : windowContext.getAllRecords()) { + totalSalesQuantity += 1; + } + // emit result + output.collect(Tuple3.of(productId, windowContext.getStartTime(), totalSalesQuantity)); + } + } + + /** + * Count sales quantity per product. + * + *

Firstly, we declare a state to store the sales quantity for each product. + * + *

When receive a record, we update the sales quantity in state. + * + *

When window trigger, we emit the result. + */ + public static class CountSalesQuantityWithPreAggregation + implements OneInputWindowStreamProcessFunction< + Tuple2, Tuple3> { + + private final ValueStateDeclaration salesQuantityStateDeclaration = + StateDeclarations.valueState("totalSalesQuantity", TypeDescriptors.LONG); + + @Override + public Set useWindowStates() { + return Set.of(salesQuantityStateDeclaration); + } + + @Override + public void onRecord( + Tuple2 record, + Collector> output, + PartitionedContext> ctx, + OneInputWindowContext> windowContext) + throws Exception { + // get sales quantity from state + ValueState salesQuantityState = + windowContext.getWindowState(salesQuantityStateDeclaration).get(); + long salesQuantity = 0; + if (salesQuantityState.value() != null) { + salesQuantity = salesQuantityState.value(); + } + + // update sales quantity in state + salesQuantity += 1; + salesQuantityState.update(salesQuantity); + } + + @Override + public void onTrigger( + Collector> output, + PartitionedContext> ctx, + OneInputWindowContext> windowContext) + throws Exception { + // get current productId + int productId = ctx.getStateManager().getCurrentKey(); + // get sales quantity from state + ValueState salesQuantityState = + windowContext.getWindowState(salesQuantityStateDeclaration).get(); + long salesQuantity = + salesQuantityState.value() == null ? 0 : salesQuantityState.value(); + // emit result + output.collect(Tuple3.of(productId, windowContext.getStartTime(), salesQuantity)); + } + } +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/util/CountProductSalesWindowingData.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/util/CountProductSalesWindowingData.java new file mode 100644 index 00000000000000..2311bf508e8a52 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/windowing/util/CountProductSalesWindowingData.java @@ -0,0 +1,648 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.windowing.util; + +import org.apache.flink.api.java.tuple.Tuple2; + +public class CountProductSalesWindowingData { + public static final Tuple2[] ORDERS = + new Tuple2[] { + Tuple2.of(12, 1738827605463L), + Tuple2.of(18, 1738827619964L), + Tuple2.of(15, 1738827629151L), + Tuple2.of(17, 1738827655996L), + Tuple2.of(8, 1738827656879L), + Tuple2.of(1, 1738827728728L), + Tuple2.of(14, 1738827742940L), + Tuple2.of(19, 1738827790415L), + Tuple2.of(2, 1738827806317L), + Tuple2.of(13, 1738827850379L), + Tuple2.of(20, 1738827893642L), + Tuple2.of(13, 1738827902605L), + Tuple2.of(17, 1738827904012L), + Tuple2.of(5, 1738827914195L), + Tuple2.of(3, 1738827963661L), + Tuple2.of(15, 1738827994974L), + Tuple2.of(15, 1738827995081L), + Tuple2.of(13, 1738828009144L), + Tuple2.of(15, 1738828018218L), + Tuple2.of(15, 1738828024151L), + Tuple2.of(11, 1738828098126L), + Tuple2.of(5, 1738828121406L), + Tuple2.of(13, 1738828240169L), + Tuple2.of(6, 1738828243354L), + Tuple2.of(7, 1738828259861L), + Tuple2.of(13, 1738828260083L), + Tuple2.of(18, 1738828331216L), + Tuple2.of(8, 1738828334313L), + Tuple2.of(10, 1738828348473L), + Tuple2.of(10, 1738828355788L), + Tuple2.of(10, 1738828387079L), + Tuple2.of(16, 1738828410810L), + Tuple2.of(3, 1738828474032L), + Tuple2.of(5, 1738828474798L), + Tuple2.of(16, 1738828535663L), + Tuple2.of(20, 1738828539171L), + Tuple2.of(4, 1738828553591L), + Tuple2.of(16, 1738828554039L), + Tuple2.of(12, 1738828867800L), + Tuple2.of(10, 1738828940472L), + Tuple2.of(8, 1738828948981L), + Tuple2.of(9, 1738828951477L), + Tuple2.of(9, 1738828978236L), + Tuple2.of(16, 1738828999196L), + Tuple2.of(8, 1738829014630L), + Tuple2.of(7, 1738829084652L), + Tuple2.of(3, 1738829122574L), + Tuple2.of(2, 1738829137059L), + Tuple2.of(15, 1738829142778L), + Tuple2.of(10, 1738829166478L), + Tuple2.of(12, 1738829177134L), + Tuple2.of(1, 1738829182585L), + Tuple2.of(4, 1738829220279L), + Tuple2.of(12, 1738829271713L), + Tuple2.of(2, 1738829272979L), + Tuple2.of(19, 1738829320698L), + Tuple2.of(5, 1738829372080L), + Tuple2.of(20, 1738829441404L), + Tuple2.of(16, 1738829531731L), + Tuple2.of(11, 1738829605210L), + Tuple2.of(7, 1738829703933L), + Tuple2.of(14, 1738829738509L), + Tuple2.of(4, 1738829785403L), + Tuple2.of(7, 1738829852807L), + Tuple2.of(4, 1738829857070L), + Tuple2.of(1, 1738829914971L), + Tuple2.of(17, 1738829928438L), + Tuple2.of(6, 1738830021055L), + Tuple2.of(13, 1738830051802L), + Tuple2.of(15, 1738830057909L), + Tuple2.of(17, 1738830092463L), + Tuple2.of(4, 1738830093188L), + Tuple2.of(6, 1738830102955L), + Tuple2.of(4, 1738830131280L), + Tuple2.of(8, 1738830145083L), + Tuple2.of(5, 1738830149836L), + Tuple2.of(15, 1738830204075L), + Tuple2.of(12, 1738830205097L), + Tuple2.of(12, 1738830253262L), + Tuple2.of(2, 1738830256349L), + Tuple2.of(4, 1738830293184L), + Tuple2.of(3, 1738830333088L), + Tuple2.of(17, 1738830344726L), + Tuple2.of(14, 1738830370613L), + Tuple2.of(4, 1738830379395L), + Tuple2.of(2, 1738830380105L), + Tuple2.of(18, 1738830401967L), + Tuple2.of(20, 1738830407920L), + Tuple2.of(17, 1738830435672L), + Tuple2.of(10, 1738830488803L), + Tuple2.of(12, 1738830498444L), + Tuple2.of(6, 1738830626147L), + Tuple2.of(8, 1738830662841L), + Tuple2.of(10, 1738830678251L), + Tuple2.of(5, 1738830695956L), + Tuple2.of(10, 1738830730239L), + Tuple2.of(10, 1738830783642L), + Tuple2.of(11, 1738830792183L), + Tuple2.of(4, 1738830821401L), + Tuple2.of(18, 1738830850832L), + Tuple2.of(9, 1738830870289L), + Tuple2.of(5, 1738830883529L), + Tuple2.of(5, 1738830904292L), + Tuple2.of(18, 1738830905564L), + Tuple2.of(15, 1738830969595L), + Tuple2.of(14, 1738831061005L), + Tuple2.of(2, 1738831069231L), + Tuple2.of(13, 1738831082539L), + Tuple2.of(2, 1738831112545L), + Tuple2.of(19, 1738831124899L), + Tuple2.of(4, 1738831160915L), + Tuple2.of(10, 1738831163250L), + Tuple2.of(14, 1738831173040L), + Tuple2.of(7, 1738831192352L), + Tuple2.of(6, 1738831232780L), + Tuple2.of(10, 1738831340850L), + Tuple2.of(9, 1738831365592L), + Tuple2.of(16, 1738831381108L), + Tuple2.of(5, 1738831385271L), + Tuple2.of(8, 1738831417880L), + Tuple2.of(8, 1738831464820L), + Tuple2.of(3, 1738831466905L), + Tuple2.of(17, 1738831526944L), + Tuple2.of(1, 1738831613742L), + Tuple2.of(14, 1738831676922L), + Tuple2.of(2, 1738831800043L), + Tuple2.of(1, 1738831801935L), + Tuple2.of(12, 1738831822940L), + Tuple2.of(9, 1738831866029L), + Tuple2.of(2, 1738831873145L), + Tuple2.of(9, 1738831891996L), + Tuple2.of(18, 1738831944500L), + Tuple2.of(9, 1738831988124L), + Tuple2.of(16, 1738832034355L), + Tuple2.of(15, 1738832066337L), + Tuple2.of(18, 1738832085720L), + Tuple2.of(14, 1738832102460L), + Tuple2.of(5, 1738832114015L), + Tuple2.of(7, 1738832183928L), + Tuple2.of(11, 1738832204948L), + Tuple2.of(15, 1738832215290L), + Tuple2.of(16, 1738832222286L), + Tuple2.of(14, 1738832232803L), + Tuple2.of(2, 1738832271807L), + Tuple2.of(1, 1738832313456L), + Tuple2.of(4, 1738832380081L), + Tuple2.of(14, 1738832400968L), + Tuple2.of(12, 1738832406097L), + Tuple2.of(11, 1738832444919L), + Tuple2.of(9, 1738832452056L), + Tuple2.of(12, 1738832460809L), + Tuple2.of(14, 1738832478208L), + Tuple2.of(6, 1738832520441L), + Tuple2.of(2, 1738832521812L), + Tuple2.of(9, 1738832605779L), + Tuple2.of(6, 1738832620247L), + Tuple2.of(5, 1738832667279L), + Tuple2.of(1, 1738832676009L), + Tuple2.of(14, 1738832676806L), + Tuple2.of(14, 1738832752059L), + Tuple2.of(5, 1738832769046L), + Tuple2.of(14, 1738832771831L), + Tuple2.of(2, 1738832910117L), + Tuple2.of(9, 1738832918017L), + Tuple2.of(19, 1738832976934L), + Tuple2.of(18, 1738833051214L), + Tuple2.of(3, 1738833051800L), + Tuple2.of(9, 1738833052733L), + Tuple2.of(10, 1738833089467L), + Tuple2.of(17, 1738833153723L), + Tuple2.of(13, 1738833266122L), + Tuple2.of(2, 1738833277046L), + Tuple2.of(2, 1738833296980L), + Tuple2.of(20, 1738833301030L), + Tuple2.of(13, 1738833335269L), + Tuple2.of(4, 1738833354448L), + Tuple2.of(4, 1738833355149L), + Tuple2.of(10, 1738833380455L), + Tuple2.of(4, 1738833394454L), + Tuple2.of(17, 1738833434936L), + Tuple2.of(12, 1738833440089L), + Tuple2.of(18, 1738833488659L), + Tuple2.of(18, 1738833613403L), + Tuple2.of(4, 1738833660877L), + Tuple2.of(17, 1738833673272L), + Tuple2.of(18, 1738833727664L), + Tuple2.of(11, 1738833764724L), + Tuple2.of(8, 1738833812060L), + Tuple2.of(20, 1738833842820L), + Tuple2.of(17, 1738833847021L), + Tuple2.of(2, 1738833860854L), + Tuple2.of(18, 1738833886515L), + Tuple2.of(15, 1738833979346L), + Tuple2.of(10, 1738833997497L), + Tuple2.of(15, 1738834063182L), + Tuple2.of(13, 1738834142541L), + Tuple2.of(5, 1738834175013L), + Tuple2.of(8, 1738834236249L), + Tuple2.of(15, 1738834249623L), + Tuple2.of(9, 1738834249724L), + Tuple2.of(6, 1738834256847L), + Tuple2.of(12, 1738834288210L), + Tuple2.of(16, 1738834295327L), + Tuple2.of(9, 1738834361297L), + Tuple2.of(15, 1738834377938L), + Tuple2.of(5, 1738834389841L), + Tuple2.of(3, 1738834454514L), + Tuple2.of(17, 1738834489949L), + Tuple2.of(3, 1738834650875L), + Tuple2.of(3, 1738834703715L), + Tuple2.of(7, 1738834742847L), + Tuple2.of(15, 1738834762103L), + Tuple2.of(3, 1738834768796L), + Tuple2.of(7, 1738834849578L), + Tuple2.of(3, 1738834883861L), + Tuple2.of(14, 1738834904306L), + Tuple2.of(16, 1738834980011L), + Tuple2.of(12, 1738835005156L), + Tuple2.of(2, 1738835006144L), + Tuple2.of(18, 1738835094576L), + Tuple2.of(14, 1738835134974L), + Tuple2.of(14, 1738835138638L), + Tuple2.of(7, 1738835197712L), + Tuple2.of(7, 1738835205335L), + Tuple2.of(7, 1738835211111L), + Tuple2.of(17, 1738835239469L), + Tuple2.of(12, 1738835301129L), + Tuple2.of(13, 1738835302412L), + Tuple2.of(20, 1738835341487L), + Tuple2.of(12, 1738835411450L), + Tuple2.of(11, 1738835418282L), + Tuple2.of(5, 1738835424892L), + Tuple2.of(3, 1738835457103L), + Tuple2.of(8, 1738835687792L), + Tuple2.of(8, 1738835719691L), + Tuple2.of(6, 1738835774221L), + Tuple2.of(11, 1738835841518L), + Tuple2.of(20, 1738835869567L), + Tuple2.of(2, 1738835871688L), + Tuple2.of(16, 1738835895756L), + Tuple2.of(20, 1738835955023L), + Tuple2.of(20, 1738835956913L), + Tuple2.of(5, 1738835967235L), + Tuple2.of(14, 1738836216116L), + Tuple2.of(14, 1738836287274L), + Tuple2.of(7, 1738836321224L), + Tuple2.of(16, 1738836326261L), + Tuple2.of(9, 1738836335398L), + Tuple2.of(14, 1738836465820L), + Tuple2.of(17, 1738836493626L), + Tuple2.of(16, 1738836517548L), + Tuple2.of(14, 1738836565632L), + Tuple2.of(19, 1738836596179L), + Tuple2.of(7, 1738836597160L), + Tuple2.of(1, 1738836660691L), + Tuple2.of(2, 1738836671508L), + Tuple2.of(18, 1738836680038L), + Tuple2.of(9, 1738836707494L), + Tuple2.of(12, 1738836802813L), + Tuple2.of(5, 1738836840504L), + Tuple2.of(11, 1738836864245L), + Tuple2.of(10, 1738836880658L), + Tuple2.of(15, 1738836950945L), + Tuple2.of(11, 1738836960087L), + Tuple2.of(10, 1738837083672L), + Tuple2.of(4, 1738837102725L), + Tuple2.of(16, 1738837164965L), + Tuple2.of(5, 1738837195535L), + Tuple2.of(20, 1738837216289L), + Tuple2.of(3, 1738837222591L), + Tuple2.of(5, 1738837473017L), + Tuple2.of(3, 1738837550004L), + Tuple2.of(14, 1738837560225L), + Tuple2.of(15, 1738837591274L), + Tuple2.of(19, 1738837598639L), + Tuple2.of(11, 1738837618254L), + Tuple2.of(7, 1738837637964L), + Tuple2.of(19, 1738837777773L), + Tuple2.of(7, 1738837810384L), + Tuple2.of(1, 1738837847185L), + Tuple2.of(19, 1738837899718L), + Tuple2.of(7, 1738837905864L), + Tuple2.of(16, 1738837917632L), + Tuple2.of(18, 1738837933282L), + Tuple2.of(15, 1738837935277L), + Tuple2.of(10, 1738838005898L), + Tuple2.of(1, 1738838011731L), + Tuple2.of(1, 1738838060235L), + Tuple2.of(20, 1738838069640L), + Tuple2.of(10, 1738838073067L), + Tuple2.of(4, 1738838092212L), + Tuple2.of(6, 1738838118626L), + Tuple2.of(17, 1738838157823L), + Tuple2.of(15, 1738838159422L), + Tuple2.of(2, 1738838298794L), + Tuple2.of(18, 1738838305480L), + Tuple2.of(14, 1738838319826L), + Tuple2.of(13, 1738838445639L), + Tuple2.of(2, 1738838456348L), + Tuple2.of(9, 1738838532239L), + Tuple2.of(5, 1738838550401L), + Tuple2.of(3, 1738838617326L), + Tuple2.of(1, 1738838645774L), + Tuple2.of(18, 1738838695246L), + Tuple2.of(20, 1738838729692L), + Tuple2.of(20, 1738838743404L), + Tuple2.of(12, 1738838747799L), + Tuple2.of(16, 1738838824985L), + Tuple2.of(7, 1738838838691L), + Tuple2.of(18, 1738838867587L), + Tuple2.of(7, 1738838877078L), + Tuple2.of(7, 1738838900772L), + Tuple2.of(20, 1738838911424L), + Tuple2.of(15, 1738838938157L), + Tuple2.of(10, 1738838945293L), + Tuple2.of(20, 1738838964177L), + Tuple2.of(4, 1738838984218L), + Tuple2.of(1, 1738839045756L), + Tuple2.of(11, 1738839055647L), + Tuple2.of(17, 1738839077564L), + Tuple2.of(13, 1738839126580L), + Tuple2.of(8, 1738839166396L), + Tuple2.of(6, 1738839173923L), + Tuple2.of(4, 1738839186226L), + Tuple2.of(8, 1738839227382L), + Tuple2.of(11, 1738839270967L), + Tuple2.of(5, 1738839288031L), + Tuple2.of(15, 1738839323158L), + Tuple2.of(19, 1738839346731L), + Tuple2.of(6, 1738839416504L), + Tuple2.of(17, 1738839423399L), + Tuple2.of(11, 1738839439627L), + Tuple2.of(12, 1738839513675L), + Tuple2.of(14, 1738839514821L), + Tuple2.of(2, 1738839522063L), + Tuple2.of(12, 1738839538656L), + Tuple2.of(13, 1738839568142L), + Tuple2.of(16, 1738839577035L), + Tuple2.of(4, 1738839606971L), + Tuple2.of(10, 1738839634570L), + Tuple2.of(11, 1738839636848L), + Tuple2.of(10, 1738839637133L), + Tuple2.of(8, 1738839647995L), + Tuple2.of(11, 1738839676645L), + Tuple2.of(16, 1738839717050L), + Tuple2.of(2, 1738839915923L), + Tuple2.of(9, 1738839928187L), + Tuple2.of(6, 1738839963923L), + Tuple2.of(19, 1738839990359L), + Tuple2.of(2, 1738840050323L), + Tuple2.of(2, 1738840051451L), + Tuple2.of(10, 1738840067284L), + Tuple2.of(13, 1738840077035L), + Tuple2.of(15, 1738840087731L), + Tuple2.of(5, 1738840090884L), + Tuple2.of(8, 1738840214971L), + Tuple2.of(15, 1738840274063L), + Tuple2.of(10, 1738840275988L), + Tuple2.of(8, 1738840320657L), + Tuple2.of(6, 1738840357875L), + Tuple2.of(14, 1738840405138L), + Tuple2.of(12, 1738840413920L), + Tuple2.of(15, 1738840414410L), + Tuple2.of(1, 1738840457487L), + Tuple2.of(14, 1738840518812L), + Tuple2.of(15, 1738840524102L), + Tuple2.of(6, 1738840535756L), + Tuple2.of(14, 1738840565845L), + Tuple2.of(6, 1738840598509L), + Tuple2.of(4, 1738840610788L), + Tuple2.of(10, 1738840708486L), + Tuple2.of(6, 1738840731585L), + Tuple2.of(9, 1738840758294L), + Tuple2.of(6, 1738840776546L), + Tuple2.of(3, 1738840828370L), + Tuple2.of(16, 1738840844239L), + Tuple2.of(14, 1738840856086L), + Tuple2.of(15, 1738840910497L), + Tuple2.of(14, 1738840911851L), + Tuple2.of(5, 1738841144348L), + Tuple2.of(11, 1738841180881L), + Tuple2.of(20, 1738841208378L), + Tuple2.of(13, 1738841288272L), + Tuple2.of(1, 1738841307908L), + Tuple2.of(19, 1738841326457L), + Tuple2.of(3, 1738841361021L), + Tuple2.of(1, 1738841423996L), + Tuple2.of(9, 1738841530442L), + Tuple2.of(17, 1738841665584L), + Tuple2.of(5, 1738841729143L), + Tuple2.of(6, 1738841733469L), + Tuple2.of(18, 1738841852652L), + Tuple2.of(19, 1738841877457L), + Tuple2.of(9, 1738841897520L), + Tuple2.of(19, 1738841906648L), + Tuple2.of(13, 1738841928297L), + Tuple2.of(2, 1738841949768L), + Tuple2.of(10, 1738841957783L), + Tuple2.of(19, 1738841983951L), + Tuple2.of(11, 1738842006637L), + Tuple2.of(5, 1738842013848L), + Tuple2.of(6, 1738842025346L), + Tuple2.of(13, 1738842042970L), + Tuple2.of(9, 1738842134045L), + Tuple2.of(12, 1738842167086L), + Tuple2.of(18, 1738842171590L), + Tuple2.of(14, 1738842233260L), + Tuple2.of(17, 1738842236066L), + Tuple2.of(16, 1738842250813L), + Tuple2.of(9, 1738842271645L), + Tuple2.of(10, 1738842326545L), + Tuple2.of(5, 1738842349475L), + Tuple2.of(9, 1738842387112L), + Tuple2.of(9, 1738842526629L), + Tuple2.of(7, 1738842641394L), + Tuple2.of(5, 1738842827225L), + Tuple2.of(1, 1738842857161L), + Tuple2.of(4, 1738842869189L), + Tuple2.of(6, 1738842894737L), + Tuple2.of(20, 1738842954016L), + Tuple2.of(18, 1738843062283L), + Tuple2.of(3, 1738843071055L), + Tuple2.of(7, 1738843110339L), + Tuple2.of(20, 1738843181706L), + Tuple2.of(16, 1738843189752L), + Tuple2.of(20, 1738843254642L), + Tuple2.of(12, 1738843261805L), + Tuple2.of(20, 1738843288706L), + Tuple2.of(14, 1738843323310L), + Tuple2.of(5, 1738843342296L), + Tuple2.of(1, 1738843351520L), + Tuple2.of(5, 1738843353856L), + Tuple2.of(6, 1738843356311L), + Tuple2.of(16, 1738843424535L), + Tuple2.of(12, 1738843437270L), + Tuple2.of(15, 1738843442059L), + Tuple2.of(5, 1738843470017L), + Tuple2.of(14, 1738843506994L), + Tuple2.of(14, 1738843569956L), + Tuple2.of(11, 1738843594935L), + Tuple2.of(4, 1738843604169L), + Tuple2.of(9, 1738843605954L), + Tuple2.of(5, 1738843606168L), + Tuple2.of(7, 1738843734697L), + Tuple2.of(18, 1738843745272L), + Tuple2.of(8, 1738843814602L), + Tuple2.of(9, 1738843873859L), + Tuple2.of(11, 1738843885694L), + Tuple2.of(15, 1738843903133L), + Tuple2.of(3, 1738843919276L), + Tuple2.of(8, 1738843974952L), + Tuple2.of(14, 1738843991970L), + Tuple2.of(8, 1738843995894L), + Tuple2.of(14, 1738843996724L), + Tuple2.of(19, 1738844084578L), + Tuple2.of(20, 1738844113406L), + Tuple2.of(10, 1738844170699L), + Tuple2.of(3, 1738844239185L), + Tuple2.of(10, 1738844270036L), + Tuple2.of(12, 1738844343365L), + Tuple2.of(17, 1738844361306L), + Tuple2.of(19, 1738844389248L), + Tuple2.of(11, 1738844395635L), + Tuple2.of(15, 1738844465653L), + Tuple2.of(6, 1738844494989L), + Tuple2.of(2, 1738844502021L), + Tuple2.of(8, 1738844506820L), + Tuple2.of(4, 1738844534899L), + Tuple2.of(13, 1738844554162L), + Tuple2.of(19, 1738844602377L), + Tuple2.of(10, 1738844604283L), + Tuple2.of(16, 1738844607841L), + Tuple2.of(1, 1738844620884L), + Tuple2.of(7, 1738844724905L), + Tuple2.of(11, 1738844768055L), + Tuple2.of(1, 1738844795105L), + Tuple2.of(4, 1738844805689L), + Tuple2.of(18, 1738844815720L), + Tuple2.of(9, 1738844861137L), + Tuple2.of(20, 1738844877862L), + Tuple2.of(2, 1738844902789L), + Tuple2.of(12, 1738845009506L), + Tuple2.of(4, 1738845015719L), + Tuple2.of(19, 1738845024817L), + Tuple2.of(4, 1738845235378L), + Tuple2.of(16, 1738845260054L), + Tuple2.of(10, 1738845300193L), + Tuple2.of(16, 1738845321660L), + Tuple2.of(16, 1738845323026L), + Tuple2.of(3, 1738845348496L), + Tuple2.of(3, 1738845374864L), + Tuple2.of(2, 1738845395772L), + Tuple2.of(14, 1738845410106L), + Tuple2.of(10, 1738845444812L), + Tuple2.of(5, 1738845466085L), + Tuple2.of(20, 1738845485756L), + Tuple2.of(8, 1738845487462L), + Tuple2.of(17, 1738845514827L), + Tuple2.of(9, 1738845517975L), + Tuple2.of(4, 1738845560229L), + }; + + public static final String SALES_AS_TUPLE = + "(12,1738825200000,1)\n" + + "(12,1738828800000,7)\n" + + "(12,1738832400000,7)\n" + + "(12,1738836000000,4)\n" + + "(12,1738839600000,2)\n" + + "(12,1738843200000,4)\n" + + "(18,1738825200000,2)\n" + + "(18,1738828800000,5)\n" + + "(18,1738832400000,6)\n" + + "(18,1738836000000,5)\n" + + "(18,1738839600000,3)\n" + + "(18,1738843200000,2)\n" + + "(15,1738825200000,5)\n" + + "(15,1738828800000,6)\n" + + "(15,1738832400000,5)\n" + + "(15,1738836000000,6)\n" + + "(15,1738839600000,5)\n" + + "(15,1738843200000,3)\n" + + "(17,1738825200000,2)\n" + + "(17,1738828800000,5)\n" + + "(17,1738832400000,6)\n" + + "(17,1738836000000,4)\n" + + "(17,1738839600000,2)\n" + + "(17,1738843200000,2)\n" + + "(8,1738825200000,2)\n" + + "(8,1738828800000,6)\n" + + "(8,1738832400000,4)\n" + + "(8,1738836000000,2)\n" + + "(8,1738839600000,3)\n" + + "(8,1738843200000,5)\n" + + "(1,1738825200000,1)\n" + + "(1,1738828800000,5)\n" + + "(1,1738832400000,1)\n" + + "(1,1738836000000,6)\n" + + "(1,1738839600000,4)\n" + + "(1,1738843200000,3)\n" + + "(14,1738825200000,1)\n" + + "(14,1738828800000,7)\n" + + "(14,1738832400000,8)\n" + + "(14,1738836000000,7)\n" + + "(14,1738839600000,6)\n" + + "(14,1738843200000,6)\n" + + "(19,1738825200000,1)\n" + + "(19,1738828800000,2)\n" + + "(19,1738832400000,1)\n" + + "(19,1738836000000,5)\n" + + "(19,1738839600000,5)\n" + + "(19,1738843200000,4)\n" + + "(2,1738825200000,1)\n" + + "(2,1738828800000,9)\n" + + "(2,1738832400000,7)\n" + + "(2,1738836000000,4)\n" + + "(2,1738839600000,4)\n" + + "(2,1738843200000,3)\n" + + "(13,1738825200000,5)\n" + + "(13,1738828800000,2)\n" + + "(13,1738832400000,4)\n" + + "(13,1738836000000,3)\n" + + "(13,1738839600000,4)\n" + + "(13,1738843200000,1)\n" + + "(20,1738825200000,2)\n" + + "(20,1738828800000,2)\n" + + "(20,1738832400000,6)\n" + + "(20,1738836000000,6)\n" + + "(20,1738839600000,3)\n" + + "(20,1738843200000,5)\n" + + "(5,1738825200000,3)\n" + + "(5,1738828800000,7)\n" + + "(5,1738832400000,6)\n" + + "(5,1738836000000,5)\n" + + "(5,1738839600000,6)\n" + + "(5,1738843200000,5)\n" + + "(3,1738825200000,2)\n" + + "(3,1738828800000,3)\n" + + "(3,1738832400000,7)\n" + + "(3,1738836000000,3)\n" + + "(3,1738839600000,3)\n" + + "(3,1738843200000,4)\n" + + "(11,1738825200000,1)\n" + + "(11,1738828800000,3)\n" + + "(11,1738832400000,4)\n" + + "(11,1738836000000,6)\n" + + "(11,1738839600000,4)\n" + + "(11,1738843200000,4)\n" + + "(6,1738825200000,1)\n" + + "(6,1738828800000,4)\n" + + "(6,1738832400000,4)\n" + + "(6,1738836000000,3)\n" + + "(6,1738839600000,9)\n" + + "(6,1738843200000,2)\n" + + "(7,1738825200000,1)\n" + + "(7,1738828800000,5)\n" + + "(7,1738832400000,5)\n" + + "(7,1738836000000,8)\n" + + "(7,1738839600000,2)\n" + + "(7,1738843200000,2)\n" + + "(10,1738825200000,3)\n" + + "(10,1738828800000,8)\n" + + "(10,1738832400000,3)\n" + + "(10,1738836000000,5)\n" + + "(10,1738839600000,7)\n" + + "(10,1738843200000,5)\n" + + "(16,1738825200000,3)\n" + + "(16,1738828800000,5)\n" + + "(16,1738832400000,3)\n" + + "(16,1738836000000,6)\n" + + "(16,1738839600000,4)\n" + + "(16,1738843200000,5)\n" + + "(4,1738825200000,1)\n" + + "(4,1738828800000,10)\n" + + "(4,1738832400000,4)\n" + + "(4,1738836000000,4)\n" + + "(4,1738839600000,3)\n" + + "(4,1738843200000,6)\n" + + "(9,1738828800000,7)\n" + + "(9,1738832400000,6)\n" + + "(9,1738836000000,3)\n" + + "(9,1738839600000,8)\n" + + "(9,1738843200000,4)\n"; +} diff --git a/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java new file mode 100644 index 00000000000000..303c7d91216057 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/main/java/org/apache/flink/streaming/examples/dsv2/wordcount/WordCount.java @@ -0,0 +1,335 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.examples.dsv2.wordcount; + +import org.apache.flink.api.common.RuntimeExecutionMode; +import org.apache.flink.api.common.serialization.SimpleStringEncoder; +import org.apache.flink.api.common.state.StateDeclaration; +import org.apache.flink.api.common.state.StateDeclarations; +import org.apache.flink.api.common.state.ValueStateDeclaration; +import org.apache.flink.api.common.typeinfo.TypeDescriptors; +import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils; +import org.apache.flink.api.connector.dsv2.WrappedSink; +import org.apache.flink.api.connector.dsv2.WrappedSource; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.MemorySize; +import org.apache.flink.connector.file.sink.FileSink; +import org.apache.flink.connector.file.src.FileSource; +import org.apache.flink.connector.file.src.reader.TextLineInputFormat; +import org.apache.flink.datastream.api.ExecutionEnvironment; +import org.apache.flink.datastream.api.common.Collector; +import org.apache.flink.datastream.api.context.PartitionedContext; +import org.apache.flink.datastream.api.function.OneInputStreamProcessFunction; +import org.apache.flink.datastream.api.stream.KeyedPartitionStream; +import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream; +import org.apache.flink.streaming.api.functions.sink.PrintSink; +import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy; +import org.apache.flink.streaming.examples.wordcount.util.CLI; +import org.apache.flink.streaming.examples.wordcount.util.WordCountData; + +import java.time.Duration; +import java.util.Arrays; +import java.util.Set; + +/** + * Implements the "WordCount" program by DataStream API V2 that computes a simple word occurrence + * histogram over text files. This Job can be executed in both streaming and batch execution modes. + * + *

The input is a [list of] plain text file[s] with lines separated by a newline character. + * + *

Usage: + * + *

    + *
  • --input <path>A list of input files and / or directories to read. If no + * input is provided, the program is run with default data from {@link WordCountData}. + *
  • --discovery-interval <duration>Turns the file reader into a continuous + * source that will monitor the provided input directories every interval and read any new + * files. + *
  • --output <path>The output directory where the Job will write the + * results. If no output path is provided, the Job will print the results to stdout + * . + *
  • --execution-mode <mode>The execution mode (BATCH, STREAMING, or + * AUTOMATIC) of this pipeline. + *
+ * + *

This example shows how to: + * + *

    + *
  • Write a simple Flink program by DataStream API V2 + *
  • Use tuple data types + *
  • Write and use a user-defined process function + *
+ * + *

Please note that if you intend to run this example in an IDE, you must first add the following + * VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is necessary because the module + * system in JDK 17+ restricts some reflection operations. + * + *

Please note that the DataStream API V2 is a new set of APIs, to gradually replace the original + * DataStream API. It is currently in the experimental stage and is not fully available for + * production. + */ +public class WordCount { + + // ************************************************************************* + // PROGRAM + // ************************************************************************* + + public static void main(String[] args) throws Exception { + final CLI params = CLI.fromArgs(args); + + // Get the execution environment instance. This is the main entrypoint + // to building a Flink application. + final ExecutionEnvironment env = ExecutionEnvironment.getInstance(); + + // Apache Flink’s unified approach to stream and batch processing means that a DataStream + // application executed over bounded input will produce the same final results regardless + // of the configured execution mode. It is important to note what final means here: a job + // executing in STREAMING mode might produce incremental updates (think upserts in + // a database) while in BATCH mode, it would only produce one final result at the end. The + // final result will be the same if interpreted correctly, but getting there can be + // different. + // + // The “classic” execution behavior of the DataStream API is called STREAMING execution + // mode. Applications should use streaming execution for unbounded jobs that require + // continuous incremental processing and are expected to stay online indefinitely. + // + // By enabling BATCH execution, we allow Flink to apply additional optimizations that we + // can only do when we know that our input is bounded. For example, different + // join/aggregation strategies can be used, in addition to a different shuffle + // implementation that allows more efficient task scheduling and failure recovery behavior. + // + // By setting the runtime mode to AUTOMATIC, Flink will choose BATCH if all sources + // are bounded and otherwise STREAMING. + env.setExecutionMode(params.getExecutionMode()); + + NonKeyedPartitionStream text; + + if (params.getInputs().isPresent()) { + // Create a new file source that will read files from a given set of directories. + // Each file will be processed as plain text and split based on newlines. + FileSource.FileSourceBuilder builder = + FileSource.forRecordStreamFormat( + new TextLineInputFormat(), params.getInputs().get()); + + // If a discovery interval is provided, the source will + // continuously watch the given directories for new files. + params.getDiscoveryInterval().ifPresent(builder::monitorContinuously); + + text = env.fromSource(new WrappedSource<>(builder.build()), "file-input"); + } else { + // Create a new from data source with default data {@code WordCountData}. + text = + env.fromSource( + DataStreamV2SourceUtils.fromData(Arrays.asList(WordCountData.WORDS)), + "in-memory-input"); + } + + KeyedPartitionStream> keyedStream = + // The text lines read from the source are split into words + // using a user-defined process function. The tokenizer, implemented below, + // will output each word as a (2-tuple) containing (word, 1) + text.process(new Tokenizer()) + .withName("tokenizer") + // keyBy groups tuples based on the first field, the word. + // Using a keyBy allows performing aggregations and other + // stateful transformations over data on a per-key basis. + // This is similar to a GROUP BY clause in a SQL query. + .keyBy(value -> value.f0); + + // For each key, we perform a simple sum of the second field, the count by user-defined + // StreamingCounter/BatchCounter process function. + // If the execution mode is streaming, it will continuously output updates each time it sees + // a new instance of each word in the stream. + // If the execution mode is batch, sum will output a final count for + // each word. + // Flink will provide more built-in functions, such as aggregation, in the future. This will + // allow users to avoid having to write their own StreamingCounter or BatchCounter process + // functions in this application. + NonKeyedPartitionStream> counts; + if (params.getExecutionMode() == RuntimeExecutionMode.STREAMING) { + counts = keyedStream.process(new StreamingCounter()); + } else { + counts = keyedStream.process(new BatchCounter()); + } + + if (params.getOutput().isPresent()) { + // Given an output directory, Flink will write the results to a file + // using a simple string encoding. In a production environment, this might + // be something more structured like CSV, Avro, JSON, or Parquet. + counts.toSink( + new WrappedSink<>( + FileSink.>forRowFormat( + params.getOutput().get(), + new SimpleStringEncoder<>()) + .withRollingPolicy( + DefaultRollingPolicy.builder() + .withMaxPartSize( + MemorySize.ofMebiBytes(1)) + .withRolloverInterval( + Duration.ofSeconds(10)) + .build()) + .build())) + .withName("file-sink"); + } else { + // Print the results to the STDOUT. + counts.toSink(new WrappedSink<>(new PrintSink<>())).withName("print-sink"); + } + + // Apache Flink applications are composed lazily. Calling execute + // submits the Job and begins processing. + env.execute("WordCount"); + } + + // ************************************************************************* + // USER PROCESS FUNCTIONS + // ************************************************************************* + + /** + * Implements the string tokenizer that splits sentences into words as a user-defined + * ProcessFunction. The process function takes a line (String) and splits it into multiple pairs + * in the form of "(word,1)" ({@code Tuple2}). + */ + public static final class Tokenizer + implements OneInputStreamProcessFunction> { + + @Override + public void processRecord( + String record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + // normalize and split the line + String[] tokens = record.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (!token.isEmpty()) { + output.collect(new Tuple2<>(token, 1)); + } + } + } + } + + /** + * Implements a word counter as a user-defined ProcessFunction that counts received words in + * streaming mode. The function uses a ValueState to store the count of each word, it will + * update the count of word and output the result when receive a record "(word,1)". + * + *

Note that this is just an example of how to code a streaming job using the DataStream API + * V2. It currently involves some complexity. In the future, we will provide more user-friendly + * APIs and extensions to simplify the process, eliminating the need for users to distinguish + * between process functions in streaming and batch execution modes. + */ + public static final class StreamingCounter + implements OneInputStreamProcessFunction< + Tuple2, Tuple2> { + + // uses a ValueState to store the count of each word + private final ValueStateDeclaration countStateDeclaration = + StateDeclarations.valueState("count", TypeDescriptors.INT); + + @Override + public Set usesStates() { + // declare a ValueState to store the count of each word + return Set.of(countStateDeclaration); + } + + @Override + public void processRecord( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + // calculate the new count of the word + String word = record.f0; + Integer count = record.f1; + Integer previousCount = + ctx.getStateManager().getState(countStateDeclaration).get().value(); + Integer newlyCount = previousCount == null ? count : previousCount + count; + + // update the count of the word + ctx.getStateManager().getState(countStateDeclaration).get().update(newlyCount); + + // output the result + output.collect(Tuple2.of(word, newlyCount)); + } + } + + /** + * Implements a word counter as a user-defined ProcessFunction that counts received words in + * batch mode. The function uses a ValueState to store the count of each word and registers a + * processing timer with Long.MAX_VALUE. This timer will be invoked when all data for the same + * key has been processed, at which point it outputs the result. + * + *

Note that this is just an example of how to code a batch job using the DataStream API V2. + * It currently involves some complexity. In the future, we will provide more user-friendly APIs + * and extensions to simplify the process, eliminating the need for users to distinguish between + * process functions in streaming and batch execution modes. + */ + public static final class BatchCounter + implements OneInputStreamProcessFunction< + Tuple2, Tuple2> { + + // uses a ValueState to store the count of each word + private final ValueStateDeclaration countStateDeclaration = + StateDeclarations.valueState("count", TypeDescriptors.INT); + + @Override + public Set usesStates() { + // declare a ValueState to store the count of each word + return Set.of(countStateDeclaration); + } + + @Override + public void processRecord( + Tuple2 record, + Collector> output, + PartitionedContext> ctx) + throws Exception { + // calculate the new count of the word + Integer count = record.f1; + Integer previousCount = + ctx.getStateManager().getState(countStateDeclaration).get().value(); + Integer newlyCount = previousCount == null ? count : previousCount + count; + + // update the count of the word + ctx.getStateManager().getState(countStateDeclaration).get().update(newlyCount); + + // register a timer to output the result + ctx.getProcessingTimeManager().registerTimer(Long.MAX_VALUE); + } + + @Override + public void onProcessingTimer( + long timestamp, + Collector> output, + PartitionedContext> ctx) { + // this timer will be triggered when all data for the same key has been fully processed + try { + // get the current word and the count of the word + String word = ctx.getStateManager().getCurrentKey(); + Integer count = ctx.getStateManager().getState(countStateDeclaration).get().value(); + + // output the result + output.collect(Tuple2.of(word, count)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } +} diff --git a/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/DSv2ExamplesITCase.java b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/DSv2ExamplesITCase.java new file mode 100644 index 00000000000000..44a0e28a862b32 --- /dev/null +++ b/flink-examples/flink-examples-streaming/src/test/java/org/apache/flink/streaming/test/examples/DSv2ExamplesITCase.java @@ -0,0 +1,108 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.test.examples; + +import org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData; +import org.apache.flink.streaming.examples.dsv2.join.util.JoinData; +import org.apache.flink.streaming.examples.dsv2.windowing.util.CountProductSalesWindowingData; +import org.apache.flink.test.testdata.WordCountData; +import org.apache.flink.test.util.AbstractTestBase; + +import org.junit.jupiter.api.Test; + +import static org.apache.flink.test.util.TestBaseUtils.compareResultsByLinesInMemory; + +/** Integration test for DataStream API V2 examples. */ +class DSv2ExamplesITCase extends AbstractTestBase { + + @Test + void testStreamingWordCount() throws Exception { + final String textPath = createTempFile("text.txt", WordCountData.TEXT); + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.dsv2.wordcount.WordCount.main( + new String[] { + "--input", textPath, + "--output", resultPath, + "--execution-mode", "streaming" + }); + + compareResultsByLinesInMemory(WordCountData.STREAMING_COUNTS_AS_TUPLES, resultPath); + } + + @Test + void testBatchWordCount() throws Exception { + final String textPath = createTempFile("text.txt", WordCountData.TEXT); + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.dsv2.wordcount.WordCount.main( + new String[] { + "--input", textPath, + "--output", resultPath, + "--execution-mode", "batch" + }); + + compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath); + } + + @Test + void testAutomaticWordCount() throws Exception { + final String textPath = createTempFile("text.txt", WordCountData.TEXT); + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.dsv2.wordcount.WordCount.main( + new String[] { + "--input", textPath, + "--output", resultPath, + "--execution-mode", "automatic" + }); + + compareResultsByLinesInMemory(WordCountData.COUNTS_AS_TUPLES, resultPath); + } + + @Test + void testJoin() throws Exception { + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.dsv2.join.Join.main( + new String[] {"--output", resultPath}); + + compareResultsByLinesInMemory(JoinData.JOINED_DATAS_AS_TUPLES, resultPath); + } + + @Test + void testCountProductSalesWindowing() throws Exception { + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.dsv2.windowing.CountProductSalesWindowing.main( + new String[] {"--output", resultPath}); + + compareResultsByLinesInMemory(CountProductSalesWindowingData.SALES_AS_TUPLE, resultPath); + } + + @Test + void testStatisticNewsClickNumber() throws Exception { + final String resultPath = getTempDirPath("result"); + + org.apache.flink.streaming.examples.dsv2.eventtime.StatisticNewsClickNumber.main( + new String[] {"--output", resultPath}); + + compareResultsByLinesInMemory(StatisticNewsClickNumberData.STATISTIC_RESULT, resultPath); + } +}