diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala index 0df164e88..1119810f2 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/model.scala @@ -270,7 +270,8 @@ sealed trait CouchbaseWriteResult[T <: Document[_]] { /** * Emitted for a successful Couchbase write operation. */ -final case class CouchbaseWriteSuccess[T <: Document[_]] private (override val doc: T) extends CouchbaseWriteResult[T] { +final case class CouchbaseWriteSuccess[T <: Document[_]] private[couchbase] ( + override val doc: T) extends CouchbaseWriteResult[T] { val isSuccess: Boolean = true val isFailure: Boolean = false } @@ -278,7 +279,7 @@ final case class CouchbaseWriteSuccess[T <: Document[_]] private (override val d /** * Emitted for a failed Couchbase write operation. */ -final case class CouchbaseWriteFailure[T <: Document[_]] private (override val doc: T, failure: Throwable) +final case class CouchbaseWriteFailure[T <: Document[_]] private[couchbase] (override val doc: T, failure: Throwable) extends CouchbaseWriteResult[T] { val isSuccess: Boolean = false val isFailure: Boolean = true @@ -296,7 +297,7 @@ sealed trait CouchbaseDeleteResult { /** * Emitted for a successful Couchbase write operation. */ -final case class CouchbaseDeleteSuccess private (override val id: String) extends CouchbaseDeleteResult { +final case class CouchbaseDeleteSuccess private[couchbase] (override val id: String) extends CouchbaseDeleteResult { val isSuccess: Boolean = true val isFailure: Boolean = false } @@ -304,7 +305,7 @@ final case class CouchbaseDeleteSuccess private (override val id: String) extend /** * Emitted for a failed Couchbase write operation. */ -final case class CouchbaseDeleteFailure private (override val id: String, failure: Throwable) +final case class CouchbaseDeleteFailure private[couchbase] (override val id: String, failure: Throwable) extends CouchbaseDeleteResult { val isSuccess: Boolean = false val isFailure: Boolean = true diff --git a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala index 79039403d..faadb2e12 100644 --- a/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala +++ b/couchbase/src/main/scala/org/apache/pekko/stream/connectors/couchbase/scaladsl/CouchbaseFlow.scala @@ -29,7 +29,7 @@ import pekko.stream.connectors.couchbase.{ import pekko.stream.scaladsl.Flow import com.couchbase.client.java.document.{ Document, JsonDocument } -import scala.concurrent.ExecutionContext +import scala.concurrent.{ ExecutionContext, Future } /** * Scala API: Factory methods for Couchbase flows. @@ -100,8 +100,8 @@ object CouchbaseFlow { */ def upsertDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, - bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = - Flow + bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = { + val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow .fromMaterializer { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[T] @@ -115,7 +115,8 @@ object CouchbaseFlow { } }) } - .mapMaterializedValue(_ => NotUsed) + flow.mapMaterializedValue(_ => NotUsed) + } /** * Create a flow to replace a Couchbase [[com.couchbase.client.java.document.JsonDocument JsonDocument]]. @@ -153,8 +154,8 @@ object CouchbaseFlow { */ def replaceDocWithResult[T <: Document[_]](sessionSettings: CouchbaseSessionSettings, writeSettings: CouchbaseWriteSettings, - bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = - Flow + bucketName: String): Flow[T, CouchbaseWriteResult[T], NotUsed] = { + val flow: Flow[T, CouchbaseWriteResult[T], Future[NotUsed]] = Flow .fromMaterializer { (materializer, _) => val session = CouchbaseSessionRegistry(materializer.system).sessionFor(sessionSettings, bucketName) Flow[T] @@ -168,7 +169,8 @@ object CouchbaseFlow { } }) } - .mapMaterializedValue(_ => NotUsed) + flow.mapMaterializedValue(_ => NotUsed) + } /** * Create a flow to delete documents from Couchbase by `id`. Emits the same `id`. diff --git a/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala b/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala index a51fa055a..745f65478 100644 --- a/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala +++ b/couchbase/src/test/scala/org/apache/pekko/stream/connectors/couchbase/testing/CouchbaseSupport.scala @@ -25,8 +25,9 @@ import com.couchbase.client.deps.io.netty.util.CharsetUtil import com.couchbase.client.java.ReplicateTo import com.couchbase.client.java.document.json.JsonObject import com.couchbase.client.java.document.{ BinaryDocument, JsonDocument, RawJsonDocument, StringDocument } +import com.fasterxml.jackson.databind.json.JsonMapper +import com.fasterxml.jackson.module.scala.DefaultScalaModule import org.slf4j.LoggerFactory -import play.api.libs.json.Json import scala.collection.immutable.Seq import scala.concurrent.ExecutionContext.Implicits.global @@ -35,6 +36,12 @@ import scala.concurrent.{ Await, Future } case class TestObject(id: String, value: String) +private[couchbase] object CouchbaseSupport { + val jacksonMapper = JsonMapper.builder() + .addModule(DefaultScalaModule) + .build() +} + trait CouchbaseSupport { private val log = LoggerFactory.getLogger(classOf[CouchbaseSupport]) @@ -64,7 +71,7 @@ trait CouchbaseSupport { } def toRawJsonDocument(testObject: TestObject): RawJsonDocument = { - val json = Json.toJson(testObject)(Json.writes[TestObject]).toString() + val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject) RawJsonDocument.create(testObject.id, json) } @@ -72,12 +79,12 @@ trait CouchbaseSupport { JsonDocument.create(testObject.id, JsonObject.create().put("id", testObject.id).put("value", testObject.value)) def toStringDocument(testObject: TestObject): StringDocument = { - val json = Json.toJson(testObject)(Json.writes[TestObject]).toString() + val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject) StringDocument.create(testObject.id, json) } def toBinaryDocument(testObject: TestObject): BinaryDocument = { - val json = Json.toJson(testObject)(Json.writes[TestObject]).toString() + val json = CouchbaseSupport.jacksonMapper.writeValueAsString(testObject) val toWrite = Unpooled.copiedBuffer(json, CharsetUtil.UTF_8) BinaryDocument.create(testObject.id, toWrite) } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 2da9a41c1..4cb841e53 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -125,13 +125,13 @@ object Dependencies { "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided)) val Couchbase = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "com.couchbase.client" % "java-client" % CouchbaseVersion, // ApacheV2 "io.reactivex" % "rxjava-reactive-streams" % "1.2.1", // ApacheV2 "org.apache.pekko" %% "pekko-discovery" % PekkoVersion % Provided, // Apache V2 - "com.typesafe.play" %% "play-json" % "2.9.2" % Test, // Apache V2 - "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test // Apache V2 + "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion % Test, // Apache V2 + "com.fasterxml.jackson.core" % "jackson-databind" % JacksonDatabindVersion % Test, // Apache V2 + "com.fasterxml.jackson.module" %% "jackson-module-scala" % JacksonDatabindVersion % Test // Apache V2 )) val `Doc-examples` = Seq(