Skip to content

Commit

Permalink
support scala3 (google-common) (#141)
Browse files Browse the repository at this point in the history
* support scala3 (google-common)

* Update ResumableUpload.scala

* add huawei

* Update Dependencies.scala

* Update BigQuery.scala

* annotation
  • Loading branch information
pjfanning committed Aug 6, 2023
1 parent 181a70f commit b408cb7
Show file tree
Hide file tree
Showing 27 changed files with 181 additions and 88 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ object BigQueryException {

implicit val fromResponseUnmarshaller: FromResponseUnmarshaller[Throwable] =
Unmarshaller
.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
import SprayJsonSupport._
val HttpResponse(status, _, entity, _) = response: @nowarn("msg=match may not be exhaustive")
Unmarshaller
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import pekko.annotation.ApiMayChange
import pekko.http.javadsl.marshalling.Marshaller
import pekko.http.javadsl.model.{ HttpEntity, RequestEntity }
import pekko.http.javadsl.unmarshalling.Unmarshaller
import pekko.http.scaladsl.{ model => sm }
import pekko.http.scaladsl.{ marshalling, model => sm, unmarshalling }
import pekko.japi.Pair
import pekko.stream.connectors.google.GoogleSettings
import pekko.stream.connectors.google.javadsl.Google
Expand All @@ -45,7 +45,6 @@ import pekko.util.OptionConverters._
import java.time.Duration
import java.util.concurrent.CompletionStage
import java.{ lang, util }

import scala.annotation.nowarn
import scala.concurrent.duration.{ FiniteDuration, MILLISECONDS }

Expand Down Expand Up @@ -220,7 +219,8 @@ object BigQuery extends Google {
selectedFields: util.List[String],
unmarshaller: Unmarshaller[HttpEntity, TableDataListResponse[Out]])
: Source[Out, CompletionStage[TableDataListResponse[Out]]] = {
implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity]
implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, TableDataListResponse[Out]] =
unmarshaller.asScalaCastInput[sm.HttpEntity]
ScalaBigQuery
.tableData(datasetId, tableId, startIndex.toScala, maxResults.toScala, selectedFields.asScala.toList)
.mapMaterializedValue(_.asJava)
Expand All @@ -245,7 +245,8 @@ object BigQuery extends Google {
retryPolicy: InsertAllRetryPolicy,
templateSuffix: util.Optional[String],
marshaller: Marshaller[TableDataInsertAllRequest[In], RequestEntity]): Sink[util.List[In], NotUsed] = {
implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity]
implicit val m: marshalling.Marshaller[TableDataInsertAllRequest[In], sm.RequestEntity] =
marshaller.asScalaCastOutput[sm.RequestEntity]
ss.Flow[util.List[In]]
.map(_.asScala.toList)
.to(ScalaBigQuery.insertAll[In](datasetId, tableId, retryPolicy, templateSuffix.toScala))
Expand All @@ -269,7 +270,8 @@ object BigQuery extends Google {
retryFailedRequests: Boolean,
marshaller: Marshaller[TableDataInsertAllRequest[In], RequestEntity])
: Flow[TableDataInsertAllRequest[In], TableDataInsertAllResponse, NotUsed] = {
implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity]
implicit val m: marshalling.Marshaller[TableDataInsertAllRequest[In], sm.RequestEntity] =
marshaller.asScalaCastOutput[sm.RequestEntity]
ScalaBigQuery.insertAll[In](datasetId, tableId, retryFailedRequests).asJava
}

Expand All @@ -290,7 +292,8 @@ object BigQuery extends Google {
dryRun: Boolean,
useLegacySql: Boolean,
unmarshaller: Unmarshaller[HttpEntity, QueryResponse[Out]]): Source[Out, CompletionStage[QueryResponse[Out]]] = {
implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity]
implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, QueryResponse[Out]] =
unmarshaller.asScalaCastInput[sm.HttpEntity]
ScalaBigQuery.query(query, dryRun, useLegacySql).mapMaterializedValue(_.asJava).asJava
}

Expand All @@ -309,7 +312,8 @@ object BigQuery extends Google {
query: QueryRequest,
unmarshaller: Unmarshaller[HttpEntity, QueryResponse[Out]])
: Source[Out, Pair[CompletionStage[JobReference], CompletionStage[QueryResponse[Out]]]] = {
implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity]
implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, QueryResponse[Out]] =
unmarshaller.asScalaCastInput[sm.HttpEntity]
ScalaBigQuery
.query(query)
.mapMaterializedValue {
Expand Down Expand Up @@ -339,7 +343,8 @@ object BigQuery extends Google {
timeout: util.Optional[Duration],
location: util.Optional[String],
unmarshaller: Unmarshaller[HttpEntity, QueryResponse[Out]]): Source[Out, CompletionStage[QueryResponse[Out]]] = {
implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity]
implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, QueryResponse[Out]] =
unmarshaller.asScalaCastInput[sm.HttpEntity]
ScalaBigQuery
.queryResults(jobId,
startIndex.toScala,
Expand Down Expand Up @@ -396,7 +401,7 @@ object BigQuery extends Google {
def insertAllAsync[In](datasetId: String,
tableId: String,
marshaller: Marshaller[In, RequestEntity]): Flow[In, Job, NotUsed] = {
implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity]
implicit val m: marshalling.Marshaller[In, sm.RequestEntity] = marshaller.asScalaCastOutput[sm.RequestEntity]
ScalaBigQuery.insertAllAsync[In](datasetId, tableId).asJava[In]
}

Expand All @@ -415,7 +420,7 @@ object BigQuery extends Google {
tableId: String,
labels: util.Optional[util.Map[String, String]],
marshaller: Marshaller[In, RequestEntity]): Flow[In, Job, NotUsed] = {
implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity]
implicit val m: marshalling.Marshaller[In, sm.RequestEntity] = marshaller.asScalaCastOutput[sm.RequestEntity]
ScalaBigQuery.insertAllAsync[In](datasetId, tableId, labels.toScala.map(_.asScala.toMap)).asJava[In]
}

Expand All @@ -436,8 +441,8 @@ object BigQuery extends Google {
job: Job,
marshaller: Marshaller[Job, RequestEntity],
unmarshaller: Unmarshaller[HttpEntity, Job]): Sink[ByteString, CompletionStage[Job]] = {
implicit val m = marshaller.asScalaCastOutput[sm.RequestEntity]
implicit val um = unmarshaller.asScalaCastInput[sm.HttpEntity]
implicit val m: marshalling.Marshaller[Job, sm.RequestEntity] = marshaller.asScalaCastOutput[sm.RequestEntity]
implicit val um: unmarshalling.Unmarshaller[sm.HttpEntity, Job] = unmarshaller.asScalaCastInput[sm.HttpEntity]
ScalaBigQuery.createLoadJob(job).mapMaterializedValue(_.asJava).asJava[ByteString]
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val pullResponseUnmarshaller: FromResponseUnmarshaller[PullResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PullResponse]
Expand Down Expand Up @@ -211,7 +211,7 @@ private[pubsub] trait PubSubApi {
.mapMaterializedValue(_ => NotUsed)

private implicit val acknowledgeResponseUnmarshaller: FromResponseUnmarshaller[Done] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) =>
response.discardEntityBytes().future
Expand Down Expand Up @@ -261,7 +261,7 @@ private[pubsub] trait PubSubApi {
publish(topic, parallelism, None)

private implicit val publishResponseUnmarshaller: FromResponseUnmarshaller[PublishResponse] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case StatusCodes.Success(_) if response.entity.contentType == ContentTypes.`application/json` =>
Unmarshal(response.entity).to[PublishResponse]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ object Formats extends DefaultJsonProtocol {
domain: String,
projectTeam: ProjectTeam,
etag: String)
private implicit val ObjectAccessControlsJsonFormat = jsonFormat13(ObjectAccessControls)
private implicit val ObjectAccessControlsJsonFormat: RootJsonFormat[ObjectAccessControls] =
jsonFormat13(ObjectAccessControls.apply)

/**
* Google API storage response object
Expand Down Expand Up @@ -79,7 +80,8 @@ object Formats extends DefaultJsonProtocol {
timeStorageClassUpdated: String,
updated: String)

private implicit val storageObjectReadOnlyJson = jsonFormat18(StorageObjectReadOnlyJson)
private implicit val storageObjectReadOnlyJson: RootJsonFormat[StorageObjectReadOnlyJson] =
jsonFormat18(StorageObjectReadOnlyJson.apply)

// private sub class of StorageObjectJson used to workaround 22 field jsonFormat issue
private final case class StorageObjectWriteableJson(
Expand All @@ -98,7 +100,8 @@ object Formats extends DefaultJsonProtocol {
temporaryHold: Option[Boolean],
acl: Option[List[ObjectAccessControls]])

private implicit val storageObjectWritableJson = jsonFormat14(StorageObjectWriteableJson)
private implicit val storageObjectWritableJson: RootJsonFormat[StorageObjectWriteableJson] =
jsonFormat14(StorageObjectWriteableJson.apply)

private implicit object StorageObjectJsonFormat extends RootJsonFormat[StorageObjectJson] {
override def read(value: JsValue): StorageObjectJson = {
Expand Down Expand Up @@ -175,7 +178,8 @@ object Formats extends DefaultJsonProtocol {
}
}

private implicit val bucketListResultJsonReads = jsonFormat4(BucketListResultJson.apply)
private implicit val bucketListResultJsonReads: RootJsonFormat[BucketListResultJson] =
jsonFormat4(BucketListResultJson.apply)

implicit object RewriteResponseReads extends RootJsonReader[RewriteResponse] {
override def read(json: JsValue): RewriteResponse = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ import scala.concurrent.{ ExecutionContext, Future }
getBucketPath(bucket) / "o" / objectName

implicit def unmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, T] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Unmarshal(entity).to[T]
Expand All @@ -254,7 +254,7 @@ import scala.concurrent.{ ExecutionContext, Future }
}.withDefaultRetry

implicit def optionUnmarshaller[T: FromEntityUnmarshaller]: Unmarshaller[HttpResponse, Option[T]] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response match {
case HttpResponse(status, _, entity, _) if status.isSuccess() && !status.isRedirection() =>
Unmarshal(entity).to[T].map(Some(_))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ trait WithMaterializerGlobal
with ScalaFutures
with IntegrationPatience
with Matchers {
implicit val actorSystem = ActorSystem("test")
implicit val actorSystem: ActorSystem = ActorSystem("test")
implicit val ec: ExecutionContext = actorSystem.dispatcher

override protected def afterAll(): Unit = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ class GCStorageStreamIntegrationSpec
with ScalaFutures
with LogCapturing {

private implicit val defaultPatience =
private implicit val defaultPatience: PatienceConfig =
PatienceConfig(timeout = 60.seconds, interval = 60.millis)

var folderName: String = _
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ public interface XUploadContentType {
ContentType getContentType();

static XUploadContentType create(ContentType contentType) {
return X$minusUpload$minusContent$minusType$.MODULE$.apply(
(org.apache.pekko.http.scaladsl.model.ContentType) contentType);
return X$minusUpload$minusContent$minusType$.MODULE$.apply(contentType.toString());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ object GoogleSettings {

}

final case class GoogleSettings @InternalApi private (projectId: String,
final case class GoogleSettings @InternalApi private[connectors] (projectId: String,
credentials: Credentials,
requestSettings: RequestSettings) {
def getProjectId = projectId
Expand Down Expand Up @@ -134,7 +134,7 @@ object RequestSettings {
apply(userIp.toScala, quotaUser.toScala, prettyPrint, chunkSize, retrySettings, forwardProxy.toScala)
}

final case class RequestSettings @InternalApi private (
final case class RequestSettings @InternalApi private[connectors] (
userIp: Option[String],
quotaUser: Option[String],
prettyPrint: Boolean,
Expand Down Expand Up @@ -252,7 +252,7 @@ object ForwardProxy {
credentials: Option[BasicHttpCredentials],
trustPem: Option[String])(implicit system: ClassicActorSystemProvider): ForwardProxy = {
ForwardProxy(
trustPem.fold(Http(system).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)),
trustPem.fold(Http(system.classicSystem).defaultClientHttpsContext)(ForwardProxyHttpsContext(_)),
ForwardProxyPoolSettings(scheme, host, port, credentials)(system.classicSystem))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ private[connectors] object ResumableUpload {
Sink
.fromMaterializer { (mat, attr) =>
import mat.executionContext
implicit val materializer = mat
implicit val materializer: Materializer = mat
implicit val settings: GoogleSettings = GoogleAttributes.resolveSettings(mat, attr)
val uploadChunkSize = settings.requestSettings.uploadChunkSize

Expand Down Expand Up @@ -96,25 +96,24 @@ private[connectors] object ResumableUpload {

private def initiateSession(request: HttpRequest)(implicit mat: Materializer,
settings: GoogleSettings): Future[Uri] = {
implicit val system: ActorSystem = mat.system
import implicits._

implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
}.withDefaultRetry
implicit val um: FromResponseUnmarshaller[Uri] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.discardEntityBytes().future.map { _ =>
response.header[Location].fold(throw InvalidResponseException(ErrorInfo("No Location header")))(_.uri)
}
}.withDefaultRetry

GoogleHttp().singleAuthenticatedRequest[Uri](request)
GoogleHttp(mat.system).singleAuthenticatedRequest[Uri](request)
}

private final case class DoNotRetry(ex: Throwable) extends Throwable(ex) with NoStackTrace

private def uploadChunk[T: FromResponseUnmarshaller](
request: HttpRequest)(implicit mat: Materializer): Flow[Either[T, MaybeLast[Chunk]], Try[Option[T]], NotUsed] = {
implicit val system: ActorSystem = mat.system

val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case PermanentRedirect =>
response.discardEntityBytes().future.map(_ => None)
Expand All @@ -127,7 +126,8 @@ private[connectors] object ResumableUpload {
val uri = request.uri
Flow[HttpRequest]
.map((_, ()))
.via(GoogleHttp().cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)(um))
.via(GoogleHttp(mat.system).cachedHostConnectionPoolWithContext(uri.authority.host.address, uri.effectivePort)(
um))
.map(_._1.recoverWith { case DoNotRetry(ex) => Failure(ex) })
}

Expand All @@ -147,30 +147,30 @@ private[connectors] object ResumableUpload {
request: HttpRequest,
chunk: Future[MaybeLast[Chunk]])(
implicit mat: Materializer, settings: GoogleSettings): Future[Either[T, MaybeLast[Chunk]]] = {
implicit val system: ActorSystem = mat.system
import implicits._

implicit val um = Unmarshaller.withMaterializer { implicit ec => implicit mat => response: HttpResponse =>
response.status match {
case OK | Created => Unmarshal(response).to[T].map(Left(_))
case PermanentRedirect =>
response.discardEntityBytes().future.map { _ =>
Right(
response
.header[Range]
.flatMap(_.ranges.headOption)
.collect {
case Slice(_, last) => last + 1
}.getOrElse(0L))
}
case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage))
}
}.withDefaultRetry
implicit val um: FromResponseUnmarshaller[Either[T, Long]] =
Unmarshaller.withMaterializer { implicit ec => implicit mat => (response: HttpResponse) =>
response.status match {
case OK | Created => Unmarshal(response).to[T].map(Left(_))
case PermanentRedirect =>
response.discardEntityBytes().future.map { _ =>
Right(
response
.header[Range]
.flatMap(_.ranges.headOption)
.collect {
case Slice(_, last) => last + 1
}.getOrElse(0L))
}
case _ => throw InvalidResponseException(ErrorInfo(response.status.value, response.status.defaultMessage))
}
}.withDefaultRetry

import mat.executionContext
chunk.flatMap {
case maybeLast @ MaybeLast(Chunk(bytes, position)) =>
GoogleHttp()
GoogleHttp(mat.system)
.singleAuthenticatedRequest[Either[T, Long]](request.addHeader(statusRequestHeader))
.map {
case Left(result) if maybeLast.isLast => Left(result)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ object Credentials {
*/
def apply(c: Config)(implicit system: ClassicActorSystemProvider): Credentials = c.getString("provider") match {
case "application-default" =>
val log = Logging(system.classicSystem, getClass)
val log = Logging(system.classicSystem, classOf[Credentials])
try {
val creds = parseServiceAccount(c)
log.info("Using service account credentials")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ import pekko.http.scaladsl.model.HttpMethods.POST
import pekko.http.scaladsl.model.{ FormData, HttpRequest }
import pekko.stream.Materializer
import pekko.stream.connectors.google.http.GoogleHttp
import pekko.stream.connectors.google.jwt.JwtSprayJson
import pekko.stream.connectors.google.{ implicits, RequestSettings }
import pdi.jwt.JwtAlgorithm.RS256
import pdi.jwt.{ JwtClaim, JwtSprayJson }
import pdi.jwt.JwtClaim
import spray.json.DefaultJsonProtocol._
import spray.json.JsonFormat

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ import pekko.stream.connectors.google.util.Retry
import spray.json.DefaultJsonProtocol._
import spray.json.RootJsonFormat

final case class GoogleOAuth2Exception private (override val info: ErrorInfo) extends ExceptionWithErrorInfo(info)
final case class GoogleOAuth2Exception private[google] (override val info: ErrorInfo)
extends ExceptionWithErrorInfo(info)

private[google] object GoogleOAuth2Exception {

Expand Down
Loading

0 comments on commit b408cb7

Please sign in to comment.