Skip to content

Commit

Permalink
[FLINK-34168][config] Refactor callers that use deprecated get/setXXX (
Browse files Browse the repository at this point in the history
  • Loading branch information
Sxnan authored Sep 12, 2024
1 parent 4d13170 commit 740a567
Show file tree
Hide file tree
Showing 53 changed files with 591 additions and 540 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Supplier;

import static org.apache.flink.configuration.PipelineOptions.ALLOW_UNALIGNED_SOURCE_SPLITS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

Expand Down Expand Up @@ -107,9 +108,7 @@ public void testPauseResumeSplitReaders(boolean individualReader) throws Excepti
public void testPauseResumeUnsupported(boolean allowUnalignedSourceSplits) throws Exception {
final AtomicInteger numSplitReaders = new AtomicInteger();
final Configuration configuration = new Configuration();
configuration.setBoolean(
"pipeline.watermark-alignment.allow-unaligned-source-splits",
allowUnalignedSourceSplits);
configuration.set(ALLOW_UNALIGNED_SOURCE_SPLITS, allowUnalignedSourceSplits);
final MockSplitReader.Builder readerBuilder =
SteppingSourceReaderTestHarness.createSplitReaderBuilder();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -384,7 +384,7 @@ private Configuration getPartitionCommitTriggerConf(long commitDelay) {
configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file");
configuration.setString(PARTITION_TIME_EXTRACTOR_TIMESTAMP_FORMATTER.key(), "yyyy-MM-dd");
configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "partition-time");
configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay);
configuration.set(SINK_PARTITION_COMMIT_DELAY, Duration.ofMillis(commitDelay));
configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC");
return configuration;
}
Expand All @@ -393,7 +393,7 @@ private Configuration getProcTimeCommitTriggerConf(long commitDelay) {
Configuration configuration = new Configuration();
configuration.set(SINK_PARTITION_COMMIT_POLICY_KIND, "success-file");
configuration.setString(SINK_PARTITION_COMMIT_TRIGGER.key(), "process-time");
configuration.setLong(SINK_PARTITION_COMMIT_DELAY.key(), commitDelay);
configuration.set(SINK_PARTITION_COMMIT_DELAY, Duration.ofMillis(commitDelay));
configuration.setString(SINK_PARTITION_COMMIT_WATERMARK_TIME_ZONE.key(), "UTC");
return configuration;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,9 @@
import java.util.concurrent.Future;
import java.util.stream.Collectors;

import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
import static org.apache.flink.configuration.ConfigurationUtils.getIntConfigOption;

/**
* DistributedCache provides static methods to write the registered cache files into job
* configuration or decode them from job configuration. It also provides user access to the file
Expand Down Expand Up @@ -172,20 +175,24 @@ public File getFile(String name) {

public static void writeFileInfoToConfig(
String name, DistributedCacheEntry e, Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM, 0) + 1;
conf.setInteger(CACHE_FILE_NUM, num);
int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0) + 1;
conf.set(getIntConfigOption(CACHE_FILE_NUM), num);
conf.setString(CACHE_FILE_NAME + num, name);
conf.setString(CACHE_FILE_PATH + num, e.filePath);
conf.setBoolean(CACHE_FILE_EXE + num, e.isExecutable || new File(e.filePath).canExecute());
conf.setBoolean(CACHE_FILE_DIR + num, e.isZipped || new File(e.filePath).isDirectory());
conf.set(
getBooleanConfigOption(CACHE_FILE_EXE + num),
e.isExecutable || new File(e.filePath).canExecute());
conf.set(
getBooleanConfigOption(CACHE_FILE_DIR + num),
e.isZipped || new File(e.filePath).isDirectory());
if (e.blobKey != null) {
conf.setBytes(CACHE_FILE_BLOB_KEY + num, e.blobKey);
}
}

public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(
Configuration conf) {
int num = conf.getInteger(CACHE_FILE_NUM, 0);
int num = conf.get(getIntConfigOption(CACHE_FILE_NUM), 0);
if (num == 0) {
return Collections.emptySet();
}
Expand All @@ -195,8 +202,8 @@ public static Set<Entry<String, DistributedCacheEntry>> readFileInfoFromConfig(
for (int i = 1; i <= num; i++) {
String name = conf.getString(CACHE_FILE_NAME + i, null);
String filePath = conf.getString(CACHE_FILE_PATH + i, null);
boolean isExecutable = conf.getBoolean(CACHE_FILE_EXE + i, false);
boolean isDirectory = conf.getBoolean(CACHE_FILE_DIR + i, false);
boolean isExecutable = conf.get(getBooleanConfigOption(CACHE_FILE_EXE + i), false);
boolean isDirectory = conf.get(getBooleanConfigOption(CACHE_FILE_DIR + i), false);

byte[] blobKey = conf.getBytes(CACHE_FILE_BLOB_KEY + i, null);
cacheFiles.put(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
import java.util.Arrays;
import java.util.List;

import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;

/**
* Base class for all input formats that use blocks of fixed size. The input splits are aligned to
* these blocks, meaning that each split will consist of one block. Without configuration, these
Expand Down Expand Up @@ -90,7 +92,9 @@ public void configure(Configuration parameters) {
// overwriting the value set by the setter

if (this.blockSize == NATIVE_BLOCK_SIZE) {
long blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE);
long blockSize =
parameters.get(
getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), NATIVE_BLOCK_SIZE);
setBlockSize(blockSize);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
import java.io.IOException;
import java.io.OutputStream;

import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;

@Public
public abstract class BinaryOutputFormat<T> extends FileOutputFormat<T> {

Expand Down Expand Up @@ -63,7 +65,8 @@ public void configure(Configuration parameters) {
super.configure(parameters);

// read own parameters
this.blockSize = parameters.getLong(BLOCK_SIZE_PARAMETER_KEY, NATIVE_BLOCK_SIZE);
this.blockSize =
parameters.get(getLongConfigOption(BLOCK_SIZE_PARAMETER_KEY), NATIVE_BLOCK_SIZE);
if (this.blockSize < 1 && this.blockSize != NATIVE_BLOCK_SIZE) {
throw new IllegalArgumentException(
"The block size parameter must be set and larger than 0.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
import java.util.Map;
import java.util.Set;

import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
import static org.apache.flink.configuration.TaskManagerOptions.FS_STREAM_OPENING_TIME_OUT;
import static org.apache.flink.util.Preconditions.checkNotNull;

Expand Down Expand Up @@ -459,7 +460,8 @@ public void configure(Configuration parameters) {
}

if (!this.enumerateNestedFiles) {
this.enumerateNestedFiles = parameters.getBoolean(ENUMERATE_NESTED_FILES_FLAG, false);
this.enumerateNestedFiles =
parameters.get(getBooleanConfigOption(ENUMERATE_NESTED_FILES_FLAG), false);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,32 +131,6 @@ public void setParameter(String key, String value) {
this.parameters.setString(key, value);
}

/**
* Sets a stub parameters in the configuration of this contract. The stub parameters are
* accessible by the user code at runtime. Parameters that the user code needs to access at
* runtime to configure its behavior are typically stored as stub parameters.
*
* @see #getParameters()
* @param key The parameter key.
* @param value The parameter value.
*/
public void setParameter(String key, int value) {
this.parameters.setInteger(key, value);
}

/**
* Sets a stub parameters in the configuration of this contract. The stub parameters are
* accessible by the user code at runtime. Parameters that the user code needs to access at
* runtime to configure its behavior are typically stored as stub parameters.
*
* @see #getParameters()
* @param key The parameter key.
* @param value The parameter value.
*/
public void setParameter(String key, boolean value) {
this.parameters.setBoolean(key, value);
}

/**
* Gets the parallelism for this contract instance. The parallelism denotes how many parallel
* instances of the user function will be spawned during the execution. If this value is {@link
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -650,4 +650,24 @@ static boolean removePrefixMap(Map<String, Object> confData, String key) {

// Make sure that we cannot instantiate this class
private ConfigurationUtils() {}

public static ConfigOption<Boolean> getBooleanConfigOption(String key) {
return ConfigOptions.key(key).booleanType().noDefaultValue();
}

public static ConfigOption<Double> getDoubleConfigOption(String key) {
return ConfigOptions.key(key).doubleType().noDefaultValue();
}

public static ConfigOption<Float> getFloatConfigOption(String key) {
return ConfigOptions.key(key).floatType().noDefaultValue();
}

public static ConfigOption<Integer> getIntConfigOption(String key) {
return ConfigOptions.key(key).intType().noDefaultValue();
}

public static ConfigOption<Long> getLongConfigOption(String key) {
return ConfigOptions.key(key).longType().noDefaultValue();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.Properties;
import java.util.Set;

import static org.apache.flink.configuration.ConfigurationUtils.getFloatConfigOption;
import static org.apache.flink.configuration.FallbackKey.createDeprecatedKey;

/**
Expand Down Expand Up @@ -183,7 +184,7 @@ public boolean getBoolean(ConfigOption<Boolean> configOption, boolean overrideDe

@Override
public float getFloat(String key, float defaultValue) {
return this.backingConfig.getFloat(this.prefix + key, defaultValue);
return this.backingConfig.get(getFloatConfigOption(this.prefix + key), defaultValue);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import java.io.IOException;
import java.nio.file.Path;

import static org.apache.flink.configuration.ConfigurationUtils.getLongConfigOption;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Fail.fail;

Expand Down Expand Up @@ -67,7 +68,7 @@ void testCreateInputSplitsWithOneFile() throws IOException {
"test_create_input_splits_with_one_file", blockSize, numBlocks);

final Configuration config = new Configuration();
config.setLong("input.block_size", blockSize + 10);
config.set(getLongConfigOption("input.block_size"), blockSize + 10L);

final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat();
inputFormat.setFilePath(tempFile.toURI().toString());
Expand Down Expand Up @@ -184,7 +185,7 @@ void testCreateInputSplitsWithEmptySplit() throws IOException {
"test_create_input_splits_with_empty_split", blockSize, numBlocks);

final Configuration config = new Configuration();
config.setLong("input.block_size", blockSize + 10);
config.set(getLongConfigOption("input.block_size"), blockSize + 10L);

final BinaryInputFormat<Record> inputFormat = new MyBinaryInputFormat();
inputFormat.setFilePath(tempFile.toURI().toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import java.io.File;
import java.io.IOException;

import static org.apache.flink.configuration.ConfigurationUtils.getBooleanConfigOption;
import static org.assertj.core.api.Assertions.assertThat;

class EnumerateNestedFilesTest {
Expand Down Expand Up @@ -63,7 +64,7 @@ void testNoNestedDirectoryTrue() throws IOException {
String filePath = TestFileUtils.createTempFile("foo");

this.format.setFilePath(new Path(filePath));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

FileInputSplit[] splits = format.createInputSplits(1);
Expand All @@ -85,7 +86,7 @@ void testOneNestedDirectoryTrue() throws IOException {
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua");

this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

FileInputSplit[] splits = format.createInputSplits(1);
Expand All @@ -107,7 +108,7 @@ void testOneNestedDirectoryFalse() throws IOException {
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), "fideua");

this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", false);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), false);
format.configure(this.config);

FileInputSplit[] splits = format.createInputSplits(1);
Expand Down Expand Up @@ -135,7 +136,7 @@ void testTwoNestedDirectoriesTrue() throws IOException {
TestFileUtils.createTempFileInDirectory(nestedNestedDir.getAbsolutePath(), "bravas");

this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

FileInputSplit[] splits = format.createInputSplits(1);
Expand Down Expand Up @@ -165,7 +166,7 @@ void testOnlyLevel2NestedDirectories() throws IOException {
TestFileUtils.createTempFileInDirectory(nestedNestedDir2.getAbsolutePath(), "bravas");

this.format.setFilePath(new Path(testDir.getAbsolutePath()));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

FileInputSplit[] splits = format.createInputSplits(1);
Expand Down Expand Up @@ -208,7 +209,7 @@ void testTwoNestedDirectoriesWithFilteredFilesTrue() throws IOException {
nestedNestedDirFiltered.getAbsolutePath(), "bravas");

this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

FileInputSplit[] splits = format.createInputSplits(1);
Expand All @@ -229,7 +230,7 @@ void testGetStatisticsOneFileInNestedDir() throws IOException {
TestFileUtils.createTempFileInDirectory(insideNestedDir.getAbsolutePath(), SIZE);

this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

BaseStatistics stats = format.getStatistics(null);
Expand Down Expand Up @@ -262,7 +263,7 @@ void testGetStatisticsMultipleNestedFiles() throws IOException, InterruptedExcep
TestFileUtils.createTempFileInDirectory(insideNestedDir2.getAbsolutePath(), SIZE4);

this.format.setFilePath(new Path(nestedDir.toURI().toString()));
this.config.setBoolean("recursive.file.enumeration", true);
this.config.set(getBooleanConfigOption("recursive.file.enumeration"), true);
format.configure(this.config);

BaseStatistics stats = format.getStatistics(null);
Expand Down
Loading

0 comments on commit 740a567

Please sign in to comment.