Skip to content

Commit

Permalink
Lots of improvements
Browse files Browse the repository at this point in the history
- Added DocumentCollection.ref(name)
- Added optional Id key validation
- Lots of improvements to OperationsQueue support
- Addition of ProcessQueue for more powerful batch processing
- Improvements to Searchable support
  • Loading branch information
darkfrog26 committed Oct 1, 2023
1 parent 41a9a95 commit dc56373
Show file tree
Hide file tree
Showing 10 changed files with 112 additions and 98 deletions.
2 changes: 1 addition & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ val scala3 = "3.3.1"

name := "scarango"
ThisBuild / organization := "com.outr"
ThisBuild / version := "3.15.1-SNAPSHOT"
ThisBuild / version := "3.16.0-SNAPSHOT"
ThisBuild / scalaVersion := scala213
ThisBuild / crossScalaVersions := List(scala3, scala213)
ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation")
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/com/outr/arango/Id.scala
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ case class Id[D](value: String,

object Id {
private val ExtractorRegex = """(.+)/(.+)""".r
private lazy val ValidSpecialChars = "_-.@()+,=;$!*'%".toSet

implicit def rw[D]: RW[Id[D]] = RW.from(_._id, v => parse[D](v.asStr.value), DefType.Str)
implicit def toJson[D](id: Id[D]): Json = rw[D].read(id)
Expand All @@ -46,6 +47,10 @@ object Id {
case ExtractorRegex(collection, value) => Id[D](value, collection)
}

def isValid(key: String): Boolean = key.forall(c =>
c.isLetterOrDigit || ValidSpecialChars.contains(c)
)

def extract[D](json: fabric.Json): Id[D] = update(json)("_id").as[Id[D]]

def update(json: fabric.Json): fabric.Json = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ class DocumentCollection[D <: Document[D], M <: DocumentModel[D]](protected[aran
override protected def afterRetrieval(value: Json): Json = model.allMutations.foldLeft(value)((v, m) => m.retrieve(v))

def ref: DocumentRef[D, M] = DocumentRef[D, M](model, None)
def ref(name: String): DocumentRef[D, M] = DocumentRef(model, Some(name))

lazy val update: UpdateBuilder[D, M] = UpdateBuilder(this)
lazy val upsert: UpsertBuilder[D, M] = UpsertBuilder(this)
Expand Down
100 changes: 18 additions & 82 deletions driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,8 @@ import cats.effect.IO
import cats.implicits.toTraverseOps
import com.outr.arango.collection.DocumentCollection
import com.outr.arango.query._
import com.outr.arango.{Document, DocumentModel}
import com.outr.arango.upsert.Searchable
import com.outr.arango.{Document, DocumentModel, DocumentRef}
import fabric.rw.RW

import java.util.concurrent.ConcurrentLinkedQueue
Expand All @@ -13,79 +14,28 @@ import java.util.concurrent.atomic.AtomicInteger
case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: DocumentCollection[D, M],
flushSize: Int,
chunkSize: Int) { oq =>
private var queues = List.empty[OpQueue]
private var queues = List.empty[ProcessQueue[D]]

/**
* Provide queue operations on a collection. Call `flush()` at the end to make sure all batched data is pushed.
*/
object op {
lazy val insert: OpQueue = OpQueue(stream => collection.stream.insert(stream, chunkSize).void)
lazy val upsert: OpQueue = OpQueue(stream => collection.stream.upsert(stream, chunkSize).void)
lazy val delete: OpQueue = OpQueue(stream => collection.stream.delete(stream.map(_._id), chunkSize).void)
def createUpsertReplace(searchFields: String*): OpQueue = OpQueue { stream =>
val searchQuery = QueryPart.Static(searchFields.map { field =>
s"$field: doc.$field"
}.mkString("{", ", ", "}"))
implicit def rw: RW[D] = collection.model.rw
stream.compile.toList.flatMap { list =>
val query =
aql"""
FOR doc IN $list
UPSERT $searchQuery
INSERT doc
REPLACE doc
IN $collection
"""
collection.graph.execute(query)
def create(process: List[D] => IO[Unit]): ProcessQueue[D] = {
val q = ProcessQueue[D](
process = process,
flushSize = flushSize,
chunkSize = chunkSize
)
oq.synchronized {
queues = q :: queues
}
q
}

/**
* Flushes the queue
*
* @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count
* is below the flushSize threshold.
*/
def flush(fullFlush: Boolean = true): IO[Unit] = queues.map(_.flush(fullFlush)).sequence.void
}

case class OpQueue(process: fs2.Stream[IO, D] => IO[Unit]) {
oq.synchronized {
queues = this :: queues
}

private lazy val queue = new ConcurrentLinkedQueue[D]
private lazy val counter = new AtomicInteger(0)

private lazy val _processed = new AtomicInteger(0)

def processed: Int = _processed.get()

private def take(n: Int): List[D] = if (n == 0) {
Nil
} else {
val d = queue.poll()
if (d == null) {
Nil
} else {
counter.decrementAndGet()
d :: take(n - 1)
}
}

/**
* Queue operations for the supplied docs. If this causes the flushSize to overflow, a flush will occur before this
* returns. Otherwise, this is a very fast operation.
*/
def apply(docs: D*): IO[Unit] = IO {
docs.foreach(queue.add)
counter.addAndGet(docs.length)
}.flatMap { size =>
if (size >= flushSize) {
flush(fullFlush = false)
} else {
IO.unit
}
lazy val insert: ProcessQueue[D] = create(list => collection.batch.insert(list).void)
lazy val upsert: ProcessQueue[D] = create(list => collection.batch.upsert(list).void)
lazy val delete: ProcessQueue[D] = create(list => collection.batch.delete(list.map(_._id)).void)
def createUpsertReplace(f: DocumentRef[D, M] => List[Searchable]): ProcessQueue[D] = create { list =>
collection.upsert.withListSearch(list)(f).execute()
}

/**
Expand All @@ -94,20 +44,6 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection:
* @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count
* is below the flushSize threshold.
*/
def flush(fullFlush: Boolean = true): IO[Unit] = IO(take(chunkSize)).flatMap { list =>
if (list.isEmpty) {
IO.unit
} else {
val stream = fs2.Stream(list: _*).covary[IO]
process(stream).flatMap { _ =>
_processed.addAndGet(list.length)
if (counter.get() >= flushSize || fullFlush) {
flush()
} else {
IO.unit
}
}
}
}
def flush(fullFlush: Boolean = true): IO[Unit] = queues.map(_.flush(fullFlush)).sequence.void
}
}
70 changes: 70 additions & 0 deletions driver/src/main/scala/com/outr/arango/queue/ProcessQueue.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
package com.outr.arango.queue

import cats.effect.IO

import java.util.concurrent.ConcurrentLinkedQueue
import java.util.concurrent.atomic.AtomicInteger

/**
* ProcessQueue provides a convenience capability to batch process in chunks.
*
* @param process the function to process a chunk of the queue
* @param flushSize the number of records before a flush occurs
* @param chunkSize the max number of records per chunk sent to the process function
*/
case class ProcessQueue[T](process: List[T] => IO[Unit], flushSize: Int, chunkSize: Int) {
private lazy val queue = new ConcurrentLinkedQueue[T]
private lazy val counter = new AtomicInteger(0)

private lazy val _processed = new AtomicInteger(0)

def processed: Int = _processed.get()

private def take(n: Int): List[T] = if (n == 0) {
Nil
} else {
val d = queue.poll()
if (d == null) {
Nil
} else {
counter.decrementAndGet()
d :: take(n - 1)
}
}

/**
* Queue operations for the supplied docs. If this causes the flushSize to overflow, a flush will occur before this
* returns. Otherwise, this is a very fast operation.
*/
def apply(docs: T*): IO[Unit] = IO {
docs.foreach(queue.add)
counter.addAndGet(docs.length)
}.flatMap { size =>
if (size >= flushSize) {
flush(fullFlush = false)
} else {
IO.unit
}
}

/**
* Flushes the queue
*
* @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count
* is below the flushSize threshold.
*/
def flush(fullFlush: Boolean = true): IO[Unit] = IO(take(chunkSize)).flatMap { list =>
if (list.isEmpty) {
IO.unit
} else {
process(list).flatMap { _ =>
_processed.addAndGet(list.length)
if (counter.get() >= flushSize || fullFlush) {
flush()
} else {
IO.unit
}
}
}
}
}
12 changes: 7 additions & 5 deletions driver/src/main/scala/com/outr/arango/upsert/Searchable.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,14 @@ package com.outr.arango.upsert
import com.outr.arango.Field
import com.outr.arango.query.{Query, QueryPart}

sealed trait Searchable {
def toSearch: QueryPart
}

object Searchable {
case class Filter[F](field1: Field[F], condition: String, field2: Field[F]) extends Searchable {
def apply[F](field1: Field[F], field2: Field[F]): Searchable = Filter[F](field1, field2)

case class Filter[F](field1: Field[F], field2: Field[F]) extends Searchable {
override val toSearch: QueryPart = Query.merge(
List(
Query(field1.fieldName),
Expand All @@ -14,8 +20,4 @@ object Searchable {
separator = " "
)
}
}

sealed trait Searchable {
def toSearch: QueryPart
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,7 @@ case class UpsertBuilder[D <: Document[D], M <: DocumentModel[D]](collection: Do
(f: DocumentRef[T, TM] => List[Searchable]): UpsertBuilder[D, M] = {
copy(
list = Some(() => {
val ref = collection.ref
addRef(ref)
val ref = collection.ref("doc")
val entries = f(ref).map(_.toSearch)
(ref, list.map(_.json(collection.model.rw)), entries)
})
Expand Down Expand Up @@ -106,6 +105,7 @@ case class UpsertBuilder[D <: Document[D], M <: DocumentModel[D]](collection: Do
val updateReplaceQuery = upsert.map {
case Upsert.Update(value) => aql"""UPDATE ${QueryPart.Static(value)} IN $collection"""
case Upsert.Replace(replacement) => aql"""REPLACE $replacement IN $collection"""
case _ => throw new RuntimeException("Should not be possible, but Scala 3 says it is...")
}.getOrElse {
val ref = listValue.get._1
aql"""REPLACE ${QueryPart.Ref(ref)} IN $collection"""
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
package com.outr.arango.upsert

import fabric.rw.RW
import fabric.rw._

case class UpsertResult[D](original: Option[D], newValue: D)

Expand Down
4 changes: 1 addition & 3 deletions driver/src/main/scala/com/outr/arango/util/Helpers.scala
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,9 @@ import com.arangodb.{ArangoDBException, entity, model}
import com.outr.arango._
import com.outr.arango.core._
import com.outr.arango.query.QueryOptions
import fabric.Json
import fabric._

import java.util.concurrent.{CompletableFuture, CompletionException}
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.language.implicitConversions

object Helpers {
Expand Down
10 changes: 6 additions & 4 deletions driver/src/test/scala/spec/AdvancedSpec.scala
Original file line number Diff line number Diff line change
Expand Up @@ -259,11 +259,13 @@ class AdvancedSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with Ope
Person("Nine", 9),
Person("Ten", 10),
)
database.people.op
.upsert(people: _*)
val upsert = database.people.op.createUpsertReplace(p => List(
Searchable(Person.name, p.name)
))
upsert(people: _*)
.flatMap(_ => flushQueue())
.map { _ =>
database.people.op.upsert.processed should be(10)
upsert.processed should be(10)
}
}
"verify the DBQueue properly inserted the records" in {
Expand Down Expand Up @@ -340,7 +342,7 @@ class AdvancedSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with Ope
"upsert multiple bios" in {
database.people.upsert
.withListSearch(List(Person("Bethany", 30), Person("Adam", 30), Person("Tom", 30))) { p =>
List(Searchable.Filter(Person.name, "==", p.name))
List(Searchable.Filter(Person.name, p.name))
}
.withNoUpdate
.toList
Expand Down

0 comments on commit dc56373

Please sign in to comment.