Skip to content

Commit

Permalink
[FLINK-37274][datastream] Add examples for DataStream V2
Browse files Browse the repository at this point in the history
  • Loading branch information
codenohup committed Feb 7, 2025
1 parent 5a1ef84 commit cf71206
Show file tree
Hide file tree
Showing 10 changed files with 2,644 additions and 0 deletions.
4 changes: 4 additions & 0 deletions flink-examples/flink-examples-streaming/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,10 @@ under the License.
<!-- Allow users to pass custom jcuda versions -->
<properties>
<jcuda.version>10.0.0</jcuda.version>
<surefire.module.config><!--
DataStream V2 API examples
-->--add-opens=java.base/java.util=ALL-UNNAMED
</surefire.module.config>
</properties>

<dependencies>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,278 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.examples.dsv2.eventtime;

import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.api.connector.dsv2.DataStreamV2SourceUtils;
import org.apache.flink.api.connector.dsv2.WrappedSink;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.datastream.api.ExecutionEnvironment;
import org.apache.flink.datastream.api.common.Collector;
import org.apache.flink.datastream.api.context.PartitionedContext;
import org.apache.flink.datastream.api.extension.eventtime.EventTimeExtension;
import org.apache.flink.datastream.api.extension.eventtime.function.OneInputEventTimeStreamProcessFunction;
import org.apache.flink.datastream.api.extension.eventtime.timer.EventTimeManager;
import org.apache.flink.datastream.api.stream.NonKeyedPartitionStream;
import org.apache.flink.streaming.api.functions.sink.PrintSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.streaming.examples.dsv2.eventtime.util.StatisticNewsClickNumberData;
import org.apache.flink.util.ParameterTool;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* This example illustrates how to count the number of clicks on each news at 1 minute, 5 minutes,
* 10 minutes, and 1 hour after news publication.
*
* <p>The input consists of a series of {@link NewsEvent}s, which fall into two categories: news
* releases and news clicks. Each {@link NewsEvent} contains three components: the event type, the
* news ID and the timestamp. Notably, there is only one event of type {@link NewsEventType#RELEASE}
* for each news.
*
* <p>Usage:
*
* <ul>
* <li><code>--output &lt;path&gt;</code>The output directory where the Job will write the
* results. If no output path is provided, the Job will print the results to <code>stdout
* </code>.
* </ul>
*
* <p>This example shows how to:
*
* <ul>
* <li>Usage of Event Time extension in DataStream API V2
* </ul>
*
* <p>Please note that if you intend to run this example in an IDE, you must first add the following
* VM options: "--add-opens=java.base/java.util=ALL-UNNAMED". This is necessary because the module
* system in JDK 17+ restricts some reflection operations.
*
* <p>Please note that the DataStream API V2 is a new set of APIs, to gradually replace the original
* DataStream API. It is currently in the experimental stage and is not fully available for
* production.
*/
public class StatisticNewsClickNumber {

// we will count the number of clicks within 1min/5min/10min/30min/1hour after the news release
private static final Duration[] TIMES_AFTER_NEWS_RELEASE =
new Duration[] {
Duration.ofMinutes(1),
Duration.ofMinutes(5),
Duration.ofMinutes(10),
Duration.ofMinutes(30),
Duration.ofHours(1)
};

/**
* The type of {@link NewsEvent}, note that only one event of type {@link NewsEventType#RELEASE}
* for each news.
*/
public enum NewsEventType {
RELEASE,
CLICK
}

/**
* The {@link NewsEvent} represents an event on news, containing the event type, news id and the
* timestamp.
*/
public static class NewsEvent {
public NewsEventType type;
public long newsId;
public long timestamp;

public NewsEvent(NewsEventType type, long newsId, long timestamp) {
this.type = type;
this.newsId = newsId;
this.timestamp = timestamp;
}
}

/**
* The {@link NewsClickNumber} represents the number of clicks on news within a specified
* duration following its release. For example, NewsClickNumber{newsId="12345678",
* timeAfterRelease=60000, clickNumber=132} indicates that the news "12345678" has been clicked
* 132 times within 60,000 milliseconds after its release.
*/
public static class NewsClickNumber {
public long newsId;
public long timeAfterRelease;
public long clickNumber;

public NewsClickNumber(long newsId, long timeAfterRelease, long clickCount) {
this.newsId = newsId;
this.timeAfterRelease = timeAfterRelease;
this.clickNumber = clickCount;
}

public String toString() {
return String.format(
"(%d,%d,%d)", this.newsId, this.timeAfterRelease, this.clickNumber);
}
}

public static void main(String[] args) throws Exception {
// parse the parameters
final ParameterTool params = ParameterTool.fromArgs(args);
final boolean fileOutput = params.has("output");

// obtain execution environment
ExecutionEnvironment env = ExecutionEnvironment.getInstance();

// the input consists of a series of {code NewsEvent}s, which include two types: news
// release event and news click event.
NonKeyedPartitionStream<NewsEvent> source =
env.fromSource(
DataStreamV2SourceUtils.fromData(
Arrays.asList(StatisticNewsClickNumberData.NEWS_EVENTS)),
"news event source");

NonKeyedPartitionStream<NewsClickNumber> clickNumberStream =
// extract event time and generate the event time watermark
source.process(
// the timestamp field of the input is considered to be the
// event time
EventTimeExtension.<NewsEvent>newWatermarkGeneratorBuilder(
event -> event.timestamp)
// generate event time watermarks every 200ms
.periodicWatermark(Duration.ofMillis(200))
// if the input is idle for more than 30 seconds, it
// is ignored during the event time watermark
// combination process
.withIdleness(Duration.ofSeconds(30))
// set the maximum out-of-order time of the event to
// 30 seconds, meaning that if an event is received
// at 12:00:00, then no further events should be
// received earlier than 11:59:30
.withMaxOutOfOrderTime(Duration.ofSeconds(10))
// build the event time watermark generator as
// ProcessFunction
.buildAsProcessFunction())
// key by the news id
.keyBy(event -> event.newsId)
// count the click number of each news
.process(
EventTimeExtension.wrapProcessFunction(
new CountNewsClickNumberProcessFunction()));

if (fileOutput) {
// write joined results to file
clickNumberStream
.toSink(
new WrappedSink<>(
FileSink.<NewsClickNumber>forRowFormat(
new Path(params.get("output")),
new SimpleStringEncoder<>())
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(
MemorySize.ofMebiBytes(1))
.withRolloverInterval(
Duration.ofSeconds(10))
.build())
.build()))
.withName("output");
} else {
// Print the results to the STDOUT.
clickNumberStream.toSink(new WrappedSink<>(new PrintSink<>())).withName("print-sink");
}

env.execute("StatisticNewsClickNumberExample");
}

/**
* This process function will consume {@link NewsEvent} and count the number of clicks within 1
* minute, 5 minutes, 10 minutes, 30 minutes and 1 hour of the news releasing and send the
* results {@link NewsClickNumber} to the output.
*
* <p>To achieve the goal, we will register a series of timers for the news, which will be
* triggered at the time of the news's release time + 1 minute/5 minutes/10 minutes/30 minutes/1
* hour, and record a list of the click times of each news. In the timer callback {@code
* onEventTimer}, we will count the number of clicks between the news release time and the timer
* trigger timer and send the result to the output.
*/
public static class CountNewsClickNumberProcessFunction
implements OneInputEventTimeStreamProcessFunction<NewsEvent, NewsClickNumber> {

private EventTimeManager eventTimeManager;

// news id to release time
private final Map<Long, Long> releaseTimeOfNews = new HashMap<>();

// news id to click time list
private final Map<Long, List<Long>> clickTimeListOfNews = new HashMap<>();

@Override
public void initEventTimeProcessFunction(EventTimeManager eventTimeManager) {
this.eventTimeManager = eventTimeManager;
}

@Override
public void processRecord(
NewsEvent record,
Collector<NewsClickNumber> output,
PartitionedContext<NewsClickNumber> ctx)
throws Exception {
if (record.type == NewsEventType.RELEASE) {
// for the news release event, record the release time and register timers
long releaseTime = record.timestamp;
releaseTimeOfNews.put(record.newsId, releaseTime);
for (Duration targetTime : TIMES_AFTER_NEWS_RELEASE) {
eventTimeManager.registerTimer(releaseTime + targetTime.toMillis());
}
} else {
// for the news click event, record the click time
clickTimeListOfNews
.computeIfAbsent(record.newsId, k -> new ArrayList<>())
.add(record.timestamp);
}
}

@Override
public void onEventTimer(
long timestamp,
Collector<NewsClickNumber> output,
PartitionedContext<NewsClickNumber> ctx) {
// get the news that the current event timer belongs to
long newsId = ctx.getStateManager().getCurrentKey();

// calculate the difference between the current time and the news release time
Duration diffTime = Duration.ofMillis(timestamp - releaseTimeOfNews.get(newsId));

// calculate the number of clicks on the news at the current time.
List<Long> clickTimeList = clickTimeListOfNews.get(newsId);
long clickCount = 0;
for (Long clickTime : clickTimeList) {
if (clickTime <= timestamp) {
clickCount++;
}
}

// send the result to output
output.collect(new NewsClickNumber(newsId, diffTime.toMillis(), clickCount));
}
}
}
Loading

0 comments on commit cf71206

Please sign in to comment.