From 2f8d9d2af2064ee7ef17f761e62b281a6fd5d62e Mon Sep 17 00:00:00 2001 From: Ken Wenzel Date: Wed, 13 Dec 2023 09:41:13 +0100 Subject: [PATCH] Added ScyllaDB-based Kvin implementation. --- bundles/io.github.linkedfactory.core/pom.xml | 6 + .../core/kvin/scylladb/KvinScyllaDb.java | 473 ++++++++++++++++++ .../core/kvin/scylladb/KvinScyllaDbTest.java | 174 +++++++ 3 files changed, 653 insertions(+) create mode 100644 bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java create mode 100644 bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java diff --git a/bundles/io.github.linkedfactory.core/pom.xml b/bundles/io.github.linkedfactory.core/pom.xml index 66b28c69..597fec2d 100644 --- a/bundles/io.github.linkedfactory.core/pom.xml +++ b/bundles/io.github.linkedfactory.core/pom.xml @@ -132,6 +132,12 @@ + + com.scylladb + java-driver-core + 4.13.0.0 + + org.openjdk.jmh jmh-core diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java new file mode 100644 index 00000000..70a2d423 --- /dev/null +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java @@ -0,0 +1,473 @@ +package io.github.linkedfactory.kvin.scylladb; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.*; +import io.github.linkedfactory.kvin.Kvin; +import io.github.linkedfactory.kvin.KvinListener; +import io.github.linkedfactory.kvin.KvinTuple; +import io.github.linkedfactory.kvin.Record; +import io.github.linkedfactory.kvin.util.AggregatingIterator; +import io.github.linkedfactory.kvin.util.Values; +import net.enilink.commons.iterator.IExtendedIterator; +import net.enilink.commons.iterator.NiceIterator; +import net.enilink.komma.core.URI; +import net.enilink.komma.core.URIs; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.time.Instant; +import java.time.YearMonth; +import java.time.ZoneId; +import java.util.*; + +public class KvinScyllaDb implements Kvin { + CqlSession session; + String keyspace; + ArrayList listeners = new ArrayList<>(); + PreparedStatement kvinDataAllTableInsertStatement, KvinMetadataTableInsertStatement; + + public KvinScyllaDb(String keyspace) { + this.keyspace = keyspace; + initializeDatabase(); + } + + @Override + public boolean addListener(KvinListener listener) { + listeners.add(listener); + return true; + } + + @Override + public boolean removeListener(KvinListener listener) { + listeners.remove(listener); + return true; + } + + private void initializeDatabase() { + // will try to connect to the ScyllaDB server with default host (127.0.0.1) and port (9042) + session = CqlSession.builder().build(); + // keyspace creation (with development config) + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};"); + //kvinData table creation + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".kvinData (" + + "item text," + + "timeRange text," + + "property text," + + "context text," + + "time bigint," + + "seqNr int," + + "value blob," + + "PRIMARY KEY((item, timeRange), context, property, time, seqNr));"); + //metadata table creation + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".kvinMetadata (" + + "item text," + + "property text," + + "context text," + + "timeRange text," + + "PRIMARY KEY(item, context, property));"); + + // put prepared statements + kvinDataAllTableInsertStatement = session.prepare("INSERT INTO " + keyspace + ".kvinData(" + + "item, timeRange, property, context, time, seqNr, value) " + + "VALUES (?, ?, ?, ?, ?, ?, ?);"); + KvinMetadataTableInsertStatement = session.prepare("INSERT INTO " + keyspace + ".kvinMetadata(" + + "item, property, context, timeRange) " + + "VALUES (?, ?, ?, ?) IF NOT EXISTS;"); + } + + @Override + public void put(KvinTuple... tuples) { + put(Arrays.asList(tuples)); + } + + @Override + public void put(Iterable tuples) { + try { + putInternal(tuples); + } catch (IOException | SQLException jsonProcessingException) { + throw new RuntimeException(jsonProcessingException); + } + } + + private void putInternal(Iterable tuples) throws IOException, SQLException { + boolean isEndOfAllTuples = false; + URI currentItem = null, previousItem = null; + + Set currentItemProperties = new HashSet<>(); + Iterator iterator = tuples.iterator(); + + BatchStatementBuilder kvinDataBatch = BatchStatement.builder(DefaultBatchType.LOGGED); + BatchStatementBuilder kvinMetadataBatch = BatchStatement.builder(DefaultBatchType.LOGGED); + + + while (!isEndOfAllTuples) { + KvinTuple tuple = iterator.next(); + isEndOfAllTuples = !iterator.hasNext(); + + if (currentItem == null) { + currentItem = tuple.item; + previousItem = tuple.item; + } else { + currentItem = tuple.item; + } + + // building kvinData batch + YearMonth tupleYearMonth = getYearMonth(tuple.time); + String timeRange = tupleYearMonth.getMonthValue() + "" + tupleYearMonth.getYear(); + kvinDataBatch.addStatement(kvinDataAllTableInsertStatement.bind( + tuple.item.toString(), + timeRange, + tuple.property.toString(), + tuple.context.toString(), + tuple.time, + tuple.seqNr, + ByteBuffer.wrap(encodeTupleValue(tuple.value)))); + + // inserting metadata on item change + if (isEndOfAllTuples || currentItem != previousItem) { + //batch insert into kvinData table + session.execute(kvinDataBatch.build()); + kvinDataBatch.clearStatements(); + + if (isEndOfAllTuples) { + currentItemProperties.add(tuple.property.toString()); + } + + for (String itemProperty : currentItemProperties) { + // building timeRange kvinMetadata batch + YearMonth propertyYearMonth = getYearMonth(tuple.time); + kvinMetadataBatch.addStatement(KvinMetadataTableInsertStatement.bind( + previousItem.toString(), + itemProperty, + tuple.context.toString(), + propertyYearMonth.getMonthValue() + "" + propertyYearMonth.getYear() + )); + } + // inserting metadata batch + session.execute(kvinMetadataBatch.build()); + kvinMetadataBatch.clearStatements(); + + currentItemProperties.clear(); + currentItem = null; + previousItem = null; + } else { + previousItem = currentItem; + } + currentItemProperties.add(tuple.property.toString()); + } + } + + private YearMonth getYearMonth(long timestamp) { + return YearMonth.from(Instant.ofEpochSecond(timestamp).atZone(ZoneId.systemDefault())); + } + + @Override + public IExtendedIterator fetch(URI item, URI property, URI context, long limit) { + return fetchInternal(item, property, context, Long.MAX_VALUE, 0L, limit, null, null); + } + + @Override + public IExtendedIterator fetch(URI item, URI property, URI context, long end, long begin, long limit, long interval, String op) { + IExtendedIterator internalResult = fetchInternal(item, property, context, end, begin, limit, interval, op); + if (op != null) { + internalResult = new AggregatingIterator<>(internalResult, interval, op.trim().toLowerCase(), limit) { + @Override + protected KvinTuple createElement(URI item, URI property, URI context, long time, int seqNr, Object value) { + return new KvinTuple(item, property, context, time, seqNr, value); + } + }; + } + return internalResult; + } + + private IExtendedIterator fetchInternal(URI item, URI property, URI context, Long end, Long begin, Long limit, Long interval, String op) { + if (item == null) return NiceIterator.emptyIterator(); + + if (limit > 0 || begin > 0) { + return getLimitIterator(item, property, context, end, begin, limit, interval, op); + } else { + return getSimpleIterator(item, property, context); + } + } + + private IExtendedIterator getLimitIterator(URI item, URI property, URI context, Long end, Long begin, Long limit, Long interval, String op) { + return new NiceIterator<>() { + + Iterator itemPropertiesIterator; + Iterator itemIterator; + Row currentEntry; + PreparedStatement kvinDataSelectStatement, kvinPropertiesSelectStatement; + BoundStatement kvinDataSelectBoundStatement, kvinPropertiesSelectBoundStatement; + + @Override + public boolean hasNext() { + if (itemPropertiesIterator == null && property == null) readPropertySet(); + if (itemIterator == null || (!itemIterator.hasNext() && property == null && itemPropertiesIterator.hasNext())) { + readNextProperty(property != null); + } + return itemIterator.hasNext(); + } + + @Override + public KvinTuple next() { + KvinTuple tuple = null; + if (itemIterator.hasNext()) { + currentEntry = itemIterator.next(); + URI item = URIs.createURI(Objects.requireNonNull(currentEntry.getString("item"))); + URI property = URIs.createURI(Objects.requireNonNull(currentEntry.getString("property"))); + URI context = URIs.createURI(Objects.requireNonNull(currentEntry.getString("context"))); + long time = currentEntry.getLong("time"); + int seqNr = currentEntry.getInt("seqNr"); + Object value; + try { + value = decodeTupleValue(currentEntry.getByteBuffer("value").array()); + } catch (IOException e) { + throw new RuntimeException(e); + } + tuple = new KvinTuple(item, property, context, time, seqNr, value); + currentEntry = null; + } + return tuple; + } + + private void readNextProperty(boolean isSingleItemPropertyRead) { + // preparing statement + if (itemIterator == null || !itemIterator.hasNext()) { + if (kvinDataSelectStatement == null) { + String query = "SELECT * FROM " + keyspace + ".kvinData " + "WHERE item = ? AND timeRange IN(?) AND context = ? AND property = ? "; + if (begin >= 0 && end <= Long.MAX_VALUE) query = query + "AND time >= ? AND time <= ? "; + if (limit > 0) query = query + "limit ? ;"; + + kvinDataSelectStatement = session.prepare(query); + } + // binding + String currentProperty = isSingleItemPropertyRead ? property.toString() : itemPropertiesIterator.next().getString("property"); + if (limit > 0) { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, URIs.createURI(currentProperty), context), context.toString(), currentProperty, begin, end, limit.intValue()).setPageSize(1000); + } else { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, URIs.createURI(currentProperty), context), context.toString(), currentProperty, begin, end).setPageSize(1000); + } + // execution + itemIterator = session.execute(kvinDataSelectBoundStatement).iterator(); + } + } + + private void readPropertySet() { + kvinPropertiesSelectStatement = session.prepare("SELECT property FROM " + keyspace + ".kvinMetadata WHERE item = ? ;"); + kvinPropertiesSelectBoundStatement = kvinPropertiesSelectStatement.bind(item.toString()).setPageSize(1000); + itemPropertiesIterator = session.execute(kvinPropertiesSelectBoundStatement).iterator(); + } + + @Override + public void close() { + super.close(); + } + }; + } + + private IExtendedIterator getSimpleIterator(URI item, URI property, URI context) { + return new NiceIterator<>() { + Iterator entries; + PreparedStatement kvinDataSelectStatement; + BoundStatement kvinDataSelectBoundStatement; + + @Override + public boolean hasNext() { + if (entries == null) loadEntriesFromTable(); + return entries.hasNext(); + } + + @Override + public KvinTuple next() { + KvinTuple tuple = null; + Row currentEntry = entries.next(); + if (currentEntry != null) { + URI item = URIs.createURI(Objects.requireNonNull(currentEntry.getString("item"))); + URI property = URIs.createURI(Objects.requireNonNull(currentEntry.getString("property"))); + URI context = URIs.createURI(Objects.requireNonNull(currentEntry.getString("context"))); + long time = currentEntry.getLong("time"); + int seqNr = currentEntry.getInt("seqNr"); + Object value; + try { + value = decodeTupleValue(currentEntry.getByteBuffer("value").array()); + } catch (IOException e) { + throw new RuntimeException(e); + } + tuple = new KvinTuple(item, property, context, time, seqNr, value); + } + return tuple; + } + + private void loadEntriesFromTable() { + // preparing statement + if (kvinDataSelectStatement == null && property != null) { + kvinDataSelectStatement = session.prepare("SELECT * FROM " + keyspace + ".kvinData " + + "WHERE item = ? AND timeRange IN(?) AND context = ? AND property = ? ;"); + } else if (kvinDataSelectStatement == null && property == null) { + kvinDataSelectStatement = session.prepare("SELECT * FROM " + keyspace + ".kvinData " + + "WHERE item = ? AND timeRange IN(?) AND context = ? ;"); + } + + //binding + if (property != null) { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, property, context), context.toString(), property.toString()).setPageSize(1000); + } else if (property == null) { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, null, context), context.toString()).setPageSize(1000); + } + // execution + entries = session.execute(kvinDataSelectBoundStatement).iterator(); + } + + @Override + public void close() { + super.close(); + } + }; + } + + @Override + public IExtendedIterator properties(URI item) { + if (item == null) return NiceIterator.emptyIterator(); + return new NiceIterator<>() { + Iterator propertyIterator; + PreparedStatement kvinPropertiesSelectStatement; + BoundStatement kvinPropertiesSelectBoundStatement; + + @Override + public boolean hasNext() { + if (propertyIterator == null) readPropertySet(); + return propertyIterator.hasNext(); + } + + @Override + public URI next() { + return URIs.createURI(propertyIterator.next().getString("property")); + } + + private void readPropertySet() { + kvinPropertiesSelectStatement = session.prepare("SELECT property FROM " + keyspace + ".kvinMetadata WHERE item = ? ;"); + kvinPropertiesSelectBoundStatement = kvinPropertiesSelectStatement.bind(item.toString()).setPageSize(1000); + propertyIterator = session.execute(kvinPropertiesSelectBoundStatement).iterator(); + } + + @Override + public void close() { + super.close(); + } + }; + } + + private String getTimeRage(URI item, URI property, URI context) { + PreparedStatement kvinMetadataSelectStatement; + BoundStatement kvinMetadataBoundStatement; + + //preparing statements + if (item != null && property == null) { + kvinMetadataSelectStatement = session.prepare("SELECT timeRange FROM " + keyspace + ".kvinMetadata WHERE item = ? AND context = ?;"); + kvinMetadataBoundStatement = kvinMetadataSelectStatement.bind(item.toString(), context.toString()); + } else if (item != null && property != null) { + kvinMetadataSelectStatement = session.prepare("SELECT timeRange FROM " + keyspace + ".kvinMetadata WHERE item = ? AND context = ? AND property = ?;"); + kvinMetadataBoundStatement = kvinMetadataSelectStatement.bind(item.toString(), context.toString(), property.toString()); + } else { + throw new RuntimeException("Invalid parameters for timeRange lookup"); + } + + // converting query result to string for IN() operator value + Iterator result = session.execute(kvinMetadataBoundStatement).iterator(); + Set timeRangeSet = new HashSet<>(); + while (result.hasNext()) { + timeRangeSet.add(String.valueOf(result.next().getString("timeRange"))); + } + return String.join(", ", timeRangeSet); + } + + private byte[] encodeTupleValue(Object record) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + if (record instanceof Record) { + Record r = (Record) record; + byteArrayOutputStream.write("O".getBytes(StandardCharsets.UTF_8)); + byte[] propertyBytes = r.getProperty().toString().getBytes(); + byteArrayOutputStream.write((byte) propertyBytes.length); + byteArrayOutputStream.write(propertyBytes); + byteArrayOutputStream.write(encodeTupleValue(r.getValue())); + } else if (record instanceof URI) { + URI uri = (URI) record; + byte[] uriIndicatorBytes = "R".getBytes(StandardCharsets.UTF_8); + byte[] uriBytes = new byte[uri.toString().getBytes().length + 1]; + uriBytes[0] = (byte) uri.toString().getBytes().length; + System.arraycopy(uri.toString().getBytes(), 0, uriBytes, 1, uriBytes.length - 1); + + byte[] combinedBytes = new byte[uriIndicatorBytes.length + uriBytes.length]; + System.arraycopy(uriIndicatorBytes, 0, combinedBytes, 0, uriIndicatorBytes.length); + System.arraycopy(uriBytes, 0, combinedBytes, uriIndicatorBytes.length, uriBytes.length); + return combinedBytes; + } else { + return Values.encode(record); + } + return byteArrayOutputStream.toByteArray(); + } + + private Object decodeTupleValue(byte[] data) throws IOException { + Record r = null; + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) { + char type = (char) byteArrayInputStream.read(); + if (type == 'O') { + int propertyLength = byteArrayInputStream.read(); + String property = new String(byteArrayInputStream.readNBytes(propertyLength), StandardCharsets.UTF_8); + var value = decodeTupleValue(byteArrayInputStream.readAllBytes()); + if (r != null) { + r.append(new Record(URIs.createURI(property), value)); + } else { + r = new Record(URIs.createURI(property), value); + } + } else if (type == 'R') { + int uriLength = byteArrayInputStream.read(); + String uri = new String(byteArrayInputStream.readNBytes(uriLength), StandardCharsets.UTF_8); + return URIs.createURI(uri); + } else { + return Values.decode(data); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return r; + } + + @Override + public long delete(URI item, URI property, URI context, long end, long begin) { + return 0; + } + + @Override + public boolean delete(URI item) { + return false; + } + + @Override + public IExtendedIterator descendants(URI item) { + return null; + } + + @Override + public IExtendedIterator descendants(URI item, long limit) { + return null; + } + + @Override + public long approximateSize(URI item, URI property, URI context, long end, long begin) { + return 0; + } + + @Override + public void close() { + session.close(); + } + + public void dropKeyspace() { + session.execute("DROP KEYSPACE " + keyspace + " ;"); + } +} diff --git a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java new file mode 100644 index 00000000..67bd14ad --- /dev/null +++ b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java @@ -0,0 +1,174 @@ +package io.github.linkedfactory.kvin.scylladb; + +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.Row; +import io.github.linkedfactory.kvin.Kvin; +import io.github.linkedfactory.kvin.KvinTuple; +import net.enilink.commons.iterator.IExtendedIterator; +import net.enilink.commons.iterator.NiceIterator; +import net.enilink.komma.core.URI; +import net.enilink.komma.core.URIs; +import org.junit.*; + +import java.util.Iterator; +import java.util.Random; + +import static org.junit.Assert.*; + +public class KvinScyllaDbTest extends KvinScyllaDbTestBase { + + final static String keyspace = "kvindata"; + static CqlSession testSession; + static KvinScyllaDb kvinScyllaDb; + static boolean isConnectedToDB = false; + + @BeforeClass + public static void setup() { + try { + kvinScyllaDb = new KvinScyllaDb(keyspace); + testSession = CqlSession.builder().build(); + kvinScyllaDb.put(generateRandomKvinTuples(500, 10, 20)); + kvinScyllaDb.put(generateTuplesForLimitCheck()); + isConnectedToDB = true; + } catch (AllNodesFailedException exception) { + System.err.println("ScyllaDB local server instance connection failed...skipping tests."); + } + } + + @AfterClass + public static void tearDown() { + if (!isConnectedToDB) return; + kvinScyllaDb.dropKeyspace(); + } + + private static IExtendedIterator generateTuplesForLimitCheck() { + return new NiceIterator<>() { + + final int sampleSize = 10; + final int itemCounter = 500; + final Random random = new Random(200); + int tupleCount = 0; + int propertyCounter = 1; + int seqNr = 0; + long time = 1688115537L; + URI currentItem = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + itemCounter); + URI currentProperty = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/febric/" + propertyCounter + "/measured-point-1"); + + @Override + public boolean hasNext() { + return tupleCount < sampleSize; + } + + @Override + public KvinTuple next() { + KvinTuple tuple = null; + time = time + 3600; + + if (tupleCount < 5) { + currentItem = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + itemCounter); + currentProperty = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/febric/" + propertyCounter + "/measured-point-1"); + tuple = new KvinTuple(currentItem, currentProperty, Kvin.DEFAULT_CONTEXT, time, seqNr++, random.nextInt()); + } + + if (tupleCount == 5) { + seqNr = 0; + propertyCounter++; + } + + if (tupleCount >= 5) { + currentItem = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + itemCounter); + currentProperty = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/febric/" + propertyCounter + "/measured-point-1"); + tuple = new KvinTuple(currentItem, currentProperty, Kvin.DEFAULT_CONTEXT, time, seqNr++, random.nextFloat()); + } + tupleCount++; + return tuple; + } + + @Override + public void close() { + super.close(); + } + }; + } + + @Test + public void doSimplePut() { + if (!isConnectedToDB) return; + Iterator kvinDataResult = testSession.execute("SELECT * FROM " + keyspace + ".kvinData;").iterator(); + int kvinDataRowCount = 0; + while (kvinDataResult.hasNext()) { + kvinDataResult.next(); + kvinDataRowCount++; + } + assertTrue(kvinDataRowCount > 0); + + Iterator kvinMetadataResult = testSession.execute("SELECT * FROM " + keyspace + ".kvinMetadata;").iterator(); + int kvinMetadataRowCount = 0; + while (kvinMetadataResult.hasNext()) { + kvinMetadataResult.next(); + kvinMetadataRowCount++; + } + assertTrue(kvinMetadataRowCount > 0); + } + + @Test + public void doSimpleFetch() { + if (!isConnectedToDB) return; + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 1); + URI property = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/febric/" + 1 + "/measured-point-1"); + URI context = Kvin.DEFAULT_CONTEXT; + + IExtendedIterator singleTupleRead = kvinScyllaDb.fetch(item, property, context, 0); + assertEquals(1, singleTupleRead.toList().size()); + + IExtendedIterator multiTupleRead = kvinScyllaDb.fetch(item, null, context, 0); + assertEquals(9, multiTupleRead.toList().size()); + } + + @Test + public void doFetchWithLimit() { + if (!isConnectedToDB) return; + URI context = Kvin.DEFAULT_CONTEXT; + URI property = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/febric/" + 1 + "/measured-point-1"); + + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 500); + IExtendedIterator limitRead = kvinScyllaDb.fetch(item, null, context, 2); + assertEquals(4, limitRead.toList().size()); + + IExtendedIterator singleLimitRead = kvinScyllaDb.fetch(item, property, context, 3); + assertEquals(3, singleLimitRead.toList().size()); + } + + @Test + public void doFetchWithOp() { + if (!isConnectedToDB) return; + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 500); + URI context = Kvin.DEFAULT_CONTEXT; + + IExtendedIterator opRead = kvinScyllaDb.fetch(item, null, context, 1688151537L, 1688115537L, 2, 1000, "avg"); + int count = 0; + while (opRead.hasNext()) { + KvinTuple tuple = opRead.next(); + switch (count) { + case 2: + assertEquals(0.5867009162902832, tuple.value); + break; + case 3: + assertEquals(0.7427548170089722, tuple.value); + break; + } + count++; + } + assertEquals(4, count); + } + + @Test + public void doListProperties() { + if (!isConnectedToDB) return; + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 1); + IExtendedIterator properties = kvinScyllaDb.properties(item); + assertEquals(9, properties.toList().size()); + } + +}