diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java index 170acf76..74be1f0d 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/Compactor.java @@ -25,8 +25,8 @@ public class Compactor { int dataFileCompactionTrigger = 2, mappingFileCompactionTrigger = 3; File compactionFolder; - public Compactor(String archiveLocation, KvinParquet kvinParquet) { - this.archiveLocation = archiveLocation; + public Compactor(KvinParquet kvinParquet) { + this.archiveLocation = kvinParquet.archiveLocation; this.compactionFolder = new File(archiveLocation, ".compaction"); this.kvinParquet = kvinParquet; } @@ -42,36 +42,47 @@ public void execute() throws IOException { } } - kvinParquet.writeLock.lock(); - clearCache(); - // replace existing files with compacted files - FileUtils.cleanDirectory(new File(archiveLocation, "metadata")); - for (File weekFolder : weekFolders) { - FileUtils.cleanDirectory(weekFolder); + if (!compactionFolder.exists()) { + // nothing to do, compaction was not necessary + return; + } + + try { + kvinParquet.writeLock.lock(); + clearCache(); + // replace existing files with compacted files + FileUtils.cleanDirectory(new File(archiveLocation, "metadata")); + for (File weekFolder : weekFolders) { + FileUtils.cleanDirectory(weekFolder); + } + java.nio.file.Path source = compactionFolder.toPath(); + java.nio.file.Path destination = Paths.get(archiveLocation); + Files.walk(source) + .skip(1) + .filter(p -> Files.isRegularFile(p)) + .forEach(p -> { + java.nio.file.Path dest = destination.resolve(source.relativize(p)); + try { + Files.createDirectories(dest.getParent()); + Files.move(p, dest); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + }); + } finally { + kvinParquet.writeLock.unlock(); } - java.nio.file.Path source = compactionFolder.toPath(); - java.nio.file.Path destination = Paths.get(archiveLocation); - Files.walk(source) - .skip(1) - .filter(p -> Files.isRegularFile(p)) - .forEach(p -> { - java.nio.file.Path dest = destination.resolve(source.relativize(p)); - try { - Files.createDirectories(dest.getParent()); - Files.move(p, dest); - } catch (IOException e) { - throw new UncheckedIOException(e); - } - }); - kvinParquet.writeLock.unlock(); } private void performMappingFileCompaction() throws IOException { Map>> mappingFiles = getMappingFiles(Paths.get(archiveLocation, "metadata")); if (mappingFiles.get("items").size() >= mappingFileCompactionTrigger) { - kvinParquet.readLock.lock(); - generateCompactedMappingFiles(mappingFiles, new File(compactionFolder, "metadata")); - kvinParquet.readLock.unlock(); + try { + kvinParquet.readLock.lock(); + generateCompactedMappingFiles(mappingFiles, new File(compactionFolder, "metadata")); + } finally { + kvinParquet.readLock.unlock(); + } } } @@ -139,50 +150,53 @@ private ParquetReader getParquetDataReader(HadoopInputFile fi } private void compactDataFiles(File weekFolder) throws IOException { - kvinParquet.readLock.lock(); - List dataFiles = Files.walk(weekFolder.toPath(), 1) - .skip(1) - .filter(path -> path.getFileName().toString().startsWith("data_")) - .collect(Collectors.toList()); - - java.nio.file.Path targetFolder = compactionFolder.toPath().resolve( - Paths.get(archiveLocation).relativize(weekFolder.toPath())); - - Path compactionFile = new Path(targetFolder.toAbsolutePath().toString(), "data__1.parquet"); - ParquetWriter compactionFileWriter = getParquetDataWriter(compactionFile); - - PriorityQueue>> nextTuples = - new PriorityQueue<>(Comparator.comparing(Pair::getFirst)); - for (java.nio.file.Path dataFile : dataFiles) { - ParquetReader reader = getParquetDataReader( - HadoopInputFile.fromPath(new Path(dataFile.toString()), new Configuration())); - KvinTupleInternal tuple = reader.read(); - if (tuple != null) { - nextTuples.add(new Pair<>(tuple, reader)); - } else { - try { - reader.close(); - } catch (IOException e) { + try { + kvinParquet.readLock.lock(); + List dataFiles = Files.walk(weekFolder.toPath(), 1) + .skip(1) + .filter(path -> path.getFileName().toString().startsWith("data_")) + .collect(Collectors.toList()); + + java.nio.file.Path targetFolder = compactionFolder.toPath().resolve( + Paths.get(archiveLocation).relativize(weekFolder.toPath())); + + Path compactionFile = new Path(targetFolder.toAbsolutePath().toString(), "data__1.parquet"); + ParquetWriter compactionFileWriter = getParquetDataWriter(compactionFile); + + PriorityQueue>> nextTuples = + new PriorityQueue<>(Comparator.comparing(Pair::getFirst)); + for (java.nio.file.Path dataFile : dataFiles) { + ParquetReader reader = getParquetDataReader( + HadoopInputFile.fromPath(new Path(dataFile.toString()), new Configuration())); + KvinTupleInternal tuple = reader.read(); + if (tuple != null) { + nextTuples.add(new Pair<>(tuple, reader)); + } else { + try { + reader.close(); + } catch (IOException e) { + } } } - } - while (!nextTuples.isEmpty()) { - var pair = nextTuples.poll(); - compactionFileWriter.write(pair.getFirst()); - KvinTupleInternal tuple = pair.getSecond().read(); - if (tuple != null) { - nextTuples.add(new Pair<>(tuple, pair.getSecond())); - } else { - try { - pair.getSecond().close(); - } catch (IOException e) { + while (!nextTuples.isEmpty()) { + var pair = nextTuples.poll(); + compactionFileWriter.write(pair.getFirst()); + KvinTupleInternal tuple = pair.getSecond().read(); + if (tuple != null) { + nextTuples.add(new Pair<>(tuple, pair.getSecond())); + } else { + try { + pair.getSecond().close(); + } catch (IOException e) { + } } } - } - compactionFileWriter.close(); - kvinParquet.readLock.unlock(); + compactionFileWriter.close(); + } finally { + kvinParquet.readLock.unlock(); + } } private void clearCache() { diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java index 3f2c09db..803db215 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/parquet/KvinParquet.java @@ -14,12 +14,19 @@ import net.enilink.komma.core.URIs; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.parquet.HadoopReadOptions; +import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.avro.AvroParquetReader; import org.apache.parquet.filter2.compat.FilterCompat; import org.apache.parquet.filter2.predicate.FilterApi; import org.apache.parquet.filter2.predicate.FilterPredicate; +import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetReader; import org.apache.parquet.hadoop.ParquetWriter; +import org.apache.parquet.hadoop.metadata.BlockMetaData; +import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; +import org.apache.parquet.hadoop.metadata.ColumnPath; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.io.api.Binary; import org.slf4j.Logger; @@ -27,6 +34,7 @@ import java.io.File; import java.io.IOException; +import java.io.UncheckedIOException; import java.math.BigDecimal; import java.math.BigInteger; import java.nio.ByteBuffer; @@ -63,9 +71,6 @@ public class KvinParquet implements Kvin { Cache metaCache = CacheBuilder.newBuilder().maximumSize(10000).build(); String archiveLocation; - long itemIdCounter = 0, propertyIdCounter = 0, contextIdCounter = 0; // global id counter - WriteContext writeContext = new WriteContext(); - public KvinParquet(String archiveLocation) { this.archiveLocation = archiveLocation; if (!this.archiveLocation.endsWith("/")) { @@ -86,6 +91,41 @@ private IdMapping fetchMappingIds(Path mappingFile, FilterPredicate filter) thro return id; } + private void readMaxIds(WriteContext writeContext, java.nio.file.Path metadataPath) throws IOException { + Map>> mappingFiles = getMappingFiles(metadataPath); + ColumnPath idPath = ColumnPath.get("id"); + for (Map.Entry>> entry : mappingFiles.entrySet()) { + long maxId = 0L; + for (Pair mappingFile : entry.getValue()) { + HadoopInputFile inputFile = getFile(new Path(metadataPath.resolve(mappingFile.getFirst()).toString())); + ParquetReadOptions readOptions = HadoopReadOptions + .builder(inputFile.getConfiguration(), inputFile.getPath()) + .build(); + + ParquetMetadata meta = ParquetFileReader.readFooter(inputFile, readOptions, inputFile.newStream()); + for (BlockMetaData blockMeta : meta.getBlocks()) { + for (ColumnChunkMetaData columnMeta : blockMeta.getColumns()) { + if (columnMeta.getPath().equals(idPath)) { + // get max id from statistics + maxId = Math.max(maxId, ((Number) columnMeta.getStatistics().genericGetMax()).longValue()); + } + } + } + } + switch(entry.getKey()) { + case "items": + writeContext.itemIdCounter = maxId; + break; + case "properties": + writeContext.propertyIdCounter = maxId; + break; + case "contexts": + writeContext.contextIdCounter = maxId; + break; + } + } + } + private HadoopInputFile getFile(Path path) { HadoopInputFile inputFile; synchronized (inputFileCache) { @@ -129,6 +169,13 @@ public void put(Iterable tuples) { private void putInternal(Iterable tuples) throws IOException { try { writeLock.lock(); + java.nio.file.Path metadataPath = Paths.get(archiveLocation, "metadata"); + WriteContext writeContext = new WriteContext(); + writeContext.hasExistingData = Files.exists(metadataPath); + if (writeContext.hasExistingData) { + readMaxIds(writeContext, metadataPath); + } + Map writers = new HashMap<>(); java.nio.file.Path tempPath = Paths.get(archiveLocation, ".tmp"); @@ -186,8 +233,8 @@ private void putInternal(Iterable tuples) throws IOException { internalTuple.setValueObject(null); } writerState.writer.write(internalTuple); - writerState.minMax[0] = Math.min(writerState.minMax[0], itemIdCounter); - writerState.minMax[1] = Math.max(writerState.minMax[1], itemIdCounter); + writerState.minMax[0] = Math.min(writerState.minMax[0], writeContext.itemIdCounter); + writerState.minMax[1] = Math.max(writerState.minMax[1], writeContext.itemIdCounter); } for (WriterState state : writers.values()) { @@ -295,7 +342,6 @@ private void putInternal(Iterable tuples) throws IOException { } java.nio.file.Path tempMetadataPath = tempPath.resolve("metadata"); - java.nio.file.Path metadataPath = Paths.get(archiveLocation, "metadata"); Files.createDirectories(metadataPath); Map>> newMappingFiles = getMappingFiles(tempMetadataPath); Map>> existingMappingFiles = getMappingFiles(metadataPath); @@ -315,6 +361,11 @@ private void putInternal(Iterable tuples) throws IOException { // clear cache with meta data metaCache.invalidateAll(); + + // invalidate id caches - TODO could be improved by directly updating the caches + itemIdCache.invalidateAll(); + propertyIdCache.invalidateAll(); + contextIdCache.invalidateAll(); } finally { writeLock.unlock(); } @@ -328,13 +379,19 @@ private Calendar getDate(long timestamp) { return calendar; } - private byte[] generateId(KvinTuple currentTuple, + private byte[] generateId(KvinTuple tuple, WriteContext writeContext, ParquetWriter itemMappingWriter, ParquetWriter propertyMappingWriter, ParquetWriter contextMappingWriter) { - long itemId = writeContext.itemMap.computeIfAbsent(currentTuple.item.toString(), key -> { - long newId = ++itemIdCounter; + long itemId = writeContext.itemMap.computeIfAbsent(tuple.item.toString(), key -> { + if (writeContext.hasExistingData) { + long id = getId(tuple.item, IdType.ITEM_ID); + if (id != 0L) { + return id; + } + } + long newId = ++writeContext.itemIdCounter; IdMapping mapping = new SimpleMapping(); mapping.setId(newId); mapping.setValue(key); @@ -345,8 +402,14 @@ private byte[] generateId(KvinTuple currentTuple, } return newId; }); - long propertyId = writeContext.propertyMap.computeIfAbsent(currentTuple.property.toString(), key -> { - long newId = ++propertyIdCounter; + long propertyId = writeContext.propertyMap.computeIfAbsent(tuple.property.toString(), key -> { + if (writeContext.hasExistingData) { + long id = getId(tuple.property, IdType.PROPERTY_ID); + if (id != 0L) { + return id; + } + } + long newId = ++writeContext.propertyIdCounter; IdMapping mapping = new SimpleMapping(); mapping.setId(newId); mapping.setValue(key); @@ -358,8 +421,14 @@ private byte[] generateId(KvinTuple currentTuple, return newId; }); - long contextId = writeContext.contextMap.computeIfAbsent(currentTuple.context.toString(), key -> { - long newId = ++contextIdCounter; + long contextId = writeContext.contextMap.computeIfAbsent(tuple.context.toString(), key -> { + if (writeContext.hasExistingData) { + long id = getId(tuple.context, IdType.CONTEXT_ID); + if (id != 0L) { + return id; + } + } + long newId = ++writeContext.contextIdCounter; IdMapping mapping = new SimpleMapping(); mapping.setId(newId); mapping.setValue(key); @@ -507,7 +576,7 @@ public String getProperty(KvinTupleInternal tuple) { cachedProperty = propertyMapping.getValue(); propertyIdReverseLookUpCache.put(propertyId, propertyMapping.getValue()); } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } } return cachedProperty; @@ -554,7 +623,7 @@ private synchronized IExtendedIterator fetchInternal(URI item, URI pr } } } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } } @@ -599,7 +668,7 @@ public boolean hasNext() { } nextTuple = selectNextTuple(); } catch (IOException e) { - throw new RuntimeException(e); + throw new UncheckedIOException(e); } return nextTuple != null; } @@ -680,8 +749,9 @@ void nextReaders() throws IOException { } } }; - } catch (Exception e) { - throw new RuntimeException(e); + } catch (IOException e) { + readLock.unlock(); + throw new UncheckedIOException(e); } } @@ -710,16 +780,21 @@ private List getDataFiles(String path) throws IOException { .collect(Collectors.toList()); } - private List getDataFolders(IdMappings idMappings) throws IOException, ExecutionException { + private List getDataFolders(IdMappings idMappings) throws IOException { long itemId = idMappings.itemId; java.nio.file.Path metaPath = Paths.get(archiveLocation, "meta.properties"); - Properties meta = metaCache.get(metaPath, () -> { - Properties p = new Properties(); - if (Files.exists(metaPath)) { - p.load(Files.newInputStream(metaPath)); - } - return p; - }); + Properties meta = null; + try { + meta = metaCache.get(metaPath, () -> { + Properties p = new Properties(); + if (Files.exists(metaPath)) { + p.load(Files.newInputStream(metaPath)); + } + return p; + }); + } catch (ExecutionException e) { + throw new IOException(e); + } return meta.entrySet().stream().flatMap(entry -> { String idRange = (String) entry.getValue(); long[] minMax = splitRange(idRange); @@ -872,11 +947,11 @@ static class WriterState { int week; long[] minMax = {Long.MAX_VALUE, Long.MIN_VALUE}; - WriterState(java.nio.file.Path file, ParquetWriter writer, int year, int weak) { + WriterState(java.nio.file.Path file, ParquetWriter writer, int year, int week) { this.file = file; this.writer = writer; this.year = year; - this.week = weak; + this.week = week; } } @@ -885,7 +960,8 @@ static class IdMappings { } class WriteContext { - // used by writer + boolean hasExistingData; + long itemIdCounter = 0, propertyIdCounter = 0, contextIdCounter = 0; Map itemMap = new HashMap<>(); Map propertyMap = new HashMap<>(); Map contextMap = new HashMap<>(); diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java index f38ae7f5..827d4b79 100644 --- a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitioned.java @@ -5,6 +5,7 @@ import io.github.linkedfactory.core.kvin.KvinTuple; import io.github.linkedfactory.core.kvin.leveldb.KvinLevelDbArchiver; import io.github.linkedfactory.core.kvin.leveldb.KvinLevelDb; +import io.github.linkedfactory.core.kvin.parquet.Compactor; import io.github.linkedfactory.core.kvin.parquet.KvinParquet; import io.github.linkedfactory.core.kvin.util.AggregatingIterator; import net.enilink.commons.iterator.IExtendedIterator; @@ -13,6 +14,8 @@ import net.enilink.commons.util.Pair; import net.enilink.komma.core.URI; import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.File; import java.io.IOException; @@ -23,6 +26,8 @@ import java.util.concurrent.locks.ReentrantReadWriteLock; public class KvinPartitioned implements Kvin { + static final Logger log = LoggerFactory.getLogger(KvinPartitioned.class); + final ReadWriteLock storeLock = new ReentrantReadWriteLock(); protected List listeners = new ArrayList<>(); protected File path; @@ -51,14 +56,23 @@ public CompletableFuture runArchival() { createNewHotDataStore(); } return CompletableFuture.supplyAsync(() -> { - new KvinLevelDbArchiver(hotStoreArchive, archiveStore).archive(); + try { + new KvinLevelDbArchiver(hotStoreArchive, archiveStore).archive(); + try { + new Compactor(archiveStore).execute(); + } catch (IOException e) { + log.error("Compacting archive store failed", e); + } + } catch (Exception e) { + log.error("Archiving data to archive store failed", e); + } try { storeLock.writeLock().lock(); this.hotStoreArchive.close(); this.hotStoreArchive = null; FileUtils.deleteDirectory(this.currentStoreArchivePath); } catch (IOException e) { - throw new RuntimeException(e); + log.error("Deleting hot store archive failed", e); } finally { storeLock.writeLock().unlock(); } diff --git a/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDbArchiver.scala b/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDbArchiver.scala index 279e3d78..0e55f8a3 100644 --- a/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDbArchiver.scala +++ b/bundles/io.github.linkedfactory.core/src/main/scala/io/github/linkedfactory/core/kvin/leveldb/KvinLevelDbArchiver.scala @@ -18,7 +18,7 @@ class KvinLevelDbArchiver(var databaseStore: KvinLevelDb, var archiveStore: Kvin idsSnapshotOption.snapshot(idsSnapshot) - def getDatabaseIterator: NiceIterator[KvinTuple] = { + def getDatabaseIterator: IExtendedIterator[KvinTuple] = { new NiceIterator[KvinTuple] { // initial store iterators val idIterator: DBIterator = ids.iterator(idsSnapshotOption) @@ -89,7 +89,7 @@ class KvinLevelDbArchiver(var databaseStore: KvinLevelDb, var archiveStore: Kvin } def archive(): Unit = { - val dbIterator: NiceIterator[KvinTuple] = getDatabaseIterator + val dbIterator: IExtendedIterator[KvinTuple] = getDatabaseIterator try { archiveStore.put(dbIterator) } finally { diff --git a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java index 9ab5dfee..944bdd0d 100644 --- a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java +++ b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/parquet/KvinParquetTest.java @@ -124,7 +124,7 @@ public void shouldDoFetchForNonSeqEntry() { @Test public void mappingFileCompactionTest() throws IOException { - new Compactor(kvinParquet.archiveLocation, kvinParquet).execute(); + new Compactor(kvinParquet).execute(); File[] metadataFiles = new File(kvinParquet.archiveLocation, "metadata") .listFiles((file, s) -> s.endsWith(".parquet")); diff --git a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitionedTest.java b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitionedTest.java index 2ef2ef84..e359b08f 100644 --- a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitionedTest.java +++ b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/partitioned/KvinPartitionedTest.java @@ -53,7 +53,7 @@ public void shouldDoPut() throws ExecutionException, InterruptedException { // continuing incremental put on kvinPartitioned kvinPartitioned.put(tupleGenerator.setStartTime(1672614000000L).generate()); kvinPartitioned.put(tupleGenerator.setStartTime(1673218800000L).generate()); - NiceIterator storeIterator = new KvinLevelDbArchiver(kvinPartitioned.hotStore, null) + IExtendedIterator storeIterator = new KvinLevelDbArchiver(kvinPartitioned.hotStore, null) .getDatabaseIterator(); int recordCount = 0; while (storeIterator.hasNext()) {