diff --git a/build.sbt b/build.sbt index 91268bc8..1efc1986 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ val scala3 = "3.3.1" name := "scarango" ThisBuild / organization := "com.outr" -ThisBuild / version := "3.15.1-SNAPSHOT" +ThisBuild / version := "3.16.0-SNAPSHOT" ThisBuild / scalaVersion := scala213 ThisBuild / crossScalaVersions := List(scala3, scala213) ThisBuild / scalacOptions ++= Seq("-unchecked", "-deprecation") diff --git a/core/src/main/scala/com/outr/arango/Id.scala b/core/src/main/scala/com/outr/arango/Id.scala index 3cf006ce..40bf2a5c 100644 --- a/core/src/main/scala/com/outr/arango/Id.scala +++ b/core/src/main/scala/com/outr/arango/Id.scala @@ -38,6 +38,7 @@ case class Id[D](value: String, object Id { private val ExtractorRegex = """(.+)/(.+)""".r + private lazy val ValidSpecialChars = "_-.@()+,=;$!*'%".toSet implicit def rw[D]: RW[Id[D]] = RW.from(_._id, v => parse[D](v.asStr.value), DefType.Str) implicit def toJson[D](id: Id[D]): Json = rw[D].read(id) @@ -46,6 +47,10 @@ object Id { case ExtractorRegex(collection, value) => Id[D](value, collection) } + def isValid(key: String): Boolean = key.forall(c => + c.isLetterOrDigit || ValidSpecialChars.contains(c) + ) + def extract[D](json: fabric.Json): Id[D] = update(json)("_id").as[Id[D]] def update(json: fabric.Json): fabric.Json = { diff --git a/driver/src/main/scala/com/outr/arango/collection/DocumentCollection.scala b/driver/src/main/scala/com/outr/arango/collection/DocumentCollection.scala index 2cad0019..27b6864b 100644 --- a/driver/src/main/scala/com/outr/arango/collection/DocumentCollection.scala +++ b/driver/src/main/scala/com/outr/arango/collection/DocumentCollection.scala @@ -21,6 +21,7 @@ class DocumentCollection[D <: Document[D], M <: DocumentModel[D]](protected[aran override protected def afterRetrieval(value: Json): Json = model.allMutations.foldLeft(value)((v, m) => m.retrieve(v)) def ref: DocumentRef[D, M] = DocumentRef[D, M](model, None) + def ref(name: String): DocumentRef[D, M] = DocumentRef(model, Some(name)) lazy val update: UpdateBuilder[D, M] = UpdateBuilder(this) lazy val upsert: UpsertBuilder[D, M] = UpsertBuilder(this) diff --git a/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala b/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala index 42f2b21d..2e801dc0 100644 --- a/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala +++ b/driver/src/main/scala/com/outr/arango/queue/OperationsQueue.scala @@ -4,7 +4,8 @@ import cats.effect.IO import cats.implicits.toTraverseOps import com.outr.arango.collection.DocumentCollection import com.outr.arango.query._ -import com.outr.arango.{Document, DocumentModel} +import com.outr.arango.upsert.Searchable +import com.outr.arango.{Document, DocumentModel, DocumentRef} import fabric.rw.RW import java.util.concurrent.ConcurrentLinkedQueue @@ -13,79 +14,28 @@ import java.util.concurrent.atomic.AtomicInteger case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: DocumentCollection[D, M], flushSize: Int, chunkSize: Int) { oq => - private var queues = List.empty[OpQueue] + private var queues = List.empty[ProcessQueue[D]] /** * Provide queue operations on a collection. Call `flush()` at the end to make sure all batched data is pushed. */ object op { - lazy val insert: OpQueue = OpQueue(stream => collection.stream.insert(stream, chunkSize).void) - lazy val upsert: OpQueue = OpQueue(stream => collection.stream.upsert(stream, chunkSize).void) - lazy val delete: OpQueue = OpQueue(stream => collection.stream.delete(stream.map(_._id), chunkSize).void) - def createUpsertReplace(searchFields: String*): OpQueue = OpQueue { stream => - val searchQuery = QueryPart.Static(searchFields.map { field => - s"$field: doc.$field" - }.mkString("{", ", ", "}")) - implicit def rw: RW[D] = collection.model.rw - stream.compile.toList.flatMap { list => - val query = - aql""" - FOR doc IN $list - UPSERT $searchQuery - INSERT doc - REPLACE doc - IN $collection - """ - collection.graph.execute(query) + def create(process: List[D] => IO[Unit]): ProcessQueue[D] = { + val q = ProcessQueue[D]( + process = process, + flushSize = flushSize, + chunkSize = chunkSize + ) + oq.synchronized { + queues = q :: queues } + q } - - /** - * Flushes the queue - * - * @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count - * is below the flushSize threshold. - */ - def flush(fullFlush: Boolean = true): IO[Unit] = queues.map(_.flush(fullFlush)).sequence.void - } - - case class OpQueue(process: fs2.Stream[IO, D] => IO[Unit]) { - oq.synchronized { - queues = this :: queues - } - - private lazy val queue = new ConcurrentLinkedQueue[D] - private lazy val counter = new AtomicInteger(0) - - private lazy val _processed = new AtomicInteger(0) - - def processed: Int = _processed.get() - - private def take(n: Int): List[D] = if (n == 0) { - Nil - } else { - val d = queue.poll() - if (d == null) { - Nil - } else { - counter.decrementAndGet() - d :: take(n - 1) - } - } - - /** - * Queue operations for the supplied docs. If this causes the flushSize to overflow, a flush will occur before this - * returns. Otherwise, this is a very fast operation. - */ - def apply(docs: D*): IO[Unit] = IO { - docs.foreach(queue.add) - counter.addAndGet(docs.length) - }.flatMap { size => - if (size >= flushSize) { - flush(fullFlush = false) - } else { - IO.unit - } + lazy val insert: ProcessQueue[D] = create(list => collection.batch.insert(list).void) + lazy val upsert: ProcessQueue[D] = create(list => collection.batch.upsert(list).void) + lazy val delete: ProcessQueue[D] = create(list => collection.batch.delete(list.map(_._id)).void) + def createUpsertReplace(f: DocumentRef[D, M] => List[Searchable]): ProcessQueue[D] = create { list => + collection.upsert.withListSearch(list)(f).execute() } /** @@ -94,20 +44,6 @@ case class OperationsQueue[D <: Document[D], M <: DocumentModel[D]](collection: * @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count * is below the flushSize threshold. */ - def flush(fullFlush: Boolean = true): IO[Unit] = IO(take(chunkSize)).flatMap { list => - if (list.isEmpty) { - IO.unit - } else { - val stream = fs2.Stream(list: _*).covary[IO] - process(stream).flatMap { _ => - _processed.addAndGet(list.length) - if (counter.get() >= flushSize || fullFlush) { - flush() - } else { - IO.unit - } - } - } - } + def flush(fullFlush: Boolean = true): IO[Unit] = queues.map(_.flush(fullFlush)).sequence.void } } \ No newline at end of file diff --git a/driver/src/main/scala/com/outr/arango/queue/ProcessQueue.scala b/driver/src/main/scala/com/outr/arango/queue/ProcessQueue.scala new file mode 100644 index 00000000..dd94811d --- /dev/null +++ b/driver/src/main/scala/com/outr/arango/queue/ProcessQueue.scala @@ -0,0 +1,70 @@ +package com.outr.arango.queue + +import cats.effect.IO + +import java.util.concurrent.ConcurrentLinkedQueue +import java.util.concurrent.atomic.AtomicInteger + +/** + * ProcessQueue provides a convenience capability to batch process in chunks. + * + * @param process the function to process a chunk of the queue + * @param flushSize the number of records before a flush occurs + * @param chunkSize the max number of records per chunk sent to the process function + */ +case class ProcessQueue[T](process: List[T] => IO[Unit], flushSize: Int, chunkSize: Int) { + private lazy val queue = new ConcurrentLinkedQueue[T] + private lazy val counter = new AtomicInteger(0) + + private lazy val _processed = new AtomicInteger(0) + + def processed: Int = _processed.get() + + private def take(n: Int): List[T] = if (n == 0) { + Nil + } else { + val d = queue.poll() + if (d == null) { + Nil + } else { + counter.decrementAndGet() + d :: take(n - 1) + } + } + + /** + * Queue operations for the supplied docs. If this causes the flushSize to overflow, a flush will occur before this + * returns. Otherwise, this is a very fast operation. + */ + def apply(docs: T*): IO[Unit] = IO { + docs.foreach(queue.add) + counter.addAndGet(docs.length) + }.flatMap { size => + if (size >= flushSize) { + flush(fullFlush = false) + } else { + IO.unit + } + } + + /** + * Flushes the queue + * + * @param fullFlush if true, all operations are applied. If false, flushing only occurs until the operation count + * is below the flushSize threshold. + */ + def flush(fullFlush: Boolean = true): IO[Unit] = IO(take(chunkSize)).flatMap { list => + if (list.isEmpty) { + IO.unit + } else { + process(list).flatMap { _ => + _processed.addAndGet(list.length) + if (counter.get() >= flushSize || fullFlush) { + flush() + } else { + IO.unit + } + } + } + } +} diff --git a/driver/src/main/scala/com/outr/arango/upsert/Searchable.scala b/driver/src/main/scala/com/outr/arango/upsert/Searchable.scala index 913b5474..99436894 100644 --- a/driver/src/main/scala/com/outr/arango/upsert/Searchable.scala +++ b/driver/src/main/scala/com/outr/arango/upsert/Searchable.scala @@ -3,8 +3,14 @@ package com.outr.arango.upsert import com.outr.arango.Field import com.outr.arango.query.{Query, QueryPart} +sealed trait Searchable { + def toSearch: QueryPart +} + object Searchable { - case class Filter[F](field1: Field[F], condition: String, field2: Field[F]) extends Searchable { + def apply[F](field1: Field[F], field2: Field[F]): Searchable = Filter[F](field1, field2) + + case class Filter[F](field1: Field[F], field2: Field[F]) extends Searchable { override val toSearch: QueryPart = Query.merge( List( Query(field1.fieldName), @@ -14,8 +20,4 @@ object Searchable { separator = " " ) } -} - -sealed trait Searchable { - def toSearch: QueryPart } \ No newline at end of file diff --git a/driver/src/main/scala/com/outr/arango/upsert/UpsertBuilder.scala b/driver/src/main/scala/com/outr/arango/upsert/UpsertBuilder.scala index bc81265f..1dddfb1a 100644 --- a/driver/src/main/scala/com/outr/arango/upsert/UpsertBuilder.scala +++ b/driver/src/main/scala/com/outr/arango/upsert/UpsertBuilder.scala @@ -36,8 +36,7 @@ case class UpsertBuilder[D <: Document[D], M <: DocumentModel[D]](collection: Do (f: DocumentRef[T, TM] => List[Searchable]): UpsertBuilder[D, M] = { copy( list = Some(() => { - val ref = collection.ref - addRef(ref) + val ref = collection.ref("doc") val entries = f(ref).map(_.toSearch) (ref, list.map(_.json(collection.model.rw)), entries) }) @@ -106,6 +105,7 @@ case class UpsertBuilder[D <: Document[D], M <: DocumentModel[D]](collection: Do val updateReplaceQuery = upsert.map { case Upsert.Update(value) => aql"""UPDATE ${QueryPart.Static(value)} IN $collection""" case Upsert.Replace(replacement) => aql"""REPLACE $replacement IN $collection""" + case _ => throw new RuntimeException("Should not be possible, but Scala 3 says it is...") }.getOrElse { val ref = listValue.get._1 aql"""REPLACE ${QueryPart.Ref(ref)} IN $collection""" diff --git a/driver/src/main/scala/com/outr/arango/upsert/UpsertResult.scala b/driver/src/main/scala/com/outr/arango/upsert/UpsertResult.scala index e71f43fb..eedc7e69 100644 --- a/driver/src/main/scala/com/outr/arango/upsert/UpsertResult.scala +++ b/driver/src/main/scala/com/outr/arango/upsert/UpsertResult.scala @@ -1,6 +1,6 @@ package com.outr.arango.upsert -import fabric.rw.RW +import fabric.rw._ case class UpsertResult[D](original: Option[D], newValue: D) diff --git a/driver/src/main/scala/com/outr/arango/util/Helpers.scala b/driver/src/main/scala/com/outr/arango/util/Helpers.scala index 674dec0f..8211f440 100644 --- a/driver/src/main/scala/com/outr/arango/util/Helpers.scala +++ b/driver/src/main/scala/com/outr/arango/util/Helpers.scala @@ -7,11 +7,9 @@ import com.arangodb.{ArangoDBException, entity, model} import com.outr.arango._ import com.outr.arango.core._ import com.outr.arango.query.QueryOptions -import fabric.Json +import fabric._ -import java.util.concurrent.{CompletableFuture, CompletionException} import scala.jdk.CollectionConverters._ -import scala.jdk.FutureConverters._ import scala.language.implicitConversions object Helpers { diff --git a/driver/src/test/scala/spec/AdvancedSpec.scala b/driver/src/test/scala/spec/AdvancedSpec.scala index 32fe9f0a..6a9aab02 100644 --- a/driver/src/test/scala/spec/AdvancedSpec.scala +++ b/driver/src/test/scala/spec/AdvancedSpec.scala @@ -259,11 +259,13 @@ class AdvancedSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with Ope Person("Nine", 9), Person("Ten", 10), ) - database.people.op - .upsert(people: _*) + val upsert = database.people.op.createUpsertReplace(p => List( + Searchable(Person.name, p.name) + )) + upsert(people: _*) .flatMap(_ => flushQueue()) .map { _ => - database.people.op.upsert.processed should be(10) + upsert.processed should be(10) } } "verify the DBQueue properly inserted the records" in { @@ -340,7 +342,7 @@ class AdvancedSpec extends AsyncWordSpec with AsyncIOSpec with Matchers with Ope "upsert multiple bios" in { database.people.upsert .withListSearch(List(Person("Bethany", 30), Person("Adam", 30), Person("Tom", 30))) { p => - List(Searchable.Filter(Person.name, "==", p.name)) + List(Searchable.Filter(Person.name, p.name)) } .withNoUpdate .toList