-
Notifications
You must be signed in to change notification settings - Fork 3.1k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Co-authored-by: Florent Delannoy <[email protected]>
- Loading branch information
Showing
12 changed files
with
745 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
36 changes: 36 additions & 0 deletions
36
...o-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,36 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.plugin.deltalake.cache; | ||
|
||
import io.trino.filesystem.TrinoInputFile; | ||
import io.trino.filesystem.cache.CacheKeyProvider; | ||
|
||
import java.util.Optional; | ||
|
||
public class DeltaLakeCacheKeyProvider | ||
implements CacheKeyProvider | ||
{ | ||
/** | ||
* Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable. | ||
*/ | ||
@Override | ||
public Optional<String> getCacheKey(TrinoInputFile delegate) | ||
{ | ||
// TODO: Consider caching of the Parquet checkpoint files within _delta_log | ||
if (!delegate.location().path().contains("/_delta_log/")) { | ||
return Optional.of(delegate.location().path()); | ||
} | ||
return Optional.empty(); | ||
} | ||
} |
389 changes: 389 additions & 0 deletions
389
...lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java
Large diffs are not rendered by default.
Oops, something went wrong.
73 changes: 73 additions & 0 deletions
73
...ava/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,73 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.plugin.deltalake; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import org.junit.jupiter.api.AfterAll; | ||
import org.junit.jupiter.api.BeforeAll; | ||
|
||
import java.io.IOException; | ||
import java.io.UncheckedIOException; | ||
import java.nio.file.Files; | ||
import java.nio.file.Path; | ||
import java.util.Comparator; | ||
import java.util.Iterator; | ||
import java.util.Map; | ||
import java.util.stream.Stream; | ||
|
||
/** | ||
* Delta Lake connector smoke test exercising Hive metastore and MinIO storage with Alluxio caching. | ||
*/ | ||
public class TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest | ||
extends TestDeltaLakeMinioAndHmsConnectorSmokeTest | ||
{ | ||
private Path cacheDirectory; | ||
|
||
@BeforeAll | ||
@Override | ||
public void init() | ||
throws Exception | ||
{ | ||
cacheDirectory = Files.createTempDirectory("cache"); | ||
super.init(); | ||
} | ||
|
||
@AfterAll | ||
@Override | ||
public void cleanUp() | ||
{ | ||
try (Stream<Path> walk = Files.walk(cacheDirectory)) { | ||
Iterator<Path> iterator = walk.sorted(Comparator.reverseOrder()).iterator(); | ||
while (iterator.hasNext()) { | ||
Path path = iterator.next(); | ||
Files.delete(path); | ||
} | ||
} | ||
catch (IOException e) { | ||
throw new UncheckedIOException(e); | ||
} | ||
super.cleanUp(); | ||
} | ||
|
||
@Override | ||
protected Map<String, String> deltaStorageConfiguration() | ||
{ | ||
return ImmutableMap.<String, String>builder() | ||
.putAll(super.deltaStorageConfiguration()) | ||
.put("fs.cache", "alluxio") | ||
.put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) | ||
.put("fs.cache.max-sizes", "100MB") | ||
.buildOrThrow(); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
51 changes: 51 additions & 0 deletions
51
...ava/io/trino/tests/product/launcher/env/environment/EnvMultinodeMinioDataLakeCaching.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,51 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.tests.product.launcher.env.environment; | ||
|
||
import com.google.common.collect.ImmutableMap; | ||
import com.google.inject.Inject; | ||
import io.trino.tests.product.launcher.docker.DockerFiles; | ||
import io.trino.tests.product.launcher.env.Environment; | ||
import io.trino.tests.product.launcher.env.EnvironmentProvider; | ||
import io.trino.tests.product.launcher.env.common.Hadoop; | ||
import io.trino.tests.product.launcher.env.common.Minio; | ||
import io.trino.tests.product.launcher.env.common.StandardMultinode; | ||
import io.trino.tests.product.launcher.env.common.TestsEnvironment; | ||
|
||
import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; | ||
import static org.testcontainers.utility.MountableFile.forHostPath; | ||
|
||
@TestsEnvironment | ||
public final class EnvMultinodeMinioDataLakeCaching | ||
extends EnvironmentProvider | ||
{ | ||
private static final String CONTAINER_TRINO_DELTA_LAKE_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta.properties"; | ||
private static final String CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta_non_cached.properties"; | ||
private final DockerFiles.ResourceProvider configDir; | ||
|
||
@Inject | ||
public EnvMultinodeMinioDataLakeCaching(StandardMultinode standardMultinode, Hadoop hadoop, Minio minio, DockerFiles dockerFiles) | ||
{ | ||
super(standardMultinode, hadoop, minio); | ||
this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment"); | ||
} | ||
|
||
@Override | ||
public void extendEnvironment(Environment.Builder builder) | ||
{ | ||
builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES); | ||
builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake-cached/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_PROPERTIES); | ||
builder.configureContainers(container -> container.withTmpFs(ImmutableMap.of("/tmp/cache", "rw"))); | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
11 changes: 11 additions & 0 deletions
11
...r/presto-product-tests/conf/environment/multinode-minio-data-lake-cached/delta.properties
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,11 @@ | ||
connector.name=delta_lake | ||
hive.metastore.uri=thrift://hadoop-master:9083 | ||
hive.s3.aws-access-key=minio-access-key | ||
hive.s3.aws-secret-key=minio-secret-key | ||
hive.s3.endpoint=http://minio:9080/ | ||
hive.s3.path-style-access=true | ||
hive.s3.ssl.enabled=false | ||
delta.register-table-procedure.enabled=true | ||
fs.cache=alluxio | ||
fs.cache.directories=/tmp/cache/delta | ||
fs.cache.max-disk-usage-percentages=90 |
91 changes: 91 additions & 0 deletions
91
...uct-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,91 @@ | ||
/* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.trino.tests.product.deltalake; | ||
|
||
import io.airlift.units.Duration; | ||
import io.trino.tempto.ProductTest; | ||
import io.trino.tests.product.deltalake.util.CachingTestUtils.CacheStats; | ||
import org.testng.annotations.Test; | ||
|
||
import static io.airlift.testing.Assertions.assertGreaterThan; | ||
import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; | ||
import static io.trino.tests.product.TestGroups.DELTA_LAKE_ALLUXIO_CACHING; | ||
import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; | ||
import static io.trino.tests.product.deltalake.util.CachingTestUtils.getCacheStats; | ||
import static io.trino.tests.product.utils.QueryAssertions.assertEventually; | ||
import static io.trino.tests.product.utils.QueryExecutors.onTrino; | ||
import static java.util.concurrent.TimeUnit.SECONDS; | ||
import static org.assertj.core.api.Assertions.assertThat; | ||
import static org.testng.Assert.assertEquals; | ||
|
||
public class TestDeltaLakeAlluxioCaching | ||
extends ProductTest | ||
{ | ||
@Test(groups = {DELTA_LAKE_ALLUXIO_CACHING, PROFILE_SPECIFIC_TESTS}) | ||
public void testReadFromCache() | ||
{ | ||
testReadFromTable("table1"); | ||
testReadFromTable("table2"); | ||
} | ||
|
||
private void testReadFromTable(String tableNameSuffix) | ||
{ | ||
String cachedTableName = "delta.default.test_cache_read" + tableNameSuffix; | ||
String nonCachedTableName = "delta_non_cached.default.test_cache_read" + tableNameSuffix; | ||
|
||
createTestTable(cachedTableName); | ||
|
||
CacheStats beforeCacheStats = getCacheStats("delta"); | ||
|
||
long tableSize = (Long) onTrino().executeQuery("SELECT SUM(size) as size FROM (SELECT \"$path\", \"$file_size\" AS size FROM " + nonCachedTableName + " GROUP BY 1, 2)").getOnlyValue(); | ||
|
||
assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows(); | ||
|
||
assertEventually( | ||
new Duration(20, SECONDS), | ||
() -> { | ||
// first query via caching catalog should fetch external data | ||
CacheStats afterQueryCacheStats = getCacheStats("delta"); | ||
assertGreaterThanOrEqual(afterQueryCacheStats.cacheSpaceUsed(), beforeCacheStats.cacheSpaceUsed() + tableSize); | ||
assertGreaterThan(afterQueryCacheStats.externalReads(), beforeCacheStats.externalReads()); | ||
assertGreaterThanOrEqual(afterQueryCacheStats.cacheReads(), beforeCacheStats.cacheReads()); | ||
}); | ||
|
||
assertEventually( | ||
new Duration(10, SECONDS), | ||
() -> { | ||
CacheStats beforeQueryCacheStats = getCacheStats("delta"); | ||
|
||
assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows(); | ||
|
||
// query via caching catalog should read exclusively from cache | ||
CacheStats afterQueryCacheStats = getCacheStats("delta"); | ||
assertGreaterThan(afterQueryCacheStats.cacheReads(), beforeQueryCacheStats.cacheReads()); | ||
assertEquals(afterQueryCacheStats.externalReads(), beforeQueryCacheStats.externalReads()); | ||
assertEquals(afterQueryCacheStats.cacheSpaceUsed(), beforeQueryCacheStats.cacheSpaceUsed()); | ||
}); | ||
|
||
onTrino().executeQuery("DROP TABLE " + nonCachedTableName); | ||
} | ||
|
||
/** | ||
* Creates a table which should contain around 6 2 MB parquet files | ||
*/ | ||
private void createTestTable(String tableName) | ||
{ | ||
onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName); | ||
onTrino().executeQuery("SET SESSION delta.target_max_file_size = '2MB'"); | ||
onTrino().executeQuery("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.customer"); | ||
} | ||
} |
Oops, something went wrong.