Skip to content

Commit

Permalink
Improve compaction and KvinParquet.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Jun 25, 2024
1 parent d52de61 commit 11bb7ec
Show file tree
Hide file tree
Showing 4 changed files with 128 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,10 @@
import static io.github.linkedfactory.core.kvin.parquet.ParquetHelpers.*;

public class Compactor {
final KvinParquet kvinParquet;
final File compactionFolder;
String archiveLocation;
KvinParquet kvinParquet;
int dataFileCompactionTrigger = 2, mappingFileCompactionTrigger = 3;
File compactionFolder;

public Compactor(KvinParquet kvinParquet) {
this.archiveLocation = kvinParquet.archiveLocation;
Expand All @@ -32,14 +32,21 @@ public Compactor(KvinParquet kvinParquet) {
}

public void execute() throws IOException {
boolean mappingFilesCompacted = performMappingFileCompaction();
List<File> weekFolders = getCompactionEligibleWeekFolders();
for (File weekFolder : weekFolders) {
try {
compactDataFiles(weekFolder);
} catch (IOException e) {
throw new RuntimeException(e);
Set<String> compactedMappings;
List<File> weekFolders;
kvinParquet.readLock.lock();
try {
compactedMappings = compactMappingFiles();
weekFolders = getCompactionEligibleWeekFolders();
for (File weekFolder : weekFolders) {
try {
compactDataFiles(weekFolder);
} catch (IOException e) {
throw new RuntimeException(e);
}
}
} finally {
kvinParquet.readLock.unlock();
}

if (!compactionFolder.exists()) {
Expand All @@ -51,8 +58,9 @@ public void execute() throws IOException {
kvinParquet.writeLock.lock();
clearCache();
// replace existing files with compacted files
if (mappingFilesCompacted) {
FileUtils.cleanDirectory(new File(archiveLocation, "metadata"));
if (!compactedMappings.isEmpty()) {
// delete compacted mapping files
deleteMappingFiles(Paths.get(archiveLocation, "metadata"), compactedMappings);
}
for (File weekFolder : weekFolders) {
FileUtils.cleanDirectory(weekFolder);
Expand All @@ -71,26 +79,13 @@ public void execute() throws IOException {
throw new UncheckedIOException(e);
}
});
// completely delete compaction folder
FileUtils.deleteDirectory(compactionFolder);
} finally {
kvinParquet.writeLock.unlock();
}
}

private boolean performMappingFileCompaction() throws IOException {
Map<String, List<Pair<String, Integer>>> mappingFiles = getMappingFiles(Paths.get(archiveLocation, "metadata"));
List<Pair<String, Integer>> itemsFiles = mappingFiles.get("items");
if (itemsFiles != null && itemsFiles.size() >= mappingFileCompactionTrigger) {
try {
kvinParquet.readLock.lock();
generateCompactedMappingFiles(mappingFiles, new File(compactionFolder, "metadata"));
return true;
} finally {
kvinParquet.readLock.unlock();
}
}
return false;
}

private List<File> getCompactionEligibleWeekFolders() {
List<File> weekFolderList = new ArrayList<>();
File[] yearFolders = new File(archiveLocation).listFiles((file, s) ->
Expand All @@ -107,10 +102,17 @@ private List<File> getCompactionEligibleWeekFolders() {
return weekFolderList;
}

private void generateCompactedMappingFiles(Map<String, List<Pair<String, Integer>>> mappingFiles,
File compactionFolder) throws IOException {
private Set<String> compactMappingFiles() throws IOException {
Set<String> compacted = new HashSet<>();
Map<String, List<Pair<String, Integer>>> mappingFiles = getMappingFiles(Paths.get(archiveLocation, "metadata"));
for (Map.Entry<String, List<Pair<String, Integer>>> mapping : mappingFiles.entrySet()) {
Path compactedFile = new Path(compactionFolder.toString(), mapping.getKey() + "__1.parquet");
if (mapping.getValue().size() < mappingFileCompactionTrigger) {
// do nothing if number of files for compaction is not yet reached
continue;
}
compacted.add(mapping.getKey());

Path compactedFile = new Path(new File(compactionFolder, "metadata").toString(), mapping.getKey() + "__1.parquet");
ParquetWriter<Object> compactedFileWriter = getParquetMappingWriter(compactedFile);

PriorityQueue<Pair<IdMapping, ParquetReader<IdMapping>>> nextMappings =
Expand All @@ -128,16 +130,18 @@ private void generateCompactedMappingFiles(Map<String, List<Pair<String, Integer

while (!nextMappings.isEmpty()) {
var pair = nextMappings.poll();
compactedFileWriter.write(pair.getFirst());

IdMapping idMapping = pair.getSecond().read();
if (idMapping != null) {
compactedFileWriter.write(idMapping);
nextMappings.add(new Pair<>(idMapping, pair.getSecond()));
} else {
pair.getSecond().close();
}
}
compactedFileWriter.close();
}
return compacted;
}

private ParquetReader<IdMapping> getParquetMappingReader(HadoopInputFile file) throws IOException {
Expand Down Expand Up @@ -187,6 +191,7 @@ private void compactDataFiles(File weekFolder) throws IOException {
while (!nextTuples.isEmpty()) {
var pair = nextTuples.poll();
compactionFileWriter.write(pair.getFirst());

KvinTupleInternal tuple = pair.getSecond().read();
if (tuple != null) {
nextTuples.add(new Pair<>(tuple, pair.getSecond()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -245,10 +245,24 @@ private synchronized void putInternal(Iterable<KvinTuple> tuples) throws IOExcep
for (WriterState state : writers.values()) {
state.writer.close();
}

boolean itemsWritten = itemMappingWriter.getDataSize() > 0;
itemMappingWriter.close();
boolean contextsWritten = contextMappingWriter.getDataSize() > 0;
contextMappingWriter.close();
boolean propertiesWritten = propertyMappingWriter.getDataSize() > 0;
propertyMappingWriter.close();

if (!itemsWritten) {
Files.delete(Paths.get(itemMappingFile.toString()));
}
if (!contextsWritten) {
Files.delete(Paths.get(contextMappingFile.toString()));
}
if (!propertiesWritten) {
Files.delete(Paths.get(propertyMappingFile.toString()));
}

Map<Integer, long[]> minMaxYears = new HashMap<>();
for (WriterState state : writers.values()) {
minMaxYears.compute(state.year, (k, v) -> {
Expand Down Expand Up @@ -565,29 +579,29 @@ protected KvinTuple createElement(URI item, URI property, URI context, long time
return internalResult;
}

public String getProperty(KvinTupleInternal tuple) {
public String getProperty(KvinTupleInternal tuple) throws IOException {
ByteBuffer idBuffer = ByteBuffer.wrap(tuple.getId());
idBuffer.getLong();
Long propertyId = idBuffer.getLong();
String cachedProperty = propertyIdReverseLookUpCache.getIfPresent(propertyId);

if (cachedProperty == null) {
try {
FilterPredicate filter = eq(FilterApi.longColumn("id"), propertyId);
Path metadataFolder = new Path(this.archiveLocation + "metadata/");
File[] mappingFiles = new File(metadataFolder.toString()).listFiles((file, s) -> s.startsWith("properties"));
IdMapping propertyMapping = null;

for (File mappingFile : mappingFiles) {
propertyMapping = fetchMappingIds(new Path(mappingFile.getPath()), filter);
if (propertyMapping != null) break;
}
FilterPredicate filter = eq(FilterApi.longColumn("id"), propertyId);
Path metadataFolder = new Path(this.archiveLocation + "metadata/");
File[] mappingFiles = new File(metadataFolder.toString()).listFiles((file, s) -> s.startsWith("properties"));
IdMapping propertyMapping = null;

for (File mappingFile : mappingFiles) {
propertyMapping = fetchMappingIds(new Path(mappingFile.getPath()), filter);
if (propertyMapping != null) break;
}

if (propertyMapping == null) {
throw new IOException("Unknown property with id: " + propertyId);
} else {
cachedProperty = propertyMapping.getValue();
propertyIdReverseLookUpCache.put(propertyId, propertyMapping.getValue());
} catch (IOException e) {
throw new UncheckedIOException(e);
}
propertyIdReverseLookUpCache.put(propertyId, cachedProperty);
}
return cachedProperty;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.linkedfactory.core.kvin.parquet;

import io.github.linkedfactory.core.kvin.partitioned.KvinPartitioned;
import net.enilink.commons.util.Pair;
import org.apache.avro.Schema;
import org.apache.avro.SchemaBuilder;
Expand All @@ -10,6 +11,8 @@
import org.apache.parquet.hadoop.ParquetWriter;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.nio.file.Files;
Expand All @@ -18,7 +21,8 @@
import java.util.regex.Pattern;

public class ParquetHelpers {
final static ReflectData reflectData = new ReflectData(ParquetHelpers.class.getClassLoader());
static final Logger log = LoggerFactory.getLogger(ParquetHelpers.class);
static final ReflectData reflectData = new ReflectData(ParquetHelpers.class.getClassLoader());

// parquet file writer config
static final long ROW_GROUP_SIZE = 1048576; // 1 MB
Expand All @@ -45,6 +49,7 @@ public class ParquetHelpers {
.name("valueObject").type().nullable().bytesType().noDefault().endRecord();

static Pattern fileWithSeqNr = Pattern.compile("^([^.].*)__([0-9]+)\\..*$");
static Pattern fileOrDotFileWithSeqNr = Pattern.compile("^\\.?([^.].*)__([0-9]+)\\..*$");

static ParquetWriter<KvinTupleInternal> getParquetDataWriter(Path dataFile) throws IOException {
return AvroParquetWriter.<KvinTupleInternal>builder(HadoopOutputFile.fromPath(dataFile, configuration))
Expand Down Expand Up @@ -100,4 +105,25 @@ public static Map<String, List<Pair<String, Integer>>> getMappingFiles(java.nio.
}
return fileMap;
}

public static void deleteMappingFiles(java.nio.file.Path folder, Set<String> types)
throws IOException {
if (Files.isDirectory(folder)) {
Files.list(folder).forEach(p -> {
String name = p.getFileName().toString();
Matcher m = fileOrDotFileWithSeqNr.matcher(name);
if (m.matches()) {
String type = m.group(1);
if (types.contains(type)) {
// delete file
try {
Files.delete(p);
} catch (IOException e) {
log.error("Unable to delete file", e);
}
}
}
});
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,22 +301,35 @@ public IExtendedIterator<URI> descendants(URI item) {
storeLock.readLock().lock();
IExtendedIterator<URI> results = hotStore.descendants(item);
return new NiceIterator<>() {
boolean closed;

@Override
public boolean hasNext() {
return results.hasNext();
if (results.hasNext()) {
return true;
} else {
close();
return false;
}
}

@Override
public URI next() {
return results.next();
if (hasNext()) {
return results.next();
}
throw new NoSuchElementException();
}

@Override
public void close() {
try {
results.close();
} finally {
storeLock.readLock().unlock();
if (!closed) {
try {
results.close();
} finally {
storeLock.readLock().unlock();
closed = true;
}
}
}
};
Expand All @@ -327,22 +340,35 @@ public IExtendedIterator<URI> descendants(URI item, long limit) {
storeLock.readLock().lock();
IExtendedIterator<URI> results = hotStore.descendants(item, limit);
return new NiceIterator<>() {
boolean closed;

@Override
public boolean hasNext() {
return results.hasNext();
if (results.hasNext()) {
return true;
} else {
close();
return false;
}
}

@Override
public URI next() {
return results.next();
if (hasNext()) {
return results.next();
}
throw new NoSuchElementException();
}

@Override
public void close() {
try {
results.close();
} finally {
storeLock.readLock().unlock();
if (!closed) {
try {
results.close();
} finally {
storeLock.readLock().unlock();
closed = true;
}
}
}
};
Expand Down

0 comments on commit 11bb7ec

Please sign in to comment.