Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Scala3 couchbase support #128

Merged
merged 5 commits into from
May 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -270,15 +270,16 @@ sealed trait CouchbaseWriteResult[T <: Document[_]] {
/**
* Emitted for a successful Couchbase write operation.
*/
final case class CouchbaseWriteSuccess[T <: Document[_]] private (override val doc: T) extends CouchbaseWriteResult[T] {
final case class CouchbaseWriteSuccess[T <: Document[_]] private[couchbase] (
override val doc: T) extends CouchbaseWriteResult[T] {
val isSuccess: Boolean = true
val isFailure: Boolean = false
}

/**
* Emitted for a failed Couchbase write operation.
*/
final case class CouchbaseWriteFailure[T <: Document[_]] private (override val doc: T, failure: Throwable)
final case class CouchbaseWriteFailure[T <: Document[_]] private[couchbase] (override val doc: T, failure: Throwable)
extends CouchbaseWriteResult[T] {
val isSuccess: Boolean = false
val isFailure: Boolean = true
Expand All @@ -296,15 +297,15 @@ sealed trait CouchbaseDeleteResult {
/**
* Emitted for a successful Couchbase write operation.
*/
final case class CouchbaseDeleteSuccess private (override val id: String) extends CouchbaseDeleteResult {
final case class CouchbaseDeleteSuccess private[couchbase] (override val id: String) extends CouchbaseDeleteResult {
val isSuccess: Boolean = true
val isFailure: Boolean = false
}

/**
* Emitted for a failed Couchbase write operation.
*/
final case class CouchbaseDeleteFailure private (override val id: String, failure: Throwable)
final case class CouchbaseDeleteFailure private[couchbase] (override val id: String, failure: Throwable)
extends CouchbaseDeleteResult {
val isSuccess: Boolean = false
val isFailure: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ import pekko.stream.connectors.couchbase.{
import pekko.stream.scaladsl.Flow
import com.couchbase.client.java.document.{ Document, JsonDocument }

import scala.concurrent.ExecutionContext
import scala.concurrent.{ ExecutionContext, Future }

/**
* Scala API: Factory methods for Couchbase flows.
Expand Down Expand Up @@ -100,8 +100,8 @@ object CouchbaseFlow {
*/
def upsertDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
Flow
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
.fromMaterializer { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
Expand All @@ -115,7 +115,8 @@ object CouchbaseFlow {
}
})
}
.mapMaterializedValue(_ => NotUsed)
flow.mapMaterializedValue(_ => NotUsed)
}

/**
* Create a flow to replace a Couchbase [[com.couchbase.client.java.document.JsonDocument JsonDocument]].
Expand Down Expand Up @@ -153,8 +154,8 @@ object CouchbaseFlow {
*/
def replaceDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings,
writeSettings: CouchbaseWriteSettings,
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] =
Flow
bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = {
val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow
.fromMaterializer { (materializer, _) =>
val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName)
Flow[T]
Expand All @@ -168,7 +169,8 @@ object CouchbaseFlow {
}
})
}
.mapMaterializedValue(_ => NotUsed)
flow.mapMaterializedValue(_ => NotUsed)
}

/**
* Create a flow to delete documents from Couchbase by `id`. Emits the same `id`.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,9 @@ import com.couchbase.client.deps.io.netty.util.CharsetUtil
import com.couchbase.client.java.ReplicateTo
import com.couchbase.client.java.document.json.JsonObject
import com.couchbase.client.java.document.{ BinaryDocument, JsonDocument, RawJsonDocument, StringDocument }
import com.fasterxml.jackson.databind.json.JsonMapper
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.slf4j.LoggerFactory
import play.api.libs.json.Json

import scala.collection.immutable.Seq
import scala.concurrent.ExecutionContext.Implicits.global
Expand All @@ -35,6 +36,12 @@ import scala.concurrent.{ Await, Future }

case class TestObject(id: String, value: String)

private[couchbase] object CouchbaseSupport {
val jacksonMapper = JsonMapper.builder()
.addModule(DefaultScalaModule)
.build()
}

trait CouchbaseSupport {

private val log = LoggerFactory.getLogger(classOf[CouchbaseSupport])
Expand Down Expand Up @@ -64,20 +71,20 @@ trait CouchbaseSupport {
}

def toRawJsonDocument(testObject: TestObject): RawJsonDocument = {
val json = Json.toJson(testObject)(Json.writes[TestObject]).toString()
val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject)
RawJsonDocument.create(testObject.id, json)
}

def toJsonDocument(testObject: TestObject): JsonDocument =
JsonDocument.create(testObject.id, JsonObject.create().put("id", testObject.id).put("value", testObject.value))

def toStringDocument(testObject: TestObject): StringDocument = {
val json = Json.toJson(testObject)(Json.writes[TestObject]).toString()
val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject)
StringDocument.create(testObject.id, json)
}

def toBinaryDocument(testObject: TestObject): BinaryDocument = {
val json = Json.toJson(testObject)(Json.writes[TestObject]).toString()
val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject)
val toWrite = Unpooled.copiedBuffer(json, CharsetUtil.UTF_8)
BinaryDocument.create(testObject.id, toWrite)
}
Expand Down
6 changes: 3 additions & 3 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,13 @@ object Dependencies {
"org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided))

val Couchbase = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"com.couchbase.client" % "java-client" % CouchbaseVersion, // ApacheV2
"io.reactivex" % "rxjava-reactive-streams" % "1.2.1", // ApacheV2
"org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided, // Apache V2
"com.typesafe.play" %% "play-json" % "2.9.2" % Test, // Apache V2
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test // Apache V2
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test, // Apache V2
"com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion % Test, // Apache V2
"com.fasterxml.jackson.module" %% "jackson-module-scala" % JacksonDatabindVersion % Test // Apache V2
))

val `Doc-examples` = Seq(
Expand Down