diff --git a/bundles/io.github.linkedfactory.core/pom.xml b/bundles/io.github.linkedfactory.core/pom.xml index 66b28c69..9c2859be 100644 --- a/bundles/io.github.linkedfactory.core/pom.xml +++ b/bundles/io.github.linkedfactory.core/pom.xml @@ -132,6 +132,12 @@ + + com.scylladb + java-driver-core + 4.17.0.0 + + org.openjdk.jmh jmh-core diff --git a/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java new file mode 100644 index 00000000..0edd3d4b --- /dev/null +++ b/bundles/io.github.linkedfactory.core/src/main/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDb.java @@ -0,0 +1,470 @@ +package io.github.linkedfactory.core.kvin.scylladb; + +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.*; +import io.github.linkedfactory.core.kvin.Kvin; +import io.github.linkedfactory.core.kvin.KvinListener; +import io.github.linkedfactory.core.kvin.KvinTuple; +import io.github.linkedfactory.core.kvin.Record; +import io.github.linkedfactory.core.kvin.util.AggregatingIterator; +import io.github.linkedfactory.core.kvin.util.Values; +import net.enilink.commons.iterator.IExtendedIterator; +import net.enilink.commons.iterator.NiceIterator; +import net.enilink.komma.core.URI; +import net.enilink.komma.core.URIs; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.sql.SQLException; +import java.time.Instant; +import java.time.YearMonth; +import java.time.ZoneId; +import java.util.*; + +public class KvinScyllaDb implements Kvin { + CqlSession session; + String keyspace; + ArrayList listeners = new ArrayList<>(); + PreparedStatement kvinDataAllTableInsertStatement, KvinMetadataTableInsertStatement; + + public KvinScyllaDb(String keyspace) { + this.keyspace = keyspace; + initializeDatabase(); + } + + @Override + public boolean addListener(KvinListener listener) { + listeners.add(listener); + return true; + } + + @Override + public boolean removeListener(KvinListener listener) { + listeners.remove(listener); + return true; + } + + private void initializeDatabase() { + // will try to connect to the ScyllaDB server with default host (127.0.0.1) and port (9042) + session = CqlSession.builder().build(); + // keyspace creation (with development config) + session.execute("CREATE KEYSPACE IF NOT EXISTS " + keyspace + " WITH replication = {'class': 'SimpleStrategy', 'replication_factor' : 1};"); + //kvinData table creation + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".kvinData (" + + "item text," + + "timeRange text," + + "property text," + + "context text," + + "time bigint," + + "seqNr int," + + "value blob," + + "PRIMARY KEY((item, timeRange), context, property, time, seqNr));"); + //metadata table creation + session.execute("CREATE TABLE IF NOT EXISTS " + keyspace + ".kvinMetadata (" + + "item text," + + "property text," + + "context text," + + "timeRange text," + + "PRIMARY KEY(item, context, property));"); + + // put prepared statements + kvinDataAllTableInsertStatement = session.prepare("INSERT INTO " + keyspace + ".kvinData(" + + "item, timeRange, property, context, time, seqNr, value) " + + "VALUES (?, ?, ?, ?, ?, ?, ?);"); + KvinMetadataTableInsertStatement = session.prepare("INSERT INTO " + keyspace + ".kvinMetadata(" + + "item, property, context, timeRange) " + + "VALUES (?, ?, ?, ?) IF NOT EXISTS;"); + } + + @Override + public void put(KvinTuple... tuples) { + put(Arrays.asList(tuples)); + } + + @Override + public void put(Iterable tuples) { + try { + putInternal(tuples); + } catch (IOException | SQLException jsonProcessingException) { + throw new RuntimeException(jsonProcessingException); + } + } + + private void putInternal(Iterable tuples) throws IOException, SQLException { + boolean isEndOfAllTuples = false; + URI currentItem = null, previousItem = null; + + Set currentItemProperties = new HashSet<>(); + + BatchStatementBuilder kvinDataBatch = BatchStatement.builder(DefaultBatchType.LOGGED); + BatchStatementBuilder kvinMetadataBatch = BatchStatement.builder(DefaultBatchType.LOGGED); + + Iterator iterator = tuples.iterator(); + while (iterator.hasNext()) { + KvinTuple tuple = iterator.next(); + isEndOfAllTuples = !iterator.hasNext(); + + if (currentItem == null) { + currentItem = tuple.item; + previousItem = tuple.item; + } else { + currentItem = tuple.item; + } + + // building kvinData batch + YearMonth tupleYearMonth = getYearMonth(tuple.time); + String timeRange = tupleYearMonth.getMonthValue() + "" + tupleYearMonth.getYear(); + kvinDataBatch.addStatement(kvinDataAllTableInsertStatement.bind( + tuple.item.toString(), + timeRange, + tuple.property.toString(), + tuple.context.toString(), + tuple.time, + tuple.seqNr, + ByteBuffer.wrap(encodeTupleValue(tuple.value)))); + + // inserting metadata on item change + if (isEndOfAllTuples || !currentItem.equals(previousItem)) { + //batch insert into kvinData table + session.execute(kvinDataBatch.build()); + kvinDataBatch.clearStatements(); + + if (isEndOfAllTuples) { + currentItemProperties.add(tuple.property.toString()); + } + + for (String itemProperty : currentItemProperties) { + // building timeRange kvinMetadata batch + YearMonth propertyYearMonth = getYearMonth(tuple.time); + kvinMetadataBatch.addStatement(KvinMetadataTableInsertStatement.bind( + previousItem.toString(), + itemProperty, + tuple.context.toString(), + propertyYearMonth.getMonthValue() + "" + propertyYearMonth.getYear() + )); + } + // inserting metadata batch + session.execute(kvinMetadataBatch.build()); + kvinMetadataBatch.clearStatements(); + + currentItemProperties.clear(); + currentItem = null; + previousItem = null; + } else { + previousItem = currentItem; + } + currentItemProperties.add(tuple.property.toString()); + } + } + + private YearMonth getYearMonth(long timestamp) { + return YearMonth.from(Instant.ofEpochSecond(timestamp).atZone(ZoneId.systemDefault())); + } + + @Override + public IExtendedIterator fetch(URI item, URI property, URI context, long limit) { + return fetchInternal(item, property, context, Long.MAX_VALUE, 0L, limit, null, null); + } + + @Override + public IExtendedIterator fetch(URI item, URI property, URI context, long end, long begin, long limit, long interval, String op) { + IExtendedIterator internalResult = fetchInternal(item, property, context, end, begin, limit, interval, op); + if (op != null) { + internalResult = new AggregatingIterator<>(internalResult, interval, op.trim().toLowerCase(), limit) { + @Override + protected KvinTuple createElement(URI item, URI property, URI context, long time, int seqNr, Object value) { + return new KvinTuple(item, property, context, time, seqNr, value); + } + }; + } + return internalResult; + } + + private IExtendedIterator fetchInternal(URI item, URI property, URI context, Long end, Long begin, Long limit, Long interval, String op) { + if (item == null) return NiceIterator.emptyIterator(); + + if (limit > 0 || begin > 0) { + return getLimitIterator(item, property, context, end, begin, limit, interval, op); + } else { + return getSimpleIterator(item, property, context); + } + } + + private IExtendedIterator getLimitIterator(URI item, URI property, URI context, Long end, Long begin, Long limit, Long interval, String op) { + return new NiceIterator<>() { + + Iterator itemPropertiesIterator; + Iterator itemIterator; + Row currentEntry; + PreparedStatement kvinDataSelectStatement, kvinPropertiesSelectStatement; + BoundStatement kvinDataSelectBoundStatement, kvinPropertiesSelectBoundStatement; + + @Override + public boolean hasNext() { + if (itemPropertiesIterator == null && property == null) readPropertySet(); + if (itemIterator == null || (!itemIterator.hasNext() && property == null && itemPropertiesIterator.hasNext())) { + readNextProperty(property != null); + } + return itemIterator.hasNext(); + } + + @Override + public KvinTuple next() { + KvinTuple tuple = null; + if (itemIterator.hasNext()) { + currentEntry = itemIterator.next(); + URI item = URIs.createURI(Objects.requireNonNull(currentEntry.getString("item"))); + URI property = URIs.createURI(Objects.requireNonNull(currentEntry.getString("property"))); + URI context = URIs.createURI(Objects.requireNonNull(currentEntry.getString("context"))); + long time = currentEntry.getLong("time"); + int seqNr = currentEntry.getInt("seqNr"); + Object value; + try { + value = decodeTupleValue(currentEntry.getByteBuffer("value").array()); + } catch (IOException e) { + throw new RuntimeException(e); + } + tuple = new KvinTuple(item, property, context, time, seqNr, value); + currentEntry = null; + } + return tuple; + } + + private void readNextProperty(boolean isSingleItemPropertyRead) { + // preparing statement + if (itemIterator == null || !itemIterator.hasNext()) { + if (kvinDataSelectStatement == null) { + String query = "SELECT * FROM " + keyspace + ".kvinData " + "WHERE item = ? AND timeRange IN(?) AND context = ? AND property = ? "; + if (begin >= 0 && end <= Long.MAX_VALUE) query = query + "AND time >= ? AND time <= ? "; + if (limit > 0) query = query + "limit ? ;"; + + kvinDataSelectStatement = session.prepare(query); + } + // binding + String currentProperty = isSingleItemPropertyRead ? property.toString() : itemPropertiesIterator.next().getString("property"); + if (limit > 0) { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, URIs.createURI(currentProperty), context), context.toString(), currentProperty, begin, end, limit.intValue()).setPageSize(1000); + } else { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, URIs.createURI(currentProperty), context), context.toString(), currentProperty, begin, end).setPageSize(1000); + } + // execution + itemIterator = session.execute(kvinDataSelectBoundStatement).iterator(); + } + } + + private void readPropertySet() { + kvinPropertiesSelectStatement = session.prepare("SELECT property FROM " + keyspace + ".kvinMetadata WHERE item = ? ;"); + kvinPropertiesSelectBoundStatement = kvinPropertiesSelectStatement.bind(item.toString()).setPageSize(1000); + itemPropertiesIterator = session.execute(kvinPropertiesSelectBoundStatement).iterator(); + } + + @Override + public void close() { + super.close(); + } + }; + } + + private IExtendedIterator getSimpleIterator(URI item, URI property, URI context) { + return new NiceIterator<>() { + Iterator entries; + PreparedStatement kvinDataSelectStatement; + BoundStatement kvinDataSelectBoundStatement; + + @Override + public boolean hasNext() { + if (entries == null) loadEntriesFromTable(); + return entries.hasNext(); + } + + @Override + public KvinTuple next() { + KvinTuple tuple = null; + Row currentEntry = entries.next(); + if (currentEntry != null) { + URI item = URIs.createURI(Objects.requireNonNull(currentEntry.getString("item"))); + URI property = URIs.createURI(Objects.requireNonNull(currentEntry.getString("property"))); + URI context = URIs.createURI(Objects.requireNonNull(currentEntry.getString("context"))); + long time = currentEntry.getLong("time"); + int seqNr = currentEntry.getInt("seqNr"); + Object value; + try { + value = decodeTupleValue(currentEntry.getByteBuffer("value").array()); + } catch (IOException e) { + throw new RuntimeException(e); + } + tuple = new KvinTuple(item, property, context, time, seqNr, value); + } + return tuple; + } + + private void loadEntriesFromTable() { + // preparing statement + if (kvinDataSelectStatement == null && property != null) { + kvinDataSelectStatement = session.prepare("SELECT * FROM " + keyspace + ".kvinData " + + "WHERE item = ? AND timeRange IN(?) AND context = ? AND property = ? ;"); + } else if (kvinDataSelectStatement == null && property == null) { + kvinDataSelectStatement = session.prepare("SELECT * FROM " + keyspace + ".kvinData " + + "WHERE item = ? AND timeRange IN(?) AND context = ? ;"); + } + + //binding + if (property != null) { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, property, context), context.toString(), property.toString()).setPageSize(1000); + } else if (property == null) { + kvinDataSelectBoundStatement = kvinDataSelectStatement.bind(item.toString(), getTimeRage(item, null, context), context.toString()).setPageSize(1000); + } + // execution + entries = session.execute(kvinDataSelectBoundStatement).iterator(); + } + + @Override + public void close() { + super.close(); + } + }; + } + + @Override + public IExtendedIterator properties(URI item) { + if (item == null) return NiceIterator.emptyIterator(); + return new NiceIterator<>() { + Iterator propertyIterator; + PreparedStatement kvinPropertiesSelectStatement; + BoundStatement kvinPropertiesSelectBoundStatement; + + @Override + public boolean hasNext() { + if (propertyIterator == null) readPropertySet(); + return propertyIterator.hasNext(); + } + + @Override + public URI next() { + return URIs.createURI(propertyIterator.next().getString("property")); + } + + private void readPropertySet() { + kvinPropertiesSelectStatement = session.prepare("SELECT property FROM " + keyspace + ".kvinMetadata WHERE item = ? ;"); + kvinPropertiesSelectBoundStatement = kvinPropertiesSelectStatement.bind(item.toString()).setPageSize(1000); + propertyIterator = session.execute(kvinPropertiesSelectBoundStatement).iterator(); + } + + @Override + public void close() { + super.close(); + } + }; + } + + private String getTimeRage(URI item, URI property, URI context) { + PreparedStatement kvinMetadataSelectStatement; + BoundStatement kvinMetadataBoundStatement; + + //preparing statements + if (item != null && property == null) { + kvinMetadataSelectStatement = session.prepare("SELECT timeRange FROM " + keyspace + ".kvinMetadata WHERE item = ? AND context = ?;"); + kvinMetadataBoundStatement = kvinMetadataSelectStatement.bind(item.toString(), context.toString()); + } else if (item != null && property != null) { + kvinMetadataSelectStatement = session.prepare("SELECT timeRange FROM " + keyspace + ".kvinMetadata WHERE item = ? AND context = ? AND property = ?;"); + kvinMetadataBoundStatement = kvinMetadataSelectStatement.bind(item.toString(), context.toString(), property.toString()); + } else { + throw new RuntimeException("Invalid parameters for timeRange lookup"); + } + + // converting query result to string for IN() operator value + Iterator result = session.execute(kvinMetadataBoundStatement).iterator(); + Set timeRangeSet = new HashSet<>(); + while (result.hasNext()) { + timeRangeSet.add(String.valueOf(result.next().getString("timeRange"))); + } + return String.join(", ", timeRangeSet); + } + + private byte[] encodeTupleValue(Object record) throws IOException { + ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream(); + if (record instanceof Record) { + Record r = (Record) record; + byteArrayOutputStream.write("O".getBytes(StandardCharsets.UTF_8)); + byte[] propertyBytes = r.getProperty().toString().getBytes(); + byteArrayOutputStream.write((byte) propertyBytes.length); + byteArrayOutputStream.write(propertyBytes); + byteArrayOutputStream.write(encodeTupleValue(r.getValue())); + } else if (record instanceof URI) { + URI uri = (URI) record; + byte[] uriIndicatorBytes = "R".getBytes(StandardCharsets.UTF_8); + byte[] uriBytes = new byte[uri.toString().getBytes().length + 1]; + uriBytes[0] = (byte) uri.toString().getBytes().length; + System.arraycopy(uri.toString().getBytes(), 0, uriBytes, 1, uriBytes.length - 1); + + byte[] combinedBytes = new byte[uriIndicatorBytes.length + uriBytes.length]; + System.arraycopy(uriIndicatorBytes, 0, combinedBytes, 0, uriIndicatorBytes.length); + System.arraycopy(uriBytes, 0, combinedBytes, uriIndicatorBytes.length, uriBytes.length); + return combinedBytes; + } else { + return Values.encode(record); + } + return byteArrayOutputStream.toByteArray(); + } + + private Object decodeTupleValue(byte[] data) throws IOException { + Record r = null; + try (ByteArrayInputStream byteArrayInputStream = new ByteArrayInputStream(data)) { + char type = (char) byteArrayInputStream.read(); + if (type == 'O') { + int propertyLength = byteArrayInputStream.read(); + String property = new String(byteArrayInputStream.readNBytes(propertyLength), StandardCharsets.UTF_8); + var value = decodeTupleValue(byteArrayInputStream.readAllBytes()); + if (r != null) { + r.append(new Record(URIs.createURI(property), value)); + } else { + r = new Record(URIs.createURI(property), value); + } + } else if (type == 'R') { + int uriLength = byteArrayInputStream.read(); + String uri = new String(byteArrayInputStream.readNBytes(uriLength), StandardCharsets.UTF_8); + return URIs.createURI(uri); + } else { + return Values.decode(data); + } + } catch (IOException e) { + throw new RuntimeException(e); + } + return r; + } + + @Override + public long delete(URI item, URI property, URI context, long end, long begin) { + return 0; + } + + @Override + public boolean delete(URI item) { + return false; + } + + @Override + public IExtendedIterator descendants(URI item) { + return null; + } + + @Override + public IExtendedIterator descendants(URI item, long limit) { + return null; + } + + @Override + public long approximateSize(URI item, URI property, URI context, long end, long begin) { + return 0; + } + + @Override + public void close() { + if (session != null) { + session.close(); + } + } +} diff --git a/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java new file mode 100644 index 00000000..d1fc92b1 --- /dev/null +++ b/bundles/io.github.linkedfactory.core/src/test/java/io/github/linkedfactory/core/kvin/scylladb/KvinScyllaDbTest.java @@ -0,0 +1,133 @@ +package io.github.linkedfactory.core.kvin.scylladb; + +import com.datastax.oss.driver.api.core.AllNodesFailedException; +import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.cql.Row; +import io.github.linkedfactory.core.kvin.Kvin; +import io.github.linkedfactory.core.kvin.KvinTuple; +import io.github.linkedfactory.core.kvin.util.KvinTupleGenerator; +import net.enilink.commons.iterator.IExtendedIterator; +import net.enilink.komma.core.URI; +import net.enilink.komma.core.URIs; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.util.Iterator; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +public class KvinScyllaDbTest { + + final static String keyspace = "test"; + static CqlSession testSession; + static KvinScyllaDb kvinScyllaDb; + static boolean isConnectedToDB = false; + static KvinTupleGenerator tupleGenerator; + + @BeforeClass + public static void setup() { + try { + testSession = CqlSession.builder().build(); + testSession.execute("DROP KEYSPACE IF EXISTS " + keyspace + " ;"); + kvinScyllaDb = new KvinScyllaDb(keyspace); + tupleGenerator = new KvinTupleGenerator(); + kvinScyllaDb.put(tupleGenerator.generate(1696197600000L, 500, 10, 10, + "http://localhost:8080/linkedfactory/demofactory/{}", + "http://example.org/{}")); + isConnectedToDB = true; + } catch (AllNodesFailedException exception) { + exception.printStackTrace(); + System.err.println("ScyllaDB local server instance connection failed...skipping tests."); + } + } + + @AfterClass + public static void tearDown() { + if (!isConnectedToDB) return; + testSession.execute("DROP KEYSPACE IF EXISTS " + keyspace + " ;"); + testSession.close(); + kvinScyllaDb.close(); + } + + @Test + public void doSimplePut() { + if (!isConnectedToDB) return; + Iterator kvinDataResult = testSession.execute("SELECT * FROM " + keyspace + ".kvinData;").iterator(); + int kvinDataRowCount = 0; + while (kvinDataResult.hasNext()) { + kvinDataResult.next(); + kvinDataRowCount++; + } + assertEquals(500 * 10 * 10, kvinDataRowCount); + + Iterator kvinMetadataResult = testSession.execute("SELECT * FROM " + keyspace + ".kvinMetadata;").iterator(); + int kvinMetadataRowCount = 0; + while (kvinMetadataResult.hasNext()) { + kvinMetadataResult.next(); + kvinMetadataRowCount++; + } + assertTrue(kvinMetadataRowCount > 0); + } + + @Test + public void doSimpleFetch() { + if (!isConnectedToDB) return; + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 1); + URI property = URIs.createURI("http://example.org/" + 1); + URI context = Kvin.DEFAULT_CONTEXT; + + IExtendedIterator valuesSingleProperty = kvinScyllaDb.fetch(item, property, context, 0); + assertEquals(10, valuesSingleProperty.toList().size()); + + IExtendedIterator valuesMultipleProperties = kvinScyllaDb.fetch(item, null, context, 0); + assertEquals(100, valuesMultipleProperties.toList().size()); + } + + @Test + public void doFetchWithLimit() { + if (!isConnectedToDB) return; + URI context = Kvin.DEFAULT_CONTEXT; + URI property = URIs.createURI("http://example.org/" + 1); + + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 400); + IExtendedIterator limitRead = kvinScyllaDb.fetch(item, null, context, 2); + assertEquals(2 * 10, limitRead.toList().size()); + + IExtendedIterator singleLimitRead = kvinScyllaDb.fetch(item, property, context, 3); + assertEquals(3, singleLimitRead.toList().size()); + } + + @Test + public void doFetchWithOp() { + if (!isConnectedToDB) return; + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 400); + URI context = Kvin.DEFAULT_CONTEXT; + + IExtendedIterator opRead = kvinScyllaDb.fetch(item, null, context, 1688151537L, 1688115537L, 2, 1000, "avg"); + int count = 0; + while (opRead.hasNext()) { + KvinTuple tuple = opRead.next(); + switch (count) { + case 2: + assertEquals(0.5867009162902832, tuple.value); + break; + case 3: + assertEquals(0.7427548170089722, tuple.value); + break; + } + count++; + } + assertEquals(4, count); + } + + @Test + public void doListProperties() { + if (!isConnectedToDB) return; + URI item = URIs.createURI("http://localhost:8080/linkedfactory/demofactory/" + 1); + IExtendedIterator properties = kvinScyllaDb.properties(item); + assertEquals(10, properties.toList().size()); + } + +} diff --git a/pom.xml b/pom.xml index f6b48ceb..8caf37fb 100644 --- a/pom.xml +++ b/pom.xml @@ -206,16 +206,14 @@ - - - - org.slf4j - slf4j-simple - 1.7.30 - test - - - + + + org.slf4j + slf4j-simple + 1.7.30 + test + +