From 7bf93e1bb499d2db0271003469085f30cfd4c0f3 Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Sun, 11 Jun 2023 12:20:34 +0100 Subject: [PATCH] enable scala3 build for more google connectors (#165) * enable scala3 build for more google connectors * implicits * disable part of scala3 build * try full build again * Update GooglePubSub.scala * refactor suggested by @mdedetrich * remove interim variables * add comments * Update GooglePubSub.scala --- .../pubsub/grpc/javadsl/GooglePubSub.scala | 12 ++++++++++-- .../pubsub/grpc/javadsl/GrpcPublisher.scala | 2 +- .../pubsub/grpc/javadsl/GrpcSubscriber.scala | 2 +- .../pubsub/grpc/scaladsl/GrpcPublisher.scala | 2 +- .../pubsub/grpc/scaladsl/GrpcSubscriber.scala | 2 +- .../test/scala/docs/scaladsl/IntegrationSpec.scala | 2 +- .../stream/connectors/google/ResumableUpload.scala | 1 - project/Dependencies.scala | 2 -- 8 files changed, 15 insertions(+), 10 deletions(-) diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala index 930fa2bc7..b254474f0 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GooglePubSub.scala @@ -69,7 +69,11 @@ object GooglePubSub { Source .tick(Duration.ZERO, pollInterval, subsequentRequest) .mapMaterializedValue(cancellable.complete(_)))) - .mapConcat(_.getReceivedMessagesList) + .mapConcat( + // TODO uptake any fix suggested for https://contributors.scala-lang.org/t/better-type-inference-for-scala-send-us-your-problematic-cases/2410/183 + ((response: StreamingPullResponse) => + response.getReceivedMessagesList): pekko.japi.function.Function[StreamingPullResponse, + java.util.List[ReceivedMessage]]) .mapMaterializedValue(_ => cancellable) } .mapMaterializedValue(flattenCs(_)) @@ -95,7 +99,11 @@ object GooglePubSub { Source .tick(Duration.ZERO, pollInterval, request) .mapAsync(1, client.pull(_)) - .mapConcat(_.getReceivedMessagesList) + .mapConcat( + // TODO uptake any fix suggested for https://contributors.scala-lang.org/t/better-type-inference-for-scala-send-us-your-problematic-cases/2410/183 + ((response: PullResponse) => + response.getReceivedMessagesList): pekko.japi.function.Function[PullResponse, + java.util.List[ReceivedMessage]]) .mapMaterializedValue(cancellable.complete(_)) .mapMaterializedValue(_ => cancellable) } diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala index 5dacfc18a..9fd70656a 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcPublisher.scala @@ -74,7 +74,7 @@ object GrpcPublisher { * An extension that manages a single gRPC java publisher client per actor system. */ final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension { - implicit val publisher = GrpcPublisher.create(sys) + implicit val publisher: GrpcPublisher = GrpcPublisher.create(sys) } object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala index e66f95aac..de2e738ad 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/javadsl/GrpcSubscriber.scala @@ -75,7 +75,7 @@ object GrpcSubscriber { * An extension that manages a single gRPC java subscriber client per actor system. */ final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension { - implicit val subscriber = GrpcSubscriber.create(sys) + implicit val subscriber: GrpcSubscriber = GrpcSubscriber.create(sys) } object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala index 896cd60c6..286c6d78e 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcPublisher.scala @@ -65,7 +65,7 @@ object GrpcPublisher { * An extension that manages a single gRPC scala publisher client per actor system. */ final class GrpcPublisherExt private (sys: ExtendedActorSystem) extends Extension { - implicit val publisher = GrpcPublisher(sys: ActorSystem) + implicit val publisher: GrpcPublisher = GrpcPublisher(sys: ActorSystem) } object GrpcPublisherExt extends ExtensionId[GrpcPublisherExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala index 955a3ed1b..0d70d861d 100644 --- a/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala +++ b/google-cloud-pub-sub-grpc/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/grpc/scaladsl/GrpcSubscriber.scala @@ -65,7 +65,7 @@ object GrpcSubscriber { * An extension that manages a single gRPC scala subscriber client per actor system. */ final class GrpcSubscriberExt private (sys: ExtendedActorSystem) extends Extension { - implicit val subscriber = GrpcSubscriber(sys: ActorSystem) + implicit val subscriber: GrpcSubscriber = GrpcSubscriber(sys: ActorSystem) } object GrpcSubscriberExt extends ExtensionId[GrpcSubscriberExt] with ExtensionIdProvider { diff --git a/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala b/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala index 38eba06e4..fcea88765 100644 --- a/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala +++ b/google-cloud-pub-sub-grpc/src/test/scala/docs/scaladsl/IntegrationSpec.scala @@ -52,7 +52,7 @@ class IntegrationSpec implicit val system: ActorSystem = ActorSystem("IntegrationSpec") - implicit val defaultPatience = PatienceConfig(timeout = 15.seconds, interval = 50.millis) + implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 15.seconds, interval = 50.millis) "connector" should { diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala index 8dbc91cc2..969cef4c1 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala @@ -14,7 +14,6 @@ package org.apache.pekko.stream.connectors.google import org.apache.pekko -import pekko.actor.ActorSystem import pekko.NotUsed import pekko.annotation.InternalApi import pekko.http.scaladsl.model.HttpMethods.{ POST, PUT } diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 7cea7f4d2..82f5160c8 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -217,7 +217,6 @@ object Dependencies { "io.specto" % "hoverfly-java" % hoverflyVersion % Test // ApacheV2 ) ++ Mockito) val GoogleBigQueryStorage = Seq( - crossScalaVersions -= Scala3, // see Pekko gRPC version in plugins.sbt libraryDependencies ++= Seq( // https://github.com/googleapis/java-bigquerystorage/tree/master/proto-google-cloud-bigquerystorage-v1 @@ -240,7 +239,6 @@ object Dependencies { ) ++ Mockito) val GooglePubSubGrpc = Seq( - crossScalaVersions -= Scala3, // see Pekko gRPC version in plugins.sbt libraryDependencies ++= Seq( // https://github.com/googleapis/java-pubsub/tree/master/proto-google-cloud-pubsub-v1/