Skip to content

Commit

Permalink
Add further simplifications to KvinParquet.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Aug 1, 2024
1 parent 0e3a1d3 commit 4996e67
Showing 1 changed file with 38 additions and 59 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,7 @@
import java.math.BigDecimal;
import java.math.BigInteger;
import java.nio.ByteBuffer;
import java.nio.file.CopyOption;
import java.nio.file.Files;
import java.nio.file.OpenOption;
import java.nio.file.Paths;
import java.sql.Timestamp;
import java.util.*;
Expand All @@ -63,6 +61,7 @@

public class KvinParquet implements Kvin {
static final Logger log = LoggerFactory.getLogger(KvinParquet.class);
static final long[] EMPTY_IDS = {0};
static Comparator<GenericRecord> RECORD_COMPARATOR = (a, b) -> {
int diff = ((ByteBuffer) a.get(0)).compareTo((ByteBuffer) b.get(0));
if (diff != 0) {
Expand All @@ -80,7 +79,6 @@ public class KvinParquet implements Kvin {
}
return 0;
};
static final long[] EMPTY_IDS = { 0 };
// used by reader
final Cache<URI, Long> itemIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
final Cache<URI, Long> propertyIdCache = CacheBuilder.newBuilder().maximumSize(10000).build();
Expand All @@ -106,6 +104,15 @@ public KvinParquet(String archiveLocation) {
}
}

static boolean anyBetween(long[] values, long min, long max) {
for (int i = 0; i < values.length; i++) {
if (values[i] >= min && values[i] <= max) {
return true;
}
}
return false;
}

private List<IdMapping> fetchMappingIds(Path mappingFile, FilterPredicate filter) throws IOException {
List<IdMapping> mappings = null;
HadoopInputFile inputFile = getFile(mappingFile);
Expand Down Expand Up @@ -639,7 +646,7 @@ private long getId(URI entity, IdType idType) {
IdMapping mapping = null;
for (File mappingFile : mappingFiles) {
var mappings = fetchMappingIds(new Path(mappingFile.getPath()), filter);
if (! mappings.isEmpty()) {
if (!mappings.isEmpty()) {
mapping = mappings.get(0);
break;
}
Expand Down Expand Up @@ -680,7 +687,7 @@ private long[] getIds(List<URI> entities, IdType idType) {
}
i++;
}
if (! toFetch.isEmpty()) {
if (!toFetch.isEmpty()) {
// read from files
String name;
switch (idType) {
Expand Down Expand Up @@ -730,64 +737,45 @@ private IdMappings getIdMappings(URI item, URI property, URI context) throws IOE
return mappings;
}

private List<IdMappings> getIdMappings(List<URI> items, List<URI> properties, URI context) throws IOException {
long[] itemIds = getIds(items, IdType.ITEM_ID);
long[] propertyIds = EMPTY_IDS;
if (! properties.isEmpty()) {
propertyIds = getIds(properties, IdType.PROPERTY_ID);
}
long contextId = 0;
if (context != null) {
contextId = getId(context, IdType.CONTEXT_ID);
}
if (contextId == 0L) {
return Collections.emptyList();
}
return getIdMappings(itemIds, propertyIds, contextId);
}

private List<IdMappings> getIdMappings(long[] itemIds, long[] propertyIds, long contextId) {
List<IdMappings> result = new ArrayList<>(itemIds.length * propertyIds.length);
private FilterPredicate generateFetchFilter(long[] itemIds, long[] propertyIds, long contextId) {
FilterPredicate filter = null;
for (long itemId : itemIds) {
if (itemId == 0L) {
continue;
}
for (long propertyId : propertyIds) {
if (propertyId == 0L && propertyIds!= EMPTY_IDS) {
if (propertyId == 0L && propertyIds != EMPTY_IDS) {
continue;
}
IdMappings mappings = new IdMappings();
mappings.itemId = itemId;
mappings.propertyId = propertyId;
mappings.contextId = contextId;
result.add(mappings);
var fetchFilter = createIdFilter(itemId, propertyId, contextId);
filter = filter == null ? fetchFilter : or(filter, fetchFilter);
}
}
return result;
return filter;
}

private FilterPredicate generateFetchFilter(IdMappings idMappings) {
if (idMappings.itemId != 0L && idMappings.propertyId != 0L && idMappings.contextId != 0L) {
private FilterPredicate createIdFilter(long itemId, long propertyId, long contextId) {
if (itemId != 0L && propertyId != 0L && contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 3);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(idMappings.contextId);
keyBuffer.putLong(idMappings.propertyId);
keyBuffer.putLong(itemId);
keyBuffer.putLong(contextId);
keyBuffer.putLong(propertyId);
return eq(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array()));
} else if (idMappings.contextId != 0L) {
} else if (contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 2);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(idMappings.contextId);
keyBuffer.putLong(itemId);
keyBuffer.putLong(contextId);
return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())),
lt(FilterApi.binaryColumn("id"),
Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES * 2)
.putLong(idMappings.itemId).putLong(idMappings.contextId + 1).array())));
.putLong(itemId).putLong(contextId + 1).array())));
} else {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(itemId);
return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())),
lt(FilterApi.binaryColumn("id"),
Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES)
.putLong(idMappings.itemId + 1).array())));
.putLong(itemId + 1).array())));
}
}

Expand Down Expand Up @@ -822,7 +810,7 @@ public IExtendedIterator<KvinTuple> fetch(URI item, URI property, URI context, l
@Override
public IExtendedIterator<KvinTuple> fetch(URI item, URI property, URI context, long end, long begin, long limit, long interval, String op) {
var properties = property == null ? Collections.<URI>emptyList() : List.of(property);
return fetch(List.of(item), properties, context, end, begin, limit, interval, op);
return fetch(List.of(item), properties, context, end, begin, limit, interval, op);
}

public URI getProperty(long propertyId) throws IOException {
Expand All @@ -835,7 +823,7 @@ public URI getProperty(long propertyId) throws IOException {

for (File mappingFile : mappingFiles) {
var mappings = fetchMappingIds(new Path(mappingFile.getPath()), filter);
if (! mappings.isEmpty()) {
if (!mappings.isEmpty()) {
propertyMapping = mappings.get(0);
break;
}
Expand Down Expand Up @@ -883,18 +871,12 @@ private IExtendedIterator<KvinTuple> fetchInternal(List<URI> items, List<URI> pr
return NiceIterator.emptyIterator();
}
// filters
List<IdMappings> mappingsList = getIdMappings(itemIds, propertyIds, contextId);
if (mappingsList.isEmpty()) {
FilterPredicate filter = generateFetchFilter(itemIds, propertyIds, contextId);
if (filter == null) {
// ensure read lock is freed
readLock.release();
return NiceIterator.emptyIterator();
}

FilterPredicate filter = null;
for (IdMappings idMappings : mappingsList) {
var fetchFilter = generateFetchFilter(idMappings);
filter = filter == null ? fetchFilter : or(filter, fetchFilter);
}
if (begin != null) {
filter = and(filter, gtEq(FilterApi.longColumn("time"), begin));
}
Expand All @@ -903,7 +885,7 @@ private IExtendedIterator<KvinTuple> fetchInternal(List<URI> items, List<URI> pr
}

final FilterPredicate filterFinal = filter;
List<java.nio.file.Path> dataFolders = getDataFolders(mappingsList);
List<java.nio.file.Path> dataFolders = getDataFolders(itemIds);
if (dataFolders.isEmpty()) {
// ensure read lock is freed
readLock.release();
Expand Down Expand Up @@ -1097,7 +1079,7 @@ private List<Path> getDataFiles(String path) throws IOException {
}
}

private List<java.nio.file.Path> getDataFolders(List<IdMappings> idMappings) throws IOException {
private List<java.nio.file.Path> getDataFolders(long[] itemIds) throws IOException {
java.nio.file.Path metaPath = Paths.get(archiveLocation, "meta.properties");
Properties meta;
try {
Expand All @@ -1114,7 +1096,7 @@ private List<java.nio.file.Path> getDataFolders(List<IdMappings> idMappings) thr
return meta.entrySet().stream().flatMap(entry -> {
String idRange = (String) entry.getValue();
long[] minMax = splitRange(idRange);
if (minMax != null && idMappings.stream().anyMatch(m -> m.itemId >= minMax[0] && m.itemId <= minMax[1])) {
if (minMax != null && anyBetween(itemIds, minMax[0], minMax[1])) {
java.nio.file.Path yearFolder = Paths.get(archiveLocation, entry.getKey().toString());
java.nio.file.Path yearMetaPath = yearFolder.resolve("meta.properties");
try {
Expand All @@ -1128,7 +1110,7 @@ private List<java.nio.file.Path> getDataFolders(List<IdMappings> idMappings) thr
return yearMeta.entrySet().stream().filter(weekEntry -> {
String weekIdRange = (String) weekEntry.getValue();
long[] weekMinMax = splitRange(weekIdRange);
return weekMinMax != null && idMappings.stream().anyMatch(m -> m.itemId >= weekMinMax[0] && m.itemId <= weekMinMax[1]);
return weekMinMax != null && anyBetween(itemIds, weekMinMax[0], weekMinMax[1]);
}).map(weekEntry -> yearFolder.resolve(weekEntry.getKey().toString()));
} catch (Exception e) {
log.error("Error while loading meta data", e);
Expand Down Expand Up @@ -1164,10 +1146,7 @@ private List<URI> getProperties(long itemId, long contextId) {
gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(lowKey.array())),
lt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(highKey.array()))));

IdMappings idMappings = new IdMappings();
idMappings.itemId = itemId;
idMappings.contextId = contextId;
List<java.nio.file.Path> dataFolders = getDataFolders(List.of(idMappings));
List<java.nio.file.Path> dataFolders = getDataFolders(new long[]{itemId});
Set<Long> propertyIds = new LinkedHashSet<>();

for (java.nio.file.Path dataFolder : dataFolders) {
Expand Down

0 comments on commit 4996e67

Please sign in to comment.