Skip to content

Commit

Permalink
HaloDB tests work
Browse files Browse the repository at this point in the history
  • Loading branch information
darkfrog26 committed Dec 29, 2024
1 parent 0eae763 commit 4cccfc0
Show file tree
Hide file tree
Showing 12 changed files with 405 additions and 364 deletions.
17 changes: 7 additions & 10 deletions core/src/main/scala/lightdb/Query.scala
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,7 @@ import lightdb.materialized.{MaterializedAndDoc, MaterializedIndex}
import lightdb.spatial.{DistanceAndDoc, Geo}
import lightdb.store.{Conversion, Store, StoreMode}
import lightdb.transaction.Transaction
import lightdb.util.GroupedIterator
import rapid.{Forge, Task}
import rapid.{Forge, Grouped, Task}

case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model,
store: Store[Doc, Model],
Expand Down Expand Up @@ -283,18 +282,16 @@ case class Query[Doc <: Document[Doc], Model <: DocumentModel[Doc]](model: Model
def aggregate(f: Model => List[AggregateFunction[_, _, Doc]]): AggregateQuery[Doc, Model] =
AggregateQuery(this, f(model))

// TODO: Support this via stream
/*def grouped[F](f: Model => Field[Doc, F],
def grouped[F](f: Model => Field[Doc, F],
direction: SortDirection = SortDirection.Ascending)
(implicit transaction: Transaction[Doc]): GroupedIterator[Doc, F] = {
(implicit transaction: Transaction[Doc]): rapid.Stream[Grouped[F, Doc]] = {
val field = f(model)
val state = new IndexingState
val iterator = sort(Sort.ByField(field, direction))
.search
sort(Sort.ByField(field, direction))
.stream
.docs
.iterator
GroupedIterator[Doc, F](iterator, doc => field.get(doc, field, state))
}*/
.groupSequential(doc => field.get(doc, field, state))
}
}

object Query {
Expand Down
21 changes: 12 additions & 9 deletions core/src/main/scala/lightdb/doc/MaterializedBatchModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package lightdb.doc

import lightdb.Id
import lightdb.transaction.Transaction
import rapid.Task

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
Expand All @@ -12,7 +13,7 @@ trait MaterializedBatchModel[Doc <: Document[Doc], MaterialDoc <: Document[Mater

private val map = new ConcurrentHashMap[Transaction[MaterialDoc], TransactionState]

private def changed(docState: DocState[MaterialDoc])(implicit transaction: Transaction[MaterialDoc]): Unit = {
private def changed(docState: DocState[MaterialDoc])(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = Task {
map.compute(transaction, (_, current) => {
val state = Option(current).getOrElse(new TransactionState)
state.changed(docState)
Expand All @@ -22,28 +23,30 @@ trait MaterializedBatchModel[Doc <: Document[Doc], MaterialDoc <: Document[Mater
if (state.size > maxBatchSize) {
val list = state.process()
process(list)
} else {
Task.unit
}
}
}.flatten

protected def process(list: List[List[DocState[MaterialDoc]]]): Unit
protected def process(list: List[List[DocState[MaterialDoc]]]): Task[Unit]

override protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit =
override protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] =
changed(DocState.Added(doc))

override protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)
(implicit transaction: Transaction[MaterialDoc]): Unit =
(implicit transaction: Transaction[MaterialDoc]): Task[Unit] =
changed(DocState.Modified(newDoc))

override protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit =
override protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] =
changed(DocState.Removed(doc))

override protected def transactionStart(transaction: Transaction[MaterialDoc]): Unit = {}
override protected def transactionStart(transaction: Transaction[MaterialDoc]): Task[Unit] = Task.unit

override protected def transactionEnd(transaction: Transaction[MaterialDoc]): Unit = {
override protected def transactionEnd(transaction: Transaction[MaterialDoc]): Task[Unit] = Task {
Option(map.get(transaction)).foreach { state =>
if (state.size > 0) {
val list = state.process()
process(list)
process(list).sync()
}
map.remove(transaction)
}
Expand Down
24 changes: 10 additions & 14 deletions core/src/main/scala/lightdb/doc/MaterializedModel.scala
Original file line number Diff line number Diff line change
Expand Up @@ -6,29 +6,25 @@ import lightdb.transaction.Transaction
import lightdb.trigger.BasicCollectionTrigger
import rapid.Task

import java.util.concurrent.ConcurrentHashMap
import java.util.concurrent.atomic.AtomicInteger
import scala.jdk.CollectionConverters._

trait MaterializedModel[Doc <: Document[Doc], MaterialDoc <: Document[MaterialDoc], MaterialModel <: DocumentModel[MaterialDoc]] extends DocumentModel[Doc] { mm =>
def materialCollection: Collection[MaterialDoc, MaterialModel]

protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit
protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit
protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Unit
protected def transactionStart(transaction: Transaction[MaterialDoc]): Unit
protected def transactionEnd(transaction: Transaction[MaterialDoc]): Unit
protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit]
protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit]
protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit]
protected def transactionStart(transaction: Transaction[MaterialDoc]): Task[Unit]
protected def transactionEnd(transaction: Transaction[MaterialDoc]): Task[Unit]

override def init[Model <: DocumentModel[Doc]](collection: Collection[Doc, Model]): Task[Unit] = {
super.init(collection).map { _ =>
materialCollection.trigger += new BasicCollectionTrigger[MaterialDoc, MaterialModel] {
override def collection: Collection[MaterialDoc, MaterialModel] = materialCollection

override protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = Task(mm.adding(doc))
override protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = Task(mm.modifying(oldDoc, newDoc))
override protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = Task(mm.removing(doc))
override def transactionStart(transaction: Transaction[MaterialDoc]): Task[Unit] = Task(mm.transactionStart(transaction))
override def transactionEnd(transaction: Transaction[MaterialDoc]): Task[Unit] = Task(mm.transactionEnd(transaction))
override protected def adding(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = mm.adding(doc)
override protected def modifying(oldDoc: MaterialDoc, newDoc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = mm.modifying(oldDoc, newDoc)
override protected def removing(doc: MaterialDoc)(implicit transaction: Transaction[MaterialDoc]): Task[Unit] = mm.removing(doc)
override def transactionStart(transaction: Transaction[MaterialDoc]): Task[Unit] = mm.transactionStart(transaction)
override def transactionEnd(transaction: Transaction[MaterialDoc]): Task[Unit] = mm.transactionEnd(transaction)
}
}
}
Expand Down
31 changes: 13 additions & 18 deletions core/src/main/scala/lightdb/transaction/Transaction.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,21 @@ package lightdb.transaction

import lightdb.doc.Document
import lightdb.feature.FeatureSupport
import rapid.Task

final class Transaction[Doc <: Document[Doc]] extends FeatureSupport[TransactionKey] { transaction =>
def commit(): Unit = {
features.foreach {
case f: TransactionFeature => f.commit()
case _ => // Ignore
}
}
def commit(): Task[Unit] = features.map {
case f: TransactionFeature => f.commit()
case _ => Task.unit // Ignore
}.tasks.unit

def rollback(): Unit = {
features.foreach {
case f: TransactionFeature => f.rollback()
case _ => // Ignore
}
}
def rollback(): Task[Unit] = features.map {
case f: TransactionFeature => f.rollback()
case _ => Task.unit // Ignore
}.tasks.unit

def close(): Unit = {
features.foreach {
case f: TransactionFeature => f.close()
case _ => // Ignore
}
}
def close(): Task[Unit] = features.map {
case f: TransactionFeature => f.close()
case _ => Task.unit // Ignore
}.tasks.unit
}
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package lightdb.transaction

import rapid.Task

trait TransactionFeature {
def commit(): Unit = {}
def commit(): Task[Unit] = Task.unit

def rollback(): Unit = {}
def rollback(): Task[Unit] = Task.unit

def close(): Unit = {}
def close(): Task[Unit] = Task.unit
}
32 changes: 0 additions & 32 deletions core/src/main/scala/lightdb/util/GroupedIterator.scala

This file was deleted.

20 changes: 6 additions & 14 deletions core/src/main/scala/lightdb/util/Initializable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -2,30 +2,22 @@ package lightdb.util

import rapid._

import java.util.concurrent.atomic.AtomicInteger

/**
* Provides simple initialization support to avoid initialization being invoked more
* than once. FlatMap on `init` to safely guarantee initialization was successful.
*/
trait Initializable {
private val status = new AtomicInteger(0)
@volatile private var initialized = false
private lazy val singleton = initialize().map { _ =>
initialized = true
}.singleton

def isInitialized: Boolean = status.get() == 2
def isInitialized: Boolean = initialized

/**
* Calls initialize() exactly one time. Safe to call multiple times.
*/
final def init(): Task[Boolean] = Task {
if (status.compareAndSet(0, 1)) {
initialize().map { _ =>
status.set(2)
true
}
} else {
Task.pure(false)
}
}.flatten
final def init(): Task[Unit] = singleton

/**
* Define initialization functionality here, but never call directly.
Expand Down
Loading

0 comments on commit 4cccfc0

Please sign in to comment.