Skip to content

Commit

Permalink
enable scala3 build for more google connectors (#165)
Browse files Browse the repository at this point in the history
* enable scala3 build for more google connectors

* implicits

* disable part of scala3 build

* try full build again

* Update GooglePubSub.scala

* refactor suggested by @mdedetrich

* remove interim variables

* add comments

* Update GooglePubSub.scala
  • Loading branch information
pjfanning committed Aug 19, 2023
1 parent f5fbaeb commit 22332a0
Show file tree
Hide file tree
Showing 8 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,11 @@ object GooglePubSub {
Source
.tick(Duration.ZERO, pollInterval, subsequentRequest)
.mapMaterializedValue(cancellable.complete(_))))
.mapConcat(_.getReceivedMessagesList)
.mapConcat(
// TODO uptake any fix suggested for https://contributors.scala-lang.org/t/better-type-inference-for-scala-send-us-your-problematic-cases/2410/183
((response: StreamingPullResponse) =>
response.getReceivedMessagesList): pekko.japi.function.Function[StreamingPullResponse,
java.util.List[ReceivedMessage]])
.mapMaterializedValue(_ => cancellable)
}
.mapMaterializedValue(flattenCs(_))
Expand All @@ -95,7 +99,11 @@ object GooglePubSub {
Source
.tick(Duration.ZERO, pollInterval, request)
.mapAsync(1, client.pull(_))
.mapConcat(_.getReceivedMessagesList)
.mapConcat(
// TODO uptake any fix suggested for https://contributors.scala-lang.org/t/better-type-inference-for-scala-send-us-your-problematic-cases/2410/183
((response: PullResponse) =>
response.getReceivedMessagesList): pekko.japi.function.Function[PullResponse,
java.util.List[ReceivedMessage]])
.mapMaterializedValue(cancellable.complete(_))
.mapMaterializedValue(_ => cancellable)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ object GrpcPublisher {
* An extension that manages a single gRPC java publisher client per actor system.
*/
final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension {
implicit val publisher = GrpcPublisher.create(sys)
implicit val publisher: GrpcPublisher = GrpcPublisher.create(sys)
}

object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ object GrpcSubscriber {
* An extension that manages a single gRPC java subscriber client per actor system.
*/
final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension {
implicit val subscriber = GrpcSubscriber.create(sys)
implicit val subscriber: GrpcSubscriber = GrpcSubscriber.create(sys)
}

object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object GrpcPublisher {
* An extension that manages a single gRPC scala publisher client per actor system.
*/
final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension {
implicit val publisher = GrpcPublisher(sys: ActorSystem)
implicit val publisher: GrpcPublisher = GrpcPublisher(sys: ActorSystem)
}

object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ object GrpcSubscriber {
* An extension that manages a single gRPC scala subscriber client per actor system.
*/
final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension {
implicit val subscriber = GrpcSubscriber(sys: ActorSystem)
implicit val subscriber: GrpcSubscriber = GrpcSubscriber(sys: ActorSystem)
}

object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class IntegrationSpec

implicit val system: ActorSystem = ActorSystem("IntegrationSpec")

implicit val defaultPatience = PatienceConfig(timeout = 15.seconds, interval = 50.millis)
implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 15.seconds, interval = 50.millis)

"connector" should {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
package org.apache.pekko.stream.connectors.google

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT }
Expand Down
2 changes: 0 additions & 2 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ object Dependencies {
"com.fasterxml.jackson.datatype" % "jackson-datatype-jsr310" % JacksonDatabindVersion % Test,
"io.specto" % "hoverfly-java" % hoverflyVersion % Test) ++ Mockito)
val GoogleBigQueryStorage = Seq(
crossScalaVersions -= Scala3,
// see Pekko gRPC version in plugins.sbt
libraryDependencies ++= Seq(
// https://github.com/googleapis/java-bigquerystorage/tree/master/proto-google-cloud-bigquerystorage-v1
Expand All @@ -227,7 +226,6 @@ object Dependencies {
"com.github.tomakehurst" % "wiremock" % "2.27.2" % Test) ++ Mockito)

val GooglePubSubGrpc = Seq(
crossScalaVersions -= Scala3,
// see Pekko gRPC version in plugins.sbt
libraryDependencies ++= Seq(
// https://github.com/googleapis/java-pubsub/tree/master/proto-google-cloud-pubsub-v1/
Expand Down

0 comments on commit 22332a0

Please sign in to comment.