Skip to content

Commit

Permalink
Extend records test with parallel execution and more items.
Browse files Browse the repository at this point in the history
kenwenzel committed Mar 18, 2024
1 parent 0e869f0 commit 3b778ca
Showing 2 changed files with 47 additions and 15 deletions.
Original file line number Diff line number Diff line change
@@ -24,9 +24,9 @@ import net.enilink.komma.core.{URI, URIs}
import org.iq80.leveldb.impl.Iq80DBFactory.{bytes, factory}
import org.iq80.leveldb.{CompressionType, DB, Options, Range, WriteBatch, WriteOptions}

import java.io.{ByteArrayOutputStream, File}
import java.io.{ByteArrayOutputStream, File, IOException, UncheckedIOException}
import java.nio.{ByteBuffer, ByteOrder}
import java.util
import java.{io, util}
import java.util.concurrent.CopyOnWriteArraySet
import java.util.concurrent.locks.{ReadWriteLock, ReentrantReadWriteLock}
import scala.collection.mutable
@@ -695,7 +695,17 @@ class KvinLevelDb(path: File) extends KvinLevelDbBase with Kvin {
}

override def close(): Unit = {
ids.close()
values.close()
var errors: List[IOException] = Nil
try {
ids.close()
} catch {
case e: IOException => errors ::= e
}
try {
values.close()
} catch {
case e: IOException => errors ::= e
}
errors.headOption.foreach(e => throw new UncheckedIOException(e))
}
}
Original file line number Diff line number Diff line change
@@ -22,6 +22,7 @@ import org.junit.{After, Before, Test}

import java.io.File
import scala.util.Random
import scala.jdk.CollectionConverters._

/**
* Tests for the LevelDB-based time series store.
@@ -34,34 +35,55 @@ class KvinLevelDbTest extends KvinTestBase {
store = new KvinLevelDb(storeDirectory)
}

def recreateStore: Unit = {
store.close()
store = new KvinLevelDb(storeDirectory)
}

@After
def closeStore {
store.close
store = null
deleteDirectory(storeDirectory.toPath)
}

@Test
def testEvents {
def testRecordsParallel {
val ctx = null

var rand = new Random(seed)

val points = 100
val pointDistance: Long = 100
val startTime: Long = pointDistance * points
for (nr <- 1 to 2) {
val items = 50
// insert items in parallel
(1 to items).asJava.stream().parallel().forEach((nr: Int) => {
val uri = itemUri(nr)
for (i <- 0 until points) {
val time = startTime - i * pointDistance
val value = new Record(URIs.createURI("prop:itemNr"), nr).append(new Record(URIs.createURI("prop:pointNr"), i)).append(new Record(URIs.createURI("prop:value"), URIs.createURI("some:value")))
val value = new Record(URIs.createURI("prop:itemNr"), nr)
.append(new Record(URIs.createURI("prop:pointNr"), i))
.append(new Record(URIs.createURI("prop:value"), URIs.createURI("some:value#" + time)))
// println("ADD: " + new KvinTuple(uri, valueProperty, ctx, time, value))
store.put(new KvinTuple(uri, valueProperty, ctx, time, value))
}
}
var interval = 2 * pointDistance
// println(store.fetch(itemUri(1), valueProperty, ctx, startTime, startTime - interval, 0, 0, null).toList)
})

// close and re-open store
recreateStore

assertEquals(3, store.fetch(itemUri(1), valueProperty, ctx, startTime, startTime - interval, 0, 0, null).toList.size)
val queryPoints = points / 2
val interval = (queryPoints - 1) * pointDistance
for (nr <- 1 to items) {
assertEquals(queryPoints, store.fetch(itemUri(nr), valueProperty, ctx,
startTime, startTime - interval, 0, 0, null).toList.size)

store.fetch(itemUri(nr), valueProperty, ctx, startTime, startTime - interval, 0, 0, null)
.iterator.asScala
.foreach { tuple =>
val record = tuple.value.asInstanceOf[Record]
assertEquals(URIs.createURI("some:value#" + tuple.time),
record.first(URIs.createURI("prop:value")).getValue())
}
}
}
}

0 comments on commit 3b778ca

Please sign in to comment.