Skip to content

Commit

Permalink
[FLINK-37247][FileSystems][Tests] Implement common Hadoop file system…
Browse files Browse the repository at this point in the history
… integration tests for GCS.
  • Loading branch information
cnauroth committed Feb 3, 2025
1 parent 795ec7f commit e7c77da
Show file tree
Hide file tree
Showing 9 changed files with 367 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
org.apache.flink.fs.gs.HadoopGSFileSystemBehaviorITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.fs.gs.HadoopGSFileSystemITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.fs.gs.HadoopGSRecoverableWriterExceptionITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
org.apache.flink.fs.gs.HadoopGSRecoverableWriterITCase does not satisfy: only one of the following predicates match:\
* reside in a package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type InternalMiniClusterExtension and annotated with @RegisterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and contain any fields that are static, final, and of type MiniClusterExtension and annotated with @RegisterExtension or are , and of type MiniClusterTestEnvironment and annotated with @TestEnv\
* reside in a package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class InternalMiniClusterExtension\
* reside outside of package 'org.apache.flink.runtime.*' and is annotated with @ExtendWith with class MiniClusterExtension\
or contain any fields that are public, static, and of type MiniClusterWithClientResource and final and annotated with @ClassRule or contain any fields that is of type MiniClusterWithClientResource and public and final and not static and annotated with @Rule
17 changes: 17 additions & 0 deletions flink-filesystems/flink-gs-fs-hadoop/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,23 @@ under the License.
<scope>test</scope>
</dependency>

<!-- for the behavior test suite -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
<version>${project.version}</version>
<scope>test</scope>
<type>test-jar</type>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-hadoop-fs</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.FileSystemBehaviorTestSuite;
import org.apache.flink.core.fs.FileSystemKind;
import org.apache.flink.core.fs.Path;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

/**
* An implementation of the {@link FileSystemBehaviorTestSuite} for the GCS file system.
*/
@ExtendWith(RequireGCSConfiguration.class)
class HadoopGSFileSystemBehaviorITCase extends FileSystemBehaviorTestSuite {

@BeforeAll
static void setup() {
FileSystem.initialize(new Configuration());
}

@Override
protected FileSystem getFileSystem() throws Exception {
return getBasePath().getFileSystem();
}

@Override
protected Path getBasePath() throws Exception {
return RequireGCSConfiguration.getBasePath();
}

@Override
protected FileSystemKind getFileSystemKind() {
return FileSystemKind.OBJECT_STORE;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.runtime.fs.hdfs.AbstractHadoopFileSystemITTest;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration tests for GCS file system support.
*/
@ExtendWith(RequireGCSConfiguration.class)
class HadoopGSFileSystemITCase extends AbstractHadoopFileSystemITTest {

@BeforeAll
static void setup() throws IOException {
basePath = RequireGCSConfiguration.getBasePath();
FileSystem.initialize(new Configuration());
fs = basePath.getFileSystem();
assertThat(fs.exists(basePath)).isFalse();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.fs.hdfs.AbstractHadoopRecoverableWriterExceptionITCase;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Integration tests for exception handling in
* {@link org.apache.flink.fs.gs.writer.GSRecoverableWriter}.
*/
@ExtendWith(RequireGCSConfiguration.class)
class HadoopGSRecoverableWriterExceptionITCase
extends AbstractHadoopRecoverableWriterExceptionITCase {

@BeforeAll
static void setup() throws IOException {
basePath = RequireGCSConfiguration.getBasePath();
FileSystem.initialize(new Configuration());
assertThat(basePath.getFileSystem().exists(basePath)).isFalse();
skipped = false;
}

@Override
protected String getLocalTmpDir() throws Exception {
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.RecoverableWriter;
import org.apache.flink.runtime.fs.hdfs.AbstractHadoopRecoverableWriterITCase;

import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.extension.ExtendWith;

import java.io.IOException;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.fail;

/**
* Integration tests for {@link org.apache.flink.fs.gs.writer.GSRecoverableWriter}.
*/
@ExtendWith(RequireGCSConfiguration.class)
class HadoopGSRecoverableWriterITCase extends AbstractHadoopRecoverableWriterITCase {

@BeforeAll
static void setup() throws IOException {
basePath = RequireGCSConfiguration.getBasePath();
FileSystem.initialize(new Configuration());
assertThat(basePath.getFileSystem().exists(basePath)).isFalse();
bigDataChunk = createBigDataChunk(BIG_CHUNK_DATA_PATTERN, 7L << 20);
skipped = false;
}

@Override
@Disabled("This test is not meaningful. GSRecoverableWriter#cleanupRecoverableState is no-op.")
public void testCleanupRecoverableState() {
}

@Override
@Disabled("This test assumes one file for state, but GSRecoverableWriter uses multiple files.")
public void testCallingDeleteObjectTwiceDoesNotThroughException() {
}

@Override
protected String getLocalTmpDir() throws Exception {
return null;
}

@Override
protected String getIncompleteObjectName(RecoverableWriter.ResumeRecoverable recoverable) {
fail("GCS integration tests should not call this method, because incomplete state may be " +
"stored at multiple paths.");
return null;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.fs.gs;

import static org.junit.Assume.assumeTrue;

import org.apache.flink.core.fs.Path;

import org.junit.jupiter.api.extension.BeforeAllCallback;
import org.junit.jupiter.api.extension.ExtensionContext;

import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.UUID;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assumptions.assumeThat;

/**
* Extension run before GCS integration tests to check configuration assumptions. The GCS integration
* tests require the GOOGLE_APPLICATION_CREDENTIALS environment variable, set to a path on the local
* file system containing a JSON key file, and the GCS_BASE_PATH environment variable, set to a gs://
* path in a GCS bucket.
*/
public class RequireGCSConfiguration implements BeforeAllCallback {

private static Path basePath;

@Override
public void beforeAll(ExtensionContext context) throws Exception {
String credentialsEnv = System.getenv("GOOGLE_APPLICATION_CREDENTIALS");
assumeThat(credentialsEnv)
.describedAs("GCS integration tests require GOOGLE_APPLICATION_CREDENTIALS " +
"environment variable with path to key file.")
.isNotBlank();

java.nio.file.Path credentialsFile = Paths.get(credentialsEnv);
assertThat(credentialsFile)
.describedAs(String.format("GOOGLE_APPLICATION_CREDENTIALS environment variable is " +
"set to a path (%s) that does not exist or is not readable.",
credentialsFile))
.isReadable();

assertThat(Files.isDirectory(credentialsFile))
.describedAs(String.format("GOOGLE_APPLICATION_CREDENTIALS environment variable is " +
"set to a path (%s) that is a directory.",
credentialsFile))
.isFalse();

String basePathEnv = System.getenv("GCS_BASE_PATH");
assumeThat(basePathEnv)
.describedAs("GCS integration tests require GCS_BASE_PATH " +
"environment variable with gs:// path for storing test data.")
.isNotBlank();

assertThat(basePathEnv)
.describedAs(String.format("GCS_BASE_PATH environment variable is set to a path " +
"(%s) on a different file system. The path must start with gs://.",
basePathEnv))
.startsWith("gs://");

basePath = new Path(String.format("%s/%s-%s", basePathEnv, context.getDisplayName(),
UUID.randomUUID()));
}

public static Path getBasePath() {
return basePath;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@ void prepare() throws Exception {
basePathForTest = new Path(basePath, StringUtils.getRandomString(RND, 16, 16, 'a', 'z'));

final String defaultTmpDir = getLocalTmpDir();
if (defaultTmpDir == null) {
// The suite does not use local storage. No need to create directory.
return;
}
final java.nio.file.Path path = Paths.get(defaultTmpDir);

if (!Files.exists(path)) {
Expand Down
Loading

0 comments on commit e7c77da

Please sign in to comment.