From b408cb7e44dbb48d8a44175f0a7b79a5c05f260e Mon Sep 17 00:00:00 2001 From: PJ Fanning Date: Thu, 8 Jun 2023 15:59:33 +0100 Subject: [PATCH] support scala3 (google-common) (#141) * support scala3 (google-common) * Update ResumableUpload.scala * add huawei * Update Dependencies.scala * Update BigQuery.scala * annotation --- .../bigquery/BigQueryException.scala | 2 +- .../bigquery/javadsl/BigQuery.scala | 29 ++++--- .../googlecloud/pubsub/impl/PubSubApi.scala | 6 +- .../googlecloud/storage/impl/Formats.scala | 12 ++- .../storage/impl/GCStorageStream.scala | 4 +- .../storage/WithMaterializerGlobal.scala | 2 +- .../impl/GCStorageStreamIntegrationSpec.scala | 2 +- .../google/javadsl/XUploadContentType.java | 3 +- .../connectors/google/GoogleSettings.scala | 6 +- .../connectors/google/ResumableUpload.scala | 58 ++++++------- .../connectors/google/auth/Credentials.scala | 2 +- .../connectors/google/auth/GoogleOAuth2.scala | 3 +- .../google/auth/GoogleOAuth2Exception.scala | 3 +- .../connectors/google/http/GoogleHttp.scala | 10 +-- .../connectors/google/javadsl/Google.scala | 4 +- .../connectors/google/jwt/JwtSprayJson.scala | 81 +++++++++++++++++++ .../scaladsl/`X-Upload-Content-Type`.scala | 2 +- .../google/ResumableUploadSpec.scala | 4 +- .../google/auth/GoogleOAuth2Spec.scala | 2 +- .../google/auth/OAuth2CredentialsSpec.scala | 2 +- .../google/http/GoogleHttpSpec.scala | 5 +- .../google/firebase/fcm/impl/FcmSender.scala | 2 +- .../firebase/fcm/v1/impl/FcmSender.scala | 2 +- .../firebase/fcm/v1/impl/FcmSenderSpec.scala | 2 +- .../pushkit/impl/PushKitSenderSpec.scala | 2 +- .../docs/scaladsl/MqttActorSystemsSpec.scala | 6 +- project/Dependencies.scala | 13 ++- 27 files changed, 181 insertions(+), 88 deletions(-) create mode 100644 google-common/src/main/scala/org/apache/pekko/stream/connectors/google/jwt/JwtSprayJson.scala diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryException.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryException.scala index 426f1dd8d..d0ea423d3 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryException.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/BigQueryException.scala @@ -39,7 +39,7 @@ object BigQueryException { implicit val fromResponseUnmarshaller: FromResponseUnmarshaller[Throwable] = Unmarshaller - .withMaterializer { implicit ec => implicit mat => response: HttpResponse => + .withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => import SprayJsonSupport._ val HttpResponse(status, _, entity, _) = response: @nowarn("msg=match may not be exhaustive") Unmarshaller diff --git a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala index 6e9dd836f..109811e7f 100644 --- a/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala +++ b/google-cloud-bigquery/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/bigquery/javadsl/BigQuery.scala @@ -19,7 +19,7 @@ import pekko.annotation.ApiMayChange import pekko.http.javadsl.marshalling.Marshaller import pekko.http.javadsl.model.{ HttpEntity, RequestEntity } import pekko.http.javadsl.unmarshalling.Unmarshaller -import pekko.http.scaladsl.{ model => sm } +import pekko.http.scaladsl.{ marshalling, model => sm, unmarshalling } import pekko.japi.Pair import pekko.stream.connectors.google.GoogleSettings import pekko.stream.connectors.google.javadsl.Google @@ -45,7 +45,6 @@ import pekko.util.OptionConverters._ import java.time.Duration import java.util.concurrent.CompletionStage import java.{ lang, util } - import scala.annotation.nowarn import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS } @@ -220,7 +219,8 @@ object BigQuery extends Google { selectedFields: util.List[String], unmarshaller: Unmarshaller[HttpEntity, TableDataListResponse[Out]]) : Source[Out, CompletionStage[TableDataListResponse[Out]]] = { - implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity] + implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, TableDataListResponse[Out]] = + unmarshaller.asScalaCastInput[sm.HttpEntity] ScalaBigQuery .tableData(datasetId, tableId, startIndex.toScala, maxResults.toScala, selectedFields.asScala.toList) .mapMaterializedValue(_.asJava) @@ -245,7 +245,8 @@ object BigQuery extends Google { retryPolicy: InsertAllRetryPolicy, templateSuffix: util.Optional[String], marshaller: Marshaller[TableDataInsertAllRequest[In], RequestEntity]): Sink[util.List[In], NotUsed] = { - implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity] + implicit val m: marshalling.Marshaller[TableDataInsertAllRequest[In], sm.RequestEntity] = + marshaller.asScalaCastOutput[sm.RequestEntity] ss.Flow[util.List[In]] .map(_.asScala.toList) .to(ScalaBigQuery.insertAll[In](datasetId, tableId, retryPolicy, templateSuffix.toScala)) @@ -269,7 +270,8 @@ object BigQuery extends Google { retryFailedRequests: Boolean, marshaller: Marshaller[TableDataInsertAllRequest[In], RequestEntity]) : Flow[TableDataInsertAllRequest[In], TableDataInsertAllResponse, NotUsed] = { - implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity] + implicit val m: marshalling.Marshaller[TableDataInsertAllRequest[In], sm.RequestEntity] = + marshaller.asScalaCastOutput[sm.RequestEntity] ScalaBigQuery.insertAll[In](datasetId, tableId, retryFailedRequests).asJava } @@ -290,7 +292,8 @@ object BigQuery extends Google { dryRun: Boolean, useLegacySql: Boolean, unmarshaller: Unmarshaller[HttpEntity, QueryResponse[Out]]): Source[Out, CompletionStage[QueryResponse[Out]]] = { - implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity] + implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, QueryResponse[Out]] = + unmarshaller.asScalaCastInput[sm.HttpEntity] ScalaBigQuery.query(query, dryRun, useLegacySql).mapMaterializedValue(_.asJava).asJava } @@ -309,7 +312,8 @@ object BigQuery extends Google { query: QueryRequest, unmarshaller: Unmarshaller[HttpEntity, QueryResponse[Out]]) : Source[Out, Pair[CompletionStage[JobReference], CompletionStage[QueryResponse[Out]]]] = { - implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity] + implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, QueryResponse[Out]] = + unmarshaller.asScalaCastInput[sm.HttpEntity] ScalaBigQuery .query(query) .mapMaterializedValue { @@ -339,7 +343,8 @@ object BigQuery extends Google { timeout: util.Optional[Duration], location: util.Optional[String], unmarshaller: Unmarshaller[HttpEntity, QueryResponse[Out]]): Source[Out, CompletionStage[QueryResponse[Out]]] = { - implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity] + implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, QueryResponse[Out]] = + unmarshaller.asScalaCastInput[sm.HttpEntity] ScalaBigQuery .queryResults(jobId, startIndex.toScala, @@ -396,7 +401,7 @@ object BigQuery extends Google { def insertAllAsync[In](datasetId: String, tableId: String, marshaller: Marshaller[In, RequestEntity]): Flow[In, Job, NotUsed] = { - implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity] + implicit val m: marshalling.Marshaller[In, sm.RequestEntity] = marshaller.asScalaCastOutput[sm.RequestEntity] ScalaBigQuery.insertAllAsync[In](datasetId, tableId).asJava[In] } @@ -415,7 +420,7 @@ object BigQuery extends Google { tableId: String, labels: util.Optional[util.Map[String, String]], marshaller: Marshaller[In, RequestEntity]): Flow[In, Job, NotUsed] = { - implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity] + implicit val m: marshalling.Marshaller[In, sm.RequestEntity] = marshaller.asScalaCastOutput[sm.RequestEntity] ScalaBigQuery.insertAllAsync[In](datasetId, tableId, labels.toScala.map(_.asScala.toMap)).asJava[In] } @@ -436,8 +441,8 @@ object BigQuery extends Google { job: Job, marshaller: Marshaller[Job, RequestEntity], unmarshaller: Unmarshaller[HttpEntity, Job]): Sink[ByteString, CompletionStage[Job]] = { - implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity] - implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity] + implicit val m: marshalling.Marshaller[Job, sm.RequestEntity] = marshaller.asScalaCastOutput[sm.RequestEntity] + implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, Job] = unmarshaller.asScalaCastInput[sm.HttpEntity] ScalaBigQuery.createLoadJob(job).mapMaterializedValue(_.asJava).asJava[ByteString] } diff --git a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala index 696406f89..66b1e27a7 100644 --- a/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala +++ b/google-cloud-pub-sub/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/pubsub/impl/PubSubApi.scala @@ -173,7 +173,7 @@ private[pubsub] trait PubSubApi { .mapMaterializedValue(_ => NotUsed) private implicit val pullResponseUnmarshaller: FromResponseUnmarshaller[PullResponse] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => response.status match { case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` => Unmarshal(response.entity).to[PullResponse] @@ -211,7 +211,7 @@ private[pubsub] trait PubSubApi { .mapMaterializedValue(_ => NotUsed) private implicit val acknowledgeResponseUnmarshaller: FromResponseUnmarshaller[Done] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => response.status match { case StatusCodes.Success(_) => response.discardEntityBytes().future @@ -261,7 +261,7 @@ private[pubsub] trait PubSubApi { publish(topic, parallelism, None) private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => response.status match { case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` => Unmarshal(response.entity).to[PublishResponse] diff --git a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala index 5c20c3931..ee2da5dcc 100644 --- a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala +++ b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/Formats.scala @@ -49,7 +49,8 @@ object Formats extends DefaultJsonProtocol { domain: String, projectTeam: ProjectTeam, etag: String) - private implicit val ObjectAccessControlsJsonFormat = jsonFormat13(ObjectAccessControls) + private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] = + jsonFormat13(ObjectAccessControls.apply) /** * Google API storage response object @@ -79,7 +80,8 @@ object Formats extends DefaultJsonProtocol { timeStorageClassUpdated: String, updated: String) - private implicit val storageObjectReadOnlyJson = jsonFormat18(StorageObjectReadOnlyJson) + private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] = + jsonFormat18(StorageObjectReadOnlyJson.apply) // private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue private final case class StorageObjectWriteableJson( @@ -98,7 +100,8 @@ object Formats extends DefaultJsonProtocol { temporaryHold: Option[Boolean], acl: Option[List[ObjectAccessControls]]) - private implicit val storageObjectWritableJson = jsonFormat14(StorageObjectWriteableJson) + private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] = + jsonFormat14(StorageObjectWriteableJson.apply) private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] { override def read(value: JsValue): StorageObjectJson = { @@ -175,7 +178,8 @@ object Formats extends DefaultJsonProtocol { } } - private implicit val bucketListResultJsonReads = jsonFormat4(BucketListResultJson.apply) + private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] = + jsonFormat4(BucketListResultJson.apply) implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] { override def read(json: JsValue): RewriteResponse = { diff --git a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala index 0ba705a5c..b67496ea3 100644 --- a/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala +++ b/google-cloud-storage/src/main/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStream.scala @@ -242,7 +242,7 @@ import scala.concurrent.{ ExecutionContext, Future } getBucketPath(bucket) / "o" / objectName implicit def unmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, T] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => response match { case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() => Unmarshal(entity).to[T] @@ -254,7 +254,7 @@ import scala.concurrent.{ ExecutionContext, Future } }.withDefaultRetry implicit def optionUnmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, Option[T]] = - Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => response match { case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() => Unmarshal(entity).to[T].map(Some(_)) diff --git a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala index eb046720a..787885b91 100644 --- a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala +++ b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/WithMaterializerGlobal.scala @@ -29,7 +29,7 @@ trait WithMaterializerGlobal with ScalaFutures with IntegrationPatience with Matchers { - implicit val actorSystem = ActorSystem("test") + implicit val actorSystem: ActorSystem = ActorSystem("test") implicit val ec: ExecutionContext = actorSystem.dispatcher override protected def afterAll(): Unit = { diff --git a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala index 0715faa16..d891f8241 100644 --- a/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala +++ b/google-cloud-storage/src/test/scala/org/apache/pekko/stream/connectors/googlecloud/storage/impl/GCStorageStreamIntegrationSpec.scala @@ -53,7 +53,7 @@ class GCStorageStreamIntegrationSpec with ScalaFutures with LogCapturing { - private implicit val defaultPatience = + private implicit val defaultPatience: PatienceConfig = PatienceConfig(timeout = 60.seconds, interval = 60.millis) var folderName: String = _ diff --git a/google-common/src/main/java/org/apache/pekko/stream/connectors/google/javadsl/XUploadContentType.java b/google-common/src/main/java/org/apache/pekko/stream/connectors/google/javadsl/XUploadContentType.java index 6edef7564..10e48b59d 100644 --- a/google-common/src/main/java/org/apache/pekko/stream/connectors/google/javadsl/XUploadContentType.java +++ b/google-common/src/main/java/org/apache/pekko/stream/connectors/google/javadsl/XUploadContentType.java @@ -22,7 +22,6 @@ public interface XUploadContentType { ContentType getContentType(); static XUploadContentType create(ContentType contentType) { - return X$minusUpload$minusContent$minusType$.MODULE$.apply( - (org.apache.pekko.http.scaladsl.model.ContentType) contentType); + return X$minusUpload$minusContent$minusType$.MODULE$.apply(contentType.toString()); } } diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala index ef91cfd78..6ca40022d 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/GoogleSettings.scala @@ -90,7 +90,7 @@ object GoogleSettings { } -final case class GoogleSettings @InternalApi private (projectId: String, +final case class GoogleSettings @InternalApi private[connectors] (projectId: String, credentials: Credentials, requestSettings: RequestSettings) { def getProjectId = projectId @@ -134,7 +134,7 @@ object RequestSettings { apply(userIp.toScala, quotaUser.toScala, prettyPrint, chunkSize, retrySettings, forwardProxy.toScala) } -final case class RequestSettings @InternalApi private ( +final case class RequestSettings @InternalApi private[connectors] ( userIp: Option[String], quotaUser: Option[String], prettyPrint: Boolean, @@ -252,7 +252,7 @@ object ForwardProxy { credentials: Option[BasicHttpCredentials], trustPem: Option[String])(implicit system: ClassicActorSystemProvider): ForwardProxy = { ForwardProxy( - trustPem.fold(Http(system).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)), + trustPem.fold(Http(system.classicSystem).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)), ForwardProxyPoolSettings(scheme, host, port, credentials)(system.classicSystem)) } diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala index f50c921b9..8dbc91cc2 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/ResumableUpload.scala @@ -56,7 +56,7 @@ private[connectors] object ResumableUpload { Sink .fromMaterializer { (mat, attr) => import mat.executionContext - implicit val materializer = mat + implicit val materializer: Materializer = mat implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr) val uploadChunkSize = settings.requestSettings.uploadChunkSize @@ -96,25 +96,24 @@ private[connectors] object ResumableUpload { private def initiateSession(request: HttpRequest)(implicit mat: Materializer, settings: GoogleSettings): Future[Uri] = { - implicit val system: ActorSystem = mat.system import implicits._ - implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => - response.discardEntityBytes().future.map { _ => - response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri) - } - }.withDefaultRetry + implicit val um: FromResponseUnmarshaller[Uri] = + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => + response.discardEntityBytes().future.map { _ => + response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri) + } + }.withDefaultRetry - GoogleHttp().singleAuthenticatedRequest[Uri](request) + GoogleHttp(mat.system).singleAuthenticatedRequest[Uri](request) } private final case class DoNotRetry(ex: Throwable) extends Throwable(ex) with NoStackTrace private def uploadChunk[T: FromResponseUnmarshaller]( request: HttpRequest)(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = { - implicit val system: ActorSystem = mat.system - val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => + val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => response.status match { case PermanentRedirect => response.discardEntityBytes().future.map(_ => None) @@ -127,7 +126,8 @@ private[connectors] object ResumableUpload { val uri = request.uri Flow[HttpRequest] .map((_, ())) - .via(GoogleHttp().cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)(um)) + .via(GoogleHttp(mat.system).cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)( + um)) .map(_._1.recoverWith { case DoNotRetry(ex) => Failure(ex) }) } @@ -147,30 +147,30 @@ private[connectors] object ResumableUpload { request: HttpRequest, chunk: Future[MaybeLast[Chunk]])( implicit mat: Materializer, settings: GoogleSettings): Future[Either[T, MaybeLast[Chunk]]] = { - implicit val system: ActorSystem = mat.system import implicits._ - implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse => - response.status match { - case OK | Created => Unmarshal(response).to[T].map(Left(_)) - case PermanentRedirect => - response.discardEntityBytes().future.map { _ => - Right( - response - .header[Range] - .flatMap(_.ranges.headOption) - .collect { - case Slice(_, last) => last + 1 - }.getOrElse(0L)) - } - case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage)) - } - }.withDefaultRetry + implicit val um: FromResponseUnmarshaller[Either[T, Long]] = + Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) => + response.status match { + case OK | Created => Unmarshal(response).to[T].map(Left(_)) + case PermanentRedirect => + response.discardEntityBytes().future.map { _ => + Right( + response + .header[Range] + .flatMap(_.ranges.headOption) + .collect { + case Slice(_, last) => last + 1 + }.getOrElse(0L)) + } + case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage)) + } + }.withDefaultRetry import mat.executionContext chunk.flatMap { case maybeLast @ MaybeLast(Chunk(bytes, position)) => - GoogleHttp() + GoogleHttp(mat.system) .singleAuthenticatedRequest[Either[T, Long]](request.addHeader(statusRequestHeader)) .map { case Left(result) if maybeLast.isLast => Left(result) diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala index 1bb2fd350..72c0e0226 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/Credentials.scala @@ -36,7 +36,7 @@ object Credentials { */ def apply(c: Config)(implicit system: ClassicActorSystemProvider): Credentials = c.getString("provider") match { case "application-default" => - val log = Logging(system.classicSystem, getClass) + val log = Logging(system.classicSystem, classOf[Credentials]) try { val creds = parseServiceAccount(c) log.info("Using service account credentials") diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2.scala index 4099f91c1..4455748c4 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2.scala @@ -21,9 +21,10 @@ import pekko.http.scaladsl.model.HttpMethods.POST import pekko.http.scaladsl.model.{ FormData, HttpRequest } import pekko.stream.Materializer import pekko.stream.connectors.google.http.GoogleHttp +import pekko.stream.connectors.google.jwt.JwtSprayJson import pekko.stream.connectors.google.{ implicits, RequestSettings } import pdi.jwt.JwtAlgorithm.RS256 -import pdi.jwt.{ JwtClaim, JwtSprayJson } +import pdi.jwt.JwtClaim import spray.json.DefaultJsonProtocol._ import spray.json.JsonFormat diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Exception.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Exception.scala index ae9690008..9c88c182f 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Exception.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Exception.scala @@ -22,7 +22,8 @@ import pekko.stream.connectors.google.util.Retry import spray.json.DefaultJsonProtocol._ import spray.json.RootJsonFormat -final case class GoogleOAuth2Exception private (override val info: ErrorInfo) extends ExceptionWithErrorInfo(info) +final case class GoogleOAuth2Exception private[google] (override val info: ErrorInfo) + extends ExceptionWithErrorInfo(info) private[google] object GoogleOAuth2Exception { diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala index d536f8b00..0bafea003 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttp.scala @@ -14,7 +14,7 @@ package org.apache.pekko.stream.connectors.google.http import org.apache.pekko -import pekko.actor.ClassicActorSystemProvider +import pekko.actor.{ ClassicActorSystemProvider, ExtendedActorSystem, Scheduler } import pekko.annotation.InternalApi import pekko.dispatch.ExecutionContexts import pekko.http.scaladsl.Http.HostConnectionPool @@ -26,7 +26,7 @@ import pekko.stream.connectors.google.{ GoogleAttributes, GoogleSettings, Reques import pekko.stream.connectors.google.util.Retry import pekko.stream.scaladsl.{ Flow, FlowWithContext, Keep, RetryFlow } -import scala.concurrent.Future +import scala.concurrent.{ ExecutionContextExecutor, Future } import scala.util.{ Failure, Success, Try } @InternalApi @@ -45,9 +45,9 @@ private[connectors] object GoogleHttp { @InternalApi private[connectors] final class GoogleHttp private (val http: HttpExt) extends AnyVal { - private implicit def system = http.system - private implicit def ec = system.dispatcher - private implicit def scheduler = system.scheduler + private implicit def system: ExtendedActorSystem = http.system + private implicit def ec: ExecutionContextExecutor = system.dispatcher + private implicit def scheduler: Scheduler = system.scheduler /** * Sends a single [[HttpRequest]] and returns the raw [[HttpResponse]]. diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/javadsl/Google.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/javadsl/Google.scala index 20785a9fa..1325bced9 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/javadsl/Google.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/javadsl/Google.scala @@ -18,7 +18,7 @@ import pekko.NotUsed import pekko.actor.ClassicActorSystemProvider import pekko.http.javadsl.model.{ HttpRequest, HttpResponse } import pekko.http.javadsl.unmarshalling.Unmarshaller -import pekko.http.scaladsl.{ model => sm } +import pekko.http.scaladsl.{ model => sm, unmarshalling } import pekko.stream.connectors.google.GoogleSettings import pekko.stream.connectors.google.scaladsl.{ Google => ScalaGoogle } import pekko.stream.javadsl.{ Sink, Source } @@ -59,7 +59,7 @@ private[connectors] trait Google { */ final def paginatedRequest[Out <: Paginated](request: HttpRequest, unmarshaller: Unmarshaller[HttpResponse, Out]): Source[Out, NotUsed] = { - implicit val um = unmarshaller.asScala + implicit val um: unmarshalling.Unmarshaller[HttpResponse, Out] = unmarshaller.asScala ScalaGoogle.paginatedRequest[Out](request).asJava } diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/jwt/JwtSprayJson.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/jwt/JwtSprayJson.scala new file mode 100644 index 000000000..8362fa88e --- /dev/null +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/jwt/JwtSprayJson.scala @@ -0,0 +1,81 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * license agreements; and to You under the Apache License, version 2.0: + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * This file is part of the Apache Pekko project, derived from Akka. + */ + +package org.apache.pekko.stream.connectors.google.jwt + +import java.time.Clock +import org.apache.pekko +import pekko.annotation.InternalApi +import pdi.jwt._ +import pdi.jwt.exceptions.JwtNonStringException +import spray.json._ + +/** + * Implementation of `JwtCore` using `JsObject` from spray-json. + */ +@InternalApi +private[google] trait JwtSprayJsonParser[H, C] extends JwtJsonCommon[JsObject, H, C] { + protected def parse(value: String): JsObject = value.parseJson.asJsObject + + protected def stringify(value: JsObject): String = value.compactPrint + + protected def getAlgorithm(header: JsObject): Option[JwtAlgorithm] = + header.fields.get("alg").flatMap { + case JsString("none") => None + case JsString(algo) => Option(JwtAlgorithm.fromString(algo)) + case JsNull => None + case _ => throw new JwtNonStringException("alg") + } + +} + +@InternalApi +private[google] object JwtSprayJson extends JwtSprayJsonParser[JwtHeader, JwtClaim] { + import DefaultJsonProtocol._ + + def apply(clock: Clock): JwtSprayJson = new JwtSprayJson(clock) + + override def parseHeader(header: String): JwtHeader = { + val jsObj = parse(header) + JwtHeader( + algorithm = getAlgorithm(jsObj), + typ = safeGetField[String](jsObj, "typ"), + contentType = safeGetField[String](jsObj, "cty"), + keyId = safeGetField[String](jsObj, "kid")) + } + + override def parseClaim(claim: String): JwtClaim = { + val jsObj = parse(claim) + val content = JsObject( + jsObj.fields - "iss" - "sub" - "aud" - "exp" - "nbf" - "iat" - "jti") + JwtClaim( + content = stringify(content), + issuer = safeGetField[String](jsObj, "iss"), + subject = safeGetField[String](jsObj, "sub"), + audience = safeGetField[Set[String]](jsObj, "aud") + .orElse(safeGetField[String](jsObj, "aud").map(s => Set(s))), + expiration = safeGetField[Long](jsObj, "exp"), + notBefore = safeGetField[Long](jsObj, "nbf"), + issuedAt = safeGetField[Long](jsObj, "iat"), + jwtId = safeGetField[String](jsObj, "jti")) + } + + private[this] def safeRead[A: JsonReader](js: JsValue) = + safeReader[A].read(js).fold(_ => None, a => Option(a)) + + private[this] def safeGetField[A: JsonReader](js: JsObject, name: String) = + js.fields.get(name).flatMap(safeRead[A]) +} + +@InternalApi +private[google] class JwtSprayJson private (override val clock: Clock) + extends JwtSprayJsonParser[JwtHeader, JwtClaim] { + override def parseHeader(header: String): JwtHeader = JwtSprayJson.parseHeader(header) + override def parseClaim(header: String): JwtClaim = JwtSprayJson.parseClaim(header) +} diff --git a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/scaladsl/`X-Upload-Content-Type`.scala b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/scaladsl/`X-Upload-Content-Type`.scala index 291a26ab0..e7b85c461 100644 --- a/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/scaladsl/`X-Upload-Content-Type`.scala +++ b/google-common/src/main/scala/org/apache/pekko/stream/connectors/google/scaladsl/`X-Upload-Content-Type`.scala @@ -34,7 +34,7 @@ object `X-Upload-Content-Type` extends ModeledCustomHeaderCompanion[`X-Upload-Co contentType => Success(`X-Upload-Content-Type`(contentType))) } -final case class `X-Upload-Content-Type` private (contentType: ContentType) +final case class `X-Upload-Content-Type` private[connectors] (contentType: ContentType) extends ModeledCustomHeader[`X-Upload-Content-Type`] with XUploadContentType { override def value(): String = contentType.toString() diff --git a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/ResumableUploadSpec.scala b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/ResumableUploadSpec.scala index 819834bb8..bca269015 100644 --- a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/ResumableUploadSpec.scala +++ b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/ResumableUploadSpec.scala @@ -18,7 +18,7 @@ import pekko.actor.ActorSystem import pekko.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._ import pekko.http.scaladsl.model.HttpMethods.POST import pekko.http.scaladsl.model.{ ContentTypes, HttpRequest, Uri } -import pekko.http.scaladsl.unmarshalling.Unmarshaller +import pekko.http.scaladsl.unmarshalling.{ FromResponseUnmarshaller, Unmarshaller } import pekko.stream.connectors.google.scaladsl.`X-Upload-Content-Type` import pekko.stream.scaladsl.Source import pekko.testkit.TestKit @@ -79,7 +79,7 @@ class ResumableUploadSpec .willReturn(created().header("Content-Type", "application/json").body("{}")))) import implicits._ - implicit val um = + implicit val um: FromResponseUnmarshaller[JsValue] = Unmarshaller.messageUnmarshallerFromEntityUnmarshaller(sprayJsValueUnmarshaller).withDefaultRetry val result = Source diff --git a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala index 1bd56b790..753e3004a 100644 --- a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala +++ b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/GoogleOAuth2Spec.scala @@ -46,7 +46,7 @@ class GoogleOAuth2Spec implicit val executionContext: ExecutionContext = system.dispatcher implicit val settings: GoogleSettings = GoogleSettings(system) - implicit val clock = Clock.systemUTC() + implicit val clock: Clock = Clock.systemUTC() lazy val privateKey = { val inputStream = getClass.getClassLoader.getResourceAsStream("private_pcks8.pem") diff --git a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala index 7d7e2342a..446336dd9 100644 --- a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala +++ b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/auth/OAuth2CredentialsSpec.scala @@ -45,7 +45,7 @@ class OAuth2CredentialsSpec import system.dispatcher implicit val settings: RequestSettings = GoogleSettings().requestSettings - implicit val clock = Clock.systemUTC() + implicit val clock: Clock = Clock.systemUTC() final object AccessTokenProvider { @volatile var accessTokenPromise: Promise[AccessToken] = Promise.failed(new RuntimeException) diff --git a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala index 56a75389c..cc37d94f2 100644 --- a/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala +++ b/google-common/src/test/scala/org/apache/pekko/stream/connectors/google/http/GoogleHttpSpec.scala @@ -67,10 +67,11 @@ class GoogleHttpSpec anyInt, any[HttpsConnectionContext], any[ConnectionPoolSettings], - any[LoggingAdapter])).thenReturn(Flow[Any] + any[LoggingAdapter])).thenReturn( + Flow[Any] .zipWith(response)(Keep.right) .map(Try(_)) - .map((_, mock[Nothing])) + .map((_, mock[Nothing](scala.reflect.ClassTag.Nothing))) .mapMaterializedValue(_ => mock[HostConnectionPool]), Nil: _*): @nowarn("msg=dead code") http diff --git a/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/impl/FcmSender.scala b/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/impl/FcmSender.scala index 3a366face..859543def 100644 --- a/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/impl/FcmSender.scala +++ b/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/impl/FcmSender.scala @@ -55,7 +55,7 @@ private[fcm] class FcmSender { } implicit private val unmarshaller: FromResponseUnmarshaller[FcmSuccessResponse] = Unmarshaller.withMaterializer { - implicit ec => implicit mat => response: HttpResponse => + implicit ec => implicit mat => (response: HttpResponse) => if (response.status.isSuccess) { Unmarshal(response.entity).to[FcmSuccessResponse] } else { diff --git a/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSender.scala b/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSender.scala index 479855e10..66e96560c 100644 --- a/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSender.scala +++ b/google-fcm/src/main/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSender.scala @@ -50,7 +50,7 @@ private[fcm] class FcmSender { } implicit private val unmarshaller: FromResponseUnmarshaller[FcmSuccessResponse] = Unmarshaller.withMaterializer { - implicit ec => implicit mat => response: HttpResponse => + implicit ec => implicit mat => (response: HttpResponse) => if (response.status.isSuccess) { Unmarshal(response.entity).to[FcmSuccessResponse] } else { diff --git a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala index a2d4d4922..ca6c18820 100644 --- a/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala +++ b/google-fcm/src/test/scala/org/apache/pekko/stream/connectors/google/firebase/fcm/v1/impl/FcmSenderSpec.scala @@ -57,7 +57,7 @@ class FcmSenderSpec implicit val executionContext: ExecutionContext = system.dispatcher - implicit val conf = FcmSettings() + implicit val conf: FcmSettings = FcmSettings() implicit val settings: GoogleSettings = GoogleSettings().copy(projectId = "projectId") "FcmSender" should { diff --git a/huawei-push-kit/src/test/scala/org/apache/pekko/stream/connectors/huawei/pushkit/impl/PushKitSenderSpec.scala b/huawei-push-kit/src/test/scala/org/apache/pekko/stream/connectors/huawei/pushkit/impl/PushKitSenderSpec.scala index cc83e885f..16a7f78ce 100644 --- a/huawei-push-kit/src/test/scala/org/apache/pekko/stream/connectors/huawei/pushkit/impl/PushKitSenderSpec.scala +++ b/huawei-push-kit/src/test/scala/org/apache/pekko/stream/connectors/huawei/pushkit/impl/PushKitSenderSpec.scala @@ -55,7 +55,7 @@ class PushKitSenderSpec implicit val executionContext: ExecutionContext = system.dispatcher - implicit val config = HmsSettings() + implicit val config: HmsSettings = HmsSettings() "HmsSender" should { diff --git a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala index ccf6e788a..301dde93c 100644 --- a/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala +++ b/mqtt-streaming/src/test/scala/docs/scaladsl/MqttActorSystemsSpec.scala @@ -14,6 +14,7 @@ package docs.scaladsl import org.apache.pekko +import pekko.actor import pekko.actor.typed.scaladsl.Behaviors import pekko.stream.connectors.mqtt.streaming.MqttSessionSettings import pekko.stream.connectors.mqtt.streaming.scaladsl.{ ActorMqttClientSession, ActorMqttServerSession } @@ -21,7 +22,8 @@ import org.scalatest.wordspec.AnyWordSpec class MqttTypedActorSystemSpec extends AnyWordSpec { - implicit val actorSystem = pekko.actor.typed.ActorSystem(Behaviors.ignore, "MqttTypedActorSystemSpec") + implicit val actorSystem: actor.typed.ActorSystem[Nothing] = + actor.typed.ActorSystem(Behaviors.ignore, "MqttTypedActorSystemSpec") "A typed actor system" should { "allow client creation" in { @@ -41,7 +43,7 @@ class MqttTypedActorSystemSpec extends AnyWordSpec { class MqttClassicActorSystemSpec extends AnyWordSpec { - implicit val actorSystem = pekko.actor.ActorSystem("MqttClassicActorSystemSpec") + implicit val actorSystem: actor.ActorSystem = actor.ActorSystem("MqttClassicActorSystemSpec") "A typed actor system" should { "allow client creation" in { diff --git a/project/Dependencies.scala b/project/Dependencies.scala index 0989ee015..8006d1e7d 100644 --- a/project/Dependencies.scala +++ b/project/Dependencies.scala @@ -186,13 +186,13 @@ object Dependencies { "org.apache.logging.log4j" % "log4j-to-slf4j" % "2.17.1" % Test) ++ JacksonDatabindDependencies) val GoogleCommon = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, - "com.github.jwt-scala" %% "jwt-spray-json" % "7.1.4", - "com.google.auth" % "google-auth-library-credentials" % "0.24.1", - "io.specto" % "hoverfly-java" % hoverflyVersion % Test) ++ Mockito) + "com.github.jwt-scala" %% "jwt-json-common" % "7.1.5", // ApacheV2 + "com.google.auth" % "google-auth-library-credentials" % "0.24.1", // BSD 3-clause + "io.specto" % "hoverfly-java" % hoverflyVersion % Test // ApacheV2 + ) ++ Mockito) val GoogleBigQuery = Seq( crossScalaVersions -= Scala3, @@ -239,7 +239,6 @@ object Dependencies { "org.apache.pekko" %% "pekko-discovery" % PekkoVersion)) val GoogleFcm = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion) ++ Mockito) @@ -283,11 +282,11 @@ object Dependencies { "org.slf4j" % "log4j-over-slf4j" % log4jOverSlf4jVersion % Test)) val HuaweiPushKit = Seq( - crossScalaVersions -= Scala3, libraryDependencies ++= Seq( "org.apache.pekko" %% "pekko-http" % PekkoHttpVersion, "org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion, - "com.github.jwt-scala" %% "jwt-spray-json" % "7.1.4") ++ Mockito) + "com.github.jwt-scala" %% "jwt-json-common" % "7.1.5" // ApacheV2 + ) ++ Mockito) val InfluxDB = Seq( libraryDependencies ++= Seq(