Skip to content

Commit

Permalink
Lucene tests fail
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 30, 2024
1 parent 4cccfc0 commit 0312b83
Show file tree
Hide file tree
Showing 4 changed files with 67 additions and 63 deletions.
32 changes: 16 additions & 16 deletions core/src/main/scala/lightdb/store/Store.scala
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import lightdb.field.Field._
import lightdb.lock.LockManager
import lightdb.trigger.CollectionTriggers
import rapid.{Forge, Task}
import scribe.{rapid => logger}

import java.io.File
import java.util.concurrent.ConcurrentHashMap
Expand Down Expand Up @@ -105,22 +106,21 @@ abstract class Store[Doc <: Document[Doc], Model <: DocumentModel[Doc]](val name
f(transaction).guarantee(release(transaction))
}

def create(): Task[Transaction[Doc]] = Task {
if (Collection.LogTransactions) scribe.info(s"Creating new Transaction for $name")
val transaction = new Transaction[Doc]
prepareTransaction(transaction)
set.add(transaction)
trigger.transactionStart(transaction)
transaction
}

def release(transaction: Transaction[Doc]): Task[Unit] = Task {
if (Collection.LogTransactions) scribe.info(s"Releasing Transaction for $name")
trigger.transactionEnd(transaction)
releaseTransaction(transaction)
transaction.close()
set.remove(transaction)
}
def create(): Task[Transaction[Doc]] = for {
_ <- logger.info(s"Creating new Transaction for $name").when(Collection.LogTransactions)
transaction = new Transaction[Doc]
_ <- prepareTransaction(transaction)
_ = set.add(transaction)
_ <- trigger.transactionStart(transaction)
} yield transaction

def release(transaction: Transaction[Doc]): Task[Unit] = for {
_ <- logger.info(s"Releasing Transaction for $name").when(Collection.LogTransactions)
_ <- trigger.transactionEnd(transaction)
_ <- releaseTransaction(transaction)
_ <- transaction.close()
_ = set.remove(transaction)
} yield ()

def releaseAll(): Task[Int] = Task {
val list = set.iterator().asScala.toList
Expand Down
4 changes: 2 additions & 2 deletions core/src/test/scala/spec/AbstractBasicSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M
}
}
}
"do a database backup" in {
/*"do a database backup" in {
DatabaseBackup.archive(db, new File(s"backups/$specName.zip")).map(_ should be(49))
}
"insert a lot more names" in {
Expand Down Expand Up @@ -469,7 +469,7 @@ abstract class AbstractBasicSpec extends AsyncWordSpec with AsyncTaskSpec with M
}
}
}
}
}*/
"truncate the collection" in {
db.people.transaction { implicit transaction =>
db.people.truncate().map(_ should be(CreateRecords + 24))
Expand Down
7 changes: 4 additions & 3 deletions lucene/src/main/scala/lightdb/lucene/LuceneState.scala
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import lightdb.lucene.index.Index
import lightdb.transaction.{Transaction, TransactionFeature}
import org.apache.lucene.facet.taxonomy.TaxonomyReader
import org.apache.lucene.search.IndexSearcher
import rapid.Task

case class LuceneState[Doc <: Document[Doc]](index: Index, hasFacets: Boolean) extends TransactionFeature {
private var oldIndexSearchers = List.empty[IndexSearcher]
Expand Down Expand Up @@ -35,17 +36,17 @@ case class LuceneState[Doc <: Document[Doc]](index: Index, hasFacets: Boolean) e

def taxonomyReader: TaxonomyReader = _taxonomyReader

override def commit(): Unit = {
override def commit(): Task[Unit] = Task {
index.commit()
releaseIndexSearcher()
}

override def rollback(): Unit = {
override def rollback(): Task[Unit] = Task {
index.rollback()
releaseIndexSearcher()
}

override def close(): Unit = {
override def close(): Task[Unit] = Task {
commit()
oldIndexSearchers.foreach(index.releaseIndexSearch)
oldTaxonomyReaders.foreach(index.releaseTaxonomyReader)
Expand Down
87 changes: 45 additions & 42 deletions lucene/src/main/scala/lightdb/lucene/LuceneStore.scala
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import org.apache.lucene.queryparser.classic.QueryParser
import org.apache.lucene.util.{BytesRef, Version}
import org.apache.lucene.facet.{DrillDownQuery, FacetsCollector, FacetsCollectorManager, FacetsConfig, FacetField => LuceneFacetField}
import org.apache.lucene.store.FSDirectory
import rapid.Task

import java.nio.file.{Files, Path}
import scala.language.implicitConversions
Expand Down Expand Up @@ -73,18 +74,18 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
}
}

override def prepareTransaction(transaction: Transaction[Doc]): Unit = transaction.put(
key = StateKey[Doc],
value = LuceneState[Doc](index, hasFacets)
)
override def prepareTransaction(transaction: Transaction[Doc]): Task[Unit] = Task {
transaction.put(
key = StateKey[Doc],
value = LuceneState[Doc](index, hasFacets)
)
}

override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
override def insert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] =
addDoc(doc, upsert = false)
}

override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Unit = {
override def upsert(doc: Doc)(implicit transaction: Transaction[Doc]): Task[Doc] =
addDoc(doc, upsert = true)
}

private def createGeoFields(field: Field[Doc, _],
json: Json,
Expand Down Expand Up @@ -201,45 +202,48 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
}
}

private def addDoc(doc: Doc, upsert: Boolean): Unit = if (fields.tail.nonEmpty) {
val id = this.id(doc)
val state = new IndexingState
val luceneFields = fields.flatMap { field =>
createLuceneFields(field, doc, state)
}
val document = new LuceneDocument
luceneFields.foreach(document.add)
private def addDoc(doc: Doc, upsert: Boolean): Task[Doc] = Task {
if (fields.tail.nonEmpty) {
val id = this.id(doc)
val state = new IndexingState
val luceneFields = fields.flatMap { field =>
createLuceneFields(field, doc, state)
}
val document = new LuceneDocument
luceneFields.foreach(document.add)

if (upsert) {
index.indexWriter.updateDocument(new Term("_id", id.value), facetsPrepareDoc(document))
} else {
index.indexWriter.addDocument(facetsPrepareDoc(document))
if (upsert) {
index.indexWriter.updateDocument(new Term("_id", id.value), facetsPrepareDoc(document))
} else {
index.indexWriter.addDocument(facetsPrepareDoc(document))
}
}
doc
}

override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Boolean = get(idField, id).nonEmpty
override def exists(id: Id[Doc])(implicit transaction: Transaction[Doc]): Task[Boolean] = get(idField, id).map(_.nonEmpty)

override def get[V](field: UniqueIndex[Doc, V], value: V)
(implicit transaction: Transaction[Doc]): Option[Doc] = {
(implicit transaction: Transaction[Doc]): Task[Option[Doc]] = {
val filter = Filter.Equals(field, value)
val query = Query[Doc, Model](model, this, filter = Some(filter), limit = Some(1))
doSearch[Doc](query, Conversion.Doc()).list.headOption
doSearch[Doc](query, Conversion.Doc()).flatMap(_.list).map(_.headOption)
}

override def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Boolean = {
override def delete[V](field: UniqueIndex[Doc, V], value: V)(implicit transaction: Transaction[Doc]): Task[Boolean] = Task {
val query = filter2Lucene(Some(field === value))
index.indexWriter.deleteDocuments(query)
true
}

override def count(implicit transaction: Transaction[Doc]): Int =
state.indexSearcher.count(new MatchAllDocsQuery)
override def count(implicit transaction: Transaction[Doc]): Task[Int] =
Task(state.indexSearcher.count(new MatchAllDocsQuery))

override def iterator(implicit transaction: Transaction[Doc]): Iterator[Doc] =
doSearch[Doc](Query[Doc, Model](model, this), Conversion.Doc()).iterator
override def stream(implicit transaction: Transaction[Doc]): rapid.Stream[Doc] =
rapid.Stream.force(doSearch[Doc](Query[Doc, Model](model, this), Conversion.Doc()).map(_.stream))

override def doSearch[V](query: Query[Doc, Model], conversion: Conversion[Doc, V])
(implicit transaction: Transaction[Doc]): SearchResults[Doc, Model, V] = {
(implicit transaction: Transaction[Doc]): Task[SearchResults[Doc, Model, V]] = Task {
val q: LuceneQuery = filter2Lucene(query.filter)
val sortFields = query.sort match {
case Nil => List(SortField.FIELD_SCORE)
Expand Down Expand Up @@ -352,7 +356,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
val docId = scoreDoc.doc
val id = Id[Doc](storedFields.document(docId).get("_id"))
val score = scoreDoc.score.toDouble
storage(id) -> score
storage(id).sync() -> score
}
def docIterator(): Iterator[(Doc, Double)] = scoreDocs.iterator.map(loadScoreDoc)
def jsonIterator(fields: List[Field[Doc, _]]): Iterator[(ScoreDoc, Json, Double)] = {
Expand All @@ -364,7 +368,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
(scoreDoc, json, score)
}
}
val iterator: Iterator[(V, Double)] = conversion match {
def iterator: Iterator[(V, Double)] = conversion match {
case Conversion.Value(field) => scoreDocs.iterator.map { scoreDoc =>
value(scoreDoc, field) -> scoreDoc.score.toDouble
}
Expand All @@ -382,7 +386,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
case Conversion.Distance(field, from, sort, radius) => idsAndScores.iterator.map {
case (id, score) =>
val state = new IndexingState
val doc = apply(id)(transaction)
val doc = apply(id)(transaction).sync()
val distance = field.get(doc, field, state).map(d => Spatial.distance(from, d))
DistanceAndDoc(doc, distance) -> score
}
Expand All @@ -392,7 +396,7 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
offset = query.offset,
limit = query.limit,
total = Some(total),
iteratorWithScore = iterator,
streamWithScore = rapid.Stream.fromIterator(Task(iterator)),
facetResults = facetResults,
transaction = transaction
)
Expand Down Expand Up @@ -529,19 +533,18 @@ class LuceneStore[Doc <: Document[Doc], Model <: DocumentModel[Doc]](name: Strin
}

override def aggregate(query: AggregateQuery[Doc, Model])
(implicit transaction: Transaction[Doc]): Iterator[MaterializedAggregate[Doc, Model]] =
(implicit transaction: Transaction[Doc]): rapid.Stream[MaterializedAggregate[Doc, Model]] =
Aggregator(query, model)

override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Int =
aggregate(query).length
override def aggregateCount(query: AggregateQuery[Doc, Model])(implicit transaction: Transaction[Doc]): Task[Int] =
aggregate(query).count

override def truncate()(implicit transaction: Transaction[Doc]): Int = {
val count = this.count
index.indexWriter.deleteAll()
count
}
override def truncate()(implicit transaction: Transaction[Doc]): Task[Int] = for {
count <- this.count
_ <- Task(index.indexWriter.deleteAll())
} yield count

override def dispose(): Unit = Try {
override def dispose(): Task[Unit] = Task {
index.dispose()
}
}
Expand Down

0 comments on commit 0312b83

Please sign in to comment.