Skip to content

Commit

Permalink
Scala3 support for more google connectors (#143)
Browse files Browse the repository at this point in the history
* support scala3 (google-common)

* support scala3 on more google connectors

* continue

* imports

* more implicit issues

* Update BigQueryCollectionFormats.scala

* further issues

* class access issues

* Update BigQueryRestBasicFormats.scala

* use compactPrint because toString causes tests to fail in Scala3

* review issue
  • Loading branch information
pjfanning committed Aug 19, 2023
1 parent fc4417a commit e0202ad
Show file tree
Hide file tree
Showing 24 changed files with 128 additions and 110 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -221,7 +221,7 @@ object ElasticsearchFlow {
}

private final class SprayJsonWriter[T](implicit writer: JsonWriter[T]) extends MessageWriter[T] {
override def convert(message: T): String = message.toJson.toString()
override def convert(message: T): String = message.toJson.compactPrint
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -158,9 +158,9 @@ class ElasticsearchV5Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt
// #string
val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source(
immutable.Seq(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()))).via(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via(
ElasticsearchFlow.create(
constructElasticsearchParams(indexName, "_doc", ApiVersion.V5),
settings = baseWriteSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,9 +147,9 @@ class ElasticsearchV7Spec extends ElasticsearchSpecBase with ElasticsearchSpecUt

val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source(
immutable.Seq(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()))).via(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via(
ElasticsearchFlow.create(
constructElasticsearchParams(indexName, "_doc", ApiVersion.V7),
settings = baseWriteSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,9 +163,9 @@ class OpensearchV1Spec extends ElasticsearchSpecBase with ElasticsearchSpecUtils
// #string
val write: Future[immutable.Seq[WriteResult[String, NotUsed]]] = Source(
immutable.Seq(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.toString()),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.toString()),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.toString()))).via(
WriteMessage.createIndexMessage("1", Book("Das Parfum").toJson.compactPrint),
WriteMessage.createIndexMessage("2", Book("Faust").toJson.compactPrint),
WriteMessage.createIndexMessage("3", Book("Die unendliche Geschichte").toJson.compactPrint))).via(
ElasticsearchFlow.create(
constructElasticsearchParams(indexName, "_doc", OpensearchApiVersion.V1),
settings = baseWriteSettings,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ import scala.collection.immutable.Seq
* @param labels the labels associated with this dataset
* @param location the geographic location where the dataset should reside
*/
final case class Dataset private (datasetReference: DatasetReference,
final case class Dataset private[bigquery] (datasetReference: DatasetReference,
friendlyName: Option[String],
labels: Option[Map[String, String]],
location: Option[String]) {
Expand Down Expand Up @@ -87,7 +87,7 @@ object Dataset {
* @param datasetId A unique ID for this dataset, without the project name
* @param projectId The ID of the project containing this dataset
*/
final case class DatasetReference private (datasetId: Option[String], projectId: Option[String]) {
final case class DatasetReference private[bigquery] (datasetId: Option[String], projectId: Option[String]) {

def getDatasetId = datasetId.toJava
def getProjectId = projectId.toJava
Expand Down Expand Up @@ -126,7 +126,7 @@ object DatasetReference {
* @param nextPageToken a token that can be used to request the next results page
* @param datasets an array of the dataset resources in the project
*/
final case class DatasetListResponse private (nextPageToken: Option[String], datasets: Option[Seq[Dataset]]) {
final case class DatasetListResponse private[bigquery] (nextPageToken: Option[String], datasets: Option[Seq[Dataset]]) {

def getNextPageToken = nextPageToken.toJava
def getDatasets = datasets.map(_.asJava).toJava
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,8 @@ import scala.annotation.nowarn
* @param location specifies where the error occurred, if present
* @param message A human-readable description of the error
*/
final case class ErrorProto private (reason: Option[String], location: Option[String], message: Option[String]) {
final case class ErrorProto private[bigquery] (reason: Option[String], location: Option[String],
message: Option[String]) {

@nowarn("msg=never used")
@JsonCreator
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ import scala.collection.immutable.Seq
* @param jobReference reference describing the unique-per-user name of the job
* @param status the status of this job
*/
final case class Job private (configuration: Option[JobConfiguration],
final case class Job private[bigquery] (configuration: Option[JobConfiguration],
jobReference: Option[JobReference],
status: Option[JobStatus]) {

Expand Down Expand Up @@ -83,7 +83,8 @@ object Job {
* @param load configures a load job
* @param labels the labels associated with this job
*/
final case class JobConfiguration private (load: Option[JobConfigurationLoad], labels: Option[Map[String, String]]) {
final case class JobConfiguration private[bigquery] (load: Option[JobConfigurationLoad],
labels: Option[Map[String, String]]) {
def getLoad = load.toJava
def getLabels = labels.toJava

Expand Down Expand Up @@ -144,7 +145,7 @@ object JobConfiguration {
* @param writeDisposition specifies the action that occurs if the destination table already exists
* @param sourceFormat the format of the data files
*/
final case class JobConfigurationLoad private (schema: Option[TableSchema],
final case class JobConfigurationLoad private[bigquery] (schema: Option[TableSchema],
destinationTable: Option[TableReference],
createDisposition: Option[CreateDisposition],
writeDisposition: Option[WriteDisposition],
Expand Down Expand Up @@ -210,7 +211,7 @@ object JobConfigurationLoad {
implicit val configurationLoadFormat: JsonFormat[JobConfigurationLoad] = jsonFormat5(apply)
}

final case class CreateDisposition private (value: String) extends StringEnum
final case class CreateDisposition private[bigquery] (value: String) extends StringEnum
object CreateDisposition {

/**
Expand All @@ -227,7 +228,7 @@ object CreateDisposition {
implicit val format: JsonFormat[CreateDisposition] = StringEnum.jsonFormat(apply)
}

final case class WriteDisposition private (value: String) extends StringEnum
final case class WriteDisposition private[bigquery] (value: String) extends StringEnum
object WriteDisposition {

/**
Expand Down Expand Up @@ -269,7 +270,8 @@ object SourceFormat {
* @param jobId the ID of the job
* @param location the geographic location of the job
*/
final case class JobReference private (projectId: Option[String], jobId: Option[String], location: Option[String]) {
final case class JobReference private[bigquery] (projectId: Option[String], jobId: Option[String],
location: Option[String]) {

@nowarn("msg=never used")
@JsonCreator
Expand Down Expand Up @@ -323,7 +325,8 @@ object JobReference {
* @param errors the first errors encountered during the running of the job
* @param state running state of the job
*/
final case class JobStatus private (errorResult: Option[ErrorProto], errors: Option[Seq[ErrorProto]], state: JobState) {
final case class JobStatus private[bigquery] (errorResult: Option[ErrorProto], errors: Option[Seq[ErrorProto]],
state: JobState) {

def getErrorResult = errorResult.toJava
def getErrors = errors.map(_.asJava).toJava
Expand Down Expand Up @@ -360,7 +363,7 @@ object JobStatus {
implicit val format: JsonFormat[JobStatus] = jsonFormat3(apply)
}

final case class JobState private (value: String) extends StringEnum
final case class JobState private[bigquery] (value: String) extends StringEnum
object JobState {

/**
Expand All @@ -380,7 +383,7 @@ object JobState {
implicit val format: JsonFormat[JobState] = StringEnum.jsonFormat(apply)
}

final case class JobCancelResponse private (job: Job) {
final case class JobCancelResponse private[bigquery] (job: Job) {
def getJob = job
def withJob(job: Job) =
copy(job = job)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ import pekko.util.ccompat.JavaConverters._
import pekko.util.JavaDurationConverters._
import pekko.util.OptionConverters._
import com.fasterxml.jackson.annotation.{ JsonCreator, JsonIgnoreProperties, JsonProperty }
import spray.json.{ RootJsonFormat, RootJsonReader }
import spray.json.{ JsonFormat, RootJsonFormat, RootJsonReader }

import java.time.Duration
import java.{ lang, util }

import scala.annotation.nowarn
import scala.annotation.unchecked.uncheckedVariance
import scala.collection.immutable.Seq
Expand All @@ -46,7 +45,7 @@ import scala.concurrent.duration.FiniteDuration
* @param maximumBytesBilled limits the number of bytes billed for this query
* @param requestId a unique user provided identifier to ensure idempotent behavior for queries
*/
final case class QueryRequest private (query: String,
final case class QueryRequest private[bigquery] (query: String,
maxResults: Option[Int],
defaultDataset: Option[DatasetReference],
timeout: Option[FiniteDuration],
Expand Down Expand Up @@ -192,7 +191,7 @@ object QueryRequest {
* @tparam T the data model for each row
*/
@JsonIgnoreProperties(ignoreUnknown = true)
final case class QueryResponse[+T] private (schema: Option[TableSchema],
final case class QueryResponse[+T] private[bigquery] (schema: Option[TableSchema],
jobReference: JobReference,
totalRows: Option[Long],
pageToken: Option[String],
Expand Down Expand Up @@ -329,7 +328,7 @@ object QueryResponse {

implicit def reader[T <: AnyRef](
implicit reader: BigQueryRootJsonReader[T]): RootJsonReader[QueryResponse[T]] = {
implicit val format = lift(reader)
implicit val format: JsonFormat[T] = lift(reader)
jsonFormat10(QueryResponse[T])
}
implicit val paginated: Paginated[QueryResponse[Any]] = _.pageToken
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,8 @@ import scala.collection.immutable.Seq
* @tparam T the data model of each row
*/
@JsonIgnoreProperties(ignoreUnknown = true)
final case class TableDataListResponse[+T] private (totalRows: Long, pageToken: Option[String], rows: Option[Seq[T]]) {
final case class TableDataListResponse[+T] private[bigquery] (totalRows: Long, pageToken: Option[String],
rows: Option[Seq[T]]) {

@nowarn("msg=never used")
@JsonCreator
Expand Down Expand Up @@ -82,7 +83,7 @@ object TableDataListResponse {

implicit def reader[T <: AnyRef](
implicit reader: BigQueryRootJsonReader[T]): RootJsonReader[TableDataListResponse[T]] = {
implicit val format = lift(reader)
implicit val format: JsonFormat[T] = lift(reader)
jsonFormat3(TableDataListResponse[T])
}
implicit val paginated: Paginated[TableDataListResponse[Any]] = _.pageToken
Expand All @@ -99,7 +100,7 @@ object TableDataListResponse {
* @tparam T the data model of each row
*/
@JsonInclude(Include.NON_NULL)
final case class TableDataInsertAllRequest[+T] private (skipInvalidRows: Option[Boolean],
final case class TableDataInsertAllRequest[+T] private[bigquery] (skipInvalidRows: Option[Boolean],
ignoreUnknownValues: Option[Boolean],
templateSuffix: Option[String],
rows: Seq[Row[T]]) {
Expand Down Expand Up @@ -179,7 +180,7 @@ object TableDataInsertAllRequest {
* @param json the record this row contains
* @tparam T the data model of the record
*/
final case class Row[+T] private (insertId: Option[String], json: T) {
final case class Row[+T] private[bigquery] (insertId: Option[String], json: T) {

def getInsertId = insertId.toJava
def getJson = json
Expand Down Expand Up @@ -212,7 +213,7 @@ object Row {
* TableDataInsertAllResponse model
* @see [[https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll#response-body BigQuery reference]]
*/
final case class TableDataInsertAllResponse private (insertErrors: Option[Seq[InsertError]]) {
final case class TableDataInsertAllResponse private[bigquery] (insertErrors: Option[Seq[InsertError]]) {
def getInsertErrors = insertErrors.map(_.asJava).toJava

def withInsertErrors(insertErrors: Option[Seq[InsertError]]) =
Expand All @@ -239,7 +240,7 @@ object TableDataInsertAllResponse {
* InsertError model
* @see [[https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll#response-body BigQuery reference]]
*/
final case class InsertError private (index: Int, errors: Option[Seq[ErrorProto]]) {
final case class InsertError private[bigquery] (index: Int, errors: Option[Seq[ErrorProto]]) {
def getIndex = index
def getErrors = errors.map(_.asJava).toJava

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import scala.collection.immutable.Seq
* @param numRows the number of rows of data in this table
* @param location the geographic location where the table resides
*/
final case class Table private (tableReference: TableReference,
final case class Table private[bigquery] (tableReference: TableReference,
labels: Option[Map[String, String]],
schema: Option[TableSchema],
numRows: Option[Long],
Expand Down Expand Up @@ -109,7 +109,8 @@ object Table {
* @param datasetId the ID of the dataset containing this table
* @param tableId the ID of the table
*/
final case class TableReference private (projectId: Option[String], datasetId: String, tableId: Option[String]) {
final case class TableReference private[bigquery] (projectId: Option[String], datasetId: String,
tableId: Option[String]) {

def getProjectId = projectId.toJava
def getDatasetId = datasetId
Expand Down Expand Up @@ -152,7 +153,7 @@ object TableReference {
*
* @param fields describes the fields in a table
*/
final case class TableSchema private (fields: Seq[TableFieldSchema]) {
final case class TableSchema private[bigquery] (fields: Seq[TableFieldSchema]) {

@nowarn("msg=never used")
@JsonCreator
Expand Down Expand Up @@ -200,7 +201,7 @@ object TableSchema {
* @param mode the field mode
* @param fields describes the nested schema fields if the type property is set to `RECORD`
*/
final case class TableFieldSchema private (name: String,
final case class TableFieldSchema private[bigquery] (name: String,
`type`: TableFieldSchemaType,
mode: Option[TableFieldSchemaMode],
fields: Option[Seq[TableFieldSchema]]) {
Expand Down Expand Up @@ -278,7 +279,7 @@ object TableFieldSchema {
jsonFormat(apply, "name", "type", "mode", "fields"))
}

final case class TableFieldSchemaType private (value: String) extends StringEnum
final case class TableFieldSchemaType private[bigquery] (value: String) extends StringEnum
object TableFieldSchemaType {

/**
Expand Down Expand Up @@ -328,7 +329,7 @@ object TableFieldSchemaType {
implicit val format: JsonFormat[TableFieldSchemaType] = StringEnum.jsonFormat(apply)
}

final case class TableFieldSchemaMode private (value: String) extends StringEnum
final case class TableFieldSchemaMode private[bigquery] (value: String) extends StringEnum
object TableFieldSchemaMode {

/**
Expand Down Expand Up @@ -356,7 +357,7 @@ object TableFieldSchemaMode {
* @param tables tables in the requested dataset
* @param totalItems the total number of tables in the dataset
*/
final case class TableListResponse private (nextPageToken: Option[String],
final case class TableListResponse private[bigquery] (nextPageToken: Option[String],
tables: Option[Seq[Table]],
totalItems: Option[Int]) {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,18 +44,26 @@ trait BigQueryCollectionFormats {

import collection.{ immutable => imm }

implicit def immIterableFormat[T: BigQueryJsonFormat] = viaSeq[imm.Iterable[T], T](seq => imm.Iterable(seq: _*))
implicit def immSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.Seq[T], T](seq => imm.Seq(seq: _*))
implicit def immIndexedSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.IndexedSeq[T], T](seq => imm.IndexedSeq(seq: _*))
implicit def immLinearSeqFormat[T: BigQueryJsonFormat] = viaSeq[imm.LinearSeq[T], T](seq => imm.LinearSeq(seq: _*))
implicit def vectorFormat[T: BigQueryJsonFormat] = viaSeq[Vector[T], T](seq => Vector(seq: _*))
implicit def immIterableFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.Iterable[T]] =
viaSeq[imm.Iterable[T], T](seq => imm.Iterable(seq: _*))
implicit def immSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.Seq[T]] =
viaSeq[imm.Seq[T], T](seq => imm.Seq(seq: _*))
implicit def immIndexedSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.IndexedSeq[T]] =
viaSeq[imm.IndexedSeq[T], T](seq => imm.IndexedSeq(seq: _*))
implicit def immLinearSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[imm.LinearSeq[T]] =
viaSeq[imm.LinearSeq[T], T](seq => imm.LinearSeq(seq: _*))
implicit def vectorFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[Vector[T]] =
viaSeq[Vector[T], T](seq => Vector(seq: _*))

import collection._

implicit def iterableFormat[T: BigQueryJsonFormat] = viaSeq[Iterable[T], T](seq => Iterable(seq: _*))
implicit def seqFormat[T: BigQueryJsonFormat] = viaSeq[Seq[T], T](seq => Seq(seq: _*))
implicit def indexedSeqFormat[T: BigQueryJsonFormat] = viaSeq[IndexedSeq[T], T](seq => IndexedSeq(seq: _*))
implicit def linearSeqFormat[T: BigQueryJsonFormat] = viaSeq[LinearSeq[T], T](seq => LinearSeq(seq: _*))
implicit def iterableFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[Iterable[T]] =
viaSeq[Iterable[T], T](seq => Iterable(seq: _*))
implicit def seqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[Seq[T]] = viaSeq[Seq[T], T](seq => Seq(seq: _*))
implicit def indexedSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[IndexedSeq[T]] =
viaSeq[IndexedSeq[T], T](seq => IndexedSeq(seq: _*))
implicit def linearSeqFormat[T: BigQueryJsonFormat]: BigQueryJsonFormat[collection.LinearSeq[T]] =
viaSeq[collection.LinearSeq[T], T](seq => collection.LinearSeq(seq: _*))

/**
* A BigQueryJsonFormat construction helper that creates a BigQueryJsonFormat for an Iterable type I from a builder function
Expand Down
Loading

0 comments on commit e0202ad

Please sign in to comment.