Skip to content

Commit

Permalink
Restructured modules and fixed KvinParquet properties method.
Browse files Browse the repository at this point in the history
  • Loading branch information
kenwenzel committed Dec 1, 2023
1 parent 7be9238 commit dbb6187
Show file tree
Hide file tree
Showing 75 changed files with 191 additions and 200 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
<version>0.0.1-SNAPSHOT</version>
<relativePath>../../pom.xml</relativePath>
</parent>
<artifactId>io.github.linkedfactory.kvin</artifactId>
<artifactId>io.github.linkedfactory.core</artifactId>

<properties>
<parquet.version>1.13.1</parquet.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin;
package io.github.linkedfactory.core.kvin;

import java.util.Arrays;
import java.util.Iterator;
Expand All @@ -23,7 +23,6 @@
import java.util.stream.StreamSupport;

import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;

/**
* A linked list of (property, value) pairs where properties are {@link URI}s
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin;
package io.github.linkedfactory.core.kvin;

import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.komma.core.URI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin;
package io.github.linkedfactory.core.kvin;

import java.io.Closeable;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin;
package io.github.linkedfactory.core.kvin;

import net.enilink.komma.core.URI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin;
package io.github.linkedfactory.core.kvin;

import java.util.Formatter;
import java.util.Objects;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin;
package io.github.linkedfactory.core.kvin;

import net.enilink.komma.core.URI;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.http;
package io.github.linkedfactory.core.kvin.http;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonParser;
Expand All @@ -7,11 +7,11 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.io.ByteStreams;
import io.github.linkedfactory.kvin.Kvin;
import io.github.linkedfactory.kvin.KvinListener;
import io.github.linkedfactory.kvin.KvinTuple;
import io.github.linkedfactory.kvin.Record;
import io.github.linkedfactory.kvin.util.JsonFormatParser;
import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.kvin.KvinListener;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.kvin.util.JsonFormatParser;
import net.enilink.commons.iterator.*;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
Expand All @@ -25,15 +25,12 @@
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.impl.conn.PoolingHttpClientConnectionManager;
import org.apache.http.util.EntityUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.util.*;

public class KvinHttp implements Kvin {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

import net.enilink.commons.util.Pair;
import org.apache.commons.io.FileUtils;
Expand All @@ -17,7 +17,7 @@
import java.util.*;
import java.util.stream.Collectors;

import static io.github.linkedfactory.kvin.parquet.ParquetHelpers.*;
import static io.github.linkedfactory.core.kvin.parquet.ParquetHelpers.*;

public class Compactor {
String archiveLocation;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

public interface IdMapping {
long getId();
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import io.github.linkedfactory.kvin.Kvin;
import io.github.linkedfactory.kvin.KvinListener;
import io.github.linkedfactory.kvin.KvinTuple;
import io.github.linkedfactory.kvin.Record;
import io.github.linkedfactory.kvin.util.AggregatingIterator;
import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.kvin.KvinListener;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.kvin.util.AggregatingIterator;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.commons.util.Pair;
Expand Down Expand Up @@ -42,9 +42,9 @@
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static io.github.linkedfactory.kvin.parquet.ParquetHelpers.*;
import static io.github.linkedfactory.kvin.parquet.Records.decodeRecord;
import static io.github.linkedfactory.kvin.parquet.Records.encodeRecord;
import static io.github.linkedfactory.core.kvin.parquet.ParquetHelpers.*;
import static io.github.linkedfactory.core.kvin.parquet.Records.decodeRecord;
import static io.github.linkedfactory.core.kvin.parquet.Records.encodeRecord;
import static org.apache.parquet.filter2.predicate.FilterApi.*;

public class KvinParquet implements Kvin {
Expand All @@ -68,7 +68,7 @@ public class KvinParquet implements Kvin {

public KvinParquet(String archiveLocation) {
this.archiveLocation = archiveLocation;
if (! this.archiveLocation.endsWith("/")) {
if (!this.archiveLocation.endsWith("/")) {
this.archiveLocation = this.archiveLocation + "/";
}
}
Expand Down Expand Up @@ -437,24 +437,13 @@ private IdMappings getIdMappings(URI item, URI property, URI context) throws IOE
}

private FilterPredicate generateFetchFilter(IdMappings idMappings) {
if (idMappings.propertyId != 0L) {
if (idMappings.propertyId != 0L && idMappings.contextId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 3);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(idMappings.propertyId);
keyBuffer.putLong(idMappings.contextId);
return eq(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array()));
} else {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES);
keyBuffer.putLong(idMappings.itemId);
return and(gt(FilterApi.binaryColumn("id"), Binary.fromConstantByteArray(keyBuffer.array())),
lt(FilterApi.binaryColumn("id"),
Binary.fromConstantByteArray(ByteBuffer.allocate(Long.BYTES)
.putLong(idMappings.itemId + 1).array())));
}
}

private FilterPredicate generatePropertyFetchFilter(IdMappings idMappings) {
if (idMappings.propertyId != 0L) {
} else if (idMappings.propertyId != 0L) {
ByteBuffer keyBuffer = ByteBuffer.allocate(Long.BYTES * 2);
keyBuffer.putLong(idMappings.itemId);
keyBuffer.putLong(idMappings.propertyId);
Expand Down Expand Up @@ -768,36 +757,50 @@ private KvinTupleMetadata getFirstTuple(URI item, Long itemId, Long propertyId,
KvinTupleMetadata foundTuple = null;
try {
if (itemId == null) {
idMappings = getIdMappings(item, null, Kvin.DEFAULT_CONTEXT);
idMappings = getIdMappings(item, null, null);
if (propertyId != null) {
idMappings.propertyId = propertyId;
}
if (contextId != null) {
idMappings.contextId = contextId;
}
} else if (item != null && itemId != null && propertyId != null) {
idMappings = new IdMappings();
idMappings.itemId = itemId;
idMappings.propertyId = propertyId;
idMappings.contextId = contextId;
idMappings.contextId = contextId != null ? contextId : 0L;
}
if (idMappings.itemId == 0L) {
return null;
}

FilterPredicate filter = generatePropertyFetchFilter(idMappings);
FilterPredicate filter = generateFetchFilter(idMappings);
List<java.nio.file.Path> dataFolders = getDataFolders(idMappings);
ParquetReader<KvinTupleInternal> reader;

HadoopInputFile inputFile = getFile(getDataFiles(dataFolders.get(0).toString()).get(0));
reader = AvroParquetReader.<KvinTupleInternal>builder(inputFile)
.withDataModel(reflectData)
.useStatsFilter()
.withFilter(FilterCompat.get(filter))
.build();
KvinTupleInternal firstTuple = reader.read();
reader.close();
KvinTupleInternal firstTuple = null;
for (java.nio.file.Path dataFolder : dataFolders) {
for (Path dataFile : getDataFiles(dataFolder.toString())) {
reader = AvroParquetReader.<KvinTupleInternal>builder(getFile(dataFile))
.withDataModel(reflectData)
.useStatsFilter()
.withFilter(FilterCompat.get(filter))
.build();
KvinTupleInternal tuple = reader.read();
if (firstTuple == null || tuple != null && firstTuple != null && tuple.compareTo(firstTuple) < 0) {
firstTuple = tuple;
}
reader.close();
}
}

if (firstTuple != null) {
URI firstTupleProperty = URIs.createURI(getProperty(firstTuple));
if (itemId == null) {
idMappings.propertyId = getId(firstTupleProperty, IdType.PROPERTY_ID);
}
foundTuple = new KvinTupleMetadata(item.toString(), firstTupleProperty.toString(), idMappings.itemId, idMappings.propertyId, idMappings.contextId);
foundTuple = new KvinTupleMetadata(item.toString(), firstTupleProperty.toString(),
idMappings.itemId, idMappings.propertyId, idMappings.contextId);
}

return foundTuple;
Expand All @@ -816,7 +819,8 @@ public synchronized IExtendedIterator<URI> properties(URI item) {
@Override
public boolean hasNext() {
if (currentTuple == null && previousTuple != null) {
currentTuple = getFirstTuple(URIs.createURI(previousTuple.getItem()), previousTuple.getItemId(), previousTuple.getPropertyId() + 1, previousTuple.contextId);
currentTuple = getFirstTuple(URIs.createURI(previousTuple.getItem()), previousTuple.getItemId(),
previousTuple.getPropertyId() + 1, null);
}
return currentTuple != null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

import java.util.Arrays;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

public class KvinTupleMetadata {
String item;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

import net.enilink.commons.util.Pair;
import org.apache.avro.Schema;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

import io.github.linkedfactory.kvin.Record;
import io.github.linkedfactory.kvin.util.Values;
import io.github.linkedfactory.core.kvin.Record;
import io.github.linkedfactory.core.kvin.util.Values;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;

Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package io.github.linkedfactory.kvin.parquet;
package io.github.linkedfactory.core.kvin.parquet;

public class SimpleMapping implements IdMapping {
long id;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package io.github.linkedfactory.kvin.partitioned;
package io.github.linkedfactory.core.kvin.partitioned;

import io.github.linkedfactory.kvin.Kvin;
import io.github.linkedfactory.kvin.KvinListener;
import io.github.linkedfactory.kvin.KvinTuple;
import io.github.linkedfactory.kvin.leveldb.KvinLevelDbArchiver;
import io.github.linkedfactory.kvin.leveldb.KvinLevelDb;
import io.github.linkedfactory.kvin.parquet.KvinParquet;
import io.github.linkedfactory.kvin.util.AggregatingIterator;
import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.kvin.KvinListener;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.leveldb.KvinLevelDbArchiver;
import io.github.linkedfactory.core.kvin.leveldb.KvinLevelDb;
import io.github.linkedfactory.core.kvin.parquet.KvinParquet;
import io.github.linkedfactory.core.kvin.util.AggregatingIterator;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.commons.iterator.WrappedIterator;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin.util;
package io.github.linkedfactory.core.kvin.util;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;

import io.github.linkedfactory.kvin.Kvin;
import io.github.linkedfactory.core.kvin.Kvin;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import net.enilink.commons.iterator.NiceIterator;
import net.enilink.commons.util.ValueUtils;
import net.enilink.komma.core.URI;
import io.github.linkedfactory.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.KvinTuple;

/**
* An iterator for KVIN tuples supporting a set of aggregation operators (min,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,17 +1,16 @@
package io.github.linkedfactory.kvin.util;
package io.github.linkedfactory.core.kvin.util;

import com.fasterxml.jackson.core.JsonFactory;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.core.JsonParser;
import com.fasterxml.jackson.core.JsonParser.Feature;
import com.fasterxml.jackson.core.JsonToken;
import com.fasterxml.jackson.databind.DeserializationFeature;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;

import io.github.linkedfactory.kvin.Kvin;
import io.github.linkedfactory.kvin.KvinTuple;
import io.github.linkedfactory.kvin.Record;
import io.github.linkedfactory.core.kvin.Kvin;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import net.enilink.commons.iterator.NiceIterator;
import net.enilink.komma.core.URI;
import net.enilink.komma.core.URIs;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
package io.github.linkedfactory.kvin.util;
package io.github.linkedfactory.core.kvin.util;

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.github.linkedfactory.kvin.KvinTuple;
import io.github.linkedfactory.kvin.Record;
import io.github.linkedfactory.core.kvin.KvinTuple;
import io.github.linkedfactory.core.kvin.Record;
import net.enilink.commons.iterator.IExtendedIterator;
import net.enilink.komma.core.URI;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.github.linkedfactory.kvin.util;
package io.github.linkedfactory.core.kvin.util;

import java.io.IOException;
import java.math.BigDecimal;
Expand Down
Loading

0 comments on commit dbb6187

Please sign in to comment.