From 740a5674fd92e4a74714329080710023166a82f4 Mon Sep 17 00:00:00 2001 From: Xuannan Date: Thu, 12 Sep 2024 12:21:57 +0800 Subject: [PATCH] [FLINK-34168][config] Refactor callers that use deprecated get/setXXX (#25301) --- ...plitFetcherPauseResumeSplitReaderTest.java | 5 +- .../table/stream/StreamingFileWriterTest.java | 4 +- .../api/common/cache/DistributedCache.java | 21 +- .../api/common/io/BinaryInputFormat.java | 6 +- .../api/common/io/BinaryOutputFormat.java | 5 +- .../flink/api/common/io/FileInputFormat.java | 4 +- .../flink/api/common/operators/Operator.java | 26 -- .../configuration/ConfigurationUtils.java | 20 + .../DelegatingConfiguration.java | 3 +- .../api/common/io/BinaryInputFormatTest.java | 5 +- .../common/io/EnumerateNestedFilesTest.java | 17 +- .../ConfigurationConversionsTest.java | 352 ++++++++---------- .../configuration/ConfigurationTest.java | 115 +++--- .../DelegatingConfigurationTest.java | 44 +-- .../LimitedConnectionsConfigurationTest.java | 11 +- .../AzureBlobStorageFSFactoryTest.java | 3 +- .../LimitedConnectionsConfigurationTest.java | 11 +- .../flink/fs/s3/common/FlinkS3FileSystem.java | 2 +- .../fs/s3/common/S3EntropyFsFactoryTest.java | 3 +- .../common/io/SequentialFormatTestBase.java | 4 +- .../api/java/io/TextInputFormatTest.java | 3 +- .../KubernetesJobManagerParameters.java | 2 +- ...netesApplicationClusterEntrypointTest.java | 6 +- .../InitTaskManagerDecoratorTest.java | 9 +- .../plantranslate/JobGraphGenerator.java | 3 +- .../flink/runtime/jobgraph/JobGraph.java | 11 +- .../runtime/operators/util/TaskConfig.java | 177 +++++---- .../AdaptiveBatchSchedulerFactory.java | 2 +- .../util/ConfigurationParserUtils.java | 2 +- .../streaming/api/graph/StreamConfig.java | 81 ++-- .../TaskExecutorProcessUtilsTest.java | 9 +- .../ExternalResourceUtilsTest.java | 22 +- .../flink/runtime/jobgraph/JobGraphTest.java | 3 +- .../SlotCountExceedingParallelismTest.java | 14 +- .../event/FileSystemJobEventStoreTest.java | 4 +- .../WorkerResourceSpecTest.java | 9 +- ...exParallelismAndInputInfosDeciderTest.java | 4 +- .../DefaultDelegationTokenManagerTest.java | 9 +- ...DelegationTokenReceiverRepositoryTest.java | 3 +- .../runtime/shuffle/ShuffleMasterTest.java | 7 +- .../TaskExecutorFileMergingManagerTest.java | 2 +- .../flink/runtime/taskmanager/TaskTest.java | 4 +- .../recordutils/RecordComparatorFactory.java | 16 +- .../forst/ForStNativeMetricOptionsTest.java | 4 +- .../EmbeddedRocksDBStateBackendTest.java | 7 +- .../state/RocksDBNativeMetricOptionsTest.java | 7 +- ...cksIncrementalCheckpointRescalingTest.java | 2 +- .../StreamExecutionEnvironmentTest.java | 2 +- .../stream/sql/agg/WindowAggregateTest.scala | 7 +- .../StreamingScalabilityAndLatency.java | 5 +- .../flink/test/operators/MapITCase.java | 3 +- .../runtime/NetworkStackThroughputITCase.java | 25 +- .../flink/yarn/YarnClusterDescriptor.java | 6 +- 53 files changed, 591 insertions(+), 540 deletions(-) diff --git a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java index 6487802ae30cc..ef219e4a5381c 100644 --- a/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java +++ b/flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/source/reader/fetcher/SplitFetcherPauseResumeSplitReaderTest.java @@ -45,6 +45,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; +import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -107,9 +108,7 @@ public void testPauseResumeSplitReaders(boolean individualReader) throws Excepti public void testPauseResumeUnsupported(boolean allowUnalignedSourceSplits) throws Exception { final AtomicInteger numSplitReaders = new AtomicInteger(); final Configuration configuration = new Configuration(); - configuration.setBoolean( - "pipeline.watermark-alignment.allow-unaligned-source-splits", - allowUnalignedSourceSplits); + configuration.set(ALLOW_UNALIGNED_SOURCE_SPLITS, allowUnalignedSourceSplits); final MockSplitReader.Builder readerBuilder = SteppingSourceReaderTestHarness.createSplitReaderBuilder(); diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java index 7d58d36ccbca2..f5562f1d19672 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java @@ -384,7 +384,7 @@ private Configuration getPartitionCommitTriggerConf(long commitDelay) { configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); configuration.setString(PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd"); configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "partition-time"); - configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay); + configuration.set(SINK_PARTITION_COMMIT_DELAY, Duration.ofMillis(commitDelay)); configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC"); return configuration; } @@ -393,7 +393,7 @@ private Configuration getProcTimeCommitTriggerConf(long commitDelay) { Configuration configuration = new Configuration(); configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file"); configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "process-time"); - configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay); + configuration.set(SINK_PARTITION_COMMIT_DELAY, Duration.ofMillis(commitDelay)); configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC"); return configuration; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java index b5a4dc76c0c13..a04d1a49d6e8c 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/cache/DistributedCache.java @@ -41,6 +41,9 @@ import java.util.concurrent.Future; import java.util.stream.Collectors; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; + /** * DistributedCache provides static methods to write the registered cache files into job * configuration or decode them from job configuration. It also provides user access to the file @@ -172,12 +175,16 @@ public File getFile(String name) { public static void writeFileInfoToConfig( String name, DistributedCacheEntry e, Configuration conf) { - int num = conf.getInteger(CACHE_FILE_NUM, 0) + 1; - conf.setInteger(CACHE_FILE_NUM, num); + int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0) + 1; + conf.set(getIntConfigOption(CACHE_FILE_NUM), num); conf.setString(CACHE_FILE_NAME + num, name); conf.setString(CACHE_FILE_PATH + num, e.filePath); - conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute()); - conf.setBoolean(CACHE_FILE_DIR + num, e.isZipped || new File(e.filePath).isDirectory()); + conf.set( + getBooleanConfigOption(CACHE_FILE_EXE + num), + e.isExecutable || new File(e.filePath).canExecute()); + conf.set( + getBooleanConfigOption(CACHE_FILE_DIR + num), + e.isZipped || new File(e.filePath).isDirectory()); if (e.blobKey != null) { conf.setBytes(CACHE_FILE_BLOB_KEY + num, e.blobKey); } @@ -185,7 +192,7 @@ public static void writeFileInfoToConfig( public static Set> readFileInfoFromConfig( Configuration conf) { - int num = conf.getInteger(CACHE_FILE_NUM, 0); + int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0); if (num == 0) { return Collections.emptySet(); } @@ -195,8 +202,8 @@ public static Set> readFileInfoFromConfig( for (int i = 1; i <= num; i++) { String name = conf.getString(CACHE_FILE_NAME + i, null); String filePath = conf.getString(CACHE_FILE_PATH + i, null); - boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false); - boolean isDirectory = conf.getBoolean(CACHE_FILE_DIR + i, false); + boolean isExecutable = conf.get(getBooleanConfigOption(CACHE_FILE_EXE + i), false); + boolean isDirectory = conf.get(getBooleanConfigOption(CACHE_FILE_DIR + i), false); byte[] blobKey = conf.getBytes(CACHE_FILE_BLOB_KEY + i, null); cacheFiles.put( diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java index 6d7c7fcdbb95a..b9ed4f1da71f8 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryInputFormat.java @@ -42,6 +42,8 @@ import java.util.Arrays; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; + /** * Base class for all input formats that use blocks of fixed size. The input splits are aligned to * these blocks, meaning that each split will consist of one block. Without configuration, these @@ -90,7 +92,9 @@ public void configure(Configuration parameters) { // overwriting the value set by the setter if (this.blockSize == NATIVE_BLOCK_SIZE) { - long blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE); + long blockSize = + parameters.get( + getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), NATIVE_BLOCK_SIZE); setBlockSize(blockSize); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java index f2dc9ca88835b..cddeb40c163bb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/BinaryOutputFormat.java @@ -27,6 +27,8 @@ import java.io.IOException; import java.io.OutputStream; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; + @Public public abstract class BinaryOutputFormat extends FileOutputFormat { @@ -63,7 +65,8 @@ public void configure(Configuration parameters) { super.configure(parameters); // read own parameters - this.blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE); + this.blockSize = + parameters.get(getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), NATIVE_BLOCK_SIZE); if (this.blockSize < 1 && this.blockSize != NATIVE_BLOCK_SIZE) { throw new IllegalArgumentException( "The block size parameter must be set and larger than 0."); diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index c3fc58fedb04a..547b9bce2a640 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -49,6 +49,7 @@ import java.util.Map; import java.util.Set; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.configuration.TaskManagerOptions.FS_STREAM_OPENING_TIME_OUT; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -459,7 +460,8 @@ public void configure(Configuration parameters) { } if (!this.enumerateNestedFiles) { - this.enumerateNestedFiles = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false); + this.enumerateNestedFiles = + parameters.get(getBooleanConfigOption(ENUMERATE_NESTED_FILES_FLAG), false); } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java index bad2b1891418c..e32f4032100e0 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/operators/Operator.java @@ -131,32 +131,6 @@ public void setParameter(String key, String value) { this.parameters.setString(key, value); } - /** - * Sets a stub parameters in the configuration of this contract. The stub parameters are - * accessible by the user code at runtime. Parameters that the user code needs to access at - * runtime to configure its behavior are typically stored as stub parameters. - * - * @see #getParameters() - * @param key The parameter key. - * @param value The parameter value. - */ - public void setParameter(String key, int value) { - this.parameters.setInteger(key, value); - } - - /** - * Sets a stub parameters in the configuration of this contract. The stub parameters are - * accessible by the user code at runtime. Parameters that the user code needs to access at - * runtime to configure its behavior are typically stored as stub parameters. - * - * @see #getParameters() - * @param key The parameter key. - * @param value The parameter value. - */ - public void setParameter(String key, boolean value) { - this.parameters.setBoolean(key, value); - } - /** * Gets the parallelism for this contract instance. The parallelism denotes how many parallel * instances of the user function will be spawned during the execution. If this value is {@link diff --git a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java index 1802253474ddb..a8ecc139a7930 100755 --- a/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/ConfigurationUtils.java @@ -650,4 +650,24 @@ static boolean removePrefixMap(Map confData, String key) { // Make sure that we cannot instantiate this class private ConfigurationUtils() {} + + public static ConfigOption getBooleanConfigOption(String key) { + return ConfigOptions.key(key).booleanType().noDefaultValue(); + } + + public static ConfigOption getDoubleConfigOption(String key) { + return ConfigOptions.key(key).doubleType().noDefaultValue(); + } + + public static ConfigOption getFloatConfigOption(String key) { + return ConfigOptions.key(key).floatType().noDefaultValue(); + } + + public static ConfigOption getIntConfigOption(String key) { + return ConfigOptions.key(key).intType().noDefaultValue(); + } + + public static ConfigOption getLongConfigOption(String key) { + return ConfigOptions.key(key).longType().noDefaultValue(); + } } diff --git a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java index 0f3748f62f5cc..8a9c0c6f357f5 100644 --- a/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java +++ b/flink-core/src/main/java/org/apache/flink/configuration/DelegatingConfiguration.java @@ -35,6 +35,7 @@ import java.util.Properties; import java.util.Set; +import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; import static org.apache.flink.configuration.FallbackKey.createDeprecatedKey; /** @@ -183,7 +184,7 @@ public boolean getBoolean(ConfigOption configOption, boolean overrideDe @Override public float getFloat(String key, float defaultValue) { - return this.backingConfig.getFloat(this.prefix + key, defaultValue); + return this.backingConfig.get(getFloatConfigOption(this.prefix + key), defaultValue); } @Override diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java index ab4791f88f2b2..9add3efd7ec63 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/BinaryInputFormatTest.java @@ -33,6 +33,7 @@ import java.io.IOException; import java.nio.file.Path; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Fail.fail; @@ -67,7 +68,7 @@ void testCreateInputSplitsWithOneFile() throws IOException { "test_create_input_splits_with_one_file", blockSize, numBlocks); final Configuration config = new Configuration(); - config.setLong("input.block_size", blockSize + 10); + config.set(getLongConfigOption("input.block_size"), blockSize + 10L); final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); inputFormat.setFilePath(tempFile.toURI().toString()); @@ -184,7 +185,7 @@ void testCreateInputSplitsWithEmptySplit() throws IOException { "test_create_input_splits_with_empty_split", blockSize, numBlocks); final Configuration config = new Configuration(); - config.setLong("input.block_size", blockSize + 10); + config.set(getLongConfigOption("input.block_size"), blockSize + 10L); final BinaryInputFormat inputFormat = new MyBinaryInputFormat(); inputFormat.setFilePath(tempFile.toURI().toString()); diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java index 815981c2a18e6..1c78287d8cf33 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/EnumerateNestedFilesTest.java @@ -34,6 +34,7 @@ import java.io.File; import java.io.IOException; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.assertj.core.api.Assertions.assertThat; class EnumerateNestedFilesTest { @@ -63,7 +64,7 @@ void testNoNestedDirectoryTrue() throws IOException { String filePath = TestFileUtils.createTempFile("foo"); this.format.setFilePath(new Path(filePath)); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); FileInputSplit[] splits = format.createInputSplits(1); @@ -85,7 +86,7 @@ void testOneNestedDirectoryTrue() throws IOException { TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua"); this.format.setFilePath(new Path(nestedDir.toURI().toString())); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); FileInputSplit[] splits = format.createInputSplits(1); @@ -107,7 +108,7 @@ void testOneNestedDirectoryFalse() throws IOException { TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua"); this.format.setFilePath(new Path(nestedDir.toURI().toString())); - this.config.setBoolean("recursive.file.enumeration", false); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), false); format.configure(this.config); FileInputSplit[] splits = format.createInputSplits(1); @@ -135,7 +136,7 @@ void testTwoNestedDirectoriesTrue() throws IOException { TestFileUtils.createTempFileInDirectory(nestedNestedDir.getAbsolutePath(), "bravas"); this.format.setFilePath(new Path(nestedDir.toURI().toString())); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); FileInputSplit[] splits = format.createInputSplits(1); @@ -165,7 +166,7 @@ void testOnlyLevel2NestedDirectories() throws IOException { TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "bravas"); this.format.setFilePath(new Path(testDir.getAbsolutePath())); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); FileInputSplit[] splits = format.createInputSplits(1); @@ -208,7 +209,7 @@ void testTwoNestedDirectoriesWithFilteredFilesTrue() throws IOException { nestedNestedDirFiltered.getAbsolutePath(), "bravas"); this.format.setFilePath(new Path(nestedDir.toURI().toString())); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); FileInputSplit[] splits = format.createInputSplits(1); @@ -229,7 +230,7 @@ void testGetStatisticsOneFileInNestedDir() throws IOException { TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), SIZE); this.format.setFilePath(new Path(nestedDir.toURI().toString())); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); BaseStatistics stats = format.getStatistics(null); @@ -262,7 +263,7 @@ void testGetStatisticsMultipleNestedFiles() throws IOException, InterruptedExcep TestFileUtils.createTempFileInDirectory(insideNestedDir2.getAbsolutePath(), SIZE4); this.format.setFilePath(new Path(nestedDir.toURI().toString())); - this.config.setBoolean("recursive.file.enumeration", true); + this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true); format.configure(this.config); BaseStatistics stats = format.getStatistics(null); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java index c88f1037bf7ce..cf59fc0ccbcca 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationConversionsTest.java @@ -34,6 +34,11 @@ import java.util.Objects; import java.util.Optional; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -57,311 +62,250 @@ class ConfigurationConversionsTest { void init() { pc = new Configuration(); - pc.setInteger("int", 5); - pc.setLong("long", 15); - pc.setLong("too_long", TOO_LONG); - pc.setFloat("float", 2.1456775f); - pc.setDouble("double", Math.PI); - pc.setDouble("negative_double", -1.0); - pc.setDouble("zero", 0.0); - pc.setDouble("too_long_double", TOO_LONG_DOUBLE); + pc.set(getIntConfigOption("int"), 5); + pc.set(getLongConfigOption("long"), 15L); + pc.set(getLongConfigOption("too_long"), TOO_LONG); + pc.set(getFloatConfigOption("float"), 2.1456775f); + pc.set(getDoubleConfigOption("double"), Math.PI); + pc.set(getDoubleConfigOption("negative_double"), -1.0); + pc.set(getDoubleConfigOption("zero"), 0.0); + pc.set(getDoubleConfigOption("too_long_double"), TOO_LONG_DOUBLE); pc.setString("string", "42"); pc.setString("non_convertible_string", "bcdefg&&"); - pc.setBoolean("boolean", true); + pc.set(getBooleanConfigOption("boolean"), true); } @Parameters(name = "testSpec={0}") private static Collection getSpecs() { return Arrays.asList( // from integer - TestSpec.whenAccessed(conf -> conf.getInteger("int", 0)).expect(5), - TestSpec.whenAccessed(conf -> conf.getLong("int", 0)).expect(5L), - TestSpec.whenAccessed(conf -> conf.getFloat("int", 0)).expect(5f), - TestSpec.whenAccessed(conf -> conf.getDouble("int", 0)).expect(5.0), - TestSpec.whenAccessed(conf -> conf.getBoolean("int", true)) - .expectException( - "Unrecognized option for boolean: 5. Expected either true or false(case insensitive)"), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("int"), 0)).expect(5), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("int"), 0L)).expect(5L), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("int"), 0f)).expect(5f), + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("int"), 0.0)) + .expect(5.0), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("int"), true)) + .expectException("Could not parse value '5' for key 'int'."), TestSpec.whenAccessed(conf -> conf.getString("int", "0")).expect("5"), TestSpec.whenAccessed(conf -> conf.getBytes("int", EMPTY_BYTES)) .expectException("Configuration cannot evaluate value 5 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "int", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Integer as a class name"), // from long - TestSpec.whenAccessed(conf -> conf.getInteger("long", 0)).expect(15), - TestSpec.whenAccessed(conf -> conf.getLong("long", 0)).expect(15L), - TestSpec.whenAccessed(conf -> conf.getFloat("long", 0)).expect(15f), - TestSpec.whenAccessed(conf -> conf.getDouble("long", 0)).expect(15.0), - TestSpec.whenAccessed(conf -> conf.getBoolean("long", true)) - .expectException( - "Unrecognized option for boolean: 15. Expected either true or false(case insensitive)"), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("long"), 0)).expect(15), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("long"), 0L)) + .expect(15L), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("long"), 0f)) + .expect(15f), + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("long"), 0.0)) + .expect(15.0), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("long"), true)) + .expectException("Could not parse value '15' for key 'long'."), TestSpec.whenAccessed(conf -> conf.getString("long", "0")).expect("15"), TestSpec.whenAccessed(conf -> conf.getBytes("long", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 15 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "long", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Long as a class name"), // from too long - TestSpec.whenAccessed(conf -> conf.getInteger("too_long", 0)) - .expectException( - "Configuration value 2147483657 overflows/underflows the integer type"), - TestSpec.whenAccessed(conf -> conf.getLong("too_long", 0)).expect(TOO_LONG), - TestSpec.whenAccessed(conf -> conf.getFloat("too_long", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("too_long"), 0)) + .expectException("Could not parse value '2147483657' for key 'too_long'."), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("too_long"), 0L)) + .expect(TOO_LONG), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("too_long"), 0f)) .expect((float) TOO_LONG), - TestSpec.whenAccessed(conf -> conf.getDouble("too_long", 0)) + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("too_long"), 0.0)) .expect((double) TOO_LONG), - TestSpec.whenAccessed(conf -> conf.getBoolean("too_long", true)) - .expectException( - "Unrecognized option for boolean: 2147483657. Expected either true or false(case insensitive)"), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("too_long"), true)) + .expectException("Could not parse value '2147483657' for key 'too_long'."), TestSpec.whenAccessed(conf -> conf.getString("too_long", "0")) .expect(String.valueOf(TOO_LONG)), TestSpec.whenAccessed(conf -> conf.getBytes("too_long", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 2147483657 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "too_long", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Long as a class name"), // from float - TestSpec.whenAccessed(conf -> conf.getInteger("float", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("float"), 0)) .expectException( - "For input string: \"2.1456776\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getLong("float", 0)) + "Could not parse value '2.1456776' for key 'float'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("float"), 0L)) .expectException( - "For input string: \"2.1456776\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getFloat("float", 0)).expect(2.1456775f), - TestSpec.whenAccessed(conf -> conf.getDouble("float", 0)) + "Could not parse value '2.1456776' for key 'float'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("float"), 0f)) + .expect(2.1456775f), + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("float"), 0.0)) .expect( new Condition<>( d -> Math.abs(d - 2.1456775) < 0.0000001, "Expected value")), - TestSpec.whenAccessed(conf -> conf.getBoolean("float", true)) - .expectException( - "Unrecognized option for boolean: 2.1456776. Expected either true or false(case insensitive)"), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("float"), true)) + .expectException("Could not parse value '2.1456776' for key 'float'."), TestSpec.whenAccessed(conf -> conf.getString("float", "0")) .expect(new Condition<>(s -> s.startsWith("2.145677"), "Expected value")), TestSpec.whenAccessed(conf -> conf.getBytes("float", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 2.1456776 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "float", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "onfiguration cannot evaluate object of class class java.lang.Float as a class name"), // from double - TestSpec.whenAccessed(conf -> conf.getInteger("double", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("double"), 0)) .expectException( - "For input string: \"3.141592653589793\"", - NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getLong("double", 0)) + "Could not parse value '3.141592653589793' for key 'double'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("double"), 0L)) .expectException( - "For input string: \"3.141592653589793\"", - NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getFloat("double", 0)) + "Could not parse value '3.141592653589793' for key 'double'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("double"), 0f)) .expect(new IsCloseTo(3.141592f, 0.000001f)), - TestSpec.whenAccessed(conf -> conf.getDouble("double", 0)).expect(Math.PI), - TestSpec.whenAccessed(conf -> conf.getBoolean("double", true)) + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("double"), 0.0)) + .expect(Math.PI), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("double"), true)) .expectException( - "Unrecognized option for boolean: 3.141592653589793. Expected either true or false(case insensitive)"), + "Could not parse value '3.141592653589793' for key 'double'."), TestSpec.whenAccessed(conf -> conf.getString("double", "0")) .expect(new Condition<>(s -> s.startsWith("3.141592"), "Expected value")), TestSpec.whenAccessed(conf -> conf.getBytes("double", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 3.141592653589793 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "double", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "onfiguration cannot evaluate object of class class java.lang.Double as a class name"), // from negative double - TestSpec.whenAccessed(conf -> conf.getInteger("negative_double", 0)) - .expectException("For input string: \"-1.0\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getLong("negative_double", 0)) - .expectException("For input string: \"-1.0\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getFloat("negative_double", 0)) - .expect(new IsCloseTo(-1f, 0.000001f)), - TestSpec.whenAccessed(conf -> conf.getDouble("negative_double", 0)).expect(-1D), - TestSpec.whenAccessed(conf -> conf.getBoolean("negative_double", true)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("negative_double"), 0)) .expectException( - "Unrecognized option for boolean: -1.0. Expected either true or false(case insensitive)"), + "Could not parse value '-1.0' for key 'negative_double'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("negative_double"), 0L)) + .expectException( + "Could not parse value '-1.0' for key 'negative_double'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("negative_double"), 0f)) + .expect(new IsCloseTo(-1f, 0.000001f)), + TestSpec.whenAccessed( + conf -> conf.get(getDoubleConfigOption("negative_double"), 0.0)) + .expect(-1D), + TestSpec.whenAccessed( + conf -> conf.get(getBooleanConfigOption("negative_double"), true)) + .expectException("Could not parse value '-1.0' for key 'negative_double'."), TestSpec.whenAccessed(conf -> conf.getString("negative_double", "0")) .expect(new Condition<>(s -> s.startsWith("-1.0"), "Expected value")), TestSpec.whenAccessed(conf -> conf.getBytes("negative_double", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value -1.0 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "negative_double", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Double as a class name"), // from zero - TestSpec.whenAccessed(conf -> conf.getInteger("zero", 0)) - .expectException("For input string: \"0.0\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getLong("zero", 0)) - .expectException("For input string: \"0.0\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getFloat("zero", 0)) - .expect(new IsCloseTo(0f, 0.000001f)), - TestSpec.whenAccessed(conf -> conf.getDouble("zero", 0)).expect(0D), - TestSpec.whenAccessed(conf -> conf.getBoolean("zero", true)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("zero"), 0)) + .expectException( + "Could not parse value '0.0' for key 'zero'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("zero"), 0L)) .expectException( - "Unrecognized option for boolean: 0.0. Expected either true or false(case insensitive)"), + "Could not parse value '0.0' for key 'zero'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("zero"), 0f)) + .expect(new IsCloseTo(0f, 0.000001f)), + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("zero"), 0.0)) + .expect(0D), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("zero"), true)) + .expectException("Could not parse value '0.0' for key 'zero'."), TestSpec.whenAccessed(conf -> conf.getString("zero", "0")) .expect(new Condition<>(s -> s.startsWith("0"), "Expected value")), TestSpec.whenAccessed(conf -> conf.getBytes("zero", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 0.0 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "zero", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Double as a class name"), // from too long double - TestSpec.whenAccessed(conf -> conf.getInteger("too_long_double", 0)) + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("too_long_double"), 0)) .expectException( - "For input string: \"1.7976931348623157E308\"", - NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getLong("too_long_double", 0)) + "Could not parse value '1.7976931348623157E308' for key 'too_long_double'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("too_long_double"), 0L)) .expectException( - "For input string: \"1.7976931348623157E308\"", - NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getFloat("too_long_double", 0)) + "Could not parse value '1.7976931348623157E308' for key 'too_long_double'.", + IllegalArgumentException.class), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("too_long_double"), 0f)) .expectException( - "Configuration value 1.7976931348623157E308 overflows/underflows the float type."), - TestSpec.whenAccessed(conf -> conf.getDouble("too_long_double", 0)) + "Could not parse value '1.7976931348623157E308' for key 'too_long_double'."), + TestSpec.whenAccessed( + conf -> conf.get(getDoubleConfigOption("too_long_double"), 0.0)) .expect(TOO_LONG_DOUBLE), - TestSpec.whenAccessed(conf -> conf.getBoolean("too_long_double", true)) + TestSpec.whenAccessed( + conf -> conf.get(getBooleanConfigOption("too_long_double"), true)) .expectException( - "Unrecognized option for boolean: 1.7976931348623157E308. Expected either true or false(case insensitive)"), + "Could not parse value '1.7976931348623157E308' for key 'too_long_double'."), TestSpec.whenAccessed(conf -> conf.getString("too_long_double", "0")) .expect(String.valueOf(TOO_LONG_DOUBLE)), TestSpec.whenAccessed(conf -> conf.getBytes("too_long_double", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 1.7976931348623157E308 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "too_long_double", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Double as a class name"), // from string - TestSpec.whenAccessed(conf -> conf.getInteger("string", 0)).expect(42), - TestSpec.whenAccessed(conf -> conf.getLong("string", 0)).expect(42L), - TestSpec.whenAccessed(conf -> conf.getFloat("string", 0)).expect(42f), - TestSpec.whenAccessed(conf -> conf.getDouble("string", 0)).expect(42.0), - TestSpec.whenAccessed(conf -> conf.getBoolean("string", true)) - .expectException( - "Unrecognized option for boolean: 42. Expected either true or false(case insensitive)"), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("string"), 0)).expect(42), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("string"), 0L)) + .expect(42L), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("string"), 0f)) + .expect(42f), + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("string"), 0.0)) + .expect(42.0), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("string"), true)) + .expectException("Could not parse value '42' for key 'string'."), TestSpec.whenAccessed(conf -> conf.getString("string", "0")).expect("42"), TestSpec.whenAccessed(conf -> conf.getBytes("string", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value 42 as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "string", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException("42", ClassNotFoundException.class), // from non convertible string - TestSpec.whenAccessed(conf -> conf.getInteger("non_convertible_string", 0)) + TestSpec.whenAccessed( + conf -> conf.get(getIntConfigOption("non_convertible_string"), 0)) .expectException( - "For input string: \"bcdefg&&\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getLong("non_convertible_string", 0)) + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", + IllegalArgumentException.class), + TestSpec.whenAccessed( + conf -> conf.get(getLongConfigOption("non_convertible_string"), 0L)) .expectException( - "For input string: \"bcdefg&&\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getFloat("non_convertible_string", 0)) + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", + IllegalArgumentException.class), + TestSpec.whenAccessed( + conf -> + conf.get( + getFloatConfigOption("non_convertible_string"), 0f)) .expectException( - "For input string: \"bcdefg&&\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getDouble("non_convertible_string", 0)) + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", + IllegalArgumentException.class), + TestSpec.whenAccessed( + conf -> + conf.get( + getDoubleConfigOption("non_convertible_string"), + 0.0)) .expectException( - "For input string: \"bcdefg&&\"", NumberFormatException.class), - TestSpec.whenAccessed(conf -> conf.getBoolean("non_convertible_string", true)) + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'.", + IllegalArgumentException.class), + TestSpec.whenAccessed( + conf -> + conf.get( + getBooleanConfigOption("non_convertible_string"), + true)) .expectException( - "Unrecognized option for boolean: bcdefg&&. Expected either true or false(case insensitive)"), + "Could not parse value 'bcdefg&&' for key 'non_convertible_string'."), TestSpec.whenAccessed(conf -> conf.getString("non_convertible_string", "0")) .expect("bcdefg&&"), TestSpec.whenAccessed(conf -> conf.getBytes("non_convertible_string", EMPTY_BYTES)) .expectException( "Configuration cannot evaluate value bcdefg&& as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "non_convertible_string", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException("bcdefg&&", ClassNotFoundException.class), // from boolean - TestSpec.whenAccessed(conf -> conf.getInteger("boolean", 0)) - .expectException("For input string: \"true\""), - TestSpec.whenAccessed(conf -> conf.getLong("boolean", 0)) - .expectException("For input string: \"true\""), - TestSpec.whenAccessed(conf -> conf.getFloat("boolean", 0)) - .expectException("For input string: \"true\""), - TestSpec.whenAccessed(conf -> conf.getDouble("boolean", 0)) - .expectException("For input string: \"true\""), - TestSpec.whenAccessed(conf -> conf.getBoolean("boolean", false)).expect(true), + TestSpec.whenAccessed(conf -> conf.get(getIntConfigOption("boolean"), 0)) + .expectException("Could not parse value 'true' for key 'boolean'."), + TestSpec.whenAccessed(conf -> conf.get(getLongConfigOption("boolean"), 0L)) + .expectException("Could not parse value 'true' for key 'boolean'."), + TestSpec.whenAccessed(conf -> conf.get(getFloatConfigOption("boolean"), 0f)) + .expectException("Could not parse value 'true' for key 'boolean'."), + TestSpec.whenAccessed(conf -> conf.get(getDoubleConfigOption("boolean"), 0.0)) + .expectException("Could not parse value 'true' for key 'boolean'."), + TestSpec.whenAccessed(conf -> conf.get(getBooleanConfigOption("boolean"), false)) + .expect(true), TestSpec.whenAccessed(conf -> conf.getString("boolean", "0")).expect("true"), TestSpec.whenAccessed(conf -> conf.getBytes("boolean", EMPTY_BYTES)) .expectException( - "Configuration cannot evaluate value true as a byte[] value"), - TestSpec.whenAccessed( - conf -> - conf.getClass( - "boolean", - ConfigurationConversionsTest.class, - ConfigurationConversionsTest.class - .getClassLoader())) - .expectException( - "Configuration cannot evaluate object of class class java.lang.Boolean as a class name")); + "Configuration cannot evaluate value true as a byte[] value")); } @TestTemplate diff --git a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java index 7fde75c5b32de..c61819fa08826 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/ConfigurationTest.java @@ -32,6 +32,11 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -70,24 +75,22 @@ class ConfigurationTest { void testConfigurationSerializationAndGetters() throws ClassNotFoundException, IOException { final Configuration orig = new Configuration(); orig.setString("mykey", "myvalue"); - orig.setInteger("mynumber", 100); - orig.setLong("longvalue", 478236947162389746L); - orig.setFloat("PI", 3.1415926f); - orig.setDouble("E", Math.E); - orig.setBoolean("shouldbetrue", true); + orig.set(getIntConfigOption("mynumber"), 100); + orig.set(getLongConfigOption("longvalue"), 478236947162389746L); + orig.set(getFloatConfigOption("PI"), 3.1415926f); + orig.set(getDoubleConfigOption("E"), Math.E); + orig.set(getBooleanConfigOption("shouldbetrue"), true); orig.setBytes("bytes sequence", new byte[] {1, 2, 3, 4, 5}); - orig.setClass("myclass", this.getClass()); final Configuration copy = InstantiationUtil.createCopyWritable(orig); assertThat("myvalue").isEqualTo(copy.getString("mykey", "null")); - assertThat(copy.getInteger("mynumber", 0)).isEqualTo(100); - assertThat(copy.getLong("longvalue", 0L)).isEqualTo(478236947162389746L); - assertThat(copy.getFloat("PI", 3.1415926f)).isCloseTo(3.1415926f, Offset.offset(0.0f)); - assertThat(copy.getDouble("E", 0.0)).isCloseTo(Math.E, Offset.offset(0.0)); - assertThat(copy.getBoolean("shouldbetrue", false)).isTrue(); + assertThat(copy.get(getIntConfigOption("mynumber"), 0)).isEqualTo(100); + assertThat(copy.get(getLongConfigOption("longvalue"), 0L)).isEqualTo(478236947162389746L); + assertThat(copy.get(getFloatConfigOption("PI"), 3.1415926f)) + .isCloseTo(3.1415926f, Offset.offset(0.0f)); + assertThat(copy.get(getDoubleConfigOption("E"), 0.0)).isCloseTo(Math.E, Offset.offset(0.0)); + assertThat(copy.get(getBooleanConfigOption("shouldbetrue"), false)).isTrue(); assertThat(copy.getBytes("bytes sequence", null)).containsExactly(1, 2, 3, 4, 5); - assertThat(getClass()) - .isEqualTo(copy.getClass("myclass", null, getClass().getClassLoader())); assertThat(copy).isEqualTo(orig); assertThat(copy.keySet()).isEqualTo(orig.keySet()); @@ -110,7 +113,7 @@ void testCopyConstructor() { @Test void testOptionWithDefault() { Configuration cfg = new Configuration(); - cfg.setInteger("int-key", 11); + cfg.set(getIntConfigOption("int-key"), 11); cfg.setString("string-key", "abc"); ConfigOption presentStringOption = @@ -118,10 +121,10 @@ void testOptionWithDefault() { ConfigOption presentIntOption = ConfigOptions.key("int-key").intType().defaultValue(87); - assertThat(cfg.getString(presentStringOption)).isEqualTo("abc"); + assertThat(cfg.get(presentStringOption)).isEqualTo("abc"); assertThat(cfg.getValue(presentStringOption)).isEqualTo("abc"); - assertThat(cfg.getInteger(presentIntOption)).isEqualTo(11); + assertThat(cfg.get(presentIntOption)).isEqualTo(11); assertThat(cfg.getValue(presentIntOption)).isEqualTo("11"); // test getting default when no value is present @@ -132,26 +135,26 @@ void testOptionWithDefault() { // getting strings with default value should work assertThat(cfg.getValue(stringOption)).isEqualTo("my-beautiful-default"); - assertThat(cfg.getString(stringOption)).isEqualTo("my-beautiful-default"); + assertThat(cfg.get(stringOption)).isEqualTo("my-beautiful-default"); // overriding the default should work - assertThat(cfg.getString(stringOption, "override")).isEqualTo("override"); + assertThat(cfg.get(stringOption, "override")).isEqualTo("override"); // getting a primitive with a default value should work - assertThat(cfg.getInteger(intOption)).isEqualTo(87); + assertThat(cfg.get(intOption)).isEqualTo(87); assertThat(cfg.getValue(intOption)).isEqualTo("87"); } @Test void testOptionWithNoDefault() { Configuration cfg = new Configuration(); - cfg.setInteger("int-key", 11); + cfg.get(getIntConfigOption("int-key"), 11); cfg.setString("string-key", "abc"); ConfigOption presentStringOption = ConfigOptions.key("string-key").stringType().noDefaultValue(); - assertThat(cfg.getString(presentStringOption)).isEqualTo("abc"); + assertThat(cfg.get(presentStringOption)).isEqualTo("abc"); assertThat(cfg.getValue(presentStringOption)).isEqualTo("abc"); // test getting default when no value is present @@ -160,18 +163,18 @@ void testOptionWithNoDefault() { // getting strings for null should work assertThat(cfg.getValue(stringOption)).isNull(); - assertThat(cfg.getString(stringOption)).isNull(); + assertThat(cfg.get(stringOption)).isNull(); // overriding the null default should work - assertThat(cfg.getString(stringOption, "override")).isEqualTo("override"); + assertThat(cfg.get(stringOption, "override")).isEqualTo("override"); } @Test void testDeprecatedKeys() { Configuration cfg = new Configuration(); - cfg.setInteger("the-key", 11); - cfg.setInteger("old-key", 12); - cfg.setInteger("older-key", 13); + cfg.set(getIntConfigOption("the-key"), 11); + cfg.set(getIntConfigOption("old-key"), 12); + cfg.set(getIntConfigOption("older-key"), 13); ConfigOption matchesFirst = ConfigOptions.key("the-key") @@ -197,18 +200,18 @@ void testDeprecatedKeys() { .defaultValue(-1) .withDeprecatedKeys("not-there", "also-not-there"); - assertThat(cfg.getInteger(matchesFirst)).isEqualTo(11); - assertThat(cfg.getInteger(matchesSecond)).isEqualTo(12); - assertThat(cfg.getInteger(matchesThird)).isEqualTo(13); - assertThat(cfg.getInteger(notContained)).isEqualTo(-1); + assertThat(cfg.get(matchesFirst)).isEqualTo(11); + assertThat(cfg.get(matchesSecond)).isEqualTo(12); + assertThat(cfg.get(matchesThird)).isEqualTo(13); + assertThat(cfg.get(notContained)).isEqualTo(-1); } @Test void testFallbackKeys() { Configuration cfg = new Configuration(); - cfg.setInteger("the-key", 11); - cfg.setInteger("old-key", 12); - cfg.setInteger("older-key", 13); + cfg.set(getIntConfigOption("the-key"), 11); + cfg.set(getIntConfigOption("old-key"), 12); + cfg.set(getIntConfigOption("older-key"), 13); ConfigOption matchesFirst = ConfigOptions.key("the-key") @@ -234,10 +237,10 @@ void testFallbackKeys() { .defaultValue(-1) .withFallbackKeys("not-there", "also-not-there"); - assertThat(cfg.getInteger(matchesFirst)).isEqualTo(11); - assertThat(cfg.getInteger(matchesSecond)).isEqualTo(12); - assertThat(cfg.getInteger(matchesThird)).isEqualTo(13); - assertThat(cfg.getInteger(notContained)).isEqualTo(-1); + assertThat(cfg.get(matchesFirst)).isEqualTo(11); + assertThat(cfg.get(matchesSecond)).isEqualTo(12); + assertThat(cfg.get(matchesThird)).isEqualTo(13); + assertThat(cfg.get(notContained)).isEqualTo(-1); } @Test @@ -256,12 +259,12 @@ void testFallbackAndDeprecatedKeys() { .withDeprecatedKeys(deprecated.key()); final Configuration fallbackCfg = new Configuration(); - fallbackCfg.setInteger(fallback, 1); - assertThat(fallbackCfg.getInteger(mainOption)).isOne(); + fallbackCfg.set(fallback, 1); + assertThat(fallbackCfg.get(mainOption)).isOne(); final Configuration deprecatedCfg = new Configuration(); - deprecatedCfg.setInteger(deprecated, 2); - assertThat(deprecatedCfg.getInteger(mainOption)).isEqualTo(2); + deprecatedCfg.set(deprecated, 2); + assertThat(deprecatedCfg.get(mainOption)).isEqualTo(2); // reverse declaration of fallback and deprecated keys, fallback keys should always be used // first @@ -273,17 +276,17 @@ void testFallbackAndDeprecatedKeys() { .withFallbackKeys(fallback.key()); final Configuration deprecatedAndFallBackConfig = new Configuration(); - deprecatedAndFallBackConfig.setInteger(fallback, 1); - deprecatedAndFallBackConfig.setInteger(deprecated, 2); - assertThat(deprecatedAndFallBackConfig.getInteger(mainOption)).isOne(); - assertThat(deprecatedAndFallBackConfig.getInteger(reversedMainOption)).isOne(); + deprecatedAndFallBackConfig.set(fallback, 1); + deprecatedAndFallBackConfig.set(deprecated, 2); + assertThat(deprecatedAndFallBackConfig.get(mainOption)).isOne(); + assertThat(deprecatedAndFallBackConfig.get(reversedMainOption)).isOne(); } @Test void testRemove() { Configuration cfg = new Configuration(); - cfg.setInteger("a", 1); - cfg.setInteger("b", 2); + cfg.set(getIntConfigOption("a"), 1); + cfg.set(getIntConfigOption("b"), 2); ConfigOption validOption = ConfigOptions.key("a").intType().defaultValue(-1); @@ -310,11 +313,11 @@ void testRemoveKey() { Configuration cfg = new Configuration(); String key1 = "a.b"; String key2 = "c.d"; - cfg.setInteger(key1, 42); - cfg.setInteger(key2, 44); - cfg.setInteger(key2 + ".f1", 44); - cfg.setInteger(key2 + ".f2", 44); - cfg.setInteger("e.f", 1337); + cfg.set(getIntConfigOption(key1), 42); + cfg.set(getIntConfigOption(key2), 44); + cfg.set(getIntConfigOption(key2 + ".f1"), 44); + cfg.set(getIntConfigOption(key2 + ".f2"), 44); + cfg.set(getIntConfigOption("e.f"), 1337); assertThat(cfg.removeKey("not-existing-key")).isFalse(); assertThat(cfg.removeKey(key1)).isTrue(); @@ -422,7 +425,7 @@ void testMapNotContained() { void testMapWithPrefix() { final Configuration cfg = new Configuration(); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 12); + cfg.set(getIntConfigOption(MAP_PROPERTY_2), 12); assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP); assertThat(cfg.contains(MAP_OPTION)).isTrue(); @@ -442,7 +445,7 @@ void testMapNonPrefixHasPrecedence() { final Configuration cfg = new Configuration(); cfg.set(MAP_OPTION, PROPERTIES_MAP); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 99999); + cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999); assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP); assertThat(cfg.contains(MAP_OPTION)).isTrue(); @@ -453,7 +456,7 @@ void testMapNonPrefixHasPrecedence() { void testMapThatOverwritesPrefix() { final Configuration cfg = new Configuration(); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 99999); + cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999); cfg.set(MAP_OPTION, PROPERTIES_MAP); assertThat(cfg.get(MAP_OPTION)).isEqualTo(PROPERTIES_MAP); @@ -465,7 +468,7 @@ void testMapThatOverwritesPrefix() { void testMapRemovePrefix() { final Configuration cfg = new Configuration(); cfg.setString(MAP_PROPERTY_1, "value1"); - cfg.setInteger(MAP_PROPERTY_2, 99999); + cfg.get(getIntConfigOption(MAP_PROPERTY_2), 99999); cfg.removeConfig(MAP_OPTION); assertThat(cfg.contains(MAP_OPTION)).isFalse(); diff --git a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java index 3ad3fc07bd77a..9ff2719ebf8cf 100644 --- a/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/configuration/DelegatingConfigurationTest.java @@ -155,52 +155,52 @@ void testGetWithOverrideDefault() { ConfigOptions.key("integer.key").intType().noDefaultValue(); // integerOption doesn't exist in delegatingConf, and it should be overrideDefault. - original.setInteger(integerOption, 1); - assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(2); + original.set(integerOption, 1); + assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2); assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(2); // integerOption exists in delegatingConf, and it should be value that set before. delegatingConf.setInteger(integerOption, 3); - assertThat(delegatingConf.getInteger(integerOption, 2)).isEqualTo(3); + assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3); assertThat(delegatingConf.get(integerOption, 2)).isEqualTo(3); // Test for float ConfigOption floatOption = ConfigOptions.key("float.key").floatType().noDefaultValue(); - original.setFloat(floatOption, 4f); - assertThat(delegatingConf.getFloat(floatOption, 5f)).isEqualTo(5f); + original.set(floatOption, 4f); + assertThat(delegatingConf.get(floatOption, 5f)).isEqualTo(5f); assertThat(delegatingConf.get(floatOption, 5f)).isEqualTo(5f); - delegatingConf.setFloat(floatOption, 6f); - assertThat(delegatingConf.getFloat(floatOption, 5f)).isEqualTo(6f); + delegatingConf.set(floatOption, 6f); + assertThat(delegatingConf.get(floatOption, 5f)).isEqualTo(6f); assertThat(delegatingConf.get(floatOption, 5f)).isEqualTo(6f); // Test for double ConfigOption doubleOption = ConfigOptions.key("double.key").doubleType().noDefaultValue(); - original.setDouble(doubleOption, 7d); - assertThat(delegatingConf.getDouble(doubleOption, 8d)).isEqualTo(8d); + original.set(doubleOption, 7d); + assertThat(delegatingConf.get(doubleOption, 8d)).isEqualTo(8d); assertThat(delegatingConf.get(doubleOption, 8d)).isEqualTo(8d); - delegatingConf.setDouble(doubleOption, 9f); - assertThat(delegatingConf.getDouble(doubleOption, 8d)).isEqualTo(9f); + delegatingConf.set(doubleOption, 9.0); + assertThat(delegatingConf.get(doubleOption, 8d)).isEqualTo(9f); assertThat(delegatingConf.get(doubleOption, 8d)).isEqualTo(9f); // Test for long ConfigOption longOption = ConfigOptions.key("long.key").longType().noDefaultValue(); - original.setLong(longOption, 10L); - assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(11L); + original.set(longOption, 10L); + assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L); assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(11L); - delegatingConf.setLong(longOption, 12L); - assertThat(delegatingConf.getLong(longOption, 11L)).isEqualTo(12L); + delegatingConf.set(longOption, 12L); + assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L); assertThat(delegatingConf.get(longOption, 11L)).isEqualTo(12L); // Test for boolean ConfigOption booleanOption = ConfigOptions.key("boolean.key").booleanType().noDefaultValue(); - original.setBoolean(booleanOption, false); - assertThat(delegatingConf.getBoolean(booleanOption, true)).isEqualTo(true); + original.set(booleanOption, false); assertThat(delegatingConf.get(booleanOption, true)).isEqualTo(true); - delegatingConf.setBoolean(booleanOption, false); - assertThat(delegatingConf.getBoolean(booleanOption, true)).isEqualTo(false); + assertThat(delegatingConf.get(booleanOption, true)).isEqualTo(true); + delegatingConf.set(booleanOption, false); + assertThat(delegatingConf.get(booleanOption, true)).isEqualTo(false); assertThat(delegatingConf.get(booleanOption, true)).isEqualTo(false); } @@ -217,13 +217,13 @@ void testRemoveKeyOrConfig() { assertThat(delegatingConf.get(integerOption)).isZero(); delegatingConf.removeConfig(integerOption); assertThat(delegatingConf.getOptional(integerOption)).isEmpty(); - assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero(); + assertThat(delegatingConf.get(integerOption, 0)).isZero(); // Test for removeKey delegatingConf.set(integerOption, 0); - assertThat(delegatingConf.getInteger(integerOption, -1)).isZero(); + assertThat(delegatingConf.get(integerOption, -1)).isZero(); delegatingConf.removeKey(integerOption.key()); assertThat(delegatingConf.getOptional(integerOption)).isEmpty(); - assertThat(delegatingConf.getInteger(integerOption.key(), 0)).isZero(); + assertThat(delegatingConf.get(integerOption, 0)).isZero(); } } diff --git a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java index b9ffc12e293b5..9e3ac40d10111 100644 --- a/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java +++ b/flink-core/src/test/java/org/apache/flink/core/fs/LimitedConnectionsConfigurationTest.java @@ -29,6 +29,7 @@ import java.io.File; import java.net.URI; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests that validate that the configuration for limited FS connections are properly picked up. */ @@ -54,11 +55,11 @@ void testConfiguration() throws Exception { // configure some limits, which should cause "fsScheme" to be limited final Configuration config = new Configuration(); - config.setInteger("fs." + fsScheme + ".limit.total", 42); - config.setInteger("fs." + fsScheme + ".limit.input", 11); - config.setInteger("fs." + fsScheme + ".limit.output", 40); - config.setInteger("fs." + fsScheme + ".limit.timeout", 12345); - config.setInteger("fs." + fsScheme + ".limit.stream-timeout", 98765); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.total"), 42); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.input"), 11); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.output"), 40); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.timeout"), 12345); + config.set(getIntConfigOption("fs." + fsScheme + ".limit.stream-timeout"), 98765); try { FileSystem.initialize(config); diff --git a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java index 4f5cc22e26ef1..6e0f4f3786cbf 100644 --- a/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java +++ b/flink-filesystems/flink-azure-fs-hadoop/src/test/java/org/apache/flink/fs/azurefs/AzureBlobStorageFSFactoryTest.java @@ -29,6 +29,7 @@ import java.net.URI; import java.util.stream.Stream; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThatThrownBy; /** Tests for the AzureFSFactory. */ @@ -63,7 +64,7 @@ void testCreateFsWithAuthorityMissingCreds(AbstractAzureFSFactory factory) throw final URI uri = URI.create(uriString); Configuration config = new Configuration(); - config.setInteger("fs.azure.io.retry.max.retries", 0); + config.set(getIntConfigOption("fs.azure.io.retry.max.retries"), 0); factory.configure(config); assertThatThrownBy(() -> factory.create(uri)).isInstanceOf(AzureException.class); diff --git a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java index af39bfd4295c1..554ad7a5b7347 100644 --- a/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java +++ b/flink-filesystems/flink-hadoop-fs/src/test/java/org/apache/flink/runtime/fs/hdfs/LimitedConnectionsConfigurationTest.java @@ -26,6 +26,7 @@ import java.net.URI; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** @@ -47,11 +48,11 @@ void testConfiguration() throws Exception { // configure some limits, which should cause "fsScheme" to be limited final Configuration config = new Configuration(); - config.setInteger("fs.hdfs.limit.total", 40); - config.setInteger("fs.hdfs.limit.input", 39); - config.setInteger("fs.hdfs.limit.output", 38); - config.setInteger("fs.hdfs.limit.timeout", 23456); - config.setInteger("fs.hdfs.limit.stream-timeout", 34567); + config.set(getIntConfigOption("fs.hdfs.limit.total"), 40); + config.set(getIntConfigOption("fs.hdfs.limit.input"), 39); + config.set(getIntConfigOption("fs.hdfs.limit.output"), 38); + config.set(getIntConfigOption("fs.hdfs.limit.timeout"), 23456); + config.set(getIntConfigOption("fs.hdfs.limit.stream-timeout"), 34567); try { FileSystem.initialize(config); diff --git a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java index 49f4b2c516256..998ca8077dace 100644 --- a/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java +++ b/flink-filesystems/flink-s3-fs-base/src/main/java/org/apache/flink/fs/s3/common/FlinkS3FileSystem.java @@ -148,7 +148,7 @@ public static Optional of(Configuration flinkConfig) { s -> new S5CmdConfiguration( s, - flinkConfig.getString(S5CMD_EXTRA_ARGS), + flinkConfig.get(S5CMD_EXTRA_ARGS), flinkConfig.get(ACCESS_KEY), flinkConfig.get(SECRET_KEY), flinkConfig.get(ENDPOINT), diff --git a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java index 06887d25da446..5b6b60f35aef1 100644 --- a/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java +++ b/flink-filesystems/flink-s3-fs-base/src/test/java/org/apache/flink/fs/s3/common/S3EntropyFsFactoryTest.java @@ -24,6 +24,7 @@ import java.net.URI; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests that the file system factory picks up the entropy configuration properly. */ @@ -33,7 +34,7 @@ class S3EntropyFsFactoryTest { void testEntropyInjectionConfig() throws Exception { final Configuration conf = new Configuration(); conf.setString("s3.entropy.key", "__entropy__"); - conf.setInteger("s3.entropy.length", 7); + conf.set(getIntConfigOption("s3.entropy.length"), 7); TestS3FileSystemFactory factory = new TestS3FileSystemFactory(); factory.configure(conf); diff --git a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java index 048a1d8cec9b1..57560504f69f7 100644 --- a/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java +++ b/flink-java/src/test/java/org/apache/flink/api/common/io/SequentialFormatTestBase.java @@ -42,6 +42,7 @@ import java.util.Comparator; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Test base for {@link BinaryInputFormat} and {@link BinaryOutputFormat}. */ @@ -179,7 +180,8 @@ void writeTuples() throws IOException { this.tempFile = File.createTempFile("BinaryInputFormat", null); this.tempFile.deleteOnExit(); Configuration configuration = new Configuration(); - configuration.setLong(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY, this.blockSize); + configuration.set( + getLongConfigOption(BinaryOutputFormat.BLOCK_SIZE_PARAMETER_KEY), this.blockSize); if (this.parallelism == 1) { BinaryOutputFormat output = createOutputFormat(this.tempFile.toURI().toString(), configuration); diff --git a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java index cbf5ab3d0f1f7..a7403971c235d 100644 --- a/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java +++ b/flink-java/src/test/java/org/apache/flink/api/java/io/TextInputFormatTest.java @@ -39,6 +39,7 @@ import java.util.Collections; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link TextInputFormat}. */ @@ -103,7 +104,7 @@ void testNestedFileRead(@TempDir File parentDir) throws IOException { // this is to check if the setter overrides the configuration (as expected) Configuration config = new Configuration(); - config.setBoolean("recursive.file.enumeration", false); + config.set(getBooleanConfigOption("recursive.file.enumeration"), false); config.setString("delimited-format.numSamples", "20"); inputFormat.configure(config); diff --git a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java index 8fc09caa58adc..81bfde80bfe85 100644 --- a/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java +++ b/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/parameters/KubernetesJobManagerParameters.java @@ -213,6 +213,6 @@ public String getEntrypointArgs() { } public String getUserArtifactsBaseDir() { - return flinkConfig.getString(ArtifactFetchOptions.BASE_DIR); + return flinkConfig.get(ArtifactFetchOptions.BASE_DIR); } } diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java index 6ea22c6b3bb20..ac3ef5c6fdc93 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/entrypoint/KubernetesApplicationClusterEntrypointTest.java @@ -44,9 +44,9 @@ public class KubernetesApplicationClusterEntrypointTest { @BeforeEach public void setup() { configuration = new Configuration(); - configuration.setString(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString()); - configuration.setString(KubernetesConfigOptions.NAMESPACE, TEST_NAMESPACE); - configuration.setString(KubernetesConfigOptions.CLUSTER_ID, TEST_CLUSTER_ID); + configuration.set(ArtifactFetchOptions.BASE_DIR, tempDir.toAbsolutePath().toString()); + configuration.set(KubernetesConfigOptions.NAMESPACE, TEST_NAMESPACE); + configuration.set(KubernetesConfigOptions.CLUSTER_ID, TEST_CLUSTER_ID); } @Test diff --git a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java index 526783a67b450..4caab5c57bf64 100644 --- a/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java +++ b/flink-kubernetes/src/test/java/org/apache/flink/kubernetes/kubeclient/decorators/InitTaskManagerDecoratorTest.java @@ -45,6 +45,7 @@ import java.util.Map; import java.util.stream.Collectors; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** General tests for the {@link InitJobManagerDecorator}. */ @@ -90,9 +91,11 @@ protected void setupFlinkConfig() { // Set up external resource configs flinkConfig.setString(ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), RESOURCE_NAME); - flinkConfig.setLong( - ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource( - RESOURCE_NAME, ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX), + flinkConfig.set( + getLongConfigOption( + ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource( + RESOURCE_NAME, + ExternalResourceOptions.EXTERNAL_RESOURCE_AMOUNT_SUFFIX)), RESOURCE_AMOUNT); flinkConfig.setString( ExternalResourceOptions.getSystemConfigKeyConfigOptionForResource( diff --git a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java index da19a3f209ef9..9f674c8047881 100644 --- a/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java +++ b/flink-optimizer/src/main/java/org/apache/flink/optimizer/plantranslate/JobGraphGenerator.java @@ -105,6 +105,7 @@ import java.util.stream.Collectors; import static org.apache.flink.configuration.AlgorithmOptions.USE_LARGE_RECORDS_HANDLER; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.util.Preconditions.checkState; /** @@ -129,7 +130,7 @@ public class JobGraphGenerator implements Visitor { private static final boolean mergeIterationAuxTasks = GlobalConfiguration.loadConfiguration() - .getBoolean(MERGE_ITERATION_AUX_TASKS_KEY, false); + .get(getBooleanConfigOption(MERGE_ITERATION_AUX_TASKS_KEY), false); private static final TaskInChain ALREADY_VISITED_PLACEHOLDER = new TaskInChain(null, null, null, null); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 419d805a63d01..99c2fb75bd7e2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -22,6 +22,8 @@ import org.apache.flink.api.common.InvalidProgramException; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.cache.DistributedCache; +import org.apache.flink.configuration.ConfigOption; +import org.apache.flink.configuration.ConfigOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.JobStatusHook; import org.apache.flink.core.fs.Path; @@ -68,7 +70,10 @@ public class JobGraph implements Serializable { private static final long serialVersionUID = 1L; - private static final String INITIAL_CLIENT_HEARTBEAT_TIMEOUT = "initialClientHeartbeatTimeout"; + private static final ConfigOption INITIAL_CLIENT_HEARTBEAT_TIMEOUT = + ConfigOptions.key("initialClientHeartbeatTimeout") + .longType() + .defaultValue(Long.MIN_VALUE); // --- job and configuration --- @@ -656,10 +661,10 @@ public List getJobStatusHooks() { } public void setInitialClientHeartbeatTimeout(long initialClientHeartbeatTimeout) { - jobConfiguration.setLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, initialClientHeartbeatTimeout); + jobConfiguration.set(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, initialClientHeartbeatTimeout); } public long getInitialClientHeartbeatTimeout() { - return jobConfiguration.getLong(INITIAL_CLIENT_HEARTBEAT_TIMEOUT, Long.MIN_VALUE); + return jobConfiguration.get(INITIAL_CLIENT_HEARTBEAT_TIMEOUT); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java index 23050d59a0141..3dce6c3d2f2a9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/util/TaskConfig.java @@ -49,6 +49,11 @@ import java.util.Collections; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; + /** Configuration class which stores all relevant parameters required to set up the Pact tasks. */ public class TaskConfig implements Serializable { @@ -344,11 +349,11 @@ public Class> getDriver() { } public void setDriverStrategy(DriverStrategy strategy) { - this.config.setInteger(DRIVER_STRATEGY, strategy.ordinal()); + this.config.set(getIntConfigOption(DRIVER_STRATEGY), strategy.ordinal()); } public DriverStrategy getDriverStrategy() { - final int ls = this.config.getInteger(DRIVER_STRATEGY, -1); + final int ls = this.config.get(getIntConfigOption(DRIVER_STRATEGY), -1); if (ls == -1) { return DriverStrategy.NONE; } else if (ls < 0 || ls >= DriverStrategy.values().length) { @@ -411,11 +416,13 @@ public TypePairComparatorFactory getPairComparatorFactory(Class // -------------------------------------------------------------------------------------------- public void setInputLocalStrategy(int inputNum, LocalStrategy strategy) { - this.config.setInteger(INPUT_LOCAL_STRATEGY_PREFIX + inputNum, strategy.ordinal()); + this.config.set( + getIntConfigOption(INPUT_LOCAL_STRATEGY_PREFIX + inputNum), strategy.ordinal()); } public LocalStrategy getInputLocalStrategy(int inputNum) { - final int ls = this.config.getInteger(INPUT_LOCAL_STRATEGY_PREFIX + inputNum, -1); + final int ls = + this.config.get(getIntConfigOption(INPUT_LOCAL_STRATEGY_PREFIX + inputNum), -1); if (ls == -1) { return LocalStrategy.NONE; } else if (ls < 0 || ls >= LocalStrategy.values().length) { @@ -469,58 +476,62 @@ public TypeComparatorFactory getInputComparator(int inputNum, ClassLoader } public int getNumInputs() { - return this.config.getInteger(NUM_INPUTS, 0); + return this.config.get(getIntConfigOption(NUM_INPUTS), 0); } public int getNumBroadcastInputs() { - return this.config.getInteger(NUM_BROADCAST_INPUTS, 0); + return this.config.get(getIntConfigOption(NUM_BROADCAST_INPUTS), 0); } public int getGroupSize(int groupIndex) { - return this.config.getInteger(INPUT_GROUP_SIZE_PREFIX + groupIndex, -1); + return this.config.get(getIntConfigOption(INPUT_GROUP_SIZE_PREFIX + groupIndex), -1); } public int getBroadcastGroupSize(int groupIndex) { - return this.config.getInteger(BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex, -1); + return this.config.get( + getIntConfigOption(BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex), -1); } public void addInputToGroup(int groupIndex) { final String grp = INPUT_GROUP_SIZE_PREFIX + groupIndex; - this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1); - this.config.setInteger(NUM_INPUTS, this.config.getInteger(NUM_INPUTS, 0) + 1); + this.config.set(getIntConfigOption(grp), this.config.get(getIntConfigOption(grp), 0) + 1); + this.config.set( + getIntConfigOption(NUM_INPUTS), + this.config.get(getIntConfigOption(NUM_INPUTS), 0) + 1); } public void addBroadcastInputToGroup(int groupIndex) { final String grp = BROADCAST_INPUT_GROUP_SIZE_PREFIX + groupIndex; if (!this.config.containsKey(grp)) { - this.config.setInteger( - NUM_BROADCAST_INPUTS, this.config.getInteger(NUM_BROADCAST_INPUTS, 0) + 1); + this.config.set( + getIntConfigOption(NUM_BROADCAST_INPUTS), + this.config.get(getIntConfigOption(NUM_BROADCAST_INPUTS), 0) + 1); } - this.config.setInteger(grp, this.config.getInteger(grp, 0) + 1); + this.config.set(getIntConfigOption(grp), this.config.get(getIntConfigOption(grp), 0) + 1); } public void setInputAsynchronouslyMaterialized(int inputNum, boolean temp) { - this.config.setBoolean(INPUT_DAM_PREFIX + inputNum, temp); + this.config.set(getBooleanConfigOption(INPUT_DAM_PREFIX + inputNum), temp); } public boolean isInputAsynchronouslyMaterialized(int inputNum) { - return this.config.getBoolean(INPUT_DAM_PREFIX + inputNum, false); + return this.config.get(getBooleanConfigOption(INPUT_DAM_PREFIX + inputNum), false); } public void setInputCached(int inputNum, boolean persistent) { - this.config.setBoolean(INPUT_REPLAYABLE_PREFIX + inputNum, persistent); + this.config.set(getBooleanConfigOption(INPUT_REPLAYABLE_PREFIX + inputNum), persistent); } public boolean isInputCached(int inputNum) { - return this.config.getBoolean(INPUT_REPLAYABLE_PREFIX + inputNum, false); + return this.config.get(getBooleanConfigOption(INPUT_REPLAYABLE_PREFIX + inputNum), false); } public void setRelativeInputMaterializationMemory(int inputNum, double relativeMemory) { - this.config.setDouble(INPUT_DAM_MEMORY_PREFIX + inputNum, relativeMemory); + this.config.set(getDoubleConfigOption(INPUT_DAM_MEMORY_PREFIX + inputNum), relativeMemory); } public double getRelativeInputMaterializationMemory(int inputNum) { - return this.config.getDouble(INPUT_DAM_MEMORY_PREFIX + inputNum, 0); + return this.config.get(getDoubleConfigOption(INPUT_DAM_MEMORY_PREFIX + inputNum), 0.0); } public void setBroadcastInputName(String name, int groupIndex) { @@ -538,18 +549,19 @@ public String getBroadcastInputName(int groupIndex) { // -------------------------------------------------------------------------------------------- public void addOutputShipStrategy(ShipStrategyType strategy) { - final int outputCnt = this.config.getInteger(OUTPUTS_NUM, 0); - this.config.setInteger(OUTPUT_SHIP_STRATEGY_PREFIX + outputCnt, strategy.ordinal()); - this.config.setInteger(OUTPUTS_NUM, outputCnt + 1); + final int outputCnt = this.config.get(getIntConfigOption(OUTPUTS_NUM), 0); + this.config.set( + getIntConfigOption(OUTPUT_SHIP_STRATEGY_PREFIX + outputCnt), strategy.ordinal()); + this.config.set(getIntConfigOption(OUTPUTS_NUM), outputCnt + 1); } public int getNumOutputs() { - return this.config.getInteger(OUTPUTS_NUM, 0); + return this.config.get(getIntConfigOption(OUTPUTS_NUM), 0); } public ShipStrategyType getOutputShipStrategy(int outputNum) { // check how many outputs are encoded in the config - final int outputCnt = this.config.getInteger(OUTPUTS_NUM, -1); + final int outputCnt = this.config.get(getIntConfigOption(OUTPUTS_NUM), -1); if (outputCnt < 1) { throw new CorruptConfigurationException( "No output ship strategies are specified in the configuration."); @@ -560,7 +572,8 @@ public ShipStrategyType getOutputShipStrategy(int outputNum) { throw new IllegalArgumentException("Invalid index for output shipping strategy."); } - final int strategy = this.config.getInteger(OUTPUT_SHIP_STRATEGY_PREFIX + outputNum, -1); + final int strategy = + this.config.get(getIntConfigOption(OUTPUT_SHIP_STRATEGY_PREFIX + outputNum), -1); if (strategy == -1) { throw new CorruptConfigurationException( "No output shipping strategy in configuration for output " + outputNum); @@ -680,19 +693,19 @@ public Partitioner getOutputPartitioner(int outputNum, final ClassLoader cl) // -------------------------------------------------------------------------------------------- public void setRelativeMemoryDriver(double relativeMemorySize) { - this.config.setDouble(MEMORY_DRIVER, relativeMemorySize); + this.config.set(getDoubleConfigOption(MEMORY_DRIVER), relativeMemorySize); } public double getRelativeMemoryDriver() { - return this.config.getDouble(MEMORY_DRIVER, 0); + return this.config.get(getDoubleConfigOption(MEMORY_DRIVER), 0.0); } public void setRelativeMemoryInput(int inputNum, double relativeMemorySize) { - this.config.setDouble(MEMORY_INPUT_PREFIX + inputNum, relativeMemorySize); + this.config.set(getDoubleConfigOption(MEMORY_INPUT_PREFIX + inputNum), relativeMemorySize); } public double getRelativeMemoryInput(int inputNum) { - return this.config.getDouble(MEMORY_INPUT_PREFIX + inputNum, 0); + return this.config.get(getDoubleConfigOption(MEMORY_INPUT_PREFIX + inputNum), 0.0); } // -------------------------------------------------------------------------------------------- @@ -701,22 +714,22 @@ public void setFilehandlesDriver(int filehandles) { if (filehandles < 2) { throw new IllegalArgumentException(); } - this.config.setInteger(FILEHANDLES_DRIVER, filehandles); + this.config.set(getIntConfigOption(FILEHANDLES_DRIVER), filehandles); } public int getFilehandlesDriver() { - return this.config.getInteger(FILEHANDLES_DRIVER, -1); + return this.config.get(getIntConfigOption(FILEHANDLES_DRIVER), -1); } public void setFilehandlesInput(int inputNum, int filehandles) { if (filehandles < 2) { throw new IllegalArgumentException(); } - this.config.setInteger(FILEHANDLES_INPUT_PREFIX + inputNum, filehandles); + this.config.set(getIntConfigOption(FILEHANDLES_INPUT_PREFIX + inputNum), filehandles); } public int getFilehandlesInput(int inputNum) { - return this.config.getInteger(FILEHANDLES_INPUT_PREFIX + inputNum, -1); + return this.config.get(getIntConfigOption(FILEHANDLES_INPUT_PREFIX + inputNum), -1); } // -------------------------------------------------------------------------------------------- @@ -725,30 +738,33 @@ public void setSpillingThresholdDriver(float threshold) { if (threshold < 0.0f || threshold > 1.0f) { throw new IllegalArgumentException(); } - this.config.setFloat(SORT_SPILLING_THRESHOLD_DRIVER, threshold); + this.config.set(getFloatConfigOption(SORT_SPILLING_THRESHOLD_DRIVER), threshold); } public float getSpillingThresholdDriver() { - return this.config.getFloat(SORT_SPILLING_THRESHOLD_DRIVER, 0.7f); + return this.config.get(getFloatConfigOption(SORT_SPILLING_THRESHOLD_DRIVER), 0.7f); } public void setSpillingThresholdInput(int inputNum, float threshold) { if (threshold < 0.0f || threshold > 1.0f) { throw new IllegalArgumentException(); } - this.config.setFloat(SORT_SPILLING_THRESHOLD_INPUT_PREFIX + inputNum, threshold); + this.config.set( + getFloatConfigOption(SORT_SPILLING_THRESHOLD_INPUT_PREFIX + inputNum), threshold); } public float getSpillingThresholdInput(int inputNum) { - return this.config.getFloat(SORT_SPILLING_THRESHOLD_INPUT_PREFIX + inputNum, 0.7f); + return this.config.get( + getFloatConfigOption(SORT_SPILLING_THRESHOLD_INPUT_PREFIX + inputNum), 0.7f); } public void setUseLargeRecordHandler(boolean useLargeRecordHandler) { - this.config.setBoolean(USE_LARGE_RECORD_HANDLER, useLargeRecordHandler); + this.config.set(getBooleanConfigOption(USE_LARGE_RECORD_HANDLER), useLargeRecordHandler); } public boolean getUseLargeRecordHandler() { - return this.config.getBoolean(USE_LARGE_RECORD_HANDLER, USE_LARGE_RECORD_HANDLER_DEFAULT); + return this.config.get( + getBooleanConfigOption(USE_LARGE_RECORD_HANDLER), USE_LARGE_RECORD_HANDLER_DEFAULT); } // -------------------------------------------------------------------------------------------- @@ -756,20 +772,20 @@ public boolean getUseLargeRecordHandler() { // -------------------------------------------------------------------------------------------- public int getNumberOfChainedStubs() { - return this.config.getInteger(CHAINING_NUM_STUBS, 0); + return this.config.get(getIntConfigOption(CHAINING_NUM_STUBS), 0); } public void addChainedTask( @SuppressWarnings("rawtypes") Class chainedTaskClass, TaskConfig conf, String taskName) { - int numChainedYet = this.config.getInteger(CHAINING_NUM_STUBS, 0); + int numChainedYet = this.config.get(getIntConfigOption(CHAINING_NUM_STUBS), 0); this.config.setString(CHAINING_TASK_PREFIX + numChainedYet, chainedTaskClass.getName()); this.config.addAll(conf.config, CHAINING_TASKCONFIG_PREFIX + numChainedYet + SEPARATOR); this.config.setString(CHAINING_TASKNAME_PREFIX + numChainedYet, taskName); - this.config.setInteger(CHAINING_NUM_STUBS, ++numChainedYet); + this.config.set(getIntConfigOption(CHAINING_NUM_STUBS), ++numChainedYet); } public TaskConfig getChainedStubConfig(int chainPos) { @@ -806,11 +822,11 @@ public void setNumberOfIterations(int numberOfIterations) { if (numberOfIterations <= 0) { throw new IllegalArgumentException(); } - this.config.setInteger(NUMBER_OF_ITERATIONS, numberOfIterations); + this.config.set(getIntConfigOption(NUMBER_OF_ITERATIONS), numberOfIterations); } public int getNumberOfIterations() { - int numberOfIterations = this.config.getInteger(NUMBER_OF_ITERATIONS, 0); + int numberOfIterations = this.config.get(getIntConfigOption(NUMBER_OF_ITERATIONS), 0); if (numberOfIterations <= 0) { throw new IllegalArgumentException(); } @@ -821,11 +837,12 @@ public void setIterationHeadPartialSolutionOrWorksetInputIndex(int inputIndex) { if (inputIndex < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION, inputIndex); + this.config.set(getIntConfigOption(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION), inputIndex); } public int getIterationHeadPartialSolutionOrWorksetInputIndex() { - int index = this.config.getInteger(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION, -1); + int index = + this.config.get(getIntConfigOption(ITERATION_HEAD_INDEX_OF_PARTIAL_SOLUTION), -1); if (index < 0) { throw new IllegalArgumentException(); } @@ -836,11 +853,11 @@ public void setIterationHeadSolutionSetInputIndex(int inputIndex) { if (inputIndex < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_INDEX_OF_SOLUTIONSET, inputIndex); + this.config.set(getIntConfigOption(ITERATION_HEAD_INDEX_OF_SOLUTIONSET), inputIndex); } public int getIterationHeadSolutionSetInputIndex() { - int index = this.config.getInteger(ITERATION_HEAD_INDEX_OF_SOLUTIONSET, -1); + int index = this.config.get(getIntConfigOption(ITERATION_HEAD_INDEX_OF_SOLUTIONSET), -1); if (index < 0) { throw new IllegalArgumentException(); } @@ -851,12 +868,12 @@ public void setRelativeBackChannelMemory(double relativeMemory) { if (relativeMemory < 0) { throw new IllegalArgumentException(); } - this.config.setDouble(ITERATION_HEAD_BACKCHANNEL_MEMORY, relativeMemory); + this.config.set(getDoubleConfigOption(ITERATION_HEAD_BACKCHANNEL_MEMORY), relativeMemory); } public double getRelativeBackChannelMemory() { double relativeBackChannelMemory = - this.config.getDouble(ITERATION_HEAD_BACKCHANNEL_MEMORY, 0); + this.config.get(getDoubleConfigOption(ITERATION_HEAD_BACKCHANNEL_MEMORY), 0.0); if (relativeBackChannelMemory <= 0) { throw new IllegalArgumentException(); } @@ -867,11 +884,12 @@ public void setRelativeSolutionSetMemory(double relativeMemory) { if (relativeMemory < 0) { throw new IllegalArgumentException(); } - this.config.setDouble(ITERATION_HEAD_SOLUTION_SET_MEMORY, relativeMemory); + this.config.set(getDoubleConfigOption(ITERATION_HEAD_SOLUTION_SET_MEMORY), relativeMemory); } public double getRelativeSolutionSetMemory() { - double backChannelMemory = this.config.getDouble(ITERATION_HEAD_SOLUTION_SET_MEMORY, 0); + double backChannelMemory = + this.config.get(getDoubleConfigOption(ITERATION_HEAD_SOLUTION_SET_MEMORY), 0.0); if (backChannelMemory <= 0) { throw new IllegalArgumentException(); } @@ -890,14 +908,15 @@ public void setGateIterativeWithNumberOfEventsUntilInterrupt( if (numEvents <= 0) { throw new IllegalArgumentException(); } - this.config.setInteger(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex, numEvents); + this.config.set( + getIntConfigOption(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex), numEvents); } public int getNumberOfEventsUntilInterruptInIterativeGate(int inputGateIndex) { if (inputGateIndex < 0) { throw new IllegalArgumentException(); } - return this.config.getInteger(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex, 0); + return this.config.get(getIntConfigOption(NUMBER_OF_EOS_EVENTS_PREFIX + inputGateIndex), 0); } public void setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt( @@ -908,25 +927,27 @@ public void setBroadcastGateIterativeWithNumberOfEventsUntilInterrupt( if (numEvents <= 0) { throw new IllegalArgumentException(); } - this.config.setInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex, numEvents); + this.config.set( + getIntConfigOption(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex), numEvents); } public int getNumberOfEventsUntilInterruptInIterativeBroadcastGate(int bcGateIndex) { if (bcGateIndex < 0) { throw new IllegalArgumentException(); } - return this.config.getInteger(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex, 0); + return this.config.get( + getIntConfigOption(NUMBER_OF_EOS_EVENTS_BROADCAST_PREFIX + bcGateIndex), 0); } public void setIterationId(int id) { if (id < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_ID, id); + this.config.set(getIntConfigOption(ITERATION_HEAD_ID), id); } public int getIterationId() { - int id = this.config.getInteger(ITERATION_HEAD_ID, -1); + int id = this.config.get(getIntConfigOption(ITERATION_HEAD_ID), -1); if (id == -1) { throw new CorruptConfigurationException("Iteration head ID is missing."); } @@ -934,22 +955,22 @@ public int getIterationId() { } public void setIsWorksetIteration() { - this.config.setBoolean(ITERATION_WORKSET_MARKER, true); + this.config.set(getBooleanConfigOption(ITERATION_WORKSET_MARKER), true); } public boolean getIsWorksetIteration() { - return this.config.getBoolean(ITERATION_WORKSET_MARKER, false); + return this.config.get(getBooleanConfigOption(ITERATION_WORKSET_MARKER), false); } public void setIterationHeadIndexOfSyncOutput(int outputIndex) { if (outputIndex < 0) { throw new IllegalArgumentException(); } - this.config.setInteger(ITERATION_HEAD_SYNC_OUT_INDEX, outputIndex); + this.config.set(getIntConfigOption(ITERATION_HEAD_SYNC_OUT_INDEX), outputIndex); } public int getIterationHeadIndexOfSyncOutput() { - int outputIndex = this.config.getInteger(ITERATION_HEAD_SYNC_OUT_INDEX, -1); + int outputIndex = this.config.get(getIntConfigOption(ITERATION_HEAD_SYNC_OUT_INDEX), -1); if (outputIndex < 0) { throw new IllegalArgumentException(); } @@ -994,7 +1015,7 @@ public TypeComparatorFactory getSolutionSetComparator(ClassLoader cl) { } public void addIterationAggregator(String name, Aggregator aggregator) { - int num = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); + int num = this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0); this.config.setString(ITERATION_AGGREGATOR_NAME_PREFIX + num, name); try { InstantiationUtil.writeObjectToConfig( @@ -1003,11 +1024,11 @@ public void addIterationAggregator(String name, Aggregator aggregator) { throw new RuntimeException( "Error while writing the aggregator object to the task configuration."); } - this.config.setInteger(ITERATION_NUM_AGGREGATORS, num + 1); + this.config.set(getIntConfigOption(ITERATION_NUM_AGGREGATORS), num + 1); } public void addIterationAggregators(Collection> aggregators) { - int num = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); + int num = this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0); for (AggregatorWithName awn : aggregators) { this.config.setString(ITERATION_AGGREGATOR_NAME_PREFIX + num, awn.getName()); try { @@ -1019,12 +1040,12 @@ public void addIterationAggregators(Collection> aggregator } num++; } - this.config.setInteger(ITERATION_NUM_AGGREGATORS, num); + this.config.set(getIntConfigOption(ITERATION_NUM_AGGREGATORS), num); } @SuppressWarnings("unchecked") public Collection> getIterationAggregators(ClassLoader cl) { - final int numAggs = this.config.getInteger(ITERATION_NUM_AGGREGATORS, 0); + final int numAggs = this.config.get(getIntConfigOption(ITERATION_NUM_AGGREGATORS), 0); if (numAggs == 0) { return Collections.emptyList(); } @@ -1146,35 +1167,31 @@ public String getImplicitConvergenceCriterionAggregatorName() { } public void setIsSolutionSetUpdate() { - this.config.setBoolean(ITERATION_SOLUTION_SET_UPDATE, true); + this.config.set(getBooleanConfigOption(ITERATION_SOLUTION_SET_UPDATE), true); } public boolean getIsSolutionSetUpdate() { - return this.config.getBoolean(ITERATION_SOLUTION_SET_UPDATE, false); + return this.config.get(getBooleanConfigOption(ITERATION_SOLUTION_SET_UPDATE), false); } public void setIsSolutionSetUpdateWithoutReprobe() { - this.config.setBoolean(ITERATION_SOLUTION_SET_UPDATE_SKIP_REPROBE, true); - } - - public boolean getIsSolutionSetUpdateWithoutReprobe() { - return this.config.getBoolean(ITERATION_SOLUTION_SET_UPDATE_SKIP_REPROBE, false); + this.config.set(getBooleanConfigOption(ITERATION_SOLUTION_SET_UPDATE_SKIP_REPROBE), true); } public void setWaitForSolutionSetUpdate() { - this.config.setBoolean(ITERATION_SOLUTION_SET_UPDATE_WAIT, true); + this.config.set(getBooleanConfigOption(ITERATION_SOLUTION_SET_UPDATE_WAIT), true); } public boolean getWaitForSolutionSetUpdate() { - return this.config.getBoolean(ITERATION_SOLUTION_SET_UPDATE_WAIT, false); + return this.config.get(getBooleanConfigOption(ITERATION_SOLUTION_SET_UPDATE_WAIT), false); } public void setIsWorksetUpdate() { - this.config.setBoolean(ITERATION_WORKSET_UPDATE, true); + this.config.set(getBooleanConfigOption(ITERATION_WORKSET_UPDATE), true); } public boolean getIsWorksetUpdate() { - return this.config.getBoolean(ITERATION_WORKSET_UPDATE, false); + return this.config.get(getBooleanConfigOption(ITERATION_WORKSET_UPDATE), false); } // -------------------------------------------------------------------------------------------- @@ -1294,10 +1311,10 @@ private TypeComparatorFactory getTypeComparatorFactory( } public void setSolutionSetUnmanaged(boolean unmanaged) { - config.setBoolean(SOLUTION_SET_OBJECTS, unmanaged); + config.set(getBooleanConfigOption(SOLUTION_SET_OBJECTS), unmanaged); } public boolean isSolutionSetUnmanaged() { - return config.getBoolean(SOLUTION_SET_OBJECTS, false); + return config.get(getBooleanConfigOption(SOLUTION_SET_OBJECTS), false); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java index e87cd6b660b51..993004459cc22 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveBatchSchedulerFactory.java @@ -150,7 +150,7 @@ public SchedulerNG createInstance( jobGraph.getJobID()); final boolean isJobRecoveryEnabled = - jobMasterConfiguration.getBoolean(BatchExecutionOptions.JOB_RECOVERY_ENABLED) + jobMasterConfiguration.get(BatchExecutionOptions.JOB_RECOVERY_ENABLED) && shuffleMaster.supportsBatchSnapshot(); BatchJobRecoveryHandler jobRecoveryHandler; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java index 7d645dcf6050b..3e1f65483d34c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/ConfigurationParserUtils.java @@ -199,7 +199,7 @@ public static List loadAndModifyConfiguration(String[] args, String cmdL if (removeKeyValues .getProperty(propertyName) .equals( - configuration.getString( + configuration.get( ConfigOptions.key(propertyName) .stringType() .noDefaultValue()))) { diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index aa466e16b3948..7414b82263964 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -85,28 +85,40 @@ public class StreamConfig implements Serializable { */ public static final String SERIALIZED_UDF_CLASS = "serializedUdfClass"; - private static final String NUMBER_OF_OUTPUTS = "numberOfOutputs"; - private static final String NUMBER_OF_NETWORK_INPUTS = "numberOfNetworkInputs"; + private static final ConfigOption NUMBER_OF_OUTPUTS = + ConfigOptions.key("numberOfOutputs").intType().defaultValue(0); + private static final ConfigOption NUMBER_OF_NETWORK_INPUTS = + ConfigOptions.key("numberOfNetworkInputs").intType().defaultValue(0); private static final String CHAINED_OUTPUTS = "chainedOutputs"; private static final String CHAINED_TASK_CONFIG = "chainedTaskConfig_"; - private static final String IS_CHAINED_VERTEX = "isChainedSubtask"; - private static final String CHAIN_INDEX = "chainIndex"; - private static final String VERTEX_NAME = "vertexID"; + private static final ConfigOption IS_CHAINED_VERTEX = + ConfigOptions.key("isChainedSubtask").booleanType().defaultValue(false); + private static final ConfigOption CHAIN_INDEX = + ConfigOptions.key("chainIndex").intType().defaultValue(0); + private static final ConfigOption VERTEX_NAME = + ConfigOptions.key("vertexID").intType().defaultValue(-1); private static final String ITERATION_ID = "iterationId"; private static final String INPUTS = "inputs"; private static final String TYPE_SERIALIZER_OUT_1 = "typeSerializer_out"; private static final String TYPE_SERIALIZER_SIDEOUT_PREFIX = "typeSerializer_sideout_"; - private static final String ITERATON_WAIT = "iterationWait"; + private static final ConfigOption ITERATON_WAIT = + ConfigOptions.key("iterationWait").longType().defaultValue(0L); private static final String OP_NONCHAINED_OUTPUTS = "opNonChainedOutputs"; private static final String VERTEX_NONCHAINED_OUTPUTS = "vertexNonChainedOutputs"; private static final String IN_STREAM_EDGES = "inStreamEdges"; private static final String OPERATOR_NAME = "operatorName"; private static final String OPERATOR_ID = "operatorID"; - private static final String CHAIN_END = "chainEnd"; - private static final String GRAPH_CONTAINING_LOOPS = "graphContainingLoops"; + private static final ConfigOption CHAIN_END = + ConfigOptions.key("chainEnd").booleanType().defaultValue(false); - private static final String CHECKPOINTING_ENABLED = "checkpointing"; - private static final String CHECKPOINT_MODE = "checkpointMode"; + private static final ConfigOption GRAPH_CONTAINING_LOOPS = + ConfigOptions.key("graphContainingLoops").booleanType().defaultValue(false); + + private static final ConfigOption CHECKPOINTING_ENABLED = + ConfigOptions.key("checkpointing").booleanType().defaultValue(false); + + private static final ConfigOption CHECKPOINT_MODE = + ConfigOptions.key("checkpointMode").intType().defaultValue(-1); private static final String SAVEPOINT_DIR = "savepointdir"; private static final String CHECKPOINT_STORAGE = "checkpointstorage"; @@ -117,7 +129,8 @@ public class StreamConfig implements Serializable { private static final String STATE_KEY_SERIALIZER = "statekeyser"; - private static final String TIME_CHARACTERISTIC = "timechar"; + private static final ConfigOption TIME_CHARACTERISTIC = + ConfigOptions.key("timechar").intType().defaultValue(-1); private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction."; @@ -232,11 +245,11 @@ public void setAndSerializeTransitiveChainedTaskConfigs( // ------------------------------------------------------------------------ public void setVertexID(Integer vertexID) { - config.setInteger(VERTEX_NAME, vertexID); + config.set(VERTEX_NAME, vertexID); } public Integer getVertexID() { - return config.getInteger(VERTEX_NAME, -1); + return config.get(VERTEX_NAME); } /** Fraction of managed memory reserved for the given use case that this operator should use. */ @@ -292,11 +305,11 @@ private Set getAllManagedMemoryUseCases() { } public void setTimeCharacteristic(TimeCharacteristic characteristic) { - config.setInteger(TIME_CHARACTERISTIC, characteristic.ordinal()); + config.set(TIME_CHARACTERISTIC, characteristic.ordinal()); } public TimeCharacteristic getTimeCharacteristic() { - int ordinal = config.getInteger(TIME_CHARACTERISTIC, -1); + int ordinal = config.get(TIME_CHARACTERISTIC, -1); if (ordinal >= 0) { return TimeCharacteristic.values()[ordinal]; } else { @@ -439,27 +452,27 @@ public String getIterationId() { } public void setIterationWaitTime(long time) { - config.setLong(ITERATON_WAIT, time); + config.set(ITERATON_WAIT, time); } public long getIterationWaitTime() { - return config.getLong(ITERATON_WAIT, 0); + return config.get(ITERATON_WAIT); } public void setNumberOfNetworkInputs(int numberOfInputs) { - config.setInteger(NUMBER_OF_NETWORK_INPUTS, numberOfInputs); + config.set(NUMBER_OF_NETWORK_INPUTS, numberOfInputs); } public int getNumberOfNetworkInputs() { - return config.getInteger(NUMBER_OF_NETWORK_INPUTS, 0); + return config.get(NUMBER_OF_NETWORK_INPUTS); } public void setNumberOfOutputs(int numberOfOutputs) { - config.setInteger(NUMBER_OF_OUTPUTS, numberOfOutputs); + config.set(NUMBER_OF_OUTPUTS, numberOfOutputs); } public int getNumberOfOutputs() { - return config.getInteger(NUMBER_OF_OUTPUTS, 0); + return config.get(NUMBER_OF_OUTPUTS); } /** Sets the operator level non-chained outputs. */ @@ -508,19 +521,19 @@ public List getInPhysicalEdges(ClassLoader cl) { // --------------------- checkpointing ----------------------- public void setCheckpointingEnabled(boolean enabled) { - config.setBoolean(CHECKPOINTING_ENABLED, enabled); + config.set(CHECKPOINTING_ENABLED, enabled); } public boolean isCheckpointingEnabled() { - return config.getBoolean(CHECKPOINTING_ENABLED, false); + return config.get(CHECKPOINTING_ENABLED); } public void setCheckpointMode(CheckpointingMode mode) { - config.setInteger(CHECKPOINT_MODE, mode.ordinal()); + config.set(CHECKPOINT_MODE, mode.ordinal()); } public CheckpointingMode getCheckpointMode() { - int ordinal = config.getInteger(CHECKPOINT_MODE, -1); + int ordinal = config.get(CHECKPOINT_MODE, -1); if (ordinal >= 0) { return CheckpointingMode.values()[ordinal]; } else { @@ -537,7 +550,7 @@ public boolean isUnalignedCheckpointsEnabled() { } public void setUnalignedCheckpointsSplittableTimersEnabled(boolean enabled) { - config.setBoolean(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, enabled); + config.set(CheckpointingOptions.ENABLE_UNALIGNED_INTERRUPTIBLE_TIMERS, enabled); } public boolean isUnalignedCheckpointsSplittableTimersEnabled() { @@ -640,11 +653,11 @@ public String getOperatorName() { } public void setChainIndex(int index) { - this.config.setInteger(CHAIN_INDEX, index); + this.config.set(CHAIN_INDEX, index); } public int getChainIndex() { - return this.config.getInteger(CHAIN_INDEX, 0); + return this.config.get(CHAIN_INDEX); } // ------------------------------------------------------------------------ @@ -745,19 +758,19 @@ public TypeSerializer getStateKeySerializer(ClassLoader cl) { // ------------------------------------------------------------------------ public void setChainStart() { - config.setBoolean(IS_CHAINED_VERTEX, true); + config.set(IS_CHAINED_VERTEX, true); } public boolean isChainStart() { - return config.getBoolean(IS_CHAINED_VERTEX, false); + return config.get(IS_CHAINED_VERTEX); } public void setChainEnd() { - config.setBoolean(CHAIN_END, true); + config.set(CHAIN_END, true); } public boolean isChainEnd() { - return config.getBoolean(CHAIN_END, false); + return config.get(CHAIN_END); } @Override @@ -797,11 +810,11 @@ public String toString() { } public void setGraphContainingLoops(boolean graphContainingLoops) { - config.setBoolean(GRAPH_CONTAINING_LOOPS, graphContainingLoops); + config.set(GRAPH_CONTAINING_LOOPS, graphContainingLoops); } public boolean isGraphContainingLoops() { - return config.getBoolean(GRAPH_CONTAINING_LOOPS, false); + return config.get(GRAPH_CONTAINING_LOOPS); } public void setAttribute(Attribute attribute) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java index 257c309e3a489..d8e91b6b3ba8e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/TaskExecutorProcessUtilsTest.java @@ -37,6 +37,7 @@ import java.util.Map; import java.util.function.Consumer; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_LEGACY_HEAP_OPTIONS; import static org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils.TM_PROCESS_MEMORY_OPTIONS; import static org.assertj.core.api.Assertions.assertThat; @@ -635,9 +636,11 @@ void testProcessSpecFromConfigWithExternalResource() { final Configuration config = new Configuration(); config.setString( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME_1); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME_1), - 1); + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource( + EXTERNAL_RESOURCE_NAME_1)), + 1L); config.set(TaskManagerOptions.TOTAL_PROCESS_MEMORY, MemorySize.ofMebiBytes(4096)); final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.processSpecFromConfig(config); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java index f3895a3e5f5bf..5f3b38060b044 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/externalresource/ExternalResourceUtilsTest.java @@ -38,6 +38,7 @@ import java.util.List; import java.util.Map; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.empty; import static org.hamcrest.core.Is.is; @@ -228,8 +229,9 @@ public void testGetExternalResourceAmountMap() { config.set( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), RESOURCE_AMOUNT_1); final Map externalResourceAmountMap = @@ -246,8 +248,10 @@ public void testGetExternalResourceAmountMapWithIllegalAmount() { config.set( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), 0); + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), + 0L); final Map externalResourceAmountMap = ExternalResourceUtils.getExternalResourceAmountMap(config); @@ -261,8 +265,9 @@ public void testGetExternalResourcesCollection() { config.set( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST, Collections.singletonList(RESOURCE_NAME_1)); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), RESOURCE_AMOUNT_1); final Collection externalResources = @@ -279,8 +284,9 @@ public void testRecognizeEmptyResourceList() { final Configuration config = new Configuration(); config.setString( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), ExternalResourceOptions.NONE); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1), + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource(RESOURCE_NAME_1)), RESOURCE_AMOUNT_1); final Collection externalResources = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java index 58cd04f998762..19f3f11a53ad0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobgraph/JobGraphTest.java @@ -37,6 +37,7 @@ import java.util.Arrays; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getDoubleConfigOption; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.hasSize; import static org.junit.Assert.assertEquals; @@ -57,7 +58,7 @@ public void testSerialization() { // add some configuration values { jg.getJobConfiguration().setString("some key", "some value"); - jg.getJobConfiguration().setDouble("Life of ", Math.PI); + jg.getJobConfiguration().set(getDoubleConfigOption("Life of "), Math.PI); } // add some vertices diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java index 53822036e33eb..e19be728a3bf0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/SlotCountExceedingParallelismTest.java @@ -43,6 +43,8 @@ import java.util.Arrays; import java.util.BitSet; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; + /** * Tests that Flink can execute jobs with a higher parallelism than available number of slots. This * effectively tests that Flink can execute jobs with blocking results in a staged fashion. @@ -108,12 +110,15 @@ private JobGraph createTestJobGraph( final JobVertex sender = new JobVertex("Sender"); sender.setInvokableClass(RoundRobinSubtaskIndexSender.class); sender.getConfiguration() - .setInteger(RoundRobinSubtaskIndexSender.CONFIG_KEY, receiverParallelism); + .get( + getIntConfigOption(RoundRobinSubtaskIndexSender.CONFIG_KEY), + receiverParallelism); sender.setParallelism(senderParallelism); final JobVertex receiver = new JobVertex("Receiver"); receiver.setInvokableClass(SubtaskIndexReceiver.class); - receiver.getConfiguration().setInteger(SubtaskIndexReceiver.CONFIG_KEY, senderParallelism); + receiver.getConfiguration() + .get(getIntConfigOption(SubtaskIndexReceiver.CONFIG_KEY), senderParallelism); receiver.setParallelism(receiverParallelism); receiver.connectNewDataSetAsInput( @@ -138,7 +143,8 @@ public RoundRobinSubtaskIndexSender(Environment environment) { public void invoke() throws Exception { RecordWriter writer = new RecordWriterBuilder().build(getEnvironment().getWriter(0)); - final int numberOfTimesToSend = getTaskConfiguration().getInteger(CONFIG_KEY, 0); + final int numberOfTimesToSend = + getTaskConfiguration().get(getIntConfigOption(CONFIG_KEY), 0); final IntValue subtaskIndex = new IntValue(getEnvironment().getTaskInfo().getIndexOfThisSubtask()); @@ -173,7 +179,7 @@ public void invoke() throws Exception { try { final int numberOfSubtaskIndexesToReceive = - getTaskConfiguration().getInteger(CONFIG_KEY, 0); + getTaskConfiguration().get(getIntConfigOption(CONFIG_KEY), 0); final BitSet receivedSubtaskIndexes = new BitSet(numberOfSubtaskIndexesToReceive); IntValue record; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/event/FileSystemJobEventStoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/event/FileSystemJobEventStoreTest.java index e3bcd4ff23b7d..d018adbf6693a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/event/FileSystemJobEventStoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/event/FileSystemJobEventStoreTest.java @@ -245,8 +245,8 @@ void testReadUnexpectedLengthEvent() throws Exception { @Test void testGenerateWorkingDirCorrectly() throws IOException { final Configuration configuration = new Configuration(); - configuration.setString(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///tmp/flink"); - configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, "cluster_id"); + configuration.set(HighAvailabilityOptions.HA_STORAGE_PATH, "file:///tmp/flink"); + configuration.set(HighAvailabilityOptions.HA_CLUSTER_ID, "cluster_id"); final JobID jobID = new JobID(); FileSystemJobEventStore store = new FileSystemJobEventStore(jobID, configuration); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java index 79d458073e6f6..0bd1d4264fe94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/WorkerResourceSpecTest.java @@ -28,6 +28,7 @@ import org.junit.jupiter.api.Test; +import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link WorkerResourceSpec}. */ @@ -267,9 +268,11 @@ void testCreateFromTaskExecutorProcessSpec() { final Configuration config = new Configuration(); config.setString( ExternalResourceOptions.EXTERNAL_RESOURCE_LIST.key(), EXTERNAL_RESOURCE_NAME); - config.setLong( - ExternalResourceOptions.getAmountConfigOptionForResource(EXTERNAL_RESOURCE_NAME), - 1); + config.set( + getLongConfigOption( + ExternalResourceOptions.getAmountConfigOptionForResource( + EXTERNAL_RESOURCE_NAME)), + 1L); final TaskExecutorProcessSpec taskExecutorProcessSpec = TaskExecutorProcessUtils.newProcessSpecBuilder(config) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java index 1c26a4e32d311..d1b24d862f4cd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptivebatch/DefaultVertexParallelismAndInputInfosDeciderTest.java @@ -383,7 +383,7 @@ void testEvenlyDistributeDataWithMaxSubpartitionLimitation() { @Test void testComputeSourceParallelismUpperBound() { Configuration configuration = new Configuration(); - configuration.setInteger( + configuration.set( BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, DEFAULT_SOURCE_PARALLELISM); VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = @@ -408,7 +408,7 @@ void testComputeSourceParallelismUpperBoundFallback() { @Test void testComputeSourceParallelismUpperBoundNotExceedMaxParallelism() { Configuration configuration = new Configuration(); - configuration.setInteger( + configuration.set( BatchExecutionOptions.ADAPTIVE_AUTO_PARALLELISM_DEFAULT_SOURCE_PARALLELISM, VERTEX_MAX_PARALLELISM * 2); VertexParallelismAndInputInfosDecider vertexParallelismAndInputInfosDecider = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java index 0fbf1c5bdb689..c7a4cf6f9587d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DefaultDelegationTokenManagerTest.java @@ -38,6 +38,7 @@ import java.util.concurrent.atomic.AtomicInteger; import static java.time.Instant.ofEpochMilli; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.configuration.SecurityOptions.DELEGATION_TOKENS_RENEWAL_TIME_RATIO; import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -70,7 +71,7 @@ public void isProviderEnabledMustGiveBackTrueByDefault() { @Test public void isProviderEnabledMustGiveBackFalseWhenDisabled() { Configuration configuration = new Configuration(); - configuration.setBoolean(CONFIG_PREFIX + ".test.enabled", false); + configuration.set(getBooleanConfigOption(CONFIG_PREFIX + ".test.enabled"), false); assertFalse(DefaultDelegationTokenManager.isProviderEnabled(configuration, "test")); } @@ -94,7 +95,7 @@ public void oneProviderThrowsExceptionMustFailFast() { @Test public void testAllProvidersLoaded() { Configuration configuration = new Configuration(); - configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false); + configuration.set(getBooleanConfigOption(CONFIG_PREFIX + ".throw.enabled"), false); DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager(configuration, null, null, null); @@ -195,7 +196,7 @@ public void startTokensUpdateShouldScheduleRenewal() { ExceptionThrowingDelegationTokenProvider.addToken.set(true); Configuration configuration = new Configuration(); - configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", true); + configuration.set(getBooleanConfigOption(CONFIG_PREFIX + ".throw.enabled"), true); AtomicInteger startTokensUpdateCallCount = new AtomicInteger(0); DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager( @@ -222,7 +223,7 @@ void startTokensUpdate() { @Test public void calculateRenewalDelayShouldConsiderRenewalRatio() { Configuration configuration = new Configuration(); - configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false); + configuration.set(getBooleanConfigOption(CONFIG_PREFIX + ".throw.enabled"), false); configuration.set(DELEGATION_TOKENS_RENEWAL_TIME_RATIO, 0.5); DefaultDelegationTokenManager delegationTokenManager = new DefaultDelegationTokenManager(configuration, null, null, null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java index 4b997fa81e6c6..61ed85a50274e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/security/token/DelegationTokenReceiverRepositoryTest.java @@ -24,6 +24,7 @@ import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.apache.flink.core.security.token.DelegationTokenProvider.CONFIG_PREFIX; import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; @@ -61,7 +62,7 @@ public void oneReceiverThrowsExceptionMustFailFast() { @Test public void testAllReceiversLoaded() { Configuration configuration = new Configuration(); - configuration.setBoolean(CONFIG_PREFIX + ".throw.enabled", false); + configuration.set(getBooleanConfigOption(CONFIG_PREFIX + ".throw.enabled"), false); DelegationTokenReceiverRepository delegationTokenReceiverRepository = new DelegationTokenReceiverRepository(configuration, null); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java index d1d0795aa53e5..8889ce688e267 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/shuffle/ShuffleMasterTest.java @@ -44,6 +44,7 @@ import java.util.concurrent.atomic.AtomicReference; import static org.apache.flink.api.common.restartstrategy.RestartStrategies.fixedDelayRestart; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; import static org.assertj.core.api.Assertions.assertThat; /** Tests for {@link ShuffleMaster}. */ @@ -97,7 +98,8 @@ private MiniClusterConfiguration createClusterConfiguration(boolean stopTracking configuration.set( ShuffleServiceOptions.SHUFFLE_SERVICE_FACTORY_CLASS, TestShuffleServiceFactory.class.getName()); - configuration.setBoolean(STOP_TRACKING_PARTITION_KEY, stopTrackingPartition); + configuration.set( + getBooleanConfigOption(STOP_TRACKING_PARTITION_KEY), stopTrackingPartition); return new MiniClusterConfiguration.Builder() .withRandomPorts() .setNumTaskManagers(1) @@ -153,7 +155,8 @@ private static class TestShuffleMaster extends NettyShuffleMaster { public TestShuffleMaster(Configuration conf) { super(new ShuffleMasterContextImpl(conf, throwable -> {})); - this.stopTrackingPartition = conf.getBoolean(STOP_TRACKING_PARTITION_KEY, false); + this.stopTrackingPartition = + conf.get(getBooleanConfigOption(STOP_TRACKING_PARTITION_KEY), false); currentInstance.set(this); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java index de59d1cea4f0b..4a3015d62e764 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/state/TaskExecutorFileMergingManagerTest.java @@ -53,7 +53,7 @@ public void testCheckpointScope(@TempDir java.nio.file.Path testBaseDir) throws Path checkpointDir2 = new Path(testBaseDir.toString(), "job2"); int writeBufferSize = 4096; Configuration jobConfig = new Configuration(); - jobConfig.setBoolean(FILE_MERGING_ENABLED, true); + jobConfig.set(FILE_MERGING_ENABLED, true); Configuration clusterConfig = new Configuration(); ExecutionAttemptID executionID1 = ExecutionAttemptID.randomId(); FileMergingSnapshotManager manager1 = diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java index 4aae1d1941468..45dfcd32c0df7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskTest.java @@ -917,8 +917,8 @@ public void testWatchDogInterruptsTask() throws Exception { final TaskManagerActions taskManagerActions = new ProhibitFatalErrorTaskManagerActions(); final Configuration config = new Configuration(); - config.setLong(TaskManagerOptions.TASK_CANCELLATION_INTERVAL.key(), 5); - config.setLong(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT.key(), 60 * 1000); + config.set(TaskManagerOptions.TASK_CANCELLATION_INTERVAL, Duration.ofMillis(5)); + config.set(TaskManagerOptions.TASK_CANCELLATION_TIMEOUT, Duration.ofMillis(60 * 1000)); final Task task = createTaskBuilder() diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java index 4a9f7e15d0ee6..dfa187736a918 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/testutils/recordutils/RecordComparatorFactory.java @@ -26,6 +26,9 @@ import java.util.Arrays; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; + /** * A factory for a {@link org.apache.flink.api.common.typeutils.TypeComparator} for {@link Record}. * The comparator uses a subset of the fields for the comparison. That subset of fields (positions @@ -99,11 +102,12 @@ public void writeParametersToConfig(Configuration config) { } // write the config - config.setInteger(NUM_KEYS, this.positions.length); + config.set(getIntConfigOption(NUM_KEYS), this.positions.length); for (int i = 0; i < this.positions.length; i++) { - config.setInteger(KEY_POS_PREFIX + i, this.positions[i]); + config.set(getIntConfigOption(KEY_POS_PREFIX + i), this.positions[i]); config.setString(KEY_CLASS_PREFIX + i, this.types[i].getName()); - config.setBoolean(KEY_SORT_DIRECTION_PREFIX + i, this.sortDirections[i]); + config.set( + getBooleanConfigOption(KEY_SORT_DIRECTION_PREFIX + i), this.sortDirections[i]); } } @@ -112,7 +116,7 @@ public void writeParametersToConfig(Configuration config) { public void readParametersFromConfig(Configuration config, ClassLoader cl) throws ClassNotFoundException { // figure out how many key fields there are - final int numKeyFields = config.getInteger(NUM_KEYS, -1); + final int numKeyFields = config.get(getIntConfigOption(NUM_KEYS), -1); if (numKeyFields < 0) { throw new IllegalConfigurationException( "The number of keys for the comparator is invalid: " + numKeyFields); @@ -125,7 +129,7 @@ public void readParametersFromConfig(Configuration config, ClassLoader cl) // read the individual key positions and types for (int i = 0; i < numKeyFields; i++) { // next key position - final int p = config.getInteger(KEY_POS_PREFIX + i, -1); + final int p = config.get(getIntConfigOption(KEY_POS_PREFIX + i), -1); if (p >= 0) { positions[i] = p; } else { @@ -145,7 +149,7 @@ public void readParametersFromConfig(Configuration config, ClassLoader cl) } // next key sort direction - direction[i] = config.getBoolean(KEY_SORT_DIRECTION_PREFIX + i, true); + direction[i] = config.get(getBooleanConfigOption(KEY_SORT_DIRECTION_PREFIX + i), true); } this.positions = positions; diff --git a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java index 1c8a3bddb6148..a7f9cd2240377 100644 --- a/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java +++ b/flink-state-backends/flink-statebackend-forst/src/test/java/org/apache/flink/state/forst/ForStNativeMetricOptionsTest.java @@ -23,6 +23,8 @@ import org.junit.Assert; import org.junit.Test; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; + /** Test all native metrics can be set using configuration. */ public class ForStNativeMetricOptionsTest { @Test @@ -32,7 +34,7 @@ public void testNativeMetricsConfigurable() { if (property.getConfigKey().contains("num-files-at-level")) { config.set(ForStNativeMetricOptions.MONITOR_NUM_FILES_AT_LEVEL, true); } else { - config.setBoolean(property.getConfigKey(), true); + config.set(getBooleanConfigOption(property.getConfigKey()), true); } ForStNativeMetricOptions options = ForStNativeMetricOptions.fromConfig(config); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java index a8277fa07fca6..6d9bbb9cbb47f 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/EmbeddedRocksDBStateBackendTest.java @@ -680,12 +680,11 @@ public void testConfigureTernaryBooleanConfigs() throws Exception { (EmbeddedRocksDBStateBackend) stateBackend; Configuration baseConfig = createBackendConfig(); Configuration testConfig = new Configuration(); - testConfig.setBoolean( - USE_INGEST_DB_RESTORE_MODE, !USE_INGEST_DB_RESTORE_MODE.defaultValue()); - testConfig.setBoolean( + testConfig.set(USE_INGEST_DB_RESTORE_MODE, !USE_INGEST_DB_RESTORE_MODE.defaultValue()); + testConfig.set( INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE, !INCREMENTAL_RESTORE_ASYNC_COMPACT_AFTER_RESCALE.defaultValue()); - testConfig.setBoolean( + testConfig.set( USE_DELETE_FILES_IN_RANGE_DURING_RESCALING, !USE_DELETE_FILES_IN_RANGE_DURING_RESCALING.defaultValue()); EmbeddedRocksDBStateBackend configuredBackend = diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java index a2a89e700a321..d14f09e3a7a18 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksDBNativeMetricOptionsTest.java @@ -23,6 +23,8 @@ import org.junit.Assert; import org.junit.Test; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; + /** Test all native metrics can be set using configuration. */ public class RocksDBNativeMetricOptionsTest { @Test @@ -30,10 +32,9 @@ public void testNativeMetricsConfigurable() { for (RocksDBProperty property : RocksDBProperty.values()) { Configuration config = new Configuration(); if (property.getConfigKey().contains("num-files-at-level")) { - config.setBoolean( - RocksDBNativeMetricOptions.MONITOR_NUM_FILES_AT_LEVEL.key(), true); + config.set(RocksDBNativeMetricOptions.MONITOR_NUM_FILES_AT_LEVEL, true); } else { - config.setBoolean(property.getConfigKey(), true); + config.set(getBooleanConfigOption(property.getConfigKey()), true); } RocksDBNativeMetricOptions options = RocksDBNativeMetricOptions.fromConfig(config); diff --git a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java index 38b8d3e8c92d2..b4e0eff83b8e3 100644 --- a/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java +++ b/flink-state-backends/flink-statebackend-rocksdb/src/test/java/org/apache/flink/contrib/streaming/state/RocksIncrementalCheckpointRescalingTest.java @@ -435,7 +435,7 @@ private StateBackend getStateBackend() throws Exception { RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend("file://" + rootFolder.newFolder().getAbsolutePath(), true); Configuration configuration = new Configuration(); - configuration.setBoolean( + configuration.set( RocksDBConfigurableOptions.USE_INGEST_DB_RESTORE_MODE, useIngestDbRestoreMode); return rocksDBStateBackend.configure(configuration, getClass().getClassLoader()); } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java index aaed34a2ac878..50503ca3da14f 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironmentTest.java @@ -395,7 +395,7 @@ void testPeriodicMaterializeEnabled() { assertThat(env.getConfig().isPeriodicMaterializeEnabled()) .isEqualTo(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED.defaultValue()); - config.setBoolean(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED.key(), false); + config.set(StateChangelogOptions.PERIODIC_MATERIALIZATION_ENABLED, false); env.configure(config, this.getClass().getClassLoader()); assertThat(env.getConfig().isPeriodicMaterializeEnabled()).isFalse(); } diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala index e2193bfa2a8a9..2a1270f08c225 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/plan/stream/sql/agg/WindowAggregateTest.scala @@ -17,7 +17,7 @@ */ package org.apache.flink.table.planner.plan.stream.sql.agg -import org.apache.flink.configuration.Configuration +import org.apache.flink.configuration.{ConfigOption, Configuration} import org.apache.flink.table.api.{ExplainDetail, TableException, ValidationException} import org.apache.flink.table.api.config.{AggregatePhaseStrategy, OptimizerConfigOptions} import org.apache.flink.table.planner.plan.utils.JavaUserDefinedAggFunctions.{WeightedAvg, WeightedAvgWithMerge} @@ -1480,7 +1480,10 @@ class WindowAggregateTest(aggPhaseEnforcer: AggregatePhaseStrategy) extends Tabl def testSession_DistinctSplitEnabled(): Unit = { // Session window does not support split-distinct optimization util.tableEnv.getConfig.getConfiguration - .setBoolean(OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED, true) + .set( + OptimizerConfigOptions.TABLE_OPTIMIZER_DISTINCT_AGG_SPLIT_ENABLED + .asInstanceOf[ConfigOption[Any]], + true) val sql = """ |SELECT diff --git a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java index 9e769c9937913..1ed6867d100bc 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java +++ b/flink-tests/src/test/java/org/apache/flink/test/manual/StreamingScalabilityAndLatency.java @@ -31,6 +31,7 @@ import org.apache.flink.streaming.api.functions.source.ParallelSourceFunction; import org.apache.flink.test.util.MiniClusterWithClientResource; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.junit.Assert.fail; /** Manual test to evaluate impact of checkpointing on latency. */ @@ -53,8 +54,8 @@ public static void main(String[] args) throws Exception { config.set(TaskManagerOptions.MANAGED_MEMORY_SIZE, MemorySize.parse("80m")); config.set(NettyShuffleEnvironmentOptions.NETWORK_NUM_BUFFERS, 20000); - config.setInteger("taskmanager.net.server.numThreads", 1); - config.setInteger("taskmanager.net.client.numThreads", 1); + config.set(getIntConfigOption("taskmanager.net.server.numThreads"), 1); + config.set(getIntConfigOption("taskmanager.net.client.numThreads"), 1); cluster = new MiniClusterWithClientResource( diff --git a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java index 0c2b2c716f778..ecb3bed4702b1 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/operators/MapITCase.java @@ -38,6 +38,7 @@ import java.util.Collections; import java.util.List; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; import static org.apache.flink.test.util.TestBaseUtils.compareResultAsText; import static org.apache.flink.test.util.TestBaseUtils.compareResultAsTuples; @@ -512,7 +513,7 @@ public void testPassingConfigurationObject() throws Exception { DataSet> ds = CollectionDataSets.getSmall3TupleDataSet(env); Configuration conf = new Configuration(); - conf.setInteger(TEST_KEY, TEST_VALUE); + conf.set(getIntConfigOption(TEST_KEY), TEST_VALUE); DataSet> bcMapDs = ds.map(new RichMapper2()).withParameters(conf); List> result = bcMapDs.collect(); diff --git a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java index 6d53c2fadc71e..545f6e048d28f 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/runtime/NetworkStackThroughputITCase.java @@ -48,6 +48,9 @@ import java.util.List; import java.util.concurrent.TimeUnit; +import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption; +import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption; + /** Manually test the throughput of the network stack. */ public class NetworkStackThroughputITCase extends TestLogger { @@ -88,8 +91,11 @@ public void invoke() throws Exception { // Determine the amount of data to send per subtask int dataVolumeGb = getTaskConfiguration() - .getInteger( - NetworkStackThroughputITCase.DATA_VOLUME_GB_CONFIG_KEY, 1); + .get( + getIntConfigOption( + NetworkStackThroughputITCase + .DATA_VOLUME_GB_CONFIG_KEY), + 1); long dataMbPerSubtask = (dataVolumeGb * 10) / getCurrentNumberOfSubtasks(); long numRecordsToEmit = @@ -105,7 +111,8 @@ public void invoke() throws Exception { dataMbPerSubtask / 1024.0)); boolean isSlow = - getTaskConfiguration().getBoolean(IS_SLOW_SENDER_CONFIG_KEY, false); + getTaskConfiguration() + .get(getBooleanConfigOption(IS_SLOW_SENDER_CONFIG_KEY), false); int numRecords = 0; SpeedTestRecord record = new SpeedTestRecord(); @@ -180,7 +187,8 @@ public void invoke() throws Exception { try { boolean isSlow = - getTaskConfiguration().getBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, false); + getTaskConfiguration() + .get(getBooleanConfigOption(IS_SLOW_RECEIVER_CONFIG_KEY), false); int numRecords = 0; while (reader.next() != null) { @@ -327,8 +335,10 @@ private JobGraph createJobGraph( producer.setInvokableClass(SpeedTestProducer.class); producer.setParallelism(numSubtasks); - producer.getConfiguration().setInteger(DATA_VOLUME_GB_CONFIG_KEY, dataVolumeGb); - producer.getConfiguration().setBoolean(IS_SLOW_SENDER_CONFIG_KEY, isSlowSender); + producer.getConfiguration() + .set(getIntConfigOption(DATA_VOLUME_GB_CONFIG_KEY), dataVolumeGb); + producer.getConfiguration() + .set(getBooleanConfigOption(IS_SLOW_SENDER_CONFIG_KEY), isSlowSender); jobVertices.add(producer); @@ -347,7 +357,8 @@ private JobGraph createJobGraph( consumer.setInvokableClass(SpeedTestConsumer.class); consumer.setParallelism(numSubtasks); - consumer.getConfiguration().setBoolean(IS_SLOW_RECEIVER_CONFIG_KEY, isSlowReceiver); + consumer.getConfiguration() + .set(getBooleanConfigOption(IS_SLOW_RECEIVER_CONFIG_KEY), isSlowReceiver); jobVertices.add(consumer); diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java index bc65a5ee93deb..20548c619ead5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnClusterDescriptor.java @@ -915,15 +915,15 @@ private ApplicationReport startAppMaster( if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) { // activate re-execution of failed applications appContext.setMaxAppAttempts( - configuration.getInteger( - YarnConfigOptions.APPLICATION_ATTEMPTS.key(), + configuration.get( + YarnConfigOptions.APPLICATION_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS)); activateHighAvailabilitySupport(appContext); } else { // set number of application retries to 1 in the default case appContext.setMaxAppAttempts( - configuration.getInteger(YarnConfigOptions.APPLICATION_ATTEMPTS.key(), 1)); + configuration.get(YarnConfigOptions.APPLICATION_ATTEMPTS, 1)); } final Set userJarFiles = new HashSet<>();