Skip to content

Commit

Permalink
[FLINK-14068][core] Removes API that uses org.apache.flink.api.common…
Browse files Browse the repository at this point in the history
….time.Time
  • Loading branch information
XComp committed Sep 8, 2024
1 parent ad9c4f6 commit 4765e90
Show file tree
Hide file tree
Showing 382 changed files with 1,297 additions and 1,596 deletions.
8 changes: 4 additions & 4 deletions docs/content.zh/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ env.execute()
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -318,7 +318,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand Down Expand Up @@ -438,7 +438,7 @@ ttl_config = StateTtlConfig \
{{< tab "Java" >}}
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -449,7 +449,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand Down
8 changes: 4 additions & 4 deletions docs/content/docs/dev/datastream/fault-tolerance/state.md
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ functionality can then be enabled in any state descriptor by passing the configu
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -357,7 +357,7 @@ stateDescriptor.enableTimeToLive(ttlConfig);
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.state.ValueStateDescriptor
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand Down Expand Up @@ -488,7 +488,7 @@ It can be configured in `StateTtlConfig`:
{{< tab "Java" >}}
```java
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

StateTtlConfig ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand All @@ -499,7 +499,7 @@ StateTtlConfig ttlConfig = StateTtlConfig
{{< tab "Scala" >}}
```scala
import org.apache.flink.api.common.state.StateTtlConfig
import org.apache.flink.api.common.time.Time
import java.time.Duration

val ttlConfig = StateTtlConfig
.newBuilder(Duration.ofSeconds(1))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.executors.EmbeddedExecutor;
Expand Down Expand Up @@ -50,6 +49,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
Expand Down Expand Up @@ -364,10 +364,8 @@ private CompletableFuture<JobResult> getJobResult(
final JobID jobId,
final ScheduledExecutor scheduledExecutor,
final boolean tolerateMissingResult) {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Time retryPeriod =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_RETRY_PERIOD).toMillis());
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
final Duration retryPeriod = configuration.get(ClientOptions.CLIENT_RETRY_PERIOD);
final CompletableFuture<JobResult> jobResultFuture =
JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, scheduledExecutor, timeout, retryPeriod);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.core.execution.JobClient;
import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
Expand All @@ -40,6 +39,7 @@
import javax.annotation.Nullable;

import java.io.IOException;
import java.time.Duration;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
Expand All @@ -59,15 +59,15 @@ public class EmbeddedJobClient implements JobClient, CoordinationRequestGateway

private final ScheduledExecutor retryExecutor;

private final Time timeout;
private final Duration timeout;

private final ClassLoader classLoader;

public EmbeddedJobClient(
final JobID jobId,
final DispatcherGateway dispatcherGateway,
final ScheduledExecutor retryExecutor,
final Time rpcTimeout,
final Duration rpcTimeout,
final ClassLoader classLoader) {
this.jobId = checkNotNull(jobId);
this.dispatcherGateway = checkNotNull(dispatcherGateway);
Expand Down Expand Up @@ -136,7 +136,7 @@ public CompletableFuture<Map<String, Object>> getAccumulators() {
public CompletableFuture<JobExecutionResult> getJobExecutionResult() {
checkNotNull(classLoader);

final Time retryPeriod = Time.milliseconds(100L);
final Duration retryPeriod = Duration.ofMillis(100L);
return JobStatusPollingUtils.getJobResult(
dispatcherGateway, jobId, retryExecutor, timeout, retryPeriod)
.thenApply(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,11 @@
import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
Expand All @@ -50,14 +50,14 @@ static CompletableFuture<JobResult> getJobResult(
final DispatcherGateway dispatcherGateway,
final JobID jobId,
final ScheduledExecutor scheduledExecutor,
final Time rpcTimeout,
final Time retryPeriod) {
final Duration rpcTimeout,
final Duration retryPeriod) {

return pollJobResultAsync(
() -> dispatcherGateway.requestJobStatus(jobId, rpcTimeout),
() -> dispatcherGateway.requestJobResult(jobId, rpcTimeout),
scheduledExecutor,
retryPeriod.toMilliseconds());
retryPeriod.toMillis());
}

@VisibleForTesting
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.dag.Pipeline;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.executors.PipelineExecutorUtils;
Expand All @@ -43,6 +42,7 @@

import java.net.InetSocketAddress;
import java.net.MalformedURLException;
import java.time.Duration;
import java.util.Collection;
import java.util.List;
import java.util.Optional;
Expand Down Expand Up @@ -136,8 +136,7 @@ private CompletableFuture<JobClient> submitAndGetJobClientFuture(
final Configuration configuration,
final ClassLoader userCodeClassloader)
throws MalformedURLException {
final Time timeout =
Time.milliseconds(configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);

final JobGraph jobGraph =
PipelineExecutorUtils.getJobGraph(pipeline, configuration, userCodeClassloader);
Expand Down Expand Up @@ -191,7 +190,7 @@ private static CompletableFuture<JobID> submitJob(
final Configuration configuration,
final DispatcherGateway dispatcherGateway,
final JobGraph jobGraph,
final Time rpcTimeout) {
final Duration rpcTimeout) {
checkNotNull(jobGraph);

LOG.info("Submitting Job with JobId={}.", jobGraph.getJobID());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.JobID;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.cli.ClientOptions;
import org.apache.flink.client.deployment.application.EmbeddedJobClient;
import org.apache.flink.configuration.Configuration;
Expand All @@ -29,6 +28,7 @@
import org.apache.flink.runtime.dispatcher.DispatcherGateway;
import org.apache.flink.util.concurrent.ScheduledExecutor;

import java.time.Duration;
import java.util.Collection;

import static org.apache.flink.util.Preconditions.checkNotNull;
Expand Down Expand Up @@ -81,9 +81,7 @@ public PipelineExecutor getExecutor(final Configuration configuration) {
dispatcherGateway,
configuration,
(jobId, userCodeClassloader) -> {
final Time timeout =
Time.milliseconds(
configuration.get(ClientOptions.CLIENT_TIMEOUT).toMillis());
final Duration timeout = configuration.get(ClientOptions.CLIENT_TIMEOUT);
return new EmbeddedJobClient(
jobId, dispatcherGateway, retryExecutor, timeout, userCodeClassloader);
});
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.accumulators.AccumulatorHelper;
import org.apache.flink.api.common.cache.DistributedCache;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.ClientUtils;
import org.apache.flink.client.program.ClusterClient;
Expand Down Expand Up @@ -300,7 +299,7 @@ public void close() {
TimeUnit.MILLISECONDS,
retryExecutorService);

this.restClient.shutdown(Time.seconds(5));
this.restClient.shutdown(Duration.ofSeconds(5));
ExecutorUtils.gracefulShutdown(5, TimeUnit.SECONDS, this.executorService);

try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@
package org.apache.flink.connector.base.sink;

import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.runtime.client.JobExecutionException;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -29,6 +28,8 @@
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.api.extension.RegisterExtension;

import java.time.Duration;

import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Integration tests of a baseline generic sink that implements the AsyncSinkBase. */
Expand Down Expand Up @@ -66,7 +67,7 @@ public void testFailuresOnPersistingToDestinationAreCaughtAndRaised() {
@Test
public void testThatNoIssuesOccurWhenCheckpointingIsEnabled() throws Exception {
env.enableCheckpointing(20);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(200)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(200)));
env.fromSequence(1, 10_000).map(Object::toString).sinkTo(new ArrayListAsyncSink());
env.execute("Integration Test: AsyncSinkBaseITCase");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.connector.source.Boundedness;
import org.apache.flink.configuration.Configuration;
Expand All @@ -34,6 +33,8 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSource;

import java.time.Duration;

/** Tests the functionality of the {@link FileSink} in BATCH mode. */
class BatchExecutionFileSinkITCase extends FileSinkITBase {

Expand All @@ -49,7 +50,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) {
env.configure(config, getClass().getClassLoader());

if (triggerFailover) {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100)));
} else {
env.setRestartStrategy(RestartStrategies.noRestart());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ExecutionOptions;
import org.apache.flink.core.execution.CheckpointingMode;
Expand All @@ -39,6 +38,7 @@
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;

import java.time.Duration;
import java.util.Collections;
import java.util.Map;
import java.util.UUID;
Expand Down Expand Up @@ -82,7 +82,7 @@ protected JobGraph createJobGraph(boolean triggerFailover, String path) {
env.enableCheckpointing(10, CheckpointingMode.EXACTLY_ONCE);

if (triggerFailover) {
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Time.milliseconds(100)));
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(1, Duration.ofMillis(100)));
} else {
env.setRestartStrategy(RestartStrategies.noRestart());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -236,11 +236,7 @@ void testTableRollingOnProcessingTime(@TempDir java.nio.file.Path tmpDir) throws
Path path = new Path(outDir.toURI());

FileSystemTableSink.TableRollingPolicy tableRollingPolicy =
new FileSystemTableSink.TableRollingPolicy(
false,
Long.MAX_VALUE,
Duration.ofMillis(20).toMillis(),
Duration.ofMillis(10).toMillis());
new FileSystemTableSink.TableRollingPolicy(false, Long.MAX_VALUE, 20L, 10L);

TestRecoverableWriter recoverableWriter = getRecoverableWriter(path);
FileWriterBucket<RowData> bucket =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

package org.apache.flink.streaming.connectors.wikiedits;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.testutils.junit.RetryOnFailure;
Expand All @@ -31,6 +30,7 @@

import java.net.InetSocketAddress;
import java.net.Socket;
import java.time.Duration;
import java.util.Objects;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
Expand All @@ -53,7 +53,7 @@ class WikipediaEditsSourceTest {
@RetryOnFailure(times = 1)
void testWikipediaEditsSource() throws Exception {
if (canConnect(1, TimeUnit.SECONDS)) {
final Time testTimeout = Time.seconds(60);
final Duration testTimeout = Duration.ofSeconds(60);
final WikipediaEditsSource wikipediaEditsSource = new WikipediaEditsSource();

ExecutorService executorService = null;
Expand Down Expand Up @@ -116,8 +116,8 @@ void testWikipediaEditsSource() throws Exception {
}
}

private long deadlineNanos(Time testTimeout) {
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMilliseconds());
private long deadlineNanos(Duration testTimeout) {
return System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(testTimeout.toMillis());
}

private static class CollectingSourceContext<T> implements SourceFunction.SourceContext<T> {
Expand Down
Loading

0 comments on commit 4765e90

Please sign in to comment.