From 19e7b37c0ab048a41fd52eba6d5289bd7fa23069 Mon Sep 17 00:00:00 2001 From: Matt Hicks Date: Wed, 1 Jan 2025 08:17:11 -0600 Subject: [PATCH] Major change and simplification of query conversion support --- .../lightdb/async/AsyncAggregateQuery.scala | 55 ---- .../scala/lightdb/async/AsyncCollection.scala | 102 ------- .../lightdb/async/AsyncDatabaseUpgrade.scala | 12 - .../scala/lightdb/async/AsyncLightDB.scala | 120 -------- .../main/scala/lightdb/async/AsyncQuery.scala | 288 ------------------ .../lightdb/async/AsyncSearchResults.scala | 28 -- .../lightdb/async/AsyncStoredValue.scala | 11 - .../async/AsyncTransactionConvenience.scala | 69 ----- core/src/main/scala/lightdb/Query.scala | 283 ++++++----------- .../lightdb/aggregate/AggregateQuery.scala | 2 +- .../scala/lightdb/collection/Collection.scala | 4 +- .../scala/lightdb/doc/DocumentModel.scala | 2 + .../error/NonIndexedFieldException.scala | 2 +- .../scala/lightdb/store/InMemoryIndexes.scala | 4 +- .../main/scala/lightdb/store/MapStore.scala | 3 +- core/src/main/scala/lightdb/store/Store.scala | 2 +- .../lightdb/store/split/SplitStore.scala | 4 +- .../trigger/BasicCollectionTrigger.scala | 2 +- .../main/scala/lightdb/util/Aggregator.scala | 2 +- .../test/scala/spec/AbstractBasicSpec.scala | 26 +- .../test/scala/spec/AbstractFacetSpec.scala | 17 +- .../test/scala/spec/AbstractSpatialSpec.scala | 4 +- .../scala/spec/AbstractSpecialCasesSpec.scala | 2 +- .../scala/lightdb/halodb/HaloDBStore.scala | 2 +- .../scala/lightdb/lucene/LuceneStore.scala | 10 +- .../main/scala/lightdb/mapdb/MapDBStore.scala | 2 +- .../main/scala/lightdb/redis/RedisStore.scala | 2 +- .../scala/lightdb/rocksdb/RocksDBStore.scala | 3 +- sql/src/main/scala/lightdb/sql/SQLStore.scala | 6 +- 29 files changed, 132 insertions(+), 937 deletions(-) delete mode 100644 async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncCollection.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncLightDB.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncQuery.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncSearchResults.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncStoredValue.scala delete mode 100644 async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala diff --git a/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala b/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala deleted file mode 100644 index 94617fb5..00000000 --- a/async/src/main/scala/lightdb/async/AsyncAggregateQuery.scala +++ /dev/null @@ -1,55 +0,0 @@ -package lightdb.async - -import lightdb.aggregate.{AggregateFilter, AggregateFunction, AggregateQuery} -import lightdb.doc.{Document, DocumentModel} -import lightdb.materialized.MaterializedAggregate -import lightdb.transaction.Transaction -import lightdb.{Query, SortDirection} -import rapid.Task - -case class AsyncAggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: Query[Doc, Model], - functions: List[AggregateFunction[_, _, Doc]], - filter: Option[AggregateFilter[Doc]] = None, - sort: List[(AggregateFunction[_, _, Doc], SortDirection)] = Nil) { - def filter(f: Model => AggregateFilter[Doc], and: Boolean = false): AsyncAggregateQuery[Doc, Model] = { - val filter = f(query.model) - if (and && this.filter.nonEmpty) { - copy(filter = Some(this.filter.get && filter)) - } else { - copy(filter = Some(filter)) - } - } - - def filters(f: Model => List[AggregateFilter[Doc]]): AsyncAggregateQuery[Doc, Model] = { - val filters = f(query.model) - if (filters.nonEmpty) { - var filter = filters.head - filters.tail.foreach { f => - filter = filter && f - } - this.filter(_ => filter) - } else { - this - } - } - - def sort(f: Model => AggregateFunction[_, _, Doc], - direction: SortDirection = SortDirection.Ascending): AsyncAggregateQuery[Doc, Model] = copy( - sort = sort ::: List((f(query.model), direction)) - ) - - private lazy val aggregateQuery = AggregateQuery( - query = query, - functions = functions, - filter = filter, - sort = sort - ) - - def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(query.store.aggregateCount(aggregateQuery)) - - def stream(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = { - rapid.Stream.fromIterator(Task(query.store.aggregate(aggregateQuery))) - } - - def toList(implicit transaction: Transaction[Doc]): Task[List[MaterializedAggregate[Doc, Model]]] = stream.toList -} diff --git a/async/src/main/scala/lightdb/async/AsyncCollection.scala b/async/src/main/scala/lightdb/async/AsyncCollection.scala deleted file mode 100644 index 66b18901..00000000 --- a/async/src/main/scala/lightdb/async/AsyncCollection.scala +++ /dev/null @@ -1,102 +0,0 @@ -package lightdb.async - -import lightdb._ -import lightdb.field.Field._ -import lightdb.collection.Collection -import lightdb.doc.{Document, DocumentModel} -import lightdb.transaction.Transaction -import rapid.Task - -import scala.util.{Failure, Success} - -case class AsyncCollection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](underlying: Collection[Doc, Model]) extends AnyVal { - def transaction[Return](f: Transaction[Doc] => Task[Return]): Task[Return] = { - val transaction = underlying.transaction.create() - f(transaction).guarantee(Task { - underlying.transaction.release(transaction) - }) - } - - /** - * Convenience feature for simple one-off operations removing the need to manually create a transaction around it. - */ - def t: AsyncTransactionConvenience[Doc, Model] = AsyncTransactionConvenience(this) - - def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task(underlying.insert(doc)) - - def insert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Task[Seq[Doc]] = Task(underlying.insert(docs)) - - def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] = Task(underlying.upsert(doc)) - - def upsert(docs: Seq[Doc])(implicit transaction: Transaction[Doc]): Task[Seq[Doc]] = Task(underlying.upsert(docs)) - - def get[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = - Task(underlying.get(f)) - - def apply[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Doc] = - Task(underlying(f)) - - def get(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = - Task(underlying.get(id)) - - def getAll(ids: Seq[Id[Doc]])(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = - rapid.Stream.fromIterator(Task(underlying.getAll(ids))) - - def apply(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Doc] = - Task(underlying(id)) - - def withLock(id: Id[Doc], doc: Task[Option[Doc]], establishLock: Boolean = true) - (f: Option[Doc] => Task[Option[Doc]]): Task[Option[Doc]] = if (establishLock) { - doc.map(d => if (establishLock) underlying.lock.acquire(id, d) else d).flatMap { existing => - f(existing) - .attempt - .flatMap { - case Success(modified) => - if (establishLock) underlying.lock.release(id, modified) - Task.pure(modified) - case Failure(err) => - if (establishLock) underlying.lock.release(id, existing) - Task.error(err) - } - } - } else { - doc.flatMap(f) - } - - def modify(id: Id[Doc], establishLock: Boolean = true, deleteOnNone: Boolean = false) - (f: Option[Doc] => Task[Option[Doc]]) - (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = withLock(id, get(id), establishLock) { existing => - f(existing).flatMap { - case Some(doc) => upsert(doc).map(doc => Some(doc)) - case None if deleteOnNone => delete(id).map(_ => None) - case None => Task.pure(None) - } - } - - def getOrCreate(id: Id[Doc], create: => Task[Doc], lock: Boolean = true) - (implicit transaction: Transaction[Doc]): Task[Doc] = modify(id, establishLock = lock) { - case Some(doc) => Task.pure(Some(doc)) - case None => create.map(Some.apply) - }.map(_.get) - - def delete[V](f: Model => (UniqueIndex[Doc, V], V))(implicit transaction: Transaction[Doc]): Task[Boolean] = - Task(underlying.delete(f)) - - def delete(id: Id[Doc])(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Task[Boolean] = - Task(underlying.delete(id)) - - def count(implicit transaction: Transaction[Doc]): Task[Int] = Task(underlying.count) - - def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = - rapid.Stream.fromIterator(Task(underlying.iterator)) - - def list(implicit transaction: Transaction[Doc]): Task[List[Doc]] = stream.toList - - def query: AsyncQuery[Doc, Model] = AsyncQuery(this) - - def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = Task(underlying.truncate()) - - def reIndex(): Task[Boolean] = Task(underlying.reIndex()) - - def dispose(): Task[Unit] = Task(underlying.dispose()) -} \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala b/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala deleted file mode 100644 index 42a7ba69..00000000 --- a/async/src/main/scala/lightdb/async/AsyncDatabaseUpgrade.scala +++ /dev/null @@ -1,12 +0,0 @@ -package lightdb.async - -import rapid.Task - -trait AsyncDatabaseUpgrade { - def label: String = getClass.getSimpleName.replace("$", "") - def applyToNew: Boolean - def blockStartup: Boolean - def alwaysRun: Boolean - - def upgrade(db: AsyncLightDB): Task[Unit] -} \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncLightDB.scala b/async/src/main/scala/lightdb/async/AsyncLightDB.scala deleted file mode 100644 index ac97e4ce..00000000 --- a/async/src/main/scala/lightdb/async/AsyncLightDB.scala +++ /dev/null @@ -1,120 +0,0 @@ -package lightdb.async - -import fabric.rw.RW -import lightdb.collection.Collection -import lightdb.doc.{Document, DocumentModel} -import lightdb.feature.{DBFeatureKey, FeatureSupport} -import lightdb.{KeyValue, LightDB, Persistence, StoredValue} -import lightdb.store.{Store, StoreManager} -import lightdb.upgrade.DatabaseUpgrade -import rapid.Task - -import java.nio.file.Path - -trait AsyncLightDB extends FeatureSupport[DBFeatureKey] { db => - object underlying extends LightDB { - override def name: String = db.name - - override def directory: Option[Path] = db.directory - - override def storeManager: StoreManager = db.storeManager - - override protected def truncateOnInit: Boolean = db.truncateOnInit - - override lazy val upgrades: List[DatabaseUpgrade] = db.upgrades.map { u => - new DatabaseUpgrade { - override def label: String = u.label - - override def applyToNew: Boolean = u.applyToNew - - override def blockStartup: Boolean = u.blockStartup - - override def alwaysRun: Boolean = u.alwaysRun - - override def upgrade(ldb: LightDB): Unit = u.upgrade(db).sync() - } - } - } - - override def put[T](key: DBFeatureKey[T], value: T): Unit = underlying.put(key, value) - - def backingStore: AsyncCollection[KeyValue, KeyValue.type] = AsyncCollection(underlying.backingStore) - - /** - * Identifiable name for this database. Defaults to using the class name. - */ - def name: String = getClass.getSimpleName.replace("$", "") - - /** - * The base directory for this database. If None, the database is expected to operate entirely in memory. - */ - def directory: Option[Path] - - /** - * Default StoreManager to use for collections that do not specify a Store. - */ - def storeManager: StoreManager - - /** - * List of upgrades that should be applied at the start of this database. - */ - def upgrades: List[AsyncDatabaseUpgrade] - - /** - * Automatically truncates all collections in the database during initialization if this is set to true. - * Defaults to false. - */ - protected def truncateOnInit: Boolean = false - - def collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model, - name: Option[String] = None, - storeManager: Option[StoreManager] = None): AsyncCollection[Doc, Model] = - AsyncCollection(underlying.collection[Doc, Model](model, name, storeManager)) - - def reIndex(collections: List[Collection[_, _]] = underlying.collections): Task[Int] = rapid.Stream.emits(collections) - .par(maxThreads = 32) { collection => - AsyncCollection[KeyValue, KeyValue.type](collection.asInstanceOf[Collection[KeyValue, KeyValue.type]]).reIndex() - } - .count - - object stored { - def apply[T](key: String, - default: => T, - persistence: Persistence = Persistence.Stored, - collection: AsyncCollection[KeyValue, KeyValue.type] = backingStore) - (implicit rw: RW[T]): AsyncStoredValue[T] = AsyncStoredValue(underlying.stored[T]( - key = key, - default = default, - persistence = persistence, - collection = collection.underlying - )) - - def opt[T](key: String, - persistence: Persistence = Persistence.Stored, - collection: AsyncCollection[KeyValue, KeyValue.type] = backingStore) - (implicit rw: RW[T]): AsyncStoredValue[Option[T]] = AsyncStoredValue(underlying.stored.opt[T]( - key = key, - persistence = persistence, - collection = collection.underlying - )) - } - - final def init(): Task[Boolean] = Task(underlying.init()).flatMap { - case true => initialize().map(_ => true) - case false => Task.pure(false) - } - - protected def initialize(): Task[Unit] = Task.unit - - def collectionsByNames(collectionNames: String*): Task[List[Collection[_, _]]] = - Task(underlying.collectionsByNames(collectionNames: _*)) - - def dispose(): Task[Boolean] = Task { - if (underlying.disposed) { - false - } else { - underlying.dispose() - true - } - } -} diff --git a/async/src/main/scala/lightdb/async/AsyncQuery.scala b/async/src/main/scala/lightdb/async/AsyncQuery.scala deleted file mode 100644 index 1dde984c..00000000 --- a/async/src/main/scala/lightdb/async/AsyncQuery.scala +++ /dev/null @@ -1,288 +0,0 @@ -package lightdb.async - -import fabric.Json -import lightdb.aggregate.AggregateFunction -import lightdb._ -import lightdb.field.Field._ -import lightdb.collection.Collection -import lightdb.distance.Distance -import lightdb.doc.{Document, DocumentModel} -import lightdb.facet.FacetQuery -import lightdb.field.{Field, IndexingState} -import lightdb.filter._ -import lightdb.materialized.{MaterializedAndDoc, MaterializedIndex} -import lightdb.spatial.{DistanceAndDoc, Geo} -import lightdb.store.Conversion -import lightdb.transaction.Transaction -import lightdb.util.GroupedIterator -import rapid.Task - -case class AsyncQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](asyncCollection: AsyncCollection[Doc, Model], - filter: Option[Filter[Doc]] = None, - sort: List[Sort] = Nil, - offset: Int = 0, - limit: Option[Int] = None, - countTotal: Boolean = false, - scoreDocs: Boolean = false, - minDocScore: Option[Double] = None, - facets: List[FacetQuery[Doc]] = Nil) { query => - protected def collection: Collection[Doc, Model] = asyncCollection.underlying - - def toQuery: Query[Doc, Model] = Query[Doc, Model](asyncCollection.underlying.model, asyncCollection.underlying.store, filter, sort, offset, limit, countTotal, scoreDocs, minDocScore, facets) - - def scored: AsyncQuery[Doc, Model] = copy(scoreDocs = true) - - def minDocScore(min: Double): AsyncQuery[Doc, Model] = copy( - scoreDocs = true, - minDocScore = Some(min) - ) - - def clearFilters: AsyncQuery[Doc, Model] = copy(filter = None) - - def filter(f: Model => Filter[Doc]): AsyncQuery[Doc, Model] = { - val filter = f(collection.model) - val combined = this.filter match { - case Some(current) => current && filter - case None => filter - } - copy(filter = Some(combined)) - } - - def facet(f: Model => FacetField[Doc], - path: List[String] = Nil, - childrenLimit: Option[Int] = Some(10), - dimsLimit: Option[Int] = Some(10)): AsyncQuery[Doc, Model] = { - val facetField = f(collection.model) - val facetQuery = FacetQuery(facetField, path, childrenLimit, dimsLimit) - copy(facets = facetQuery :: facets) - } - - def facets(f: Model => List[FacetField[Doc]], - childrenLimit: Option[Int] = Some(10), - dimsLimit: Option[Int] = Some(10)): AsyncQuery[Doc, Model] = { - val facetFields = f(collection.model) - val facetQueries = facetFields.map(ff => FacetQuery(ff, Nil, childrenLimit, dimsLimit)) - copy(facets = facets ::: facetQueries) - } - - def clearSort: AsyncQuery[Doc, Model] = copy(sort = Nil) - - def sort(sort: Sort*): AsyncQuery[Doc, Model] = copy(sort = this.sort ::: sort.toList) - - def offset(offset: Int): AsyncQuery[Doc, Model] = copy(offset = offset) - - def limit(limit: Int): AsyncQuery[Doc, Model] = copy(limit = Some(limit)) - - def clearLimit: AsyncQuery[Doc, Model] = copy(limit = None) - - def countTotal(b: Boolean): AsyncQuery[Doc, Model] = copy(countTotal = b) - - object stream { - object scored { - def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): rapid.Stream[(V, Double)] = { - val io = search(conversion) - .map(_.scoredStream) - rapid.Stream.force(io) - } - - def docs(implicit transaction: Transaction[Doc]): rapid.Stream[(Doc, Double)] = apply(Conversion.Doc()) - - def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): rapid.Stream[(F, Double)] = - apply(Conversion.Value(f(collection.model))) - - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): rapid.Stream[(Id[Doc], Double)] = - value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): rapid.Stream[(Json, Double)] = - apply(Conversion.Json(f(collection.model))) - - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): rapid.Stream[(T, Double)] = - apply(Conversion.Converted(f)) - - def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedIndex[Doc, Model], Double)] = - apply(Conversion.Materialized[Doc, Model](f(collection.model))) - - def indexes()(implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedIndex[Doc, Model], Double)] = { - val fields = collection.model.fields.filter(_.indexed) - apply(Conversion.Materialized[Doc, Model](fields)) - } - - def docAndIndexes()(implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedAndDoc[Doc, Model], Double)] = { - apply(Conversion.DocAndIndexes[Doc, Model]()) - } - - def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): rapid.Stream[(DistanceAndDoc[Doc], Double)] = - apply(Conversion.Distance(f(collection.model), from, sort, radius)) - } - - def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): rapid.Stream[V] = { - val task = search(conversion) - .map(_.stream) - rapid.Stream.force(task) - } - - def docs(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = apply(Conversion.Doc()) - - def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): rapid.Stream[F] = - apply(Conversion.Value(f(collection.model))) - - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): rapid.Stream[Id[Doc]] = - value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = - apply(Conversion.Json(f(collection.model))) - - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): rapid.Stream[T] = - apply(Conversion.Converted(f)) - - def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedIndex[Doc, Model]] = - apply(Conversion.Materialized[Doc, Model](f(collection.model))) - - def indexes()(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedIndex[Doc, Model]] = { - val fields = collection.model.fields.filter(_.indexed) - apply(Conversion.Materialized[Doc, Model](fields)) - } - - def docAndIndexes()(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAndDoc[Doc, Model]] = { - apply(Conversion.DocAndIndexes[Doc, Model]()) - } - - def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): rapid.Stream[DistanceAndDoc[Doc]] = - apply(Conversion.Distance(f(collection.model), from, sort, radius)) - } - - object search { - def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, V]] = - Task(collection.store.doSearch( - query = toQuery, - conversion = conversion - )).map { searchResults => - AsyncSearchResults( - model = collection.model, - offset = searchResults.offset, - limit = searchResults.limit, - total = searchResults.total, - scoredStream = rapid.Stream.fromIterator(Task(searchResults.iteratorWithScore)), - facetResults = searchResults.facetResults, - transaction = transaction - ) - } - - def docs(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, Doc]] = apply(Conversion.Doc()) - - def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, F]] = - apply(Conversion.Value(f(collection.model))) - - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): Task[AsyncSearchResults[Doc, Model, Id[Doc]]] = - value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, Json]] = - apply(Conversion.Json(f(collection.model))) - - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, T]] = - apply(Conversion.Converted(f)) - - def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = - apply(Conversion.Materialized(f(collection.model))) - - def indexes()(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = { - val fields = collection.model.fields.filter(_.indexed) - apply(Conversion.Materialized(fields)) - } - - def docAndIndexes()(implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, MaterializedAndDoc[Doc, Model]]] = { - apply(Conversion.DocAndIndexes()) - } - - def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): Task[AsyncSearchResults[Doc, Model, DistanceAndDoc[Doc]]] = - apply(Conversion.Distance(f(collection.model), from, sort, radius)) - } - - /** - * Processes through each result record from the query modifying the data in the database. - * - * @param establishLock whether to establish an id lock to avoid concurrent modification (defaults to true) - * @param deleteOnNone whether to delete the record if the function returns None (defaults to true) - * @param safeModify whether to use safe modification. This results in loading the same object twice, but should never - * risk concurrent modification occurring. (defaults to true) - * @param maxConcurrent the number of concurrent threads to process with (defaults to 1 for single-threaded) - * @param f the processing function for records - */ - def process(establishLock: Boolean = true, - deleteOnNone: Boolean = true, - safeModify: Boolean = true, - maxConcurrent: Int = 1) - (f: Doc => Task[Option[Doc]]) - (implicit transaction: Transaction[Doc]): Task[Int] = stream - .docs - .par(maxThreads = maxConcurrent) { doc => - if (safeModify) { - asyncCollection.modify(doc._id, establishLock, deleteOnNone) { - case Some(doc) => f(doc) - case None => Task.pure(None) - } - } else { - asyncCollection.withLock(doc._id, Task.pure(Some(doc)), establishLock) { current => - val io: Task[Option[Doc]] = current match { - case Some(doc) => f(doc) - case None => Task.pure(None) - } - io.flatTap { - case Some(modified) if !current.contains(modified) => asyncCollection.upsert(modified) - case None if deleteOnNone => asyncCollection.delete(doc._id) - case _ => Task.unit - } - } - } - } - .count - - def toList(implicit transaction: Transaction[Doc]): Task[List[Doc]] = stream.docs.toList - - def first(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = stream.docs.take(1).lastOption - - def one(implicit transaction: Transaction[Doc]): Task[Doc] = stream.docs.take(1).last - - def count(implicit transaction: Transaction[Doc]): Task[Int] = copy(limit = Some(1), countTotal = true) - .search.docs.map(_.total.get) - - def aggregate(f: Model => List[AggregateFunction[_, _, Doc]]): AsyncAggregateQuery[Doc, Model] = - AsyncAggregateQuery(toQuery, f(collection.model)) - - def grouped[F](f: Model => Field[Doc, F], - direction: SortDirection = SortDirection.Ascending) - (implicit transaction: Transaction[Doc]): rapid.Stream[(F, List[Doc])] = { - val field = f(collection.model) - val state = new IndexingState - val io = Task(sort(Sort.ByField(field, direction)) - .toQuery - .search - .docs - .iterator).map { iterator => - val grouped = GroupedIterator[Doc, F](iterator, doc => field.get(doc, field, state)) - rapid.Stream.fromIterator[(F, List[Doc])](Task(grouped)) - } - rapid.Stream.force(io) - } -} \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncSearchResults.scala b/async/src/main/scala/lightdb/async/AsyncSearchResults.scala deleted file mode 100644 index 79233951..00000000 --- a/async/src/main/scala/lightdb/async/AsyncSearchResults.scala +++ /dev/null @@ -1,28 +0,0 @@ -package lightdb.async - -import lightdb.doc.{Document, DocumentModel} -import lightdb.facet.FacetResult -import lightdb.field.Field.FacetField -import lightdb.transaction.Transaction -import rapid.Task - -case class AsyncSearchResults[Doc <: Document[Doc], Model <: DocumentModel[Doc], V](model: Model, - offset: Int, - limit: Option[Int], - total: Option[Int], - scoredStream: rapid.Stream[(V, Double)], - facetResults: Map[FacetField[Doc], FacetResult], - transaction: Transaction[Doc]) { - def stream: rapid.Stream[V] = scoredStream.map(_._1) - - def first: Task[Option[V]] = stream.take(1).lastOption - - def one: Task[V] = first.map { - case Some(v) => v - case None => throw new NullPointerException("No results for search") - } - - def facet(f: Model => FacetField[Doc]): FacetResult = facetResults(f(model)) - - def getFacet(f: Model => FacetField[Doc]): Option[FacetResult] = facetResults.get(f(model)) -} \ No newline at end of file diff --git a/async/src/main/scala/lightdb/async/AsyncStoredValue.scala b/async/src/main/scala/lightdb/async/AsyncStoredValue.scala deleted file mode 100644 index 0a230135..00000000 --- a/async/src/main/scala/lightdb/async/AsyncStoredValue.scala +++ /dev/null @@ -1,11 +0,0 @@ -package lightdb.async - -import lightdb.StoredValue -import rapid.Task - -case class AsyncStoredValue[T](underlying: StoredValue[T]) { - def get: Task[T] = Task(underlying.get()) - def exists: Task[Boolean] = Task(underlying.exists()) - def set(value: T): Task[T] = Task(underlying.set(value)) - def clear(): Task[Unit] = Task(underlying.clear()) -} diff --git a/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala b/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala deleted file mode 100644 index da9c1789..00000000 --- a/async/src/main/scala/lightdb/async/AsyncTransactionConvenience.scala +++ /dev/null @@ -1,69 +0,0 @@ -package lightdb.async - -import lightdb.doc.{Document, DocumentModel} -import lightdb._ -import lightdb.field.Field._ -import rapid.Task - -case class AsyncTransactionConvenience[Doc <: Document[Doc], Model <: DocumentModel[Doc]](collection: AsyncCollection[Doc, Model]) { - def insert(doc: Doc): Task[Doc] = collection.transaction { implicit transaction => - collection.insert(doc) - } - - def insert(docs: Seq[Doc]): Task[Seq[Doc]] = collection.transaction { implicit transaction => - collection.insert(docs) - } - - def upsert(doc: Doc): Task[Doc] = collection.transaction { implicit transaction => - collection.upsert(doc) - } - - def upsert(docs: Seq[Doc]): Task[Seq[Doc]] = collection.transaction { implicit transaction => - collection.upsert(docs) - } - - def get[V](f: Model => (UniqueIndex[Doc, V], V)): Task[Option[Doc]] = collection.transaction { implicit transaction => - collection.get(f) - } - - def apply[V](f: Model => (UniqueIndex[Doc, V], V)): Task[Doc] = collection.transaction { implicit transaction => - collection(f) - } - - def get(id: Id[Doc]): Task[Option[Doc]] = collection.transaction { implicit transaction => - collection.get(id) - } - - def apply(id: Id[Doc]): Task[Doc] = collection.transaction { implicit transaction => - collection(id) - } - - def list: Task[List[Doc]] = collection.transaction { implicit transaction => - collection.stream.toList - } - - def modify(id: Id[Doc], lock: Boolean = true, deleteOnNone: Boolean = false) - (f: Option[Doc] => Task[Option[Doc]]): Task[Option[Doc]] = collection.transaction { implicit transaction => - collection.modify(id, lock, deleteOnNone)(f) - } - - def getOrCreate(id: Id[Doc], create: => Task[Doc], lock: Boolean = true): Task[Doc] = collection.transaction { implicit transaction => - collection.getOrCreate(id, create, lock) - } - - def delete[V](f: Model => (UniqueIndex[Doc, V], V)): Task[Boolean] = collection.transaction { implicit transaction => - collection.delete(f) - } - - def delete(id: Id[Doc])(implicit ev: Model <:< DocumentModel[_]): Task[Boolean] = collection.transaction { implicit transaction => - collection.delete(id) - } - - def count: Task[Int] = collection.transaction { implicit transaction => - collection.count - } - - def truncate(): Task[Int] = collection.transaction { implicit transaction => - collection.truncate() - } -} diff --git a/core/src/main/scala/lightdb/Query.scala b/core/src/main/scala/lightdb/Query.scala index 9abae938..b44ec3c4 100644 --- a/core/src/main/scala/lightdb/Query.scala +++ b/core/src/main/scala/lightdb/Query.scala @@ -15,27 +15,29 @@ import lightdb.store.{Conversion, Store} import lightdb.transaction.Transaction import rapid.{Forge, Grouped, Task} -case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model, - store: Store[Doc, Model], - filter: Option[Filter[Doc]] = None, - sort: List[Sort] = Nil, - offset: Int = 0, - limit: Option[Int] = None, - countTotal: Boolean = false, - scoreDocs: Boolean = false, - minDocScore: Option[Double] = None, - facets: List[FacetQuery[Doc]] = Nil) { - query => - def scored: Query[Doc, Model] = copy(scoreDocs = true) - - def minDocScore(min: Double): Query[Doc, Model] = copy( +case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc], V](model: Model, + store: Store[Doc, Model], + conversion: Conversion[Doc, V], + filter: Option[Filter[Doc]] = None, + sort: List[Sort] = Nil, + offset: Int = 0, + limit: Option[Int] = None, + countTotal: Boolean = false, + scoreDocs: Boolean = false, + minDocScore: Option[Double] = None, + facets: List[FacetQuery[Doc]] = Nil) { query => + type Q = Query[Doc, Model, V] + + def scored: Q = copy(scoreDocs = true) + + def minDocScore(min: Double): Q = copy( scoreDocs = true, minDocScore = Some(min) ) - def clearFilters: Query[Doc, Model] = copy(filter = None) + def clearFilters: Q = copy(filter = None) - def filter(f: Model => Filter[Doc]): Query[Doc, Model] = { + def filter(f: Model => Filter[Doc]): Q = { val filter = f(model) val combined = this.filter match { case Some(current) => current && filter @@ -47,7 +49,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model def facet(f: Model => FacetField[Doc], path: List[String] = Nil, childrenLimit: Option[Int] = Some(10), - dimsLimit: Option[Int] = Some(10)): Query[Doc, Model] = { + dimsLimit: Option[Int] = Some(10)): Q = { val facetField = f(model) val facetQuery = FacetQuery(facetField, path, childrenLimit, dimsLimit) copy(facets = facetQuery :: facets) @@ -55,107 +57,96 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model def facets(f: Model => List[FacetField[Doc]], childrenLimit: Option[Int] = Some(10), - dimsLimit: Option[Int] = Some(10)): Query[Doc, Model] = { + dimsLimit: Option[Int] = Some(10)): Q = { val facetFields = f(model) val facetQueries = facetFields.map(ff => FacetQuery(ff, Nil, childrenLimit, dimsLimit)) copy(facets = facets ::: facetQueries) } - def clearSort: Query[Doc, Model] = copy(sort = Nil) + def clearSort: Q = copy(sort = Nil) - def sort(sort: Sort*): Query[Doc, Model] = copy(sort = this.sort ::: sort.toList) + def sort(sort: Sort*): Q = copy(sort = this.sort ::: sort.toList) - def offset(offset: Int): Query[Doc, Model] = copy(offset = offset) + def offset(offset: Int): Q = copy(offset = offset) - def limit(limit: Int): Query[Doc, Model] = copy(limit = Some(limit)) + def limit(limit: Int): Q = copy(limit = Some(limit)) - def clearLimit: Query[Doc, Model] = copy(limit = None) + def clearLimit: Q = copy(limit = None) - def countTotal(b: Boolean): Query[Doc, Model] = copy(countTotal = b) + def countTotal(b: Boolean): Q = copy(countTotal = b) - object search { - def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = { - val storeMode = store.storeMode - if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode.isAll)) { - val notIndexed = filter.toList.flatMap(_.fields(model)).filter(!_.indexed) - if (storeMode.isIndexes) { - if (notIndexed.nonEmpty) { - throw NonIndexedFieldException(query, notIndexed) - } - } else { - if (Query.WarnFilteringWithoutIndex && notIndexed.nonEmpty) { - scribe.warn(s"Inefficient query filtering on non-indexed field(s): ${notIndexed.map(_.name).mkString(", ")}") - } + def conversion[T](conversion: Conversion[Doc, T]): Query[Doc, Model, T] = { + var q: Query[Doc, Model, T] = copy[Doc, Model, T](conversion = conversion) + conversion match { + case Conversion.Distance(field, from, sort, radius) => + if (sort) { + q = q.clearSort.sort(Sort.ByDistance(field, from)) } - } - store.doSearch( - query = query, - conversion = conversion - ) - } - - def docs(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, Doc]] = apply(Conversion.Doc()) - - def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, F]] = - apply(Conversion.Value(f(model))) - - def id(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, Id[Doc]]] = - value(m => m._id) - - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, Json]] = - apply(Conversion.Json(f(model))) - - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, T]] = - apply(Conversion.Converted(f)) - - def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = { - val fields = f(model) - apply(Conversion.Materialized(fields)) - } - - def indexes()(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, MaterializedIndex[Doc, Model]]] = { - val fields = model.fields.filter(_.indexed) - apply(Conversion.Materialized(fields)) - } - - def docAndIndexes()(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, MaterializedAndDoc[Doc, Model]]] = { - apply(Conversion.DocAndIndexes()) + radius.foreach { r => + q = q.filter(_ => field.distance(from, r)) + } + case _ => // Ignore others } + q + } - def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, DistanceAndDoc[Doc]]] = { - val field = f(model) - var q = Query.this - if (sort) { - q = q.clearSort.sort(Sort.ByDistance(field, from)) - } - radius.foreach { r => - q = q.filter(_ => field.distance(from, r)) + def docs: Query[Doc, Model, Doc] = conversion(Conversion.Doc()) + def value[F](f: Model => Field[Doc, F]): Query[Doc, Model, F] = conversion(Conversion.Value(f(model))) + def id: Query[Doc, Model, Id[Doc]] = value(_._id) + def json(f: Model => List[Field[Doc, _]] = _ => model.fields): Query[Doc, Model, Json] = + conversion(Conversion.Json(f(model))) + def converted[T](f: Doc => T): Query[Doc, Model, T] = conversion(Conversion.Converted(f)) + def materialized(f: Model => List[Field[Doc, _]] = _ => model.indexedFields): Query[Doc, Model, MaterializedIndex[Doc, Model]] = + conversion(Conversion.Materialized(f(model))) + def indexes: Query[Doc, Model, MaterializedIndex[Doc, Model]] = materialized() + def docAndIndexes: Query[Doc, Model, MaterializedAndDoc[Doc, Model]] = conversion(Conversion.DocAndIndexes()) + def distance[G <: Geo](f: Model => Field[Doc, List[G]], + from: Geo.Point, + sort: Boolean = true, + radius: Option[Distance] = None): Query[Doc, Model, DistanceAndDoc[Doc]] = + conversion(Conversion.Distance( + field = f(model), + from = from, + sort = sort, + radius = radius + )) + + def search(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = { + val storeMode = store.storeMode + if (Query.Validation || (Query.WarnFilteringWithoutIndex && storeMode.isAll)) { + val notIndexed = filter.toList.flatMap(_.fields(model)).filter(!_.indexed) + if (storeMode.isIndexes) { + if (notIndexed.nonEmpty) { + throw NonIndexedFieldException(query, notIndexed) + } + } else { + if (Query.WarnFilteringWithoutIndex && notIndexed.nonEmpty) { + scribe.warn(s"Inefficient query filtering on non-indexed field(s): ${notIndexed.map(_.name).mkString(", ")}") + } } - q.distanceSearch(field, from, sort, radius) } + store.doSearch(this) } + def stream(implicit transaction: Transaction[Doc]): rapid.Stream[V] = rapid.Stream.force(search.map(_.stream)) + + def streamScored(implicit transaction: Transaction[Doc]): rapid.Stream[(V, Double)] = + rapid.Stream.force(search.map(_.streamWithScore)) + /** * Processes through each result record from the query modifying the data in the database. * * @param establishLock whether to establish an id lock to avoid concurrent modification (defaults to true) - * @param deleteOnNone whether to delete the record if the function returns None (defaults to true) - * @param safeModify whether to use safe modification. This results in loading the same object twice, but should never - * risk concurrent modification occurring. (defaults to true) - * @param f the processing function for records + * @param deleteOnNone whether to delete the record if the function returns None (defaults to true) + * @param safeModify whether to use safe modification. This results in loading the same object twice, but should never + * risk concurrent modification occurring. (defaults to true) + * @param f the processing function for records */ def process(establishLock: Boolean = true, deleteOnNone: Boolean = true, safeModify: Boolean = true) (f: Forge[Doc, Option[Doc]]) - (implicit transaction: Transaction[Doc]): Unit = rapid.Stream.force(search.docs.map(_.stream)) + (implicit transaction: Transaction[Doc]): Unit = docs.stream .evalMap { doc => if (safeModify) { store.modify(doc._id, establishLock, deleteOnNone) { @@ -173,110 +164,13 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model } .drain - object stream { - object scored { - def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): rapid.Stream[(V, Double)] = { - val task = search(conversion) - .map(_.streamWithScore) - rapid.Stream.force(task) - } - - def docs(implicit transaction: Transaction[Doc]): rapid.Stream[(Doc, Double)] = apply(Conversion.Doc()) - - def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): rapid.Stream[(F, Double)] = - apply(Conversion.Value(f(model))) - - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): rapid.Stream[(Id[Doc], Double)] = - value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): rapid.Stream[(Json, Double)] = - apply(Conversion.Json(f(model))) + def toList(implicit transaction: Transaction[Doc]): Task[List[V]] = stream.toList - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): rapid.Stream[(T, Double)] = - apply(Conversion.Converted(f)) + def first(implicit transaction: Transaction[Doc]): Task[V] = limit(1).stream.first - def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedIndex[Doc, Model], Double)] = - apply(Conversion.Materialized[Doc, Model](f(model))) + def firstOption(implicit transaction: Transaction[Doc]): Task[Option[V]] = limit(1).stream.firstOption - def indexes()(implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedIndex[Doc, Model], Double)] = { - val fields = model.fields.filter(_.indexed) - apply(Conversion.Materialized[Doc, Model](fields)) - } - - def docAndIndexes()(implicit transaction: Transaction[Doc]): rapid.Stream[(MaterializedAndDoc[Doc, Model], Double)] = { - apply(Conversion.DocAndIndexes[Doc, Model]()) - } - - def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): rapid.Stream[(DistanceAndDoc[Doc], Double)] = - apply(Conversion.Distance(f(model), from, sort, radius)) - } - - def apply[V](conversion: Conversion[Doc, V]) - (implicit transaction: Transaction[Doc]): rapid.Stream[V] = { - val task = search(conversion) - .map(_.stream) - rapid.Stream.force(task) - } - - def docs(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = apply(Conversion.Doc()) - - def value[F](f: Model => Field[Doc, F]) - (implicit transaction: Transaction[Doc]): rapid.Stream[F] = - apply(Conversion.Value(f(model))) - - def id(implicit transaction: Transaction[Doc], ev: Model <:< DocumentModel[_]): rapid.Stream[Id[Doc]] = - value(m => ev(m)._id.asInstanceOf[UniqueIndex[Doc, Id[Doc]]]) - - def json(f: Model => List[Field[Doc, _]])(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = - apply(Conversion.Json(f(model))) - - def converted[T](f: Doc => T)(implicit transaction: Transaction[Doc]): rapid.Stream[T] = - apply(Conversion.Converted(f)) - - def materialized(f: Model => List[Field[Doc, _]]) - (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedIndex[Doc, Model]] = - apply(Conversion.Materialized[Doc, Model](f(model))) - - def indexes()(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedIndex[Doc, Model]] = { - val fields = model.fields.filter(_.indexed) - apply(Conversion.Materialized[Doc, Model](fields)) - } - - def docAndIndexes()(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAndDoc[Doc, Model]] = { - apply(Conversion.DocAndIndexes[Doc, Model]()) - } - - def distance[G <: Geo](f: Model => Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean = true, - radius: Option[Distance] = None) - (implicit transaction: Transaction[Doc]): rapid.Stream[DistanceAndDoc[Doc]] = rapid.Stream - .force(search.distance(f, from, sort, radius).map(_.stream)) - } - - def toList(implicit transaction: Transaction[Doc]): Task[List[Doc]] = search.docs.flatMap(_.list) - - def first(implicit transaction: Transaction[Doc]): Task[Doc] = search.docs.flatMap(_.stream.first) - - def firstOption(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = search.docs.flatMap(_.stream.firstOption) - - def count(implicit transaction: Transaction[Doc]): Task[Int] = copy(limit = Some(1), countTotal = true) - .search.docs.map(_.total.get) - - protected def distanceSearch[G <: Geo](field: Field[Doc, List[G]], - from: Geo.Point, - sort: Boolean, - radius: Option[Distance]) - (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, DistanceAndDoc[Doc]]] = { - search(Conversion.Distance(field, from, sort, radius)) - } + def count(implicit transaction: Transaction[Doc]): Task[Int] = limit(1).countTotal(true).search.map(_.total.get) def aggregate(f: Model => List[AggregateFunction[_, _, Doc]]): AggregateQuery[Doc, Model] = AggregateQuery(this, f(model)) @@ -286,10 +180,7 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model (implicit transaction: Transaction[Doc]): rapid.Stream[Grouped[F, Doc]] = { val field = f(model) val state = new IndexingState - sort(Sort.ByField(field, direction)) - .stream - .docs - .groupSequential(doc => field.get(doc, field, state)) + sort(Sort.ByField(field, direction)).docs.stream.groupSequential(doc => field.get(doc, field, state)) } } diff --git a/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala b/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala index e03f8041..a33de691 100644 --- a/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala +++ b/core/src/main/scala/lightdb/aggregate/AggregateQuery.scala @@ -6,7 +6,7 @@ import lightdb.transaction.Transaction import lightdb.{Query, SortDirection} import rapid.Task -case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: Query[Doc, Model], +case class AggregateQuery[Doc <: Document[Doc], Model <: DocumentModel[Doc]](query: Query[Doc, Model, _], functions: List[AggregateFunction[_, _, Doc]], filter: Option[AggregateFilter[Doc]] = None, sort: List[(AggregateFunction[_, _, Doc], SortDirection)] = Nil) { diff --git a/core/src/main/scala/lightdb/collection/Collection.scala b/core/src/main/scala/lightdb/collection/Collection.scala index 21227952..d7b9e76b 100644 --- a/core/src/main/scala/lightdb/collection/Collection.scala +++ b/core/src/main/scala/lightdb/collection/Collection.scala @@ -8,7 +8,7 @@ import lightdb.doc.{Document, DocumentModel, JsonConversion} import lightdb.error.{DocNotFoundException, ModelMissingFieldsException} import lightdb.field.Field._ import lightdb.lock.LockManager -import lightdb.store.Store +import lightdb.store.{Conversion, Store} import lightdb.transaction.Transaction import lightdb.util.Initializable import rapid._ @@ -198,7 +198,7 @@ case class Collection[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: S def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = store.stream - lazy val query: Query[Doc, Model] = Query(model, store) + lazy val query: Query[Doc, Model, Doc] = Query(model, store, Conversion.Doc()) def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = { trigger.truncate().flatMap(_ => store.truncate()) diff --git a/core/src/main/scala/lightdb/doc/DocumentModel.scala b/core/src/main/scala/lightdb/doc/DocumentModel.scala index b8bdb272..eb9792fe 100644 --- a/core/src/main/scala/lightdb/doc/DocumentModel.scala +++ b/core/src/main/scala/lightdb/doc/DocumentModel.scala @@ -32,6 +32,8 @@ trait DocumentModel[Doc <: Document[Doc]] { def fields: List[Field[Doc, _]] = _fields + def indexedFields: List[Field[Doc, _]] = fields.filter(_.indexed) + def facetFields: List[FF] = fields.collect { case ff: FF => ff } diff --git a/core/src/main/scala/lightdb/error/NonIndexedFieldException.scala b/core/src/main/scala/lightdb/error/NonIndexedFieldException.scala index 7b373fd1..9312a012 100644 --- a/core/src/main/scala/lightdb/error/NonIndexedFieldException.scala +++ b/core/src/main/scala/lightdb/error/NonIndexedFieldException.scala @@ -3,4 +3,4 @@ package lightdb.error import lightdb.Query import lightdb.field.Field -case class NonIndexedFieldException(query: Query[_, _], fields: List[Field[_, _]]) extends RuntimeException(s"Attempting to execute a query with non-indexed fields in an indexed store mode. Not indexed fields: ${fields.map(_.name).mkString(", ")}") +case class NonIndexedFieldException(query: Query[_, _, _], fields: List[Field[_, _]]) extends RuntimeException(s"Attempting to execute a query with non-indexed fields in an indexed store mode. Not indexed fields: ${fields.map(_.name).mkString(", ")}") diff --git a/core/src/main/scala/lightdb/store/InMemoryIndexes.scala b/core/src/main/scala/lightdb/store/InMemoryIndexes.scala index 6ebf4c0b..b2a2d018 100644 --- a/core/src/main/scala/lightdb/store/InMemoryIndexes.scala +++ b/core/src/main/scala/lightdb/store/InMemoryIndexes.scala @@ -43,10 +43,8 @@ trait InMemoryIndexes[Doc <: Document[Doc], Model <: DocumentModel[Doc]] extends // TODO: Support } - override def doSearch[V](query: Query[Doc, Model], - conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = { - ??? } diff --git a/core/src/main/scala/lightdb/store/MapStore.scala b/core/src/main/scala/lightdb/store/MapStore.scala index 938f6fe0..af40956e 100644 --- a/core/src/main/scala/lightdb/store/MapStore.scala +++ b/core/src/main/scala/lightdb/store/MapStore.scala @@ -55,8 +55,7 @@ class MapStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String, override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = rapid.Stream.fromIterator(Task(map.valuesIterator)) - override def doSearch[V](query: Query[Doc, Model], - conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("MapStore does not support searching") override def aggregate(query: AggregateQuery[Doc, Model]) diff --git a/core/src/main/scala/lightdb/store/Store.scala b/core/src/main/scala/lightdb/store/Store.scala index 718e5c5c..2118868e 100644 --- a/core/src/main/scala/lightdb/store/Store.scala +++ b/core/src/main/scala/lightdb/store/Store.scala @@ -65,7 +65,7 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name def jsonStream(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = stream.map(_.json(model.rw)) - def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] def aggregate(query: AggregateQuery[Doc, Model]) diff --git a/core/src/main/scala/lightdb/store/split/SplitStore.scala b/core/src/main/scala/lightdb/store/split/SplitStore.scala index 16827a65..5b457d06 100644 --- a/core/src/main/scala/lightdb/store/split/SplitStore.scala +++ b/core/src/main/scala/lightdb/store/split/SplitStore.scala @@ -63,9 +63,9 @@ case class SplitStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](overrid override def jsonStream(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = storage.jsonStream - override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = - searching.doSearch[V](query, conversion) + searching.doSearch[V](query) override def aggregate(query: AggregateQuery[Doc, Model]) (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = diff --git a/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala b/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala index df3659e1..21d388d5 100644 --- a/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala +++ b/core/src/main/scala/lightdb/trigger/BasicCollectionTrigger.scala @@ -24,7 +24,7 @@ trait BasicCollectionTrigger[Doc <: Document[Doc], Model <: DocumentModel[Doc]] } override final def delete[V](index: Field.UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Unit] = { - collection.query.filter(_ => index === value).stream.docs.foreach(removing).drain + collection.query.filter(_ => index === value).docs.stream.foreach(removing).drain } override final def truncate(): Task[Unit] = collection.transaction { implicit transaction => diff --git a/core/src/main/scala/lightdb/util/Aggregator.scala b/core/src/main/scala/lightdb/util/Aggregator.scala index 3b2c21a0..2a414e19 100644 --- a/core/src/main/scala/lightdb/util/Aggregator.scala +++ b/core/src/main/scala/lightdb/util/Aggregator.scala @@ -18,7 +18,7 @@ object Aggregator { (implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] = { val fields = query.functions.map(_.field).distinct val groupFields = query.functions.filter(_.`type` == AggregateType.Group).map(_.field) - val stream = rapid.Stream.force(query.query.search.materialized(_ => fields).map(_.stream)) + val stream = query.query.materialized(_ => fields).stream var groups = Map.empty[List[Any], Map[String, Json]] stream .foreach { m => diff --git a/core/src/test/scala/spec/AbstractBasicSpec.scala b/core/src/test/scala/spec/AbstractBasicSpec.scala index 7067916c..0ebd709b 100644 --- a/core/src/test/scala/spec/AbstractBasicSpec.scala +++ b/core/src/test/scala/spec/AbstractBasicSpec.scala @@ -95,7 +95,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "stream the ids in the database" in { db.people.transaction { implicit transaction => - db.people.query.stream.id.toList.map(_.toSet).map { ids => + db.people.query.id.stream.toList.map(_.toSet).map { ids => ids should be(names.map(_._id).toSet) } } @@ -134,7 +134,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "search by age range" in { db.people.transaction { implicit transaction => - db.people.query.filter(_.age BETWEEN 19 -> 22).stream.value(_._id).toList.map { ids => + db.people.query.filter(_.age BETWEEN 19 -> 22).value(_._id).stream.toList.map { ids => ids.toSet should be(Set(adam._id, nancy._id, oscar._id, uba._id)) } } @@ -148,7 +148,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "sort by age" in { db.people.transaction { implicit transaction => - db.people.query.sort(Sort.ByField(Person.age).descending).stream.docs.toList.map { people => + db.people.query.sort(Sort.ByField(Person.age).descending).toList.map { people => people.map(_.name).take(3) should be(List("Ruth", "Zoey", "Quintin")) } } @@ -246,7 +246,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M .minShould(0) .should(_.search.words("nica 13", matchEndsWith = true), boost = Some(2.0)) .should(_.age <=> (10, 15)) - ).search.docs.flatMap { results => + ).docs.search.flatMap { results => for { people <- results.list scores <- results.scores @@ -302,7 +302,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "query with indexes" in { db.people.transaction { implicit transaction => - db.people.query.filter(_.name IN List("Allan", "Brenda", "Charlie")).search.indexes().flatMap(_.list).map { results => + db.people.query.filter(_.name IN List("Allan", "Brenda", "Charlie")).indexes.search.flatMap(_.list).map { results => results.map(_(_.name)).toSet should be(Set("Allan", "Brenda", "Charlie")) results.map(_(_.doc).name).toSet should be(Set("Allan", "Brenda", "Charlie")) } @@ -310,7 +310,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "query with doc and indexes" in { db.people.transaction { implicit transaction => - db.people.query.filter(_.name IN List("Allan", "Brenda", "Charlie")).stream.docAndIndexes().toList.map { results => + db.people.query.filter(_.name IN List("Allan", "Brenda", "Charlie")).docAndIndexes.stream.toList.map { results => results.map(_(_.name)).toSet should be(Set("Allan", "Brenda", "Charlie")) results.map(_.doc.name).toSet should be(Set("Allan", "Brenda", "Charlie")) } @@ -363,7 +363,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "materialize empty nicknames" in { db.people.transaction { implicit transaction => - db.people.query.filter(_.name === "Ian").stream.materialized(p => List(p.nicknames)).toList.map { people => + db.people.query.filter(_.name === "Ian").materialized(p => List(p.nicknames)).toList.map { people => people.map(m => m(_.nicknames)) should be(List(Set.empty)) } } @@ -382,9 +382,9 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M db.people.transaction { implicit transaction => val q = db.people.query.sort(Sort.ByField(Person.name)).limit(10) for { - l1 <- q.offset(0).search.docs.flatMap(_.list).map(_.map(_.name)) - l2 <- q.offset(10).search.docs.flatMap(_.list).map(_.map(_.name)) - l3 <- q.offset(20).search.docs.flatMap(_.list).map(_.map(_.name)) + l1 <- q.offset(0).docs.search.flatMap(_.list).map(_.map(_.name)) + l2 <- q.offset(10).docs.search.flatMap(_.list).map(_.map(_.name)) + l3 <- q.offset(20).docs.search.flatMap(_.list).map(_.map(_.name)) } yield { l1 should be(List("Allan", "Brenda", "Charlie", "Diana", "Evan", "Fiona", "Greg", "Hanna", "Ian", "Jenna")) l2 should be(List("Kevin", "Mike", "Nancy", "Not Ruth", "Oscar", "Penny", "Quintin", "Sam", "Tori", "Uba")) @@ -437,7 +437,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M } "verify id count matches total count" in { db.people.transaction { implicit transaction => - db.people.query.countTotal(true).search.id.flatMap { results => + db.people.query.countTotal(true).id.search.flatMap { results => results.list.map { list => results.total should be(Some(CreateRecords + 24)) list.length should be(CreateRecords + 24) @@ -452,8 +452,8 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M .sort(Sort.ByField(Person.age).descending) .limit(100) .countTotal(true) - .search .docs + .search .flatMap { results => results.list.map { list => list.length should be(100) @@ -470,8 +470,8 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M .limit(100) .offset(100) .countTotal(true) - .search .docs + .search .flatMap { results => results.list.map { list => list.length should be(100) diff --git a/core/src/test/scala/spec/AbstractFacetSpec.scala b/core/src/test/scala/spec/AbstractFacetSpec.scala index e486500d..0fee504f 100644 --- a/core/src/test/scala/spec/AbstractFacetSpec.scala +++ b/core/src/test/scala/spec/AbstractFacetSpec.scala @@ -45,8 +45,8 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M db.entries.transaction { implicit transaction => db.entries.query .facet(_.authorsFacet) - .search .docs + .search .map { results => val authorsResult = results.facet(_.authorsFacet) authorsResult.childCount should be(6) @@ -60,8 +60,8 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M db.entries.transaction { implicit transaction => db.entries.query .facet(_.publishDateFacet) - .search .docs + .search .map { results => val publishDateResult = results.facet(_.publishDateFacet) publishDateResult.values.map(_.value) should be(List("2010", "2012", "1999")) @@ -76,8 +76,8 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M db.entries.query .filter(_.keywords has "support@one.com") .facet(_.keywordsFacet) - .search .docs + .search .map { results => val keywordsResult = results.facet(_.keywordsFacet) keywordsResult.childCount should be(2) @@ -99,7 +99,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .facet(_.authorsFacet) .facet(_.publishDateFacet, path = List("2010")) .search - .docs .map { results => val authorResult = results.facet(_.authorsFacet) authorResult.childCount should be(3) @@ -121,7 +120,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .facet(_.publishDateFacet) .filter(_.builder.mustNot(_.publishDateFacet.drillDown("2010"))) .search - .docs .map { results => val authorResult = results.facet(_.authorsFacet) authorResult.childCount should be(5) @@ -143,7 +141,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .facet(_.publishDateFacet, path = List("2010", "10")) .filter(_.publishDateFacet.drillDown("2010", "10")) .search - .docs .map { results => val authorResult = results.facet(_.authorsFacet) authorResult.childCount should be(3) @@ -165,7 +162,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .facet(_.publishDateFacet, path = List("2010", "10", "20")) .filter(_.publishDateFacet.drillDown("2010", "10", "20")) .search - .docs .map { results => val authorResult = results.facet(_.authorsFacet) authorResult.childCount should be(1) @@ -186,7 +182,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .facet(_.publishDateFacet, path = List("1999")) .filter(_.publishDateFacet.drillDown("1999").onlyThisLevel) .search - .docs .map { results => val authorResult = results.facet(_.authorsFacet) authorResult.childCount should be(1) @@ -205,7 +200,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M db.entries.query .filter(_.keywordsFacet.drillDown("support@two.com")) .search - .docs .flatMap { results => results.list.map(_.map(_.name).toSet should be(Set("One", "Three"))) } @@ -219,7 +213,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .should(_.keywordsFacet.drillDown("support")) ) .stream - .docs .toList .map { list => list.map(_.name).toSet should be(Set("Four", "Cinco")) @@ -236,7 +229,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M db.entries.query .filter(_.keywordsFacet.drillDown("support@two.com")) .stream - .docs .toList .map { results => results.map(_.name).toSet should be(Set("Three")) @@ -250,7 +242,6 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M .facet(_.publishDateFacet) .filter(_.publishDateFacet.drillDown().onlyThisLevel) .search - .docs .map { results => val authorResult = results.facet(_.authorsFacet) authorResult.totalCount should be(1) @@ -270,7 +261,7 @@ abstract class AbstractFacetSpec extends AsyncWordSpec with AsyncTaskSpec with M } "query all documents verifying deletion of Four" in { db.entries.transaction { implicit transaction => - db.entries.query.facet(_.authorsFacet).search.docs.map { results => + db.entries.query.facet(_.authorsFacet).search.map { results => results.getFacet(_.publishDateFacet) should be(None) val authorResult = results.facet(_.authorsFacet) authorResult.values.map(_.value) should be(List("Bob", "Lisa", "James", "Frank", "George")) diff --git a/core/src/test/scala/spec/AbstractSpatialSpec.scala b/core/src/test/scala/spec/AbstractSpatialSpec.scala index 9d77f30a..c2f763b4 100644 --- a/core/src/test/scala/spec/AbstractSpatialSpec.scala +++ b/core/src/test/scala/spec/AbstractSpatialSpec.scala @@ -80,7 +80,7 @@ abstract class AbstractSpatialSpec extends AsyncWordSpec with AsyncTaskSpec with } "sort by distance from Oklahoma City" in { DB.people.transaction { implicit transaction => - DB.people.query.stream.distance( + DB.people.query.distance( _.point.list, from = oklahomaCity, radius = Some(1320.miles) @@ -98,7 +98,7 @@ abstract class AbstractSpatialSpec extends AsyncWordSpec with AsyncTaskSpec with } "sort by distance from Noble using geo" in { DB.people.transaction { implicit transaction => - DB.people.query.stream.distance( + DB.people.query.distance( _.geo, from = noble, radius = Some(10_000.miles) diff --git a/core/src/test/scala/spec/AbstractSpecialCasesSpec.scala b/core/src/test/scala/spec/AbstractSpecialCasesSpec.scala index 4058f05c..496f8f3a 100644 --- a/core/src/test/scala/spec/AbstractSpecialCasesSpec.scala +++ b/core/src/test/scala/spec/AbstractSpecialCasesSpec.scala @@ -46,7 +46,7 @@ trait AbstractSpecialCasesSpec extends AsyncWordSpec with AsyncTaskSpec with Mat } "verify the storage of data is correct" in { DB.specialOne.transaction { implicit transaction => - DB.specialOne.query.sort(Sort.ByField(SpecialOne.name).asc).stream.json(ref => List(ref._id)).toList.map { list => + DB.specialOne.query.sort(Sort.ByField(SpecialOne.name).asc).json(ref => List(ref._id)).toList.map { list => list should be(List(obj("_id" -> "first"), obj("_id" -> "second"))) } } diff --git a/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala b/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala index 8c97c3f9..e5721eea 100644 --- a/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala +++ b/halodb/src/main/scala/lightdb/halodb/HaloDBStore.scala @@ -81,7 +81,7 @@ class HaloDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin override def jsonStream(implicit transaction: Transaction[Doc]): rapid.Stream[Json] = rapid.Stream .fromIterator(Task(instance.newIterator().asScala.map(_.getValue).map(bytes2Json))) - override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = Task.error(new UnsupportedOperationException("HaloDBStore does not support searching")) diff --git a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala index dbafb5a2..3059a46d 100644 --- a/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala +++ b/lucene/src/main/scala/lightdb/lucene/LuceneStore.scala @@ -223,8 +223,8 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin override def get[V](field: UniqueIndex[Doc, V], value: V) (implicit transaction: Transaction[Doc]): Task[Option[Doc]] = { val filter = Filter.Equals(field, value) - val query = Query[Doc, Model](model, this, filter = Some(filter), limit = Some(1)) - doSearch[Doc](query, Conversion.Doc()).flatMap(_.list).map(_.headOption) + val query = Query[Doc, Model, Doc](model, this, Conversion.Doc(), filter = Some(filter), limit = Some(1)) + doSearch[Doc](query).flatMap(_.list).map(_.headOption) } override def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Boolean] = Task { @@ -237,9 +237,9 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin Task(state.indexSearcher.count(new MatchAllDocsQuery)) override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] = - rapid.Stream.force(doSearch[Doc](Query[Doc, Model](model, this), Conversion.Doc()).map(_.stream)) + rapid.Stream.force(doSearch[Doc](Query[Doc, Model, Doc](model, this, Conversion.Doc())).map(_.stream)) - override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = Task { val q: LuceneQuery = filter2Lucene(query.filter) val sortFields = query.sort match { @@ -366,7 +366,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin } } val stream = rapid.Stream.fromIterator[(V, Double)](Task { - conversion match { + query.conversion match { case Conversion.Value(field) => scoreDocs.iterator.map { scoreDoc => value(scoreDoc, field) -> scoreDoc.score.toDouble } diff --git a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala index 8584f8d7..321cd480 100644 --- a/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala +++ b/mapdb/src/main/scala/lightdb/mapdb/MapDBStore.scala @@ -63,7 +63,7 @@ class MapDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String .map(fromString) }) - override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("MapDBStore does not support searching") diff --git a/redis/src/main/scala/lightdb/redis/RedisStore.scala b/redis/src/main/scala/lightdb/redis/RedisStore.scala index 0bbd05b0..cfd733b3 100644 --- a/redis/src/main/scala/lightdb/redis/RedisStore.scala +++ b/redis/src/main/scala/lightdb/redis/RedisStore.scala @@ -63,7 +63,7 @@ class RedisStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: String getInstance.hgetAll(name).values().iterator().asScala.map(fromString) }) - override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("Redis does not support searching") diff --git a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala index 4b410785..c3e748a5 100644 --- a/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala +++ b/rocksdb/src/main/scala/lightdb/rocksdb/RocksDBStore.scala @@ -65,8 +65,7 @@ class RocksDBStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Stri .fromIterator(Task(iterator(db.newIterator()))) .map(bytes2Doc) - override def doSearch[V](query: Query[Doc, Model], - conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = throw new UnsupportedOperationException("RocksDBStore does not support searching") diff --git a/sql/src/main/scala/lightdb/sql/SQLStore.scala b/sql/src/main/scala/lightdb/sql/SQLStore.scala index 08fd8562..509945ba 100644 --- a/sql/src/main/scala/lightdb/sql/SQLStore.scala +++ b/sql/src/main/scala/lightdb/sql/SQLStore.scala @@ -342,10 +342,10 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: protected def fieldPart[V](field: Field[Doc, V]): SQLPart = SQLPart(field.name) - override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V]) + override def doSearch[V](query: Query[Doc, Model, V]) (implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = Task { var extraFields = List.empty[SQLPart] - val fields = conversion match { + val fields = query.conversion match { case Conversion.Value(field) => List(field) case Conversion.Doc() | Conversion.Converted(_) => this.fields case Conversion.Materialized(fields) => fields @@ -385,7 +385,7 @@ abstract class SQLStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: None } val stream = rapid.Stream.fromIterator[(V, Double)](Task { - val iterator = rs2Iterator(rs, conversion) + val iterator = rs2Iterator(rs, query.conversion) val ps = rs.getStatement.asInstanceOf[PreparedStatement] ActionIterator(iterator.map(v => v -> 0.0), onClose = () => state.returnPreparedStatement(b.sql, ps)) })