From a3eb4dee52cbbe70b8243f0a969d03825cfebaa9 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Tue, 31 Dec 2024 17:38:17 -0600 Subject: [PATCH] Migration complete --- .../test/scala/spec/AbstractAsyncSpec.scala | 254 ------------------ all/src/test/scala/spec/AirportSpec.scala | 120 +++++---- all/src/test/scala/spec/H2AsyncSpec.scala | 8 - build.sbt | 20 +- core/src/main/scala/lightdb/LightDB.scala | 2 +- .../scala/lightdb/backup/DatabaseBackup.scala | 2 +- .../lightdb/backup/DatabaseRestore.scala | 2 +- .../scala/lightdb/collection/Collection.scala | 2 +- .../lightdb/doc/MaterializedBatchModel.scala | 2 +- .../lightdb/transaction/Transaction.scala | 2 +- .../lightdb/trigger/CollectionTriggers.scala | 2 +- .../lucene/LucenePaginatedIterator.scala | 2 +- .../main/scala/lightdb/mapdb/MapDBStore.scala | 43 +-- .../main/scala/lightdb/redis/RedisStore.scala | 39 +-- .../scala/lightdb/rocksdb/RocksDBStore.scala | 35 +-- sql/src/main/scala/lightdb/sql/SQLState.scala | 37 +-- sql/src/main/scala/lightdb/sql/SQLStore.scala | 59 ++-- .../sql/connect/ConnectionManager.scala | 3 +- .../sql/connect/DBCPConnectionManager.scala | 3 +- .../sql/connect/HikariConnectionManager.scala | 3 +- .../sql/connect/SingleConnectionManager.scala | 5 +- .../main/scala/lightdb/sql/SQLiteStore.scala | 7 +- 22 files changed, 210 insertions(+), 442 deletions(-) delete mode 100644 all/src/test/scala/spec/AbstractAsyncSpec.scala delete mode 100644 all/src/test/scala/spec/H2AsyncSpec.scala diff --git a/all/src/test/scala/spec/AbstractAsyncSpec.scala b/all/src/test/scala/spec/AbstractAsyncSpec.scala deleted file mode 100644 index 70993c7c..00000000 --- a/all/src/test/scala/spec/AbstractAsyncSpec.scala +++ /dev/null @@ -1,254 +0,0 @@ -package spec - -import fabric.rw._ -import lightdb.async.{AsyncCollection, AsyncDatabaseUpgrade, AsyncLightDB, AsyncStoredValue} -import lightdb.collection.Collection -import lightdb.doc.{Document, DocumentModel, JsonConversion} -import lightdb.feature.DBFeatureKey -import lightdb.store.StoreManager -import lightdb.upgrade.DatabaseUpgrade -import lightdb.{Id, LightDB, Sort, StoredValue} -import org.scalatest.matchers.should.Matchers -import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec} -import perfolation.double2Implicits -import rapid.Task - -import java.nio.file.Path - -abstract class AbstractAsyncSpec extends AnyWordSpec with Matchers { spec => - protected def aggregationSupported: Boolean = true - - private val adam = Person("Adam", 21, Person.id("adam")) - private val brenda = Person("Brenda", 11, Person.id("brenda")) - private val charlie = Person("Charlie", 35, Person.id("charlie")) - private val diana = Person("Diana", 15, Person.id("diana")) - private val evan = Person("Evan", 53, Person.id("evan")) - private val fiona = Person("Fiona", 23, Person.id("fiona")) - private val greg = Person("Greg", 12, Person.id("greg")) - private val hanna = Person("Hanna", 62, Person.id("hanna")) - private val ian = Person("Ian", 89, Person.id("ian")) - private val jenna = Person("Jenna", 4, Person.id("jenna")) - private val kevin = Person("Kevin", 33, Person.id("kevin")) - private val linda = Person("Linda", 72, Person.id("linda")) - private val mike = Person("Mike", 42, Person.id("mike")) - private val nancy = Person("Nancy", 22, Person.id("nancy")) - private val oscar = Person("Oscar", 21, Person.id("oscar")) - private val penny = Person("Penny", 2, Person.id("penny")) - private val quintin = Person("Quintin", 99, Person.id("quintin")) - private val ruth = Person("Ruth", 102, Person.id("ruth")) - private val sam = Person("Sam", 81, Person.id("sam")) - private val tori = Person("Tori", 30, Person.id("tori")) - private val uba = Person("Uba", 21, Person.id("uba")) - private val veronica = Person("Veronica", 13, Person.id("veronica")) - private val wyatt = Person("Wyatt", 30, Person.id("wyatt")) - private val xena = Person("Xena", 63, Person.id("xena")) - private val yuri = Person("Yuri", 30, Person.id("yuri")) - private val zoey = Person("Zoey", 101, Person.id("zoey")) - - private val names = List( - adam, brenda, charlie, diana, evan, fiona, greg, hanna, ian, jenna, kevin, linda, mike, nancy, oscar, penny, - quintin, ruth, sam, tori, uba, veronica, wyatt, xena, yuri, zoey - ) - - private var features = Map.empty[DBFeatureKey[Any], Any] - protected def addFeature[T](key: DBFeatureKey[T], value: T): Unit = - features += key.asInstanceOf[DBFeatureKey[Any]] -> value - - protected lazy val specName: String = getClass.getSimpleName - - protected var db: DB = new DB - - specName should { - "initialize the database" in { - db.init.map(b => b should be(true)).sync() - } - "verify the database is empty" in { - db.people.transaction { implicit transaction => - db.people.count.map(c => c should be(0)) - }.sync() - } - "insert the records" in { - db.people.transaction { implicit transaction => - db.people.insert(names).map(_ should not be None) - }.sync() - } - "retrieve the first record by _id -> id" in { - db.people.transaction { implicit transaction => - db.people(_._id -> adam._id).map(_ should be(adam)) - }.sync() - } - "retrieve the first record by id" in { - db.people.transaction { implicit transaction => - db.people(adam._id).map(_ should be(adam)) - }.sync() - } - "count the records in the database" in { - db.people.transaction { implicit transaction => - db.people.count.map(_ should be(26)) - }.sync() - } - "stream the ids in the database" in { - db.people.transaction { implicit transaction => - db.people.query.search.id.flatMap(_.stream.toList).map(_.toSet).map { ids => - ids should be(names.map(_._id).toSet) - } - }.sync() - } - "stream the records in the database" in { - db.people.transaction { implicit transaction => - db.people.stream.toList.map(_.map(_.age).toSet).map { ages => - ages should be(Set(101, 42, 89, 102, 53, 13, 2, 22, 12, 81, 35, 63, 99, 23, 30, 4, 21, 33, 11, 72, 15, 62)) - } - }.sync() - } - "query with aggregate functions" in { - if (aggregationSupported) { - db.people.transaction { implicit transaction => - db.people.query - .aggregate(p => List( - p.age.min, - p.age.max, - p.age.avg, - p.age.sum - )) - .toList - .map { list => - list.map(m => m(_.age.min)).toSet should be(Set(2)) - list.map(m => m(_.age.max)).toSet should be(Set(102)) - list.map(m => m(_.age.avg).f(f = 6)).toSet should be(Set("41.807692")) - list.map(m => m(_.age.sum)).toSet should be(Set(1087)) - } - }.sync() - } else { - succeed - } - } - "search by age range" in { - db.people.transaction { implicit transaction => - db.people.query.filter(_.age BETWEEN 19 -> 22).search.id.flatMap(_.stream.toList).map { ids => - ids.toSet should be(Set(adam._id, nancy._id, oscar._id, uba._id)) - } - }.sync() - } - "sort by age" in { - db.people.transaction { implicit transaction => - db.people.query.sort(Sort.ByField(Person.age).descending).search.docs.flatMap(_.stream.toList).map { people => - people.map(_.name).take(3) should be(List("Ruth", "Zoey", "Quintin")) - } - }.sync() - } - "group by age" in { - db.people.transaction { implicit transaction => - db.people.query.grouped(_.age).toList.map { list => - list.map(_._1) should be(List(2, 4, 11, 12, 13, 15, 21, 22, 23, 30, 33, 35, 42, 53, 62, 63, 72, 81, 89, 99, 101, 102)) - list.map(_._2.map(_.name).toSet) should be(List( - Set("Penny"), Set("Jenna"), Set("Brenda"), Set("Greg"), Set("Veronica"), Set("Diana"), - Set("Adam", "Uba", "Oscar"), Set("Nancy"), Set("Fiona"), Set("Tori", "Yuri", "Wyatt"), Set("Kevin"), - Set("Charlie"), Set("Mike"), Set("Evan"), Set("Hanna"), Set("Xena"), Set("Linda"), Set("Sam"), Set("Ian"), - Set("Quintin"), Set("Zoey"), Set("Ruth") - )) - } - }.sync() - } - "delete some records" in { - db.people.transaction { implicit transaction => - for { - b1 <- db.people.delete(_._id -> linda._id) - b2 <- db.people.delete(_._id -> yuri._id) - } yield (b1, b2) should be((true, true)) - }.sync() - } - "verify the records were deleted" in { - db.people.transaction { implicit transaction => - db.people.count.map(_ should be(24)) - }.sync() - } - "modify a record" in { - db.people.transaction { implicit transaction => - db.people.modify(adam._id) { - case Some(p) => Task(Some(p.copy(name = "Allan"))) - case None => fail("Adam was not found!") - } - }.map { - case Some(p) => p.name should be("Allan") - case None => fail("Allan was not returned!") - }.sync() - } - "verify the record has been renamed" in { - db.people.transaction { implicit transaction => - db.people(_._id -> adam._id).map(_.name should be("Allan")) - }.sync() - } - "verify start time has been set" in { - db.startTime.get.map(_ should be > 0L).sync() - } - "dispose the database before creating a new instance" in { - db.dispose().map(_ should be(true)).sync() - } - "prepare a new instance" in { - db = new DB - db.init.map(_ should be(true)).sync() - } - "query the database to verify records were persisted properly" in { - db.people.transaction { implicit transaction => - db.people.stream.toList.map(_.map(_.name).toSet).map(_ should be(Set( - "Tori", "Ruth", "Nancy", "Jenna", "Hanna", "Wyatt", "Diana", "Ian", "Quintin", "Uba", "Oscar", "Kevin", - "Penny", "Charlie", "Evan", "Sam", "Mike", "Brenda", "Zoey", "Allan", "Xena", "Fiona", "Greg", "Veronica" - ))) - }.sync() - } - "truncate the collection" in { - db.people.transaction { implicit transaction => - db.people.truncate().map(_ should be(24)) - }.sync() - } - "verify the collection is empty" in { - db.people.transaction { implicit transaction => - db.people.count.map(_ should be(0)) - }.sync() - } - "dispose the database" in { - db.dispose().map(_ should be(true)).sync() - } - } - - def storeManager: StoreManager - - class DB extends AsyncLightDB { - spec.features.foreach { - case (key, value) => - put(key, value) - } - - lazy val directory: Option[Path] = Some(Path.of(s"db/$specName")) - - val startTime: AsyncStoredValue[Long] = stored[Long]("startTime", -1L) - - val people: AsyncCollection[Person, Person.type] = collection(Person) - - override def storeManager: StoreManager = spec.storeManager - - override def upgrades: List[AsyncDatabaseUpgrade] = List(InitialSetupUpgrade) - } - - case class Person(name: String, age: Int, _id: Id[Person] = Person.id()) extends Document[Person] - - object Person extends DocumentModel[Person] with JsonConversion[Person] { - implicit val rw: RW[Person] = RW.gen - - val name: F[String] = field("name", (p: Person) => p.name) - val age: F[Int] = field.index("age", (p: Person) => p.age) - } - - object InitialSetupUpgrade extends AsyncDatabaseUpgrade { - override def applyToNew: Boolean = true - - override def blockStartup: Boolean = true - - override def alwaysRun: Boolean = false - - override def upgrade(ldb: AsyncLightDB): Task[Unit] = { - db.startTime.set(System.currentTimeMillis()).unit - } - } -} diff --git a/all/src/test/scala/spec/AirportSpec.scala b/all/src/test/scala/spec/AirportSpec.scala index 37195353..6bb12c0d 100644 --- a/all/src/test/scala/spec/AirportSpec.scala +++ b/all/src/test/scala/spec/AirportSpec.scala @@ -12,20 +12,23 @@ import lightdb.store.{StoreManager, StoreMode} import lightdb.upgrade.DatabaseUpgrade import org.scalatest.matchers.should.Matchers import org.scalatest.wordspec.{AnyWordSpec, AsyncWordSpec} +import rapid.{AsyncTaskSpec, Task} import java.nio.file.{Path, Paths} import scala.collection.mutable import scala.collection.mutable.ListBuffer import scala.io.Source +import scribe.{rapid => logger} @EmbeddedTest -class AirportSpec extends AnyWordSpec with Matchers { +class AirportSpec extends AsyncWordSpec with AsyncTaskSpec with Matchers { "AirportSpec" should { "initialize the database" in { - db.init + DB.init.succeed } "have two collections" in { DB.collections.map(_.name).toSet should be(Set("_backingStore", "Flight", "Airport")) + Task.unit.succeed } // "query VIP airports" in { // Airport.vipKeys.values.map { keys => @@ -34,28 +37,33 @@ class AirportSpec extends AnyWordSpec with Matchers { // } "query JFK airport" in { val jfk = Airport.id("JFK") - val airport = DB.airports.t(jfk) - airport.name should be("John F Kennedy Intl") + DB.airports.t(jfk).map { airport => + airport.name should be("John F Kennedy Intl") + } } "query the airports by id filter" in { val keys = List("JFK", "LAX") DB.airports.transaction { implicit transaction => - val airports = DB.airports.query + DB.airports.query .filter(_._id IN keys.map(Airport.id)) .toList - airports.map(_.name).toSet should be(Set("John F Kennedy Intl", "Los Angeles International")) + .map { airports => + airports.map(_.name).toSet should be(Set("John F Kennedy Intl", "Los Angeles International")) + } } } "query by airport name" in { DB.airports.transaction { implicit transaction => - val airport = DB.airports.query + DB.airports.query .filter(_.name === "John F Kennedy Intl") - .one - airport._id should be(Airport.id("JFK")) + .first + .map { airport => + airport._id should be(Airport.id("JFK")) + } } } "count all the airports" in { - DB.airports.t.count should be(3375) + DB.airports.t.count.map(_ should be(3375)) } // "validate airport references" in { // Flight.airportReferences.facet(Airport.id("JFK")).map { facet => @@ -103,7 +111,7 @@ class AirportSpec extends AnyWordSpec with Matchers { // TODO: Test ValueStore // TODO: the other stuff "dispose" in { - DB.dispose() + DB.dispose().succeed } } @@ -198,8 +206,34 @@ class AirportSpec extends AnyWordSpec with Matchers { override def blockStartup: Boolean = true override def alwaysRun: Boolean = false - override def upgrade(db: LightDB): Unit = { - val airports = csv2Iterator("airports.csv").map { d => + def csv2Iterator(fileName: String): Iterator[Vector[String]] = { + val source = Source.fromURL(getClass.getClassLoader.getResource(fileName)) + val iterator = source.getLines() + iterator.next() // Skip heading + iterator.map { s => + var open = false + val entries = ListBuffer.empty[String] + val b = new mutable.StringBuilder + s.foreach { c => + if (c == '"') { + open = !open + } else if (c == ',' && !open) { + if (b.nonEmpty) { + entries += b.toString().trim + b.clear() + } + } else { + b.append(c) + } + } + if (b.nonEmpty) entries += b.toString().trim + entries.toVector + } + } + + override def upgrade(db: LightDB): Task[Unit] = for { + _ <- logger.info("Data Importing...") + airports = rapid.Stream.fromIterator(Task(csv2Iterator("airports.csv").map { d => Airport( name = d(1), city = d(2), @@ -210,17 +244,16 @@ class AirportSpec extends AnyWordSpec with Matchers { vip = d(7).toBoolean, _id = Airport.id(d(0)) ) + })) + insertedAirports <- DB.airports.transaction { implicit transaction => + airports + .evalForeach { airport => + DB.airports.insert(airport).unit + } + .count } - var insertedAirports = 0 - DB.airports.transaction { implicit transaction => - airports.foreach { airport => - DB.airports.insert(airport) - insertedAirports += 1 - } - } - insertedAirports should be(3375) - - val flights = csv2Iterator("flights.csv").map { d => + _ = insertedAirports should be(3375) + flights = rapid.Stream.fromIterator(Task(csv2Iterator("flights.csv").map { d => Flight( from = Airport.id(d(0)), to = Airport.id(d(1)), @@ -237,40 +270,15 @@ class AirportSpec extends AnyWordSpec with Matchers { tailNum = d(12), distance = d(13).toInt ) - } - var insertedFlights = 0 - DB.flights.transaction { implicit transaction => - flights.foreach { flight => - DB.flights.insert(flight) - insertedFlights += 1 - } - } - insertedFlights should be(286463) - } - - def csv2Iterator(fileName: String): Iterator[Vector[String]] = { - val source = Source.fromURL(getClass.getClassLoader.getResource(fileName)) - val iterator = source.getLines() - iterator.next() // Skip heading - iterator.map { s => - var open = false - val entries = ListBuffer.empty[String] - val b = new mutable.StringBuilder - s.foreach { c => - if (c == '"') { - open = !open - } else if (c == ',' && !open) { - if (b.nonEmpty) { - entries += b.toString().trim - b.clear() - } - } else { - b.append(c) + })) + insertedFlights <- DB.flights.transaction { implicit transaction => + flights + .evalForeach { flight => + DB.flights.insert(flight).unit } - } - if (b.nonEmpty) entries += b.toString().trim - entries.toVector + .count } - } + _ = insertedFlights should be(286463) + } yield () } } diff --git a/all/src/test/scala/spec/H2AsyncSpec.scala b/all/src/test/scala/spec/H2AsyncSpec.scala deleted file mode 100644 index b6acf817..00000000 --- a/all/src/test/scala/spec/H2AsyncSpec.scala +++ /dev/null @@ -1,8 +0,0 @@ -package spec -import lightdb.h2.H2Store -import lightdb.store.StoreManager - -@EmbeddedTest -class H2AsyncSpec extends AbstractAsyncSpec { - override lazy val storeManager: StoreManager = H2Store -} \ No newline at end of file diff --git a/build.sbt b/build.sbt index e08836e9..807d891e 100644 --- a/build.sbt +++ b/build.sbt @@ -15,7 +15,7 @@ val developerURL: String = "https://matthicks.com" name := projectName ThisBuild / organization := org -ThisBuild / version := "1.3.0-SNAPSHOT" +ThisBuild / version := "2.0.0-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := allScalaVersions ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation") @@ -82,12 +82,12 @@ val h2Version: String = "2.3.232" val postgresqlVersion: String = "42.7.3" -val rapidVersion: String = "0.4.0-SNAPSHOT" +val rapidVersion: String = "0.4.0" val scalaTestVersion: String = "3.2.19" lazy val root = project.in(file(".")) - .aggregate(core.jvm, sql, sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, redis, async, all) + .aggregate(core.jvm, sql, sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, redis, all) .settings( name := projectName, publish := {}, @@ -105,7 +105,7 @@ lazy val core = crossProject(JVMPlatform) "com.outr" %% "scribe-slf4j" % scribeVersion, "org.locationtech.spatial4j" % "spatial4j" % spatial4JVersion, "org.locationtech.jts" % "jts-core" % jtsVersion, - "com.outr" %% "rapid-core" % rapidVersion, + "com.outr" %%% "rapid-core" % rapidVersion, "org.scalatest" %%% "scalatest" % scalaTestVersion % Test, "com.outr" %%% "rapid-test" % rapidVersion % Test ), @@ -246,18 +246,8 @@ lazy val redis = project.in(file("redis")) fork := true ) -lazy val async = project.in(file("async")) - .dependsOn(core.jvm) - .settings( - name := s"$projectName-async", - fork := true, - libraryDependencies ++= Seq( - "com.outr" %% "rapid-core" % rapidVersion - ) - ) - lazy val all = project.in(file("all")) - .dependsOn(core.jvm, core.jvm % "test->test", sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, redis, async) + .dependsOn(core.jvm, core.jvm % "test->test", sqlite, postgresql, duckdb, h2, lucene, halodb, rocksdb, mapdb, redis) .settings( name := s"$projectName-all", fork := true, diff --git a/core/src/main/scala/lightdb/LightDB.scala b/core/src/main/scala/lightdb/LightDB.scala index 7bcf68c7..c68a5651 100644 --- a/core/src/main/scala/lightdb/LightDB.scala +++ b/core/src/main/scala/lightdb/LightDB.scala @@ -7,7 +7,7 @@ import lightdb.feature.{DBFeatureKey, FeatureSupport} import lightdb.store.{Store, StoreManager, StoreMode} import lightdb.upgrade.DatabaseUpgrade import lightdb.util.{Disposable, Initializable} -import rapid.Task +import rapid._ import scribe.{rapid => logger} import java.nio.file.Path diff --git a/core/src/main/scala/lightdb/backup/DatabaseBackup.scala b/core/src/main/scala/lightdb/backup/DatabaseBackup.scala index db23555a..5f866da5 100644 --- a/core/src/main/scala/lightdb/backup/DatabaseBackup.scala +++ b/core/src/main/scala/lightdb/backup/DatabaseBackup.scala @@ -4,7 +4,7 @@ import fabric.Json import fabric.io.JsonFormatter import lightdb.collection.Collection import lightdb.{KeyValue, LightDB} -import rapid.Task +import rapid._ import java.io.{File, FileOutputStream, PrintWriter} import java.util.zip.{ZipEntry, ZipOutputStream} diff --git a/core/src/main/scala/lightdb/backup/DatabaseRestore.scala b/core/src/main/scala/lightdb/backup/DatabaseRestore.scala index 66b65489..ea038f74 100644 --- a/core/src/main/scala/lightdb/backup/DatabaseRestore.scala +++ b/core/src/main/scala/lightdb/backup/DatabaseRestore.scala @@ -4,7 +4,7 @@ import fabric.io.JsonParser import lightdb.LightDB import lightdb.collection.Collection import lightdb.doc.{Document, DocumentModel} -import rapid.Task +import rapid._ import scribe.{rapid => logger} import java.io.File diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index be76d0e4..4c9e5187 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -12,7 +12,7 @@ import lightdb.util.Initializable import lightdb._ import lightdb.field.Field._ import lightdb.lock.LockManager -import rapid.{Forge, Task} +import rapid._ import scribe.{rapid => logger} import java.util.concurrent.ConcurrentHashMap diff --git a/core/src/main/scala/lightdb/doc/MaterializedBatchModel.scala b/core/src/main/scala/lightdb/doc/MaterializedBatchModel.scala index 7b38c000..b369a656 100644 --- a/core/src/main/scala/lightdb/doc/MaterializedBatchModel.scala +++ b/core/src/main/scala/lightdb/doc/MaterializedBatchModel.scala @@ -2,7 +2,7 @@ package lightdb.doc import lightdb.Id import lightdb.transaction.Transaction -import rapid.Task +import rapid._ import java.util.concurrent.ConcurrentHashMap import java.util.concurrent.atomic.AtomicInteger diff --git a/core/src/main/scala/lightdb/transaction/Transaction.scala b/core/src/main/scala/lightdb/transaction/Transaction.scala index 8e0f0b18..8ab63eb6 100644 --- a/core/src/main/scala/lightdb/transaction/Transaction.scala +++ b/core/src/main/scala/lightdb/transaction/Transaction.scala @@ -2,7 +2,7 @@ package lightdb.transaction import lightdb.doc.Document import lightdb.feature.FeatureSupport -import rapid.Task +import rapid._ final class Transaction[Doc <: Document[Doc]] extends FeatureSupport[TransactionKey] { transaction => def commit(): Task[Unit] = features.map { diff --git a/core/src/main/scala/lightdb/trigger/CollectionTriggers.scala b/core/src/main/scala/lightdb/trigger/CollectionTriggers.scala index 149c8ab5..ec9f9064 100644 --- a/core/src/main/scala/lightdb/trigger/CollectionTriggers.scala +++ b/core/src/main/scala/lightdb/trigger/CollectionTriggers.scala @@ -3,7 +3,7 @@ package lightdb.trigger import lightdb.field.Field.UniqueIndex import lightdb.doc.Document import lightdb.transaction.Transaction -import rapid.Task +import rapid._ class CollectionTriggers[Doc <: Document[Doc]] extends CollectionTrigger[Doc] { private var list = List.empty[CollectionTrigger[Doc]] diff --git a/lucene/src/main/scala/lightdb/lucene/LucenePaginatedIterator.scala b/lucene/src/main/scala/lightdb/lucene/LucenePaginatedIterator.scala index 9360ddd2..35f8f2ce 100644 --- a/lucene/src/main/scala/lightdb/lucene/LucenePaginatedIterator.scala +++ b/lucene/src/main/scala/lightdb/lucene/LucenePaginatedIterator.scala @@ -33,7 +33,7 @@ case class LucenePaginatedIterator(searcher: IndexSearcher, val threshold = if (totalHits == -1) Int.MaxValue else 0 val after = currentDocs.lastOption.map(_.asInstanceOf[FieldDoc]).orNull - val collectorManager = new TopFieldCollectorManager(sort, pageSize, after, threshold, false) + val collectorManager = new TopFieldCollectorManager(sort, pageSize, after, threshold) val topDocs = searcher.search(query, collectorManager) if (currentPageIndex == 0) totalHits = topDocs.totalHits.value.toInt diff --git a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala index b8650b57..cd2f0fd0 100644 --- a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala +++ b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala @@ -9,6 +9,7 @@ import lightdb.materialized.MaterializedAggregate import lightdb.store.{Conversion, Store, StoreManager, StoreMode} import lightdb.transaction.Transaction import org.mapdb.{DB, DBMaker, HTreeMap, Serializer} +import rapid.Task import java.nio.file.{Files, Path} import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -28,16 +29,21 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String map.verify() - override def prepareTransaction(transaction: Transaction[Doc]): Unit = () + override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task.unit - override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = upsert(doc) + override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = upsert(doc) - override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = map.put(doc._id.value, toString(doc)) + override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task { + map.put(doc._id.value, toString(doc)) + doc + } - override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = map.containsKey(id.value) + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = Task { + map.containsKey(id.value) + } override def get[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Option[Doc] = { + (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = Task { if (field == idField) { Option(map.get(value.asInstanceOf[Id[Doc]].value)).map(fromString) } else { @@ -46,34 +52,35 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String } override def delete[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Boolean = - map.remove(value.asInstanceOf[Id[Doc]].value) != null + (implicit transaction: Transaction[Doc]): Task[Boolean] = + Task(map.remove(value.asInstanceOf[Id[Doc]].value) != null) - override def count(implicit transaction: Transaction[Doc]): Int = map.size() + override def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(map.size()) - override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = map.values() - .iterator() - .asScala - .map(fromString) + override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = rapid.Stream.fromIterator(Task { + map.values() + .iterator() + .asScala + .map(fromString) + }) override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = + (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("MapDBStore does not support searching") override def aggregate(query: AggregateQuery[Doc, Model]) - (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = + (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = throw new UnsupportedOperationException("MapDBStore does not support aggregation") - override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int = + override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int] = throw new UnsupportedOperationException("MapDBStore does not support aggregation") - override def truncate()(implicit transaction: Transaction[Doc]): Int = { - val size = count + override def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = count.map { size => map.clear() size } - override def dispose(): Unit = { + override def dispose(): Task[Unit] = Task { db.commit() db.close() } diff --git a/redis/src/main/scala/lightdb/redis/RedisStore.scala b/redis/src/main/scala/lightdb/redis/RedisStore.scala index 325ce7ad..ed701fe6 100644 --- a/redis/src/main/scala/lightdb/redis/RedisStore.scala +++ b/redis/src/main/scala/lightdb/redis/RedisStore.scala @@ -9,6 +9,7 @@ import lightdb.materialized.MaterializedAggregate import lightdb.store.{Conversion, Store, StoreMode} import lightdb.transaction.{Transaction, TransactionKey} import _root_.redis.clients.jedis.{Jedis, JedisPool, JedisPoolConfig} +import rapid.Task import scala.jdk.CollectionConverters.IteratorHasAsScala @@ -27,23 +28,25 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String private def getInstance(implicit transaction: Transaction[Doc]): Jedis = transaction.getOrCreate(InstanceKey, pool.getResource) - override def prepareTransaction(transaction: Transaction[Doc]): Unit = () + override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task.unit - override def releaseTransaction(transaction: Transaction[Doc]): Unit = { + override def releaseTransaction(transaction: Transaction[Doc]): Task[Unit] = Task { super.releaseTransaction(transaction) transaction.get(InstanceKey).foreach(jedis => pool.returnResource(jedis)) } - override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = upsert(doc) + override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = upsert(doc) - override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = + override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task { getInstance.hset(name, doc._id.value, toString(doc)) + doc + } - override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = - getInstance.hexists(name, id.value) + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = + Task(getInstance.hexists(name, id.value)) override def get[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Option[Doc] = { + (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = Task { if (field == idField) { Option(getInstance.hget(name, value.asInstanceOf[Id[Doc]].value)).map(fromString) } else { @@ -52,30 +55,30 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String } override def delete[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Boolean = - getInstance.hdel(value.asInstanceOf[Id[Doc]].value) > 0L + (implicit transaction: Transaction[Doc]): Task[Boolean] = + Task(getInstance.hdel(value.asInstanceOf[Id[Doc]].value) > 0L) - override def count(implicit transaction: Transaction[Doc]): Int = getInstance.hlen(name).toInt + override def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(getInstance.hlen(name).toInt) - override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = getInstance.hgetAll(name) - .values().iterator().asScala.map(fromString) + override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = rapid.Stream.fromIterator(Task { + getInstance.hgetAll(name).values().iterator().asScala.map(fromString) + }) override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = + (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("Redis does not support searching") override def aggregate(query: AggregateQuery[Doc, Model]) - (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = + (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = throw new UnsupportedOperationException("Redis does not support aggregation") - override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int = + override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int] = throw new UnsupportedOperationException("Redis does not support aggregation") - override def truncate()(implicit transaction: Transaction[Doc]): Int = { - val size = count + override def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = count.map { size => getInstance.del(name) size } - override def dispose(): Unit = pool.close() + override def dispose(): Task[Unit] = Task(pool.close()) } diff --git a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala index a6a0a57c..0af56c5a 100644 --- a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala +++ b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala @@ -11,6 +11,7 @@ import lightdb.materialized.MaterializedAggregate import lightdb.store.{Conversion, Store, StoreManager, StoreMode} import lightdb.transaction.Transaction import org.rocksdb.{FlushOptions, Options, RocksDB, RocksIterator} +import rapid.Task import java.nio.file.{Files, Path} @@ -26,19 +27,20 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Stri RocksDB.open(options, directory.toAbsolutePath.toString) } - override def prepareTransaction(transaction: Transaction[Doc]): Unit = () + override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task.unit - override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = upsert(doc) + override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = upsert(doc) - override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { + override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task { val json = doc.json(model.rw) db.put(doc._id.bytes, JsonFormatter.Compact(json).getBytes("UTF-8")) + doc } - override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = db.keyExists(id.bytes) + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = Task(db.keyExists(id.bytes)) override def get[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Option[Doc] = { + (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = Task { if (field == idField) { Option(db.get(value.asInstanceOf[Id[Doc]].bytes)).map(bytes2Doc) } else { @@ -53,33 +55,36 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Stri } override def delete[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Boolean = { + (implicit transaction: Transaction[Doc]): Task[Boolean] = Task { db.delete(value.asInstanceOf[Id[Doc]].bytes) true } - override def count(implicit transaction: Transaction[Doc]): Int = iterator(db.newIterator()).size + override def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(iterator(db.newIterator()).size) - override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = iterator(db.newIterator()) + override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = rapid.Stream + .fromIterator(Task(iterator(db.newIterator()))) .map(bytes2Doc) override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = + (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("RocksDBStore does not support searching") override def aggregate(query: AggregateQuery[Doc, Model]) - (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = + (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = throw new UnsupportedOperationException("RocksDBStore does not support aggregation") - override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int = + override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int] = throw new UnsupportedOperationException("RocksDBStore does not support aggregation") - override def truncate()(implicit transaction: Transaction[Doc]): Int = iterator(db.newIterator(), value = false) - .map(db.delete) - .size + override def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = Task { + iterator(db.newIterator(), value = false) + .map(db.delete) + .size + } - override def dispose(): Unit = { + override def dispose(): Task[Unit] = Task { db.flush(new FlushOptions) db.close() } diff --git a/sql/src/main/scala/lightdb/sql/SQLState.scala b/sql/src/main/scala/lightdb/sql/SQLState.scala index 4a8379dd..96b8f610 100644 --- a/sql/src/main/scala/lightdb/sql/SQLState.scala +++ b/sql/src/main/scala/lightdb/sql/SQLState.scala @@ -3,6 +3,7 @@ package lightdb.sql import lightdb.doc.Document import lightdb.sql.connect.ConnectionManager import lightdb.transaction.{Transaction, TransactionFeature, TransactionKey} +import rapid.Task import java.sql.{Connection, PreparedStatement, ResultSet, Statement} import java.util.concurrent.{ConcurrentHashMap, ConcurrentLinkedQueue} @@ -77,28 +78,32 @@ case class SQLState[Doc <: Document[Doc]](connectionManager: ConnectionManager, resultSets = rs :: resultSets } - override def commit(): Unit = if (dirty) { - // TODO: SingleConnection shares - if (batchInsert.get() > 0) { - psInsert.executeBatch() - } - if (batchUpsert.get() > 0) { - psUpsert.executeBatch() - } - dirty = false - Try(connectionManager.getConnection(transaction).commit()).failed.foreach { t => - scribe.warn(s"Commit failed: ${t.getMessage}") + override def commit(): Task[Unit] = Task { + if (dirty) { + // TODO: SingleConnection shares + if (batchInsert.get() > 0) { + psInsert.executeBatch() + } + if (batchUpsert.get() > 0) { + psUpsert.executeBatch() + } + dirty = false + Try(connectionManager.getConnection(transaction).commit()).failed.foreach { t => + scribe.warn(s"Commit failed: ${t.getMessage}") + } } } - override def rollback(): Unit = if (dirty) { - dirty = false - Try(connectionManager.getConnection(transaction).rollback()).failed.foreach { t => - scribe.warn(s"Rollback failed: ${t.getMessage}") + override def rollback(): Task[Unit] = Task { + if (dirty) { + dirty = false + Try(connectionManager.getConnection(transaction).rollback()).failed.foreach { t => + scribe.warn(s"Rollback failed: ${t.getMessage}") + } } } - override def close(): Unit = { + override def close(): Task[Unit] = Task { super.close() if (batchInsert.get() > 0) { psInsert.executeBatch() diff --git a/sql/src/main/scala/lightdb/sql/SQLStore.scala b/sql/src/main/scala/lightdb/sql/SQLStore.scala index 58deab88..ae3259c2 100644 --- a/sql/src/main/scala/lightdb/sql/SQLStore.scala +++ b/sql/src/main/scala/lightdb/sql/SQLStore.scala @@ -18,6 +18,7 @@ import lightdb.util.ActionIterator import lightdb._ import lightdb.field.{Field, IndexingState} import lightdb.field.Field._ +import rapid.Task import java.sql.{Connection, PreparedStatement, ResultSet} import scala.language.implicitConversions @@ -28,7 +29,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: transaction { implicit transaction => initTransaction() - } + }.sync() protected def createTable()(implicit transaction: Transaction[Doc]): Unit = { val entries = fields.collect { @@ -58,7 +59,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: executeUpdate(s"ALTER TABLE $name ADD COLUMN ${field.name} ${def2Type(field.name, field.rw.definition)}") } - protected def initTransaction()(implicit transaction: Transaction[Doc]): Unit = { + protected def initTransaction()(implicit transaction: Transaction[Doc]): Task[Unit] = Task { val connection = connectionManager.getConnection val existingTables = tables(connection) if (!existingTables.contains(name.toLowerCase)) { @@ -109,10 +110,12 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } } - override def prepareTransaction(transaction: Transaction[Doc]): Unit = transaction.put( - key = StateKey[Doc], - value = SQLState(connectionManager, transaction, this, Collection.CacheQueries) - ) + override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task { + transaction.put( + key = StateKey[Doc], + value = SQLState(connectionManager, transaction, this, Collection.CacheQueries) + ) + } protected def field2Value(field: Field[Doc, _]): String = "?" @@ -133,7 +136,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: private[sql] lazy val insertSQL: String = createInsertSQL() private[sql] lazy val upsertSQL: String = createUpsertSQL() - override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { + override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task { val state = getState val indexingState = new IndexingState state.withInsertPreparedStatement { ps => @@ -147,9 +150,10 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: state.batchInsert.set(0) } } + doc } - override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = { + override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task { val state = getState val indexingState = new IndexingState state.withUpsertPreparedStatement { ps => @@ -163,12 +167,13 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: state.batchUpsert.set(0) } } + doc } - override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty + override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = get(idField, id).map(_.nonEmpty) override def get[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Option[Doc] = { + (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = Task { val state = getState val b = new SQLQueryBuilder[Doc]( store = this, @@ -196,7 +201,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } override def delete[V](field: UniqueIndex[Doc, V], value: V) - (implicit transaction: Transaction[Doc]): Boolean = { + (implicit transaction: Transaction[Doc]): Task[Boolean] = Task { val connection = connectionManager.getConnection val ps = connection.prepareStatement(s"DELETE FROM $name WHERE ${field.name} = ?") try { @@ -207,7 +212,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } } - override def count(implicit transaction: Transaction[Doc]): Int = { + override def count(implicit transaction: Transaction[Doc]): Task[Int] = Task { val rs = executeQuery(s"SELECT COUNT(*) FROM $name") try { rs.next() @@ -217,7 +222,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } } - override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] = { + override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = rapid.Stream.fromIterator(Task { val state = getState val connection = connectionManager.getConnection val s = connection.createStatement() @@ -225,7 +230,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: val rs = s.executeQuery(s"SELECT * FROM $name") state.register(rs) rs2Iterator(rs, Conversion.Doc()) - } + }) private def getColumnNames(rs: ResultSet): List[String] = { val meta = rs.getMetaData @@ -238,7 +243,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: storeMode match { case StoreMode.Indexes(storage) => val id = Id[Doc](rs.getString("_id")) - storage(id) + storage(id).sync() case _ => throw new UnsupportedOperationException("This should not be possible") } case c: SQLConversion[Doc] => c.convertFromSQL(rs) @@ -338,7 +343,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: protected def fieldPart[V](field: Field[Doc, V]): SQLPart = SQLPart(field.name) override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = { + (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = Task { var extraFields = List.empty[SQLPart] val fields = conversion match { case Conversion.Value(field) => List(field) @@ -379,15 +384,17 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } else { None } - val iterator = rs2Iterator(rs, conversion) - val ps = rs.getStatement.asInstanceOf[PreparedStatement] - val iteratorWithScore = ActionIterator(iterator.map(v => v -> 0.0), onClose = () => state.returnPreparedStatement(b.sql, ps)) + val stream = rapid.Stream.fromIterator[(V, Double)](Task { + val iterator = rs2Iterator(rs, conversion) + val ps = rs.getStatement.asInstanceOf[PreparedStatement] + ActionIterator(iterator.map(v => v -> 0.0), onClose = () => state.returnPreparedStatement(b.sql, ps)) + }) SearchResults( model = model, offset = query.offset, limit = query.limit, total = total, - iteratorWithScore = iteratorWithScore, + streamWithScore = stream, facetResults = Map.empty, transaction = transaction ) @@ -451,13 +458,13 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } override def aggregate(query: AggregateQuery[Doc, Model]) - (implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] = { + (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = { val b = aggregate2SQLQuery(query) val results = b.execute() val rs = results.rs val state = getState state.register(rs) - def createStream[R](f: ResultSet => R): Iterator[R] = { + def createStream[R](f: ResultSet => R): rapid.Stream[R] = rapid.Stream.fromIterator(Task { val iterator = new Iterator[R] { private var checkedNext = false private var nextValue = false @@ -479,7 +486,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } } iterator - } + }) createStream[MaterializedAggregate[Doc, Model]] { rs => val json = obj(query.functions.map { f => val o = rs.getObject(f.name) @@ -497,7 +504,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } override def aggregateCount(query: AggregateQuery[Doc, Model]) - (implicit transaction: Transaction[Doc]): Int = { + (implicit transaction: Transaction[Doc]): Task[Int] = Task { val b = aggregate2SQLQuery(query) b.queryTotal() } @@ -618,7 +625,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: protected def concatPrefix: String = "GROUP_CONCAT" - override def truncate()(implicit transaction: Transaction[Doc]): Int = { + override def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = Task { val connection = connectionManager.getConnection val ps = connection.prepareStatement(s"DELETE FROM $name") try { @@ -628,5 +635,5 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: } } - override def dispose(): Unit = if (!connectionShared) connectionManager.dispose() + override def dispose(): Task[Unit] = connectionManager.dispose().when(!connectionShared) } \ No newline at end of file diff --git a/sql/src/main/scala/lightdb/sql/connect/ConnectionManager.scala b/sql/src/main/scala/lightdb/sql/connect/ConnectionManager.scala index c9c9ef4f..d4c3b64a 100644 --- a/sql/src/main/scala/lightdb/sql/connect/ConnectionManager.scala +++ b/sql/src/main/scala/lightdb/sql/connect/ConnectionManager.scala @@ -3,6 +3,7 @@ package lightdb.sql.connect import lightdb.doc.Document import lightdb.transaction.Transaction import lightdb.util.Disposable +import rapid.Task import java.sql.Connection @@ -13,5 +14,5 @@ trait ConnectionManager extends Disposable { def releaseConnection[Doc <: Document[Doc]](implicit transaction: Transaction[Doc]): Unit - def dispose(): Unit + def dispose(): Task[Unit] } \ No newline at end of file diff --git a/sql/src/main/scala/lightdb/sql/connect/DBCPConnectionManager.scala b/sql/src/main/scala/lightdb/sql/connect/DBCPConnectionManager.scala index de1c01be..a6f3af91 100644 --- a/sql/src/main/scala/lightdb/sql/connect/DBCPConnectionManager.scala +++ b/sql/src/main/scala/lightdb/sql/connect/DBCPConnectionManager.scala @@ -1,6 +1,7 @@ package lightdb.sql.connect import org.apache.commons.dbcp2.BasicDataSource +import rapid.Task case class DBCPConnectionManager(config: SQLConfig) extends DataSourceConnectionManager { protected lazy val dataSource: BasicDataSource = { @@ -17,6 +18,6 @@ case class DBCPConnectionManager(config: SQLConfig) extends DataSourceConnection ds } - override def dispose(): Unit = dataSource.close() + override def dispose(): Task[Unit] = Task(dataSource.close()) } diff --git a/sql/src/main/scala/lightdb/sql/connect/HikariConnectionManager.scala b/sql/src/main/scala/lightdb/sql/connect/HikariConnectionManager.scala index e69cc6fe..31c52286 100644 --- a/sql/src/main/scala/lightdb/sql/connect/HikariConnectionManager.scala +++ b/sql/src/main/scala/lightdb/sql/connect/HikariConnectionManager.scala @@ -1,6 +1,7 @@ package lightdb.sql.connect import com.zaxxer.hikari.{HikariConfig, HikariDataSource} +import rapid.Task case class HikariConnectionManager(config: SQLConfig) extends DataSourceConnectionManager { protected lazy val dataSource: HikariDataSource = { @@ -17,5 +18,5 @@ case class HikariConnectionManager(config: SQLConfig) extends DataSourceConnecti new HikariDataSource(hc) } - override def dispose(): Unit = dataSource.close() + override def dispose(): Task[Unit] = Task(dataSource.close()) } diff --git a/sql/src/main/scala/lightdb/sql/connect/SingleConnectionManager.scala b/sql/src/main/scala/lightdb/sql/connect/SingleConnectionManager.scala index 12b811e3..d6db434a 100644 --- a/sql/src/main/scala/lightdb/sql/connect/SingleConnectionManager.scala +++ b/sql/src/main/scala/lightdb/sql/connect/SingleConnectionManager.scala @@ -2,6 +2,7 @@ package lightdb.sql.connect import lightdb.doc.Document import lightdb.transaction.Transaction +import rapid.Task import java.sql.{Connection, DriverManager} @@ -12,10 +13,10 @@ case class SingleConnectionManager(connection: java.sql.Connection) extends Conn override def releaseConnection[Doc <: Document[Doc]](implicit transaction: Transaction[Doc]): Unit = {} - override def dispose(): Unit = if (!connection.isClosed) { + override def dispose(): Task[Unit] = Task { if (!connection.getAutoCommit) connection.commit() connection.close() - } + }.when(!connection.isClosed) } object SingleConnectionManager { diff --git a/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala b/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala index 0075a398..812ddfc5 100644 --- a/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala +++ b/sqlite/src/main/scala/lightdb/sql/SQLiteStore.scala @@ -21,15 +21,16 @@ import java.io.File import java.nio.file.{Files, Path, StandardCopyOption} import java.sql.Connection import java.util.regex.Pattern +import rapid._ class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, model: Model, val connectionManager: ConnectionManager, val connectionShared: Boolean, val storeMode: StoreMode[Doc, Model]) extends SQLStore[Doc, Model](name, model) { - override protected def initTransaction()(implicit transaction: Transaction[Doc]): Unit = { + override protected def initTransaction()(implicit transaction: Transaction[Doc]): Task[Unit] = Task { val c = connectionManager.getConnection - if (hasSpatial) { + if (hasSpatial.sync()) { scribe.info(s"$name has spatial features. Enabling...") org.sqlite.Function.create(c, "DISTANCE", new org.sqlite.Function() { override def xFunc(): Unit = { @@ -84,7 +85,7 @@ class SQLiteStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin } }) super.initTransaction() - } + }.flatten override protected def tables(connection: Connection): Set[String] = SQLiteStore.tables(connection)