Skip to content

Commit

Permalink
Correctly initialize ids for multiple writes in KvinParquet.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Apr 19, 2024
1 parent f7d56f8 commit c6e4a48
Show file tree
Hide file tree
Showing 6 changed files with 203 additions and 99 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ public class Compactor {
int dataFileCompactionTrigger = 2, mappingFileCompactionTrigger = 3;
File compactionFolder;

public Compactor(String archiveLocation, KvinParquet kvinParquet) {
this.archiveLocation = archiveLocation;
public Compactor(KvinParquet kvinParquet) {
this.archiveLocation = kvinParquet.archiveLocation;
this.compactionFolder = new File(archiveLocation, ".compaction");
this.kvinParquet = kvinParquet;
}
Expand All @@ -42,36 +42,47 @@ public void execute() throws IOException {
}
}

kvinParquet.writeLock.lock();
clearCache();
// replace existing files with compacted files
FileUtils.cleanDirectory(new File(archiveLocation, "metadata"));
for (File weekFolder : weekFolders) {
FileUtils.cleanDirectory(weekFolder);
if (!compactionFolder.exists()) {
// nothing to do, compaction was not necessary
return;
}

try {
kvinParquet.writeLock.lock();
clearCache();
// replace existing files with compacted files
FileUtils.cleanDirectory(new File(archiveLocation, "metadata"));
for (File weekFolder : weekFolders) {
FileUtils.cleanDirectory(weekFolder);
}
java.nio.file.Path source = compactionFolder.toPath();
java.nio.file.Path destination = Paths.get(archiveLocation);
Files.walk(source)
.skip(1)
.filter(p -> Files.isRegularFile(p))
.forEach(p -> {
java.nio.file.Path dest = destination.resolve(source.relativize(p));
try {
Files.createDirectories(dest.getParent());
Files.move(p, dest);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
} finally {
kvinParquet.writeLock.unlock();
}
java.nio.file.Path source = compactionFolder.toPath();
java.nio.file.Path destination = Paths.get(archiveLocation);
Files.walk(source)
.skip(1)
.filter(p -> Files.isRegularFile(p))
.forEach(p -> {
java.nio.file.Path dest = destination.resolve(source.relativize(p));
try {
Files.createDirectories(dest.getParent());
Files.move(p, dest);
} catch (IOException e) {
throw new UncheckedIOException(e);
}
});
kvinParquet.writeLock.unlock();
}

private void performMappingFileCompaction() throws IOException {
Map<String, List<Pair<String, Integer>>> mappingFiles = getMappingFiles(Paths.get(archiveLocation, "metadata"));
if (mappingFiles.get("items").size() >= mappingFileCompactionTrigger) {
kvinParquet.readLock.lock();
generateCompactedMappingFiles(mappingFiles, new File(compactionFolder, "metadata"));
kvinParquet.readLock.unlock();
try {
kvinParquet.readLock.lock();
generateCompactedMappingFiles(mappingFiles, new File(compactionFolder, "metadata"));
} finally {
kvinParquet.readLock.unlock();
}
}
}

Expand Down Expand Up @@ -139,50 +150,53 @@ private ParquetReader<KvinTupleInternal> getParquetDataReader(HadoopInputFile fi
}

private void compactDataFiles(File weekFolder) throws IOException {
kvinParquet.readLock.lock();
List<java.nio.file.Path> dataFiles = Files.walk(weekFolder.toPath(), 1)
.skip(1)
.filter(path -> path.getFileName().toString().startsWith("data_"))
.collect(Collectors.toList());

java.nio.file.Path targetFolder = compactionFolder.toPath().resolve(
Paths.get(archiveLocation).relativize(weekFolder.toPath()));

Path compactionFile = new Path(targetFolder.toAbsolutePath().toString(), "data__1.parquet");
ParquetWriter<KvinTupleInternal> compactionFileWriter = getParquetDataWriter(compactionFile);

PriorityQueue<Pair<KvinTupleInternal, ParquetReader<KvinTupleInternal>>> nextTuples =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst));
for (java.nio.file.Path dataFile : dataFiles) {
ParquetReader<KvinTupleInternal> reader = getParquetDataReader(
HadoopInputFile.fromPath(new Path(dataFile.toString()), new Configuration()));
KvinTupleInternal tuple = reader.read();
if (tuple != null) {
nextTuples.add(new Pair<>(tuple, reader));
} else {
try {
reader.close();
} catch (IOException e) {
try {
kvinParquet.readLock.lock();
List<java.nio.file.Path> dataFiles = Files.walk(weekFolder.toPath(), 1)
.skip(1)
.filter(path -> path.getFileName().toString().startsWith("data_"))
.collect(Collectors.toList());

java.nio.file.Path targetFolder = compactionFolder.toPath().resolve(
Paths.get(archiveLocation).relativize(weekFolder.toPath()));

Path compactionFile = new Path(targetFolder.toAbsolutePath().toString(), "data__1.parquet");
ParquetWriter<KvinTupleInternal> compactionFileWriter = getParquetDataWriter(compactionFile);

PriorityQueue<Pair<KvinTupleInternal, ParquetReader<KvinTupleInternal>>> nextTuples =
new PriorityQueue<>(Comparator.comparing(Pair::getFirst));
for (java.nio.file.Path dataFile : dataFiles) {
ParquetReader<KvinTupleInternal> reader = getParquetDataReader(
HadoopInputFile.fromPath(new Path(dataFile.toString()), new Configuration()));
KvinTupleInternal tuple = reader.read();
if (tuple != null) {
nextTuples.add(new Pair<>(tuple, reader));
} else {
try {
reader.close();
} catch (IOException e) {
}
}
}
}

while (!nextTuples.isEmpty()) {
var pair = nextTuples.poll();
compactionFileWriter.write(pair.getFirst());
KvinTupleInternal tuple = pair.getSecond().read();
if (tuple != null) {
nextTuples.add(new Pair<>(tuple, pair.getSecond()));
} else {
try {
pair.getSecond().close();
} catch (IOException e) {
while (!nextTuples.isEmpty()) {
var pair = nextTuples.poll();
compactionFileWriter.write(pair.getFirst());
KvinTupleInternal tuple = pair.getSecond().read();
if (tuple != null) {
nextTuples.add(new Pair<>(tuple, pair.getSecond()));
} else {
try {
pair.getSecond().close();
} catch (IOException e) {
}
}
}
}

compactionFileWriter.close();
kvinParquet.readLock.unlock();
compactionFileWriter.close();
} finally {
kvinParquet.readLock.unlock();
}
}

private void clearCache() {
Expand Down
Loading

0 comments on commit c6e4a48

Please sign in to comment.