Skip to content

Commit

Permalink
support scala3 for google-cloud-storage connector (#159)
Browse files Browse the repository at this point in the history
* support scala3 for google-cloud-storage connector

fix import

Update Formats.scala

Update Dependencies.scala

* try to fix code that was somehow reverted in a merge

* more changes

* Update GCStorageStream.scala

* clock implicits

* another merge issue
  • Loading branch information
pjfanning committed Jun 10, 2023
1 parent 48fb70b commit d42b033
Show file tree
Hide file tree
Showing 12 changed files with 66 additions and 64 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ object GrpcBigQueryStorageReader {
* An extension that manages a single gRPC scala reader client per actor system.
*/
final class GrpcBigQueryStorageReaderExt private (sys: ExtendedActorSystem) extends Extension {
implicit val reader = GrpcBigQueryStorageReader()(sys)
implicit val reader: GrpcBigQueryStorageReader = GrpcBigQueryStorageReader()(sys)
}

object GrpcBigQueryStorageReaderExt extends ExtensionId[GrpcBigQueryStorageReaderExt] with ExtensionIdProvider {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ import scala.concurrent.Future

class ExampleReader {

implicit val sys = ActorSystem("ExampleReader")
implicit val sys: ActorSystem = ActorSystem("ExampleReader")

// #read-all
val sourceOfSources: Source[(ReadSession.Schema, Seq[Source[ReadRowsResponse.Rows, NotUsed]]), Future[NotUsed]] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ object ExampleApp {
|pekko.loglevel = INFO
""".stripMargin)

implicit val sys = ActorSystem("ExampleApp", config)
implicit val sys: ActorSystem = ActorSystem("ExampleApp", config)

import sys.dispatcher

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,15 +69,15 @@ private[pubsub] trait PubSubApi {
def PubSubGoogleApisPort: Int
def isEmulated: Boolean

private implicit val instantFormat = new RootJsonFormat[Instant] {
private implicit val instantFormat: RootJsonFormat[Instant] = new RootJsonFormat[Instant] {
override def read(jsValue: JsValue): Instant = jsValue match {
case JsString(time) => Instant.parse(time)
case _ => deserializationError("Instant required as a string of RFC3339 UTC Zulu format.")
}
override def write(instant: Instant): JsValue = JsString(instant.toString)
}

private implicit val pubSubMessageFormat =
private implicit val pubSubMessageFormat: RootJsonFormat[PubSubMessage] =
new RootJsonFormat[PubSubMessage] {
override def read(json: JsValue): PubSubMessage = {
val fields = json.asJsObject.fields
Expand All @@ -98,7 +98,7 @@ private[pubsub] trait PubSubApi {
++ m.attributes.map(attributes => "attributes" -> attributes.toJson): _*)
}

private implicit val publishMessageFormat = new RootJsonFormat[PublishMessage] {
private implicit val publishMessageFormat: RootJsonFormat[PublishMessage] = new RootJsonFormat[PublishMessage] {
def read(json: JsValue): PublishMessage = {
val data = json.asJsObject.fields("data").convertTo[String]
val attributes = json.asJsObject.fields("attributes").convertTo[immutable.Map[String, String]]
Expand All @@ -112,37 +112,39 @@ private[pubsub] trait PubSubApi {
m.attributes.map(a => "attributes" -> a.toJson): _*)
}

private implicit val pubSubRequestFormat = new RootJsonFormat[PublishRequest] {
private implicit val pubSubRequestFormat: RootJsonFormat[PublishRequest] = new RootJsonFormat[PublishRequest] {
def read(json: JsValue): PublishRequest =
PublishRequest(json.asJsObject.fields("messages").convertTo[immutable.Seq[PublishMessage]])
def write(pr: PublishRequest): JsValue = JsObject("messages" -> pr.messages.toJson)
}
private implicit val gcePubSubResponseFormat = new RootJsonFormat[PublishResponse] {
private implicit val gcePubSubResponseFormat: RootJsonFormat[PublishResponse] = new RootJsonFormat[PublishResponse] {
def read(json: JsValue): PublishResponse =
PublishResponse(json.asJsObject.fields("messageIds").convertTo[immutable.Seq[String]])
def write(pr: PublishResponse): JsValue = JsObject("messageIds" -> pr.messageIds.toJson)
}

private implicit val receivedMessageFormat = new RootJsonFormat[ReceivedMessage] {
private implicit val receivedMessageFormat: RootJsonFormat[ReceivedMessage] = new RootJsonFormat[ReceivedMessage] {
def read(json: JsValue): ReceivedMessage =
ReceivedMessage(json.asJsObject.fields("ackId").convertTo[String],
json.asJsObject.fields("message").convertTo[PubSubMessage])
def write(rm: ReceivedMessage): JsValue =
JsObject("ackId" -> rm.ackId.toJson, "message" -> rm.message.toJson)
}
private implicit val pubSubPullResponseFormat = new RootJsonFormat[PullResponse] {
private implicit val pubSubPullResponseFormat: RootJsonFormat[PullResponse] = new RootJsonFormat[PullResponse] {
def read(json: JsValue): PullResponse =
PullResponse(json.asJsObject.fields.get("receivedMessages").map(_.convertTo[immutable.Seq[ReceivedMessage]]))
def write(pr: PullResponse): JsValue =
pr.receivedMessages.map(rm => JsObject("receivedMessages" -> rm.toJson)).getOrElse(JsObject.empty)
}

private implicit val acknowledgeRequestFormat = new RootJsonFormat[AcknowledgeRequest] {
def read(json: JsValue): AcknowledgeRequest =
AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*)
def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson)
}
private implicit val pullRequestFormat = DefaultJsonProtocol.jsonFormat2(PullRequest.apply)
private implicit val acknowledgeRequestFormat: RootJsonFormat[AcknowledgeRequest] =
new RootJsonFormat[AcknowledgeRequest] {
def read(json: JsValue): AcknowledgeRequest =
AcknowledgeRequest(json.asJsObject.fields("ackIds").convertTo[immutable.Seq[String]]: _*)
def write(ar: AcknowledgeRequest): JsValue = JsObject("ackIds" -> ar.ackIds.toJson)
}
private implicit val pullRequestFormat: RootJsonFormat[PullRequest] =
DefaultJsonProtocol.jsonFormat2(PullRequest.apply)

private def scheme: String = if (isEmulated) "http" else "https"

Expand Down Expand Up @@ -173,7 +175,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val pullResponseUnmarshaller: FromResponseUnmarshaller[PullResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PullResponse]
Expand Down Expand Up @@ -211,7 +213,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val acknowledgeResponseUnmarshaller: FromResponseUnmarshaller[Done] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) =>
response.discardEntityBytes().future
Expand Down Expand Up @@ -261,7 +263,7 @@ private[pubsub] trait PubSubApi {
publish(topic, parallelism, None)

private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PublishResponse]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import java.time.OffsetDateTime
import org.apache.pekko
import pekko.http.scaladsl.model.{ ContentType, ContentTypes }
import pekko.stream.connectors.googlecloud.storage._
import spray.json.{ DefaultJsonProtocol, JsObject, JsValue, RootJsonFormat, RootJsonReader }
import spray.json.{ enrichAny, DefaultJsonProtocol, JsObject, JsValue, RootJsonFormat, RootJsonReader }

import scala.util.Try

Expand Down Expand Up @@ -49,7 +49,8 @@ object Formats extends DefaultJsonProtocol {
domain: String,
projectTeam: ProjectTeam,
etag: String)
private implicit val ObjectAccessControlsJsonFormat = jsonFormat13(ObjectAccessControls)
private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] =
jsonFormat13(ObjectAccessControls.apply)

/**
* Google API storage response object
Expand Down Expand Up @@ -79,7 +80,8 @@ object Formats extends DefaultJsonProtocol {
timeStorageClassUpdated: String,
updated: String)

private implicit val storageObjectReadOnlyJson = jsonFormat18(StorageObjectReadOnlyJson)
private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] =
jsonFormat18(StorageObjectReadOnlyJson.apply)

// private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue
private final case class StorageObjectWriteableJson(
Expand All @@ -98,7 +100,8 @@ object Formats extends DefaultJsonProtocol {
temporaryHold: Option[Boolean],
acl: Option[List[ObjectAccessControls]])

private implicit val storageObjectWritableJson = jsonFormat14(StorageObjectWriteableJson)
private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] =
jsonFormat14(StorageObjectWriteableJson.apply)

private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] {
override def read(value: JsValue): StorageObjectJson = {
Expand Down Expand Up @@ -175,7 +178,8 @@ object Formats extends DefaultJsonProtocol {
}
}

private implicit val bucketListResultJsonReads = jsonFormat4(BucketListResultJson.apply)
private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] =
jsonFormat4(BucketListResultJson.apply)

implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] {
override def read(json: JsValue): RewriteResponse = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package org.apache.pekko.stream.connectors.googlecloud.storage.impl

import org.apache.pekko
import pekko.actor.ActorSystem
import pekko.annotation.InternalApi
import pekko.dispatch.ExecutionContexts
import pekko.dispatch.ExecutionContexts.parasitic
Expand Down Expand Up @@ -289,7 +290,7 @@ import scala.concurrent.{ ExecutionContext, Future }

@nowarn("msg=deprecated")
private def resolveSettings(mat: Materializer, attr: Attributes) = {
implicit val sys = mat.system
implicit val sys: ActorSystem = mat.system
val legacySettings = attr
.get[GCStorageSettingsValue]
.map(_.settings)
Expand Down Expand Up @@ -334,7 +335,7 @@ import scala.concurrent.{ ExecutionContext, Future }
}

private def resolveGCSSettings(mat: Materializer, attr: Attributes): GCSSettings = {
implicit val sys = mat.system
implicit val sys: ActorSystem = mat.system
attr
.get[GCSSettingsValue]
.map(_.settings)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait WithMaterializerGlobal
with ScalaFutures
with IntegrationPatience
with Matchers {
implicit val actorSystem = ActorSystem("test")
implicit val actorSystem: ActorSystem = ActorSystem("test")
implicit val ec: ExecutionContext = actorSystem.dispatcher

override protected def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[connectors] object ResumableUpload {
Sink
.fromMaterializer { (mat, attr) =>
import mat.executionContext
implicit val materializer = mat
implicit val materializer: Materializer = mat
implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr)
val uploadChunkSize = settings.requestSettings.uploadChunkSize

Expand Down Expand Up @@ -96,25 +96,24 @@ private[connectors] object ResumableUpload {

private def initiateSession(request: HttpRequest)(implicit mat: Materializer,
settings: GoogleSettings): Future[Uri] = {
implicit val system: ActorSystem = mat.system
import implicits._

implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
}.withDefaultRetry
implicit val um: FromResponseUnmarshaller[Uri] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
}.withDefaultRetry

GoogleHttp().singleAuthenticatedRequest[Uri](request)
GoogleHttp(mat.system).singleAuthenticatedRequest[Uri](request)
}

private final case class DoNotRetry(ex: Throwable) extends Throwable(ex) with NoStackTrace

private def uploadChunk[T: FromResponseUnmarshaller](
request: HttpRequest)(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = {
implicit val system: ActorSystem = mat.system

val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case PermanentRedirect =>
response.discardEntityBytes().future.map(_ => None)
Expand All @@ -127,7 +126,8 @@ private[connectors] object ResumableUpload {
val uri = request.uri
Flow[HttpRequest]
.map((_, ()))
.via(GoogleHttp().cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)(um))
.via(GoogleHttp(mat.system).cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)(
um))
.map(_._1.recoverWith { case DoNotRetry(ex) => Failure(ex) })
}

Expand All @@ -147,30 +147,30 @@ private[connectors] object ResumableUpload {
request: HttpRequest,
chunk: Future[MaybeLast[Chunk]])(
implicit mat: Materializer, settings: GoogleSettings): Future[Either[T, MaybeLast[Chunk]]] = {
implicit val system: ActorSystem = mat.system
import implicits._

implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
response.status match {
case OK | Created => Unmarshal(response).to[T].map(Left(_))
case PermanentRedirect =>
response.discardEntityBytes().future.map { _ =>
Right(
response
.header[Range]
.flatMap(_.ranges.headOption)
.collect {
case Slice(_, last) => last + 1
}.getOrElse(0L))
}
case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage))
}
}.withDefaultRetry
implicit val um: FromResponseUnmarshaller[Either[T, Long]] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case OK | Created => Unmarshal(response).to[T].map(Left(_))
case PermanentRedirect =>
response.discardEntityBytes().future.map { _ =>
Right(
response
.header[Range]
.flatMap(_.ranges.headOption)
.collect {
case Slice(_, last) => last + 1
}.getOrElse(0L))
}
case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage))
}
}.withDefaultRetry

import mat.executionContext
chunk.flatMap {
case maybeLast @ MaybeLast(Chunk(bytes, position)) =>
GoogleHttp()
GoogleHttp(mat.system)
.singleAuthenticatedRequest[Either[T, Long]](request.addHeader(statusRequestHeader))
.map {
case Left(result) if maybeLast.isLast => Left(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ class GoogleOAuth2Spec

implicit val executionContext: ExecutionContext = system.dispatcher
implicit val settings: GoogleSettings = GoogleSettings(system)
implicit val clock = Clock.systemUTC()
implicit val clock: Clock = Clock.systemUTC()

lazy val privateKey = {
val inputStream = getClass.getClassLoader.getResourceAsStream("private_pcks8.pem")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ class OAuth2CredentialsSpec
import system.dispatcher

implicit val settings: RequestSettings = GoogleSettings().requestSettings
implicit val clock = Clock.systemUTC()
implicit val clock: Clock = Clock.systemUTC()

final object AccessTokenProvider {
@volatile var accessTokenPromise: Promise[AccessToken] = Promise.failed(new RuntimeException)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ class FcmSenderSpec

implicit val executionContext: ExecutionContext = system.dispatcher

implicit val conf = FcmSettings()
implicit val conf: FcmSettings = FcmSettings()
implicit val settings: GoogleSettings = GoogleSettings().copy(projectId = "projectId")

"FcmSender" should {
Expand Down
5 changes: 0 additions & 5 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,6 @@ object Dependencies {
"org.apache.logging.log4j" % "log4j-to-slf4j" % "2.17.1" % Test) ++ JacksonDatabindDependencies)

val GoogleCommon = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
Expand All @@ -209,7 +208,6 @@ object Dependencies {
) ++ Mockito)

val GoogleBigQuery = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-jackson" % PekkoHttpVersion % Provided,
Expand All @@ -236,7 +234,6 @@ object Dependencies {
"org.apache.pekko" %% "pekko-discovery" % PekkoVersion) ++ Mockito)

val GooglePubSub = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
Expand All @@ -255,13 +252,11 @@ object Dependencies {
"org.apache.pekko" %% "pekko-discovery" % PekkoVersion))

val GoogleFcm = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion) ++ Mockito)

val GoogleStorage = Seq(
crossScalaVersions -= Scala3,
libraryDependencies ++= Seq(
"org.apache.pekko" %% "pekko-http" % PekkoHttpVersion,
"org.apache.pekko" %% "pekko-http-spray-json" % PekkoHttpVersion,
Expand Down

0 comments on commit d42b033

Please sign in to comment.