diff --git a/plugin/trino-delta-lake/pom.xml b/plugin/trino-delta-lake/pom.xml index fe83c339f4cb..7d82dbe78aee 100644 --- a/plugin/trino-delta-lake/pom.xml +++ b/plugin/trino-delta-lake/pom.xml @@ -249,12 +249,24 @@ runtime + + io.trino + trino-filesystem-cache-alluxio + runtime + + io.trino trino-memory-context runtime + + org.alluxio + alluxio-core-common + runtime + + org.jetbrains annotations @@ -279,6 +291,12 @@ test + + io.trino + trino-client + test + + io.trino trino-exchange-filesystem @@ -299,10 +317,23 @@ test + + io.trino + trino-filesystem-cache-alluxio + test-jar + test + + io.trino trino-hdfs test + + + com.qubole.rubix + rubix-presto-shaded + + diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java index 2801e8067b85..05955dc1c7e7 100644 --- a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/DeltaLakeModule.java @@ -21,9 +21,11 @@ import com.google.inject.Singleton; import com.google.inject.multibindings.Multibinder; import io.airlift.configuration.AbstractConfigurationAwareModule; +import io.trino.filesystem.cache.CacheKeyProvider; import io.trino.plugin.base.CatalogName; import io.trino.plugin.base.security.ConnectorAccessControlModule; import io.trino.plugin.base.session.SessionPropertiesProvider; +import io.trino.plugin.deltalake.cache.DeltaLakeCacheKeyProvider; import io.trino.plugin.deltalake.functions.tablechanges.TableChangesFunctionProvider; import io.trino.plugin.deltalake.functions.tablechanges.TableChangesProcessorProvider; import io.trino.plugin.deltalake.procedure.DropExtendedStatsProcedure; @@ -148,6 +150,8 @@ public void setup(Binder binder) newSetBinder(binder, ConnectorTableFunction.class).addBinding().toProvider(TableChangesFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(FunctionProvider.class).to(DeltaLakeFunctionProvider.class).in(Scopes.SINGLETON); binder.bind(TableChangesProcessorProvider.class).in(Scopes.SINGLETON); + + newOptionalBinder(binder, CacheKeyProvider.class).setBinding().to(DeltaLakeCacheKeyProvider.class).in(Scopes.SINGLETON); } @Singleton diff --git a/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java new file mode 100644 index 000000000000..9f62e38e4235 --- /dev/null +++ b/plugin/trino-delta-lake/src/main/java/io/trino/plugin/deltalake/cache/DeltaLakeCacheKeyProvider.java @@ -0,0 +1,36 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake.cache; + +import io.trino.filesystem.TrinoInputFile; +import io.trino.filesystem.cache.CacheKeyProvider; + +import java.util.Optional; + +public class DeltaLakeCacheKeyProvider + implements CacheKeyProvider +{ + /** + * Get the cache key of a TrinoInputFile. Returns Optional.empty() if the file is not cacheable. + */ + @Override + public Optional getCacheKey(TrinoInputFile delegate) + { + // TODO: Consider caching of the Parquet checkpoint files within _delta_log + if (!delegate.location().path().contains("/_delta_log/")) { + return Optional.of(delegate.location().path()); + } + return Optional.empty(); + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java new file mode 100644 index 000000000000..6fba7873a6b4 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheFileOperations.java @@ -0,0 +1,389 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.HashMultiset; +import com.google.common.collect.ImmutableMultiset; +import com.google.common.collect.Multiset; +import io.trino.Session; +import io.trino.filesystem.TrackingFileSystemFactory; +import io.trino.filesystem.TrackingFileSystemFactory.OperationType; +import io.trino.filesystem.TrinoFileSystemFactory; +import io.trino.filesystem.alluxio.AlluxioFileSystemCacheConfig; +import io.trino.filesystem.alluxio.AlluxioFileSystemCacheModule; +import io.trino.filesystem.alluxio.TestingAlluxioFileSystemCache; +import io.trino.filesystem.cache.CacheFileSystemFactory; +import io.trino.filesystem.cache.CachingHostAddressProvider; +import io.trino.filesystem.cache.NoneCachingHostAddressProvider; +import io.trino.filesystem.hdfs.HdfsFileSystemFactory; +import io.trino.plugin.deltalake.cache.DeltaLakeCacheKeyProvider; +import io.trino.testing.AbstractTestQueryFramework; +import io.trino.testing.DistributedQueryRunner; +import org.intellij.lang.annotations.Language; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; + +import java.io.File; +import java.nio.file.Path; +import java.util.Map; +import java.util.Optional; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import static com.google.inject.Scopes.SINGLETON; +import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_GET_LENGTH; +import static io.trino.filesystem.TrackingFileSystemFactory.OperationType.INPUT_FILE_NEW_STREAM; +import static io.trino.filesystem.alluxio.TestingAlluxioFileSystemCache.OperationType.CACHE_READ; +import static io.trino.filesystem.alluxio.TestingAlluxioFileSystemCache.OperationType.EXTERNAL_READ; +import static io.trino.plugin.base.util.Closables.closeAllSuppress; +import static io.trino.plugin.deltalake.TestDeltaLakeAlluxioCacheFileOperations.FileType.CDF_DATA; +import static io.trino.plugin.deltalake.TestDeltaLakeAlluxioCacheFileOperations.FileType.CHECKPOINT; +import static io.trino.plugin.deltalake.TestDeltaLakeAlluxioCacheFileOperations.FileType.DATA; +import static io.trino.plugin.deltalake.TestDeltaLakeAlluxioCacheFileOperations.FileType.LAST_CHECKPOINT; +import static io.trino.plugin.deltalake.TestDeltaLakeAlluxioCacheFileOperations.FileType.TRANSACTION_LOG_JSON; +import static io.trino.plugin.deltalake.TestDeltaLakeAlluxioCacheFileOperations.FileType.TRINO_EXTENDED_STATS_JSON; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_ENVIRONMENT; +import static io.trino.plugin.hive.HiveTestUtils.HDFS_FILE_SYSTEM_STATS; +import static io.trino.testing.MultisetAssertions.assertMultisetsEqual; +import static io.trino.testing.TestingSession.testSessionBuilder; +import static java.util.Collections.nCopies; +import static java.util.Objects.requireNonNull; +import static java.util.stream.Collectors.toCollection; + +// single-threaded AccessTrackingFileSystemFactory is shared mutable state +@Execution(ExecutionMode.SAME_THREAD) +public class TestDeltaLakeAlluxioCacheFileOperations + extends AbstractTestQueryFramework +{ + private TrackingFileSystemFactory trackingFileSystemFactory; + private TestingAlluxioFileSystemCache alluxioFileSystemCache; + + @Override + protected DistributedQueryRunner createQueryRunner() + throws Exception + { + Session session = testSessionBuilder() + .setCatalog("delta_lake") + .setSchema("default") + .build(); + DistributedQueryRunner queryRunner = DistributedQueryRunner.builder(session) + .build(); + try { + File metastoreDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_metastore").toFile().getAbsoluteFile(); + trackingFileSystemFactory = new TrackingFileSystemFactory(new HdfsFileSystemFactory(HDFS_ENVIRONMENT, HDFS_FILE_SYSTEM_STATS)); + AlluxioFileSystemCacheConfig alluxioFileSystemCacheConfiguration = new AlluxioFileSystemCacheConfig() + .setCacheDirectories(metastoreDirectory.getAbsolutePath() + "/cache") + .disableTTL() + .setMaxCacheSizes("100MB"); + alluxioFileSystemCache = new TestingAlluxioFileSystemCache(AlluxioFileSystemCacheModule.getAlluxioConfiguration(alluxioFileSystemCacheConfiguration), new DeltaLakeCacheKeyProvider()); + TrinoFileSystemFactory fileSystemFactory = new CacheFileSystemFactory(trackingFileSystemFactory, alluxioFileSystemCache, alluxioFileSystemCache.getCacheKeyProvider()); + + Path dataDirectory = queryRunner.getCoordinator().getBaseDataDir().resolve("delta_lake_data"); + queryRunner.installPlugin(new TestingDeltaLakePlugin(dataDirectory, Optional.empty(), Optional.of(fileSystemFactory), binder -> binder.bind(CachingHostAddressProvider.class).to(NoneCachingHostAddressProvider.class).in(SINGLETON))); + queryRunner.createCatalog( + "delta_lake", + "delta_lake", + Map.of( + "hive.metastore", "file", + "hive.metastore.catalog.dir", metastoreDirectory.toURI().toString(), + "delta.enable-non-concurrent-writes", "true")); + + queryRunner.execute("CREATE SCHEMA " + session.getSchema().orElseThrow()); + return queryRunner; + } + catch (Throwable e) { + closeAllSuppress(e, queryRunner); + throw e; + } + } + + @Test + public void testCacheFileOperations() + { + assertUpdate("DROP TABLE IF EXISTS test_cache_file_operations"); + assertUpdate("CREATE TABLE test_cache_file_operations(key varchar, data varchar) with (partitioned_by=ARRAY['key'])"); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p1', '1-abc')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p2', '2-xyz')", 1); + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_cache_file_operations')"); + alluxioFileSystemCache.clear(); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + // All data cached when reading parquet file footers to collect statistics when writing + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p2/"), 1) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(CACHE_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p2/"), 1) + .build()); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p3', '3-xyz')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p4', '4-xyz')", 1); + assertUpdate("INSERT INTO test_cache_file_operations VALUES ('p5', '5-xyz')", 1); + alluxioFileSystemCache.clear(); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p4/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p5/", INPUT_FILE_NEW_STREAM), 1) + // All data cached when reading parquet file footers to collect statistics when writing + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p2/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p3/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p4/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p5/"), 1) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_cache_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000000.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000001.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000002.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000004.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", INPUT_FILE_NEW_STREAM), 1) + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(CACHE_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p2/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p3/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p4/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p5/"), 1) + .build()); + } + + @Test + public void testCacheCheckpointFileOperations() + { + assertUpdate("DROP TABLE IF EXISTS test_checkpoint_file_operations"); + assertUpdate("CREATE TABLE test_checkpoint_file_operations(key varchar, data varchar) with (checkpoint_interval = 2, partitioned_by=ARRAY['key'])"); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p1', '1-abc')", 1); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p2', '2-xyz')", 1); + assertUpdate("CALL system.flush_metadata_cache(schema_name => CURRENT_SCHEMA, table_name => 'test_checkpoint_file_operations')"); + alluxioFileSystemCache.clear(); + assertFileSystemAccesses( + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + // All data cached when reading parquet file footers to collect statistics when writing + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p2/"), 1) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000003.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000002.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(CACHE_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p2/"), 1) + .build()); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p3', '3-xyz')", 1); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p4', '4-xyz')", 1); + assertUpdate("INSERT INTO test_checkpoint_file_operations VALUES ('p5', '5-xyz')", 1); + alluxioFileSystemCache.clear(); + assertFileSystemAccesses( + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000004.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000004.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) + .addCopies(new FileOperation(DATA, "key=p1/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p2/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p3/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p4/", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(DATA, "key=p5/", INPUT_FILE_NEW_STREAM), 1) + // All data cached when reading parquet file footers to collect statistics when writing + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p2/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p3/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p4/"), 1) + .addCopies(new CacheOperation(EXTERNAL_READ, "key=p5/"), 1) + .build()); + assertFileSystemAccesses( + "SELECT * FROM test_checkpoint_file_operations", + ImmutableMultiset.builder() + .addCopies(new FileOperation(LAST_CHECKPOINT, "_last_checkpoint", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000005.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(TRANSACTION_LOG_JSON, "00000000000000000006.json", INPUT_FILE_NEW_STREAM), 1) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000004.checkpoint.parquet", INPUT_FILE_NEW_STREAM), 2) + .addCopies(new FileOperation(CHECKPOINT, "00000000000000000004.checkpoint.parquet", INPUT_FILE_GET_LENGTH), 4) + .build(), + ImmutableMultiset.builder() + .addCopies(new CacheOperation(CACHE_READ, "key=p1/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p2/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p3/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p4/"), 1) + .addCopies(new CacheOperation(CACHE_READ, "key=p5/"), 1) + .build()); + } + + private void assertFileSystemAccesses(@Language("SQL") String query, Multiset expectedAccesses, Multiset expectedCacheAccesses) + { + assertUpdate("CALL system.flush_metadata_cache()"); + DistributedQueryRunner queryRunner = getDistributedQueryRunner(); + trackingFileSystemFactory.reset(); + alluxioFileSystemCache.reset(); + queryRunner.executeWithPlan(queryRunner.getDefaultSession(), query); + assertMultisetsEqual(getOperations(), expectedAccesses); + assertMultisetsEqual(getCacheOperations(), expectedCacheAccesses); + } + + private Multiset getCacheOperations() + { + return alluxioFileSystemCache.getOperationCounts() + .entrySet().stream() + .filter(entry -> { + String path = entry.getKey().location().path(); + return !path.endsWith(".trinoSchema") && !path.contains(".trinoPermissions"); + }) + .flatMap(entry -> nCopies((int) entry.getValue().stream().filter(l -> l > 0).count(), CacheOperation.create( + entry.getKey().type(), + entry.getKey().location().path())).stream()) + .collect(toCollection(HashMultiset::create)); + } + + private static Pattern dataFilePattern = Pattern.compile(".*?/(?key=[^/]*/)?(?\\d{8}_\\d{6}_\\d{5}_\\w{5})_(?[0-9a-f]{8}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{4}-[0-9a-f]{12})"); + + private record CacheOperation(TestingAlluxioFileSystemCache.OperationType type, String fileId) + { + public static CacheOperation create(TestingAlluxioFileSystemCache.OperationType operationType, String path) + { + String fileName = path.replaceFirst(".*/", ""); + if (!path.contains("_delta_log") && !path.contains("/.trino")) { + Matcher matcher = dataFilePattern.matcher(path); + if (matcher.matches()) { + return new CacheOperation(operationType, matcher.group("partition")); + } + } + else { + return new CacheOperation(operationType, fileName); + } + throw new IllegalArgumentException("File not recognized: " + path); + } + } + + private Multiset getOperations() + { + return trackingFileSystemFactory.getOperationCounts() + .entrySet().stream() + .filter(entry -> { + String path = entry.getKey().location().path(); + return !path.endsWith(".trinoSchema") && !path.contains(".trinoPermissions"); + }) + .flatMap(entry -> nCopies(entry.getValue(), FileOperation.create( + entry.getKey().location().path(), + entry.getKey().operationType())).stream()) + .collect(toCollection(HashMultiset::create)); + } + + private record FileOperation(FileType fileType, String fileId, OperationType operationType) + { + public static FileOperation create(String path, OperationType operationType) + { + String fileName = path.replaceFirst(".*/", ""); + if (path.matches(".*/_delta_log/_last_checkpoint")) { + return new FileOperation(LAST_CHECKPOINT, fileName, operationType); + } + if (path.matches(".*/_delta_log/\\d+\\.json")) { + return new FileOperation(TRANSACTION_LOG_JSON, fileName, operationType); + } + if (path.matches(".*/_delta_log/\\d+\\.checkpoint.parquet")) { + return new FileOperation(CHECKPOINT, fileName, operationType); + } + if (path.matches(".*/_delta_log/_trino_meta/extended_stats.json")) { + return new FileOperation(TRINO_EXTENDED_STATS_JSON, fileName, operationType); + } + if (path.matches(".*/_change_data/.*")) { + Matcher matcher = dataFilePattern.matcher(path); + if (matcher.matches()) { + return new FileOperation(CDF_DATA, matcher.group("partition"), operationType); + } + } + if (!path.contains("_delta_log")) { + Matcher matcher = dataFilePattern.matcher(path); + if (matcher.matches()) { + return new FileOperation(DATA, matcher.group("partition"), operationType); + } + } + throw new IllegalArgumentException("File not recognized: " + path); + } + + public FileOperation + { + requireNonNull(fileType, "fileType is null"); + requireNonNull(fileId, "fileId is null"); + requireNonNull(operationType, "operationType is null"); + } + } + + enum FileType + { + LAST_CHECKPOINT, + TRANSACTION_LOG_JSON, + CHECKPOINT, + TRINO_EXTENDED_STATS_JSON, + DATA, + CDF_DATA, + /**/; + } +} diff --git a/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest.java b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest.java new file mode 100644 index 000000000000..265cf92488d7 --- /dev/null +++ b/plugin/trino-delta-lake/src/test/java/io/trino/plugin/deltalake/TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest.java @@ -0,0 +1,73 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.plugin.deltalake; + +import com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.BeforeAll; + +import java.io.IOException; +import java.io.UncheckedIOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.Comparator; +import java.util.Iterator; +import java.util.Map; +import java.util.stream.Stream; + +/** + * Delta Lake connector smoke test exercising Hive metastore and MinIO storage with Alluxio caching. + */ +public class TestDeltaLakeAlluxioCacheMinioAndHmsConnectorSmokeTest + extends TestDeltaLakeMinioAndHmsConnectorSmokeTest +{ + private Path cacheDirectory; + + @BeforeAll + @Override + public void init() + throws Exception + { + cacheDirectory = Files.createTempDirectory("cache"); + super.init(); + } + + @AfterAll + @Override + public void cleanUp() + { + try (Stream walk = Files.walk(cacheDirectory)) { + Iterator iterator = walk.sorted(Comparator.reverseOrder()).iterator(); + while (iterator.hasNext()) { + Path path = iterator.next(); + Files.delete(path); + } + } + catch (IOException e) { + throw new UncheckedIOException(e); + } + super.cleanUp(); + } + + @Override + protected Map deltaStorageConfiguration() + { + return ImmutableMap.builder() + .putAll(super.deltaStorageConfiguration()) + .put("fs.cache", "alluxio") + .put("fs.cache.directories", cacheDirectory.toAbsolutePath().toString()) + .put("fs.cache.max-sizes", "100MB") + .buildOrThrow(); + } +} diff --git a/pom.xml b/pom.xml index b7a69348a584..c3a1534a8fd7 100644 --- a/pom.xml +++ b/pom.xml @@ -1157,6 +1157,13 @@ ${project.version} + + io.trino + trino-hdfs + ${project.version} + test-jar + + io.trino trino-hive diff --git a/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java b/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java index 3b271972ed16..dfdbc6bd95a9 100644 --- a/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java +++ b/testing/trino-product-tests-groups/src/main/java/io/trino/tests/product/TestGroups.java @@ -92,6 +92,7 @@ public final class TestGroups public static final String DELTA_LAKE_DATABRICKS_113 = "delta-lake-databricks-113"; public static final String DELTA_LAKE_DATABRICKS_122 = "delta-lake-databricks-122"; public static final String DELTA_LAKE_EXCLUDE_91 = "delta-lake-exclude-91"; + public static final String DELTA_LAKE_ALLUXIO_CACHING = "delta-lake-alluxio-caching"; public static final String HUDI = "hudi"; public static final String PARQUET = "parquet"; public static final String IGNITE = "ignite"; diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeMinioDataLakeCaching.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeMinioDataLakeCaching.java new file mode 100644 index 000000000000..ad70f50d8340 --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/env/environment/EnvMultinodeMinioDataLakeCaching.java @@ -0,0 +1,51 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.launcher.env.environment; + +import com.google.common.collect.ImmutableMap; +import com.google.inject.Inject; +import io.trino.tests.product.launcher.docker.DockerFiles; +import io.trino.tests.product.launcher.env.Environment; +import io.trino.tests.product.launcher.env.EnvironmentProvider; +import io.trino.tests.product.launcher.env.common.Hadoop; +import io.trino.tests.product.launcher.env.common.Minio; +import io.trino.tests.product.launcher.env.common.StandardMultinode; +import io.trino.tests.product.launcher.env.common.TestsEnvironment; + +import static io.trino.tests.product.launcher.env.common.Standard.CONTAINER_TRINO_ETC; +import static org.testcontainers.utility.MountableFile.forHostPath; + +@TestsEnvironment +public final class EnvMultinodeMinioDataLakeCaching + extends EnvironmentProvider +{ + private static final String CONTAINER_TRINO_DELTA_LAKE_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta.properties"; + private static final String CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES = CONTAINER_TRINO_ETC + "/catalog/delta_non_cached.properties"; + private final DockerFiles.ResourceProvider configDir; + + @Inject + public EnvMultinodeMinioDataLakeCaching(StandardMultinode standardMultinode, Hadoop hadoop, Minio minio, DockerFiles dockerFiles) + { + super(standardMultinode, hadoop, minio); + this.configDir = dockerFiles.getDockerFilesHostDirectory("conf/environment"); + } + + @Override + public void extendEnvironment(Environment.Builder builder) + { + builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_NON_CACHED_PROPERTIES); + builder.addConnector("delta_lake", forHostPath(configDir.getPath("multinode-minio-data-lake-cached/delta.properties")), CONTAINER_TRINO_DELTA_LAKE_PROPERTIES); + builder.configureContainers(container -> container.withTmpFs(ImmutableMap.of("/tmp/cache", "rw"))); + } +} diff --git a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeOss.java b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeOss.java index fc3e0e47a30e..73f6e36f9eb3 100644 --- a/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeOss.java +++ b/testing/trino-product-tests-launcher/src/main/java/io/trino/tests/product/launcher/suite/suites/SuiteDeltaLakeOss.java @@ -16,6 +16,7 @@ import com.google.common.collect.ImmutableList; import io.trino.tests.product.launcher.env.EnvironmentConfig; import io.trino.tests.product.launcher.env.environment.EnvMultinodeMinioDataLake; +import io.trino.tests.product.launcher.env.environment.EnvMultinodeMinioDataLakeCaching; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeKerberizedHdfs; import io.trino.tests.product.launcher.env.environment.EnvSinglenodeDeltaLakeOss; import io.trino.tests.product.launcher.suite.Suite; @@ -24,6 +25,7 @@ import java.util.List; import static io.trino.tests.product.TestGroups.CONFIGURED_FEATURES; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_ALLUXIO_CACHING; import static io.trino.tests.product.TestGroups.DELTA_LAKE_HDFS; import static io.trino.tests.product.TestGroups.DELTA_LAKE_MINIO; import static io.trino.tests.product.TestGroups.DELTA_LAKE_OSS; @@ -48,6 +50,10 @@ public List getTestRuns(EnvironmentConfig config) // TODO: make the list of tests run here as close to those run on SinglenodeDeltaLakeDatabricks // e.g. replace `delta-lake-oss` group with `delta-lake-databricks` + any exclusions, of needed .withGroups(CONFIGURED_FEATURES, DELTA_LAKE_OSS) + .build(), + + testOnEnvironment(EnvMultinodeMinioDataLakeCaching.class) + .withGroups(CONFIGURED_FEATURES, DELTA_LAKE_ALLUXIO_CACHING) .build()); } } diff --git a/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-minio-data-lake-cached/delta.properties b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-minio-data-lake-cached/delta.properties new file mode 100644 index 000000000000..6b1da12198eb --- /dev/null +++ b/testing/trino-product-tests-launcher/src/main/resources/docker/presto-product-tests/conf/environment/multinode-minio-data-lake-cached/delta.properties @@ -0,0 +1,11 @@ +connector.name=delta_lake +hive.metastore.uri=thrift://hadoop-master:9083 +hive.s3.aws-access-key=minio-access-key +hive.s3.aws-secret-key=minio-secret-key +hive.s3.endpoint=http://minio:9080/ +hive.s3.path-style-access=true +hive.s3.ssl.enabled=false +delta.register-table-procedure.enabled=true +fs.cache=alluxio +fs.cache.directories=/tmp/cache/delta +fs.cache.max-disk-usage-percentages=90 diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java new file mode 100644 index 000000000000..a098fe8c334d --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/TestDeltaLakeAlluxioCaching.java @@ -0,0 +1,91 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake; + +import io.airlift.units.Duration; +import io.trino.tempto.ProductTest; +import io.trino.tests.product.deltalake.util.CachingTestUtils.CacheStats; +import org.testng.annotations.Test; + +import static io.airlift.testing.Assertions.assertGreaterThan; +import static io.airlift.testing.Assertions.assertGreaterThanOrEqual; +import static io.trino.tests.product.TestGroups.DELTA_LAKE_ALLUXIO_CACHING; +import static io.trino.tests.product.TestGroups.PROFILE_SPECIFIC_TESTS; +import static io.trino.tests.product.deltalake.util.CachingTestUtils.getCacheStats; +import static io.trino.tests.product.utils.QueryAssertions.assertEventually; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; +import static java.util.concurrent.TimeUnit.SECONDS; +import static org.assertj.core.api.Assertions.assertThat; +import static org.testng.Assert.assertEquals; + +public class TestDeltaLakeAlluxioCaching + extends ProductTest +{ + @Test(groups = {DELTA_LAKE_ALLUXIO_CACHING, PROFILE_SPECIFIC_TESTS}) + public void testReadFromCache() + { + testReadFromTable("table1"); + testReadFromTable("table2"); + } + + private void testReadFromTable(String tableNameSuffix) + { + String cachedTableName = "delta.default.test_cache_read" + tableNameSuffix; + String nonCachedTableName = "delta_non_cached.default.test_cache_read" + tableNameSuffix; + + createTestTable(cachedTableName); + + CacheStats beforeCacheStats = getCacheStats("delta"); + + long tableSize = (Long) onTrino().executeQuery("SELECT SUM(size) as size FROM (SELECT \"$path\", \"$file_size\" AS size FROM " + nonCachedTableName + " GROUP BY 1, 2)").getOnlyValue(); + + assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows(); + + assertEventually( + new Duration(20, SECONDS), + () -> { + // first query via caching catalog should fetch external data + CacheStats afterQueryCacheStats = getCacheStats("delta"); + assertGreaterThanOrEqual(afterQueryCacheStats.cacheSpaceUsed(), beforeCacheStats.cacheSpaceUsed() + tableSize); + assertGreaterThan(afterQueryCacheStats.externalReads(), beforeCacheStats.externalReads()); + assertGreaterThanOrEqual(afterQueryCacheStats.cacheReads(), beforeCacheStats.cacheReads()); + }); + + assertEventually( + new Duration(10, SECONDS), + () -> { + CacheStats beforeQueryCacheStats = getCacheStats("delta"); + + assertThat(onTrino().executeQuery("SELECT * FROM " + cachedTableName)).hasAnyRows(); + + // query via caching catalog should read exclusively from cache + CacheStats afterQueryCacheStats = getCacheStats("delta"); + assertGreaterThan(afterQueryCacheStats.cacheReads(), beforeQueryCacheStats.cacheReads()); + assertEquals(afterQueryCacheStats.externalReads(), beforeQueryCacheStats.externalReads()); + assertEquals(afterQueryCacheStats.cacheSpaceUsed(), beforeQueryCacheStats.cacheSpaceUsed()); + }); + + onTrino().executeQuery("DROP TABLE " + nonCachedTableName); + } + + /** + * Creates a table which should contain around 6 2 MB parquet files + */ + private void createTestTable(String tableName) + { + onTrino().executeQuery("DROP TABLE IF EXISTS " + tableName); + onTrino().executeQuery("SET SESSION delta.target_max_file_size = '2MB'"); + onTrino().executeQuery("CREATE TABLE " + tableName + " AS SELECT * FROM tpch.sf1.customer"); + } +} diff --git a/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/CachingTestUtils.java b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/CachingTestUtils.java new file mode 100644 index 000000000000..39e73ec24e83 --- /dev/null +++ b/testing/trino-product-tests/src/main/java/io/trino/tests/product/deltalake/util/CachingTestUtils.java @@ -0,0 +1,45 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.trino.tests.product.deltalake.util; + +import io.trino.tempto.query.QueryResult; + +import static com.google.common.collect.Iterables.getOnlyElement; +import static io.trino.tests.product.utils.QueryExecutors.onTrino; + +public final class CachingTestUtils +{ + private CachingTestUtils() {} + + public static CacheStats getCacheStats(String catalog) + { + QueryResult queryResult = onTrino().executeQuery("SELECT " + + " sum(\"cachereads.alltime.count\") as cachereads, " + + " sum(\"externalreads.alltime.count\") as externalreads " + + "FROM jmx.current.\"io.trino.filesystem.alluxio:name=" + catalog + ",type=alluxiocachestats\";"); + + double cacheReads = (Double) getOnlyElement(queryResult.rows()) + .get(queryResult.tryFindColumnIndex("cachereads").get() - 1); + + double externalReads = (Double) getOnlyElement(queryResult.rows()) + .get(queryResult.tryFindColumnIndex("externalreads").get() - 1); + + long cacheSpaceUsed = (Long) onTrino().executeQuery("SELECT sum(count) FROM " + + "jmx.current.\"org.alluxio:name=client.cachespaceusedcount,type=counters\"").getOnlyValue(); + + return new CacheStats(cacheReads, externalReads, cacheSpaceUsed); + } + + public record CacheStats(double cacheReads, double externalReads, long cacheSpaceUsed) {} +}