Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-14068] Replaces org.apache.flink.api.common.time.Time with imp…
Browse files Browse the repository at this point in the history
…ort java.time.Duration.

$ find . -name "*.java" | xargs sed -i 's/import org.apache.flink.api.common.time.Time;/import java.time.Duration;/g'
$ git status | grep modified | grep -o "[^ ]*$" | xargs sed -r -i \
    -e "s/(.*[^a-zA-Z0-9_])Time ([a-zA-Z_,\)]*[\),].*)/\1Duration \2/g" \
    -e 's/(.*) Time (.*)/\1 Duration \2/g' \
    -e 's/(.*)<Time>(.*)/\1<Duration>\2/g' \
    -e 's~(.*[^a-zA-Z0-9_])Time.fromDuration\(([a-zA-Z0-9_\.\(\)]*)\)(.*)~\1\2\3~g' \
    -e 's/(.*[a-zA-Z0-9_\( ])Time\.days\((.*)/\1Duration.ofDays(\2/g' \
    -e 's/(.*[a-zA-Z0-9_\( ])Time\.hours\((.*)/\1Duration.ofHours(\2/g' \
    -e 's/(.*[a-zA-Z0-9_\( ])Time\.minutes\((.*)/\1Duration.ofMinutes(\2/g' \
    -e 's/(.*[a-zA-Z0-9_\( ])Time\.seconds\((.*)/\1Duration.ofSeconds(\2/g' \
    -e 's/(.*[a-zA-Z0-9_\( ])Time\.milliseconds\((.*)/\1Duration.ofMillis(\2/g' \
    -e "s/(.*[^a-zA-Z0-9_])Time\.toDuration\(([a-zA-Z0-9_\.\(\)]*)\)(.*)/\1\2\3/g"
XComp committed Aug 24, 2024
1 parent 8f3aa8b commit c79fa91
Showing 293 changed files with 877 additions and 877 deletions.
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
package org.apache.flink.api.common.state;

import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.util.Preconditions;

import javax.annotation.Nonnegative;
@@ -126,8 +126,8 @@ public StateVisibility getStateVisibility() {
/** @deprecated Use {@link #getTimeToLive()} */
@Deprecated
@Nonnull
public Time getTtl() {
return Time.fromDuration(getTimeToLive());
public Duration getTtl() {
return getTimeToLive();
}

public Duration getTimeToLive() {
@@ -165,7 +165,7 @@ public String toString() {
/** @deprecated Use {@link #newBuilder(Duration)} */
@Deprecated
@Nonnull
public static Builder newBuilder(@Nonnull Time ttl) {
public static Builder newBuilder(@Nonnull Duration ttl) {
return new Builder(ttl);
}

@@ -187,8 +187,8 @@ public static class Builder {

/** @deprecated Use {@link #newBuilder(Duration)} */
@Deprecated
public Builder(@Nonnull Time ttl) {
this(Time.toDuration(ttl));
public Builder(@Nonnull Duration ttl) {
this(ttl);
}

private Builder(Duration ttl) {
@@ -279,7 +279,7 @@ public Builder cleanupFullSnapshot() {
* <p>Note: if no access happens to this state or no records are processed in case of {@code
* runCleanupForEveryRecord}, expired state will persist.
*
* <p>Note: Time spent for the incremental cleanup increases record processing latency.
* <p>Note: Duration spent for the incremental cleanup increases record processing latency.
*
* <p>Note: At the moment incremental cleanup is implemented only for Heap state backend.
* Setting it for RocksDB will have no effect.
@@ -370,8 +370,8 @@ public Builder disableCleanupInBackground() {
*/
@Deprecated
@Nonnull
public Builder setTtl(@Nonnull Time ttl) {
return setTimeToLive(Time.toDuration(ttl));
public Builder setTtl(@Nonnull Duration ttl) {
return setTimeToLive(ttl);
}

public Builder setTimeToLive(Duration ttl) {
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
package org.apache.flink.configuration;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;

@@ -53,13 +53,13 @@ public class ConfigurationUtils {
* @return extracted {@link MetricOptions#SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL} or {@code
* Optional.empty()} if {@link MetricOptions#SYSTEM_RESOURCE_METRICS} are disabled.
*/
public static Optional<Time> getSystemResourceMetricsProbingInterval(
public static Optional<Duration> getSystemResourceMetricsProbingInterval(
Configuration configuration) {
if (!configuration.get(SYSTEM_RESOURCE_METRICS)) {
return Optional.empty();
} else {
return Optional.of(
Time.fromDuration(configuration.get(SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL)));
configuration.get(SYSTEM_RESOURCE_METRICS_PROBING_INTERVAL));
}
}

@@ -138,15 +138,15 @@ public static String parseMapToString(Map<String, String> map) {
return convertToString(map, GlobalConfiguration.isStandardYaml());
}

public static Time getStandaloneClusterStartupPeriodTime(Configuration configuration) {
final Time timeout;
public static Duration getStandaloneClusterStartupPeriodTime(Configuration configuration) {
final Duration timeout;
Duration standaloneClusterStartupPeriodTime =
configuration.get(ResourceManagerOptions.STANDALONE_CLUSTER_STARTUP_PERIOD_TIME);
if (standaloneClusterStartupPeriodTime != null
&& !standaloneClusterStartupPeriodTime.isNegative()) {
timeout = Time.fromDuration(standaloneClusterStartupPeriodTime);
timeout = standaloneClusterStartupPeriodTime;
} else {
timeout = Time.fromDuration(configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT));
timeout = configuration.get(JobManagerOptions.SLOT_REQUEST_TIMEOUT);
}
return timeout;
}
4 changes: 2 additions & 2 deletions flink-core/src/main/java/org/apache/flink/util/TimeUtils.java
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

package org.apache.flink.util;

import org.apache.flink.api.common.time.Time;
import java.time.Duration;

import java.math.BigInteger;
import java.time.Duration;
@@ -279,7 +279,7 @@ private static String createTimeUnitString(TimeUnit timeUnit) {
* @deprecated Use {@link Duration} APIs
*/
@Deprecated
public static Duration toDuration(Time time) {
public static Duration toDuration(Duration time) {
return Duration.of(time.getSize(), toChronoUnit(time.getUnit()));
}

Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@

import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.serialization.SerializerConfigImpl;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
@@ -249,7 +249,7 @@ public void go() {
void testStateTTlConfig() {
ValueStateDescriptor<Integer> stateDescriptor =
new ValueStateDescriptor<>("test-state", IntSerializer.INSTANCE);
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Time.minutes(60)).build());
stateDescriptor.enableTimeToLive(StateTtlConfig.newBuilder(Duration.ofMinutes(60)).build());
assertThat(stateDescriptor.getTtlConfig().isEnabled()).isTrue();

stateDescriptor.enableTimeToLive(StateTtlConfig.DISABLED);
Original file line number Diff line number Diff line change
@@ -20,7 +20,7 @@
import org.apache.flink.api.common.state.StateTtlConfig.CleanupStrategies;
import org.apache.flink.api.common.state.StateTtlConfig.IncrementalCleanupStrategy;
import org.apache.flink.api.common.state.StateTtlConfig.RocksdbCompactFilterCleanupStrategy;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;

import org.junit.jupiter.api.Test;

@@ -37,7 +37,7 @@ class StateTtlConfigTest {
@Test
void testStateTtlConfigBuildWithoutCleanupInBackground() {
StateTtlConfig ttlConfig =
StateTtlConfig.newBuilder(Time.seconds(1)).disableCleanupInBackground().build();
StateTtlConfig.newBuilder(Duration.ofSeconds(1)).disableCleanupInBackground().build();

assertThat(ttlConfig.getCleanupStrategies()).isNotNull();

@@ -55,7 +55,7 @@ void testStateTtlConfigBuildWithoutCleanupInBackground() {

@Test
void testStateTtlConfigBuildWithCleanupInBackground() {
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Time.seconds(1)).build();
StateTtlConfig ttlConfig = StateTtlConfig.newBuilder(Duration.ofSeconds(1)).build();

assertThat(ttlConfig.getCleanupStrategies()).isNotNull();

@@ -83,7 +83,7 @@ void testStateTtlConfigBuildWithNonPositiveCleanupIncrementalSize() {
for (Integer illegalCleanUpSize : illegalCleanUpSizes) {
assertThatThrownBy(
() ->
StateTtlConfig.newBuilder(Time.seconds(1))
StateTtlConfig.newBuilder(Duration.ofSeconds(1))
.cleanupIncrementally(illegalCleanUpSize, false)
.build())
.isInstanceOf(IllegalArgumentException.class);
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

package org.apache.flink.util;

import org.apache.flink.api.common.time.Time;
import java.time.Duration;

import org.junit.jupiter.api.Test;

@@ -166,7 +166,7 @@ void testGetStringInMillis() {

@Test
void testToDuration() {
final Time time = Duration.of(1337, TimeUnit.MICROSECONDS);
final Duration time = Duration.of(1337, TimeUnit.MICROSECONDS);
final Duration duration = TimeUtils.toDuration(time);

assertThat(duration.toNanos()).isEqualTo(time.getUnit().toNanos(time.getSize()));
Original file line number Diff line number Diff line change
@@ -22,7 +22,7 @@
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
Original file line number Diff line number Diff line change
@@ -23,7 +23,7 @@
import org.apache.flink.api.common.serialization.Encoder;
import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

package org.apache.flink.streaming.tests;

import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
@@ -44,14 +44,14 @@ class TtlTestConfig {
final int keySpace;
final long sleepAfterElements;
final long sleepTime;
final Time ttl;
final Duration ttl;
final long reportStatAfterUpdatesNum;

private TtlTestConfig(
int keySpace,
long sleepAfterElements,
long sleepTime,
Time ttl,
Duration ttl,
long reportStatAfterUpdatesNum) {
this.keySpace = keySpace;
this.sleepAfterElements = sleepAfterElements;
@@ -73,8 +73,8 @@ static TtlTestConfig fromArgs(ParameterTool pt) {
pt.getLong(
UPDATE_GENERATOR_SRC_SLEEP_TIME.key(),
UPDATE_GENERATOR_SRC_SLEEP_TIME.defaultValue());
Time ttl =
Time.milliseconds(
Duration ttl =
Duration.ofMillis(
pt.getLong(
STATE_TTL_VERIFIER_TTL_MILLI.key(),
STATE_TTL_VERIFIER_TTL_MILLI.defaultValue()));
Original file line number Diff line number Diff line change
@@ -21,7 +21,7 @@
import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.functions.RuntimeContext;
import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.streaming.api.functions.python.DataStreamPythonFunctionInfo;
@@ -439,7 +439,7 @@ private static FlinkFnApi.CoderInfoDescriptor createCoderInfoDescriptorProto(
public static StateTtlConfig parseStateTtlConfigFromProto(
FlinkFnApi.StateDescriptor.StateTTLConfig stateTTLConfigProto) {
StateTtlConfig.Builder builder =
StateTtlConfig.newBuilder(Time.milliseconds(stateTTLConfigProto.getTtl()))
StateTtlConfig.newBuilder(Duration.ofMillis(stateTTLConfigProto.getTtl()))
.setUpdateType(
parseUpdateTypeFromProto(stateTTLConfigProto.getUpdateType()))
.setStateVisibility(
Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
package org.apache.flink.streaming.api.utils;

import org.apache.flink.api.common.state.StateTtlConfig;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.fnexecution.v1.FlinkFnApi;
import org.apache.flink.python.util.ProtoUtils;

@@ -99,7 +99,7 @@ void testParseStateTtlConfigFromProto() {
.isEqualTo(StateTtlConfig.UpdateType.OnCreateAndWrite);
assertThat(stateTTLConfig.getStateVisibility())
.isEqualTo(StateTtlConfig.StateVisibility.NeverReturnExpired);
assertThat(stateTTLConfig.getTtl()).isEqualTo(Time.milliseconds(1000));
assertThat(stateTTLConfig.getTtl()).isEqualTo(Duration.ofMillis(1000));
assertThat(stateTTLConfig.getTtlTimeCharacteristic())
.isEqualTo(StateTtlConfig.TtlTimeCharacteristic.ProcessingTime);

Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

package org.apache.flink.runtime.rpc.pekko;

import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.FlinkAssertions;
import org.apache.flink.core.testutils.OneShotLatch;
@@ -372,9 +372,9 @@ void testOnStopFutureCompletionDirectlyTerminatesRpcActor() throws Exception {
assertThat(terminationFuture).isNotDone();

final CompletableFuture<Integer> firstAsyncOperationFuture =
asyncOperationGateway.asyncOperation(Time.fromDuration(timeout));
asyncOperationGateway.asyncOperation(timeout);
final CompletableFuture<Integer> secondAsyncOperationFuture =
asyncOperationGateway.asyncOperation(Time.fromDuration(timeout));
asyncOperationGateway.asyncOperation(timeout);

endpoint.awaitEnterAsyncOperation();

@@ -845,7 +845,7 @@ public CompletableFuture<Void> onStop() {
// ------------------------------------------------------------------------

interface AsyncOperationGateway extends RpcGateway {
CompletableFuture<Integer> asyncOperation(@RpcTimeout Time timeout);
CompletableFuture<Integer> asyncOperation(@RpcTimeout Duration timeout);
}

private static class TerminatingAfterOnStopFutureCompletionEndpoint extends RpcEndpoint
@@ -866,7 +866,7 @@ protected TerminatingAfterOnStopFutureCompletionEndpoint(
}

@Override
public CompletableFuture<Integer> asyncOperation(Time timeout) {
public CompletableFuture<Integer> asyncOperation(Duration timeout) {
asyncOperationCounter.incrementAndGet();
enterAsyncOperation.trigger();

Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

package org.apache.flink.runtime.rpc.pekko;

import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.runtime.concurrent.pekko.ScalaFutureUtils;
import org.apache.flink.runtime.rpc.RpcEndpoint;
import org.apache.flink.runtime.rpc.RpcGateway;
@@ -80,7 +80,7 @@ void stopTestEndpoints() {

@Test
void testTimeoutExceptionWithTime() throws Exception {
testTimeoutException(gateway -> gateway.callThatTimesOut(Time.milliseconds(1)));
testTimeoutException(gateway -> gateway.callThatTimesOut(Duration.ofMillis(1)));
}

@Test
@@ -120,7 +120,7 @@ private TestingGateway createTestingGateway() throws Exception {

private interface TestingGateway extends RpcGateway {

CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout);
CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout);

CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout);
}
@@ -132,7 +132,7 @@ private static final class TestingRpcEndpoint extends RpcEndpoint implements Tes
}

@Override
public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Time timeout) {
public CompletableFuture<Void> callThatTimesOut(@RpcTimeout Duration timeout) {
// return a future that never completes, so the call is guaranteed to time out
return new CompletableFuture<>();
}
Original file line number Diff line number Diff line change
@@ -18,7 +18,7 @@

package org.apache.flink.runtime.rpc;

import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.TimeUtils;

Original file line number Diff line number Diff line change
@@ -19,7 +19,7 @@
package org.apache.flink.runtime.webmonitor;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.api.common.time.Time;
import java.time.Duration;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.client.deployment.application.ApplicationRunner;
import org.apache.flink.client.deployment.application.DetachedApplicationRunner;
@@ -68,7 +68,7 @@ public WebSubmissionExtension(
CompletableFuture<String> localAddressFuture,
Path jarDir,
Executor executor,
Time timeout)
Duration timeout)
throws Exception {
this(
configuration,
@@ -89,7 +89,7 @@ public WebSubmissionExtension(
CompletableFuture<String> localAddressFuture,
Path jarDir,
Executor executor,
Time timeout,
Duration timeout,
Supplier<ApplicationRunner> applicationRunnerSupplier)
throws Exception {

Loading

0 comments on commit c79fa91

Please sign in to comment.