Skip to content

Commit

Permalink
Merge pull request #41 from paualarco/2.12-compatibility
Browse files Browse the repository at this point in the history
2.12 compatibility
  • Loading branch information
paualarco authored May 16, 2020
2 parents 47df858 + ba0f45f commit d279d8c
Show file tree
Hide file tree
Showing 33 changed files with 704 additions and 914 deletions.
24 changes: 9 additions & 15 deletions build.sbt
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import sbt.Keys.version
import scala.xml.Elem
import scala.xml.transform.{RewriteRule, RuleTransformer}

lazy val doNotPublishArtifact = Seq(
publishArtifact := false,
Expand All @@ -12,9 +10,9 @@ lazy val doNotPublishArtifact = Seq(
lazy val sharedSettings = Seq(
organization := "io.monix",
homepage := Some(url("https://monix.io/monix-connect")),
scalaVersion := "2.13.1",
crossScalaVersions := Seq("2.13.1"),
scalafmtOnCompile := false,
scalaVersion := "2.12.8",
crossScalaVersions := Seq("2.12.10", "2.13.1"),
scalafmtOnCompile := true,
scalacOptions ++= Seq(
// warnings
"-unchecked", // able additional warnings where generated code depends on assumptions
Expand All @@ -33,7 +31,7 @@ lazy val sharedSettings = Seq(
"-Ywarn-unused:imports", // Warn if an import selector is not referenced.
"-Ywarn-dead-code", // Warn when dead code is identified.
// Turns all warnings into errors ;-)
//"-Xfatal-warnings", //Turning of fatal warings for the moment
//"-Xfatal-warnings", //Turning of fatal warnings for the moment
// Enables linter options
"-Xlint:adapted-args", // warn if an argument list is modified to match the receiver
"-Xlint:nullary-unit", // warn when nullary methods return Unit
Expand Down Expand Up @@ -93,7 +91,7 @@ lazy val sharedSettings = Seq(
|See the License for the specific language governing permissions and
|limitations under the License."""
.stripMargin)),
//todo add scm

developers := List(
Developer(
id = "paualarco",
Expand All @@ -107,7 +105,7 @@ lazy val sharedSettings = Seq(
)

lazy val unidocSettings = Seq(
unidocProjectFilter in (ScalaUnidoc, unidoc) := inProjects(akka, common, dynamodb, hdfs, parquet, s3, redis),
//unidocProjectFilter in (ScalaUnidoc, unidoc) := inProjects(akka, dynamodb, hdfs, s3, redis),
scalacOptions in (ScalaUnidoc, unidoc) +=
"-Xfatal-warnings",
scalacOptions in (ScalaUnidoc, unidoc) --=
Expand Down Expand Up @@ -136,17 +134,15 @@ lazy val monix = (project in file("."))
.configs(IntegrationTest, IT)
.settings(sharedSettings)
.settings(name := "monix-connect")
.aggregate(akka, common, dynamodb, hdfs, parquet, redis, s3)
.dependsOn(akka, common, dynamodb, hdfs, parquet, redis, s3)
.aggregate(akka, dynamodb, hdfs, parquet, redis, s3)
.dependsOn(akka, dynamodb, hdfs, parquet, redis, s3)
//.settings(unidocSettings) //todo enable unidoc settings
//.enablePlugins(ScalaUnidocPlugin)

lazy val akka = monixConnector("akka", Dependencies.Akka)

lazy val common = monixConnector("common", Dependencies.Common)

lazy val dynamodb = monixConnector("dynamodb", Dependencies.DynamoDb)
.dependsOn(common % "compile->compile; test->test")

lazy val hdfs = monixConnector("hdfs", Dependencies.Hdfs)

Expand Down Expand Up @@ -176,6 +172,4 @@ def monixConnector(
.settings(sharedSettings)
.settings(additionalSettings: _*)
.configure(profile)
.configs(IntegrationTest, IT)

//todo add release settings
.configs(IntegrationTest, IT)
34 changes: 0 additions & 34 deletions common/src/main/scala/monix/connect/common/Operators.scala

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ import software.amazon.awssdk.services.dynamodb.model._
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient

import scala.concurrent.duration._
import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._

class DynamoDbConsumerSpec
extends AnyWordSpecLike with Matchers with ScalaFutures with DynamoDbFixture with BeforeAndAfterAll {
Expand Down Expand Up @@ -68,7 +68,7 @@ class DynamoDbConsumerSpec

//then
r shouldBe a[PutItemResponse]
val getResponse: GetItemResponse = client.getItem(getItemRequest(tableName, city, citizenId)).asScala.futureValue
val getResponse: GetItemResponse = toScala(client.getItem(getItemRequest(tableName, city, citizenId))).futureValue
getResponse.item().values().asScala.head.n().toDouble shouldBe debt
}

Expand All @@ -85,7 +85,7 @@ class DynamoDbConsumerSpec
//then
r shouldBe a[PutItemResponse]
requestAttr.map { case (city, citizenId, debt) =>
val getResponse: GetItemResponse = client.getItem(getItemRequest(tableName, city, citizenId)).asScala.futureValue
val getResponse: GetItemResponse = toScala(client.getItem(getItemRequest(tableName, city, citizenId))).futureValue
getResponse.item().values().asScala.head.n().toDouble shouldBe debt
}
}
Expand All @@ -98,7 +98,7 @@ class DynamoDbConsumerSpec
val city = "Barcelona"
val citizenId = 11292
val debt: Int = 1015
client.putItem(putItemRequest(tableName, city, citizenId, debt)).asScala.futureValue
toScala(client.putItem(putItemRequest(tableName, city, citizenId, debt))).futureValue
val request: GetItemRequest = getItemRequest(tableName, city, citizenId)

//when
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
package monix.connect.dynamodb

import monix.eval.Task
import monix.execution.Scheduler
import org.scalacheck.Gen
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.{AttributeDefinition, AttributeValue, CreateTableRequest, DeleteTableRequest, DeleteTableResponse, GetItemRequest, KeySchemaElement, KeyType, ProvisionedThroughput, PutItemRequest, ScalarAttributeType}

import scala.jdk.CollectionConverters._
import scala.jdk.FutureConverters._
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}

trait DynamoDbFixture {
Expand Down Expand Up @@ -76,18 +76,18 @@ trait DynamoDbFixture {
.build()
}

protected def createTable(table: String)(implicit client: DynamoDbAsyncClient): Unit = {
protected def createTable(table: String)(implicit client: DynamoDbAsyncClient, scheduler: Scheduler): Unit = {
val request: CreateTableRequest =
createTableRequest(tableName = table, schema = keySchema, attributeDefinition = tableDefinition)
Try(Task.fromFuture(client.createTable(request).asScala)) match {
Try(Task.from(client.createTable(request))) match {
case Success(_) => println(s"Table ${table} was created")
case Failure(exception) => println("Failed to create table cities with exception: " + exception)
}
}

def deleteTable(tableName: String)(implicit client: DynamoDbAsyncClient): Task[DeleteTableResponse] = {
def deleteTable(tableName: String)(implicit client: DynamoDbAsyncClient, scheduler: Scheduler): Task[DeleteTableResponse] = {
val deleteRequest: DeleteTableRequest = DeleteTableRequest.builder().tableName(tableName).build()
Task.deferFuture(client.deleteTable(deleteRequest).asScala)
Task.from(client.deleteTable(deleteRequest))
}

def genRequestAttributes: Gen[(String, Int, Double)] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package monix.connect.dynamodb
import monix.eval.Task
import monix.execution.Scheduler.Implicits.global
import monix.reactive.Observable
import monix.connect.dynamodb.Transformer
import org.scalacheck.Gen
import org.scalatest.concurrent.ScalaFutures
import org.scalatest.BeforeAndAfterAll
Expand All @@ -13,9 +12,9 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model._
import DynamoDbOp._

import scala.jdk.CollectionConverters._
import scala.concurrent.duration._
import scala.jdk.FutureConverters._
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._

class DynamoDbTransformerSpec
extends AnyWordSpecLike with Matchers with ScalaFutures with DynamoDbFixture with BeforeAndAfterAll {
Expand Down Expand Up @@ -71,7 +70,7 @@ class DynamoDbTransformerSpec

//then
r shouldBe a[PutItemResponse]
val getResponse: GetItemResponse = client.getItem(getItemRequest(tableName, city, citizenId)).asScala.futureValue
val getResponse: GetItemResponse = toScala(client.getItem(getItemRequest(tableName, city, citizenId))).futureValue
getResponse.item().values().asScala.head.n().toDouble shouldBe debt
}

Expand All @@ -94,7 +93,7 @@ class DynamoDbTransformerSpec
//then
r shouldBe a[List[PutItemResponse]]
requestAttr.map { case (city, citizenId, debt) =>
val getResponse: GetItemResponse = client.getItem(getItemRequest(tableName, city, citizenId)).asScala.futureValue
val getResponse: GetItemResponse = toScala(client.getItem(getItemRequest(tableName, city, citizenId))).futureValue
getResponse.item().values().asScala.head.n().toDouble shouldBe debt
}
}
Expand All @@ -107,7 +106,7 @@ class DynamoDbTransformerSpec
val city = "London"
val citizenId = 613371
val debt: Int = 550
client.putItem(putItemRequest(tableName, city, citizenId, debt)).asScala.futureValue
toScala(client.putItem(putItemRequest(tableName, city, citizenId, debt))).futureValue
val request: GetItemRequest = getItemRequest(tableName, city, citizenId)
val transformer: Transformer[GetItemRequest, Task[GetItemResponse]] = DynamoDb.transformer

Expand Down
8 changes: 2 additions & 6 deletions dynamodb/src/main/scala/monix/connect/dynamodb/DynamoDb.scala
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@
package monix.connect.dynamodb

import monix.reactive.{Consumer, Observable}
import monix.execution.Scheduler
import monix.eval.Task
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.{DynamoDbRequest, DynamoDbResponse}

import scala.jdk.FutureConverters._

object DynamoDb {

/**
Expand All @@ -39,8 +36,7 @@ object DynamoDb {
def consumer[In <: DynamoDbRequest, Out <: DynamoDbResponse](
implicit
dynamoDbOp: DynamoDbOp[In, Out],
client: DynamoDbAsyncClient,
scheduler: Scheduler): Consumer[In, Out] = new DynamoDbSubscriber()
client: DynamoDbAsyncClient): Consumer[In, Out] = new DynamoDbSubscriber()

/**
* A monix transformer that executes any given [[DynamoDbRequest]] into its subsequent [[DynamoDbResponse]].
Expand All @@ -55,6 +51,6 @@ object DynamoDb {
implicit
dynamoDbOp: DynamoDbOp[In, Out],
client: DynamoDbAsyncClient): Observable[In] => Observable[Task[Out]] = { inObservable: Observable[In] =>
inObservable.map(in => Task.fromFuture(dynamoDbOp.execute(in).asScala))
inObservable.map(in => Task.from(dynamoDbOp.execute(in)))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
import software.amazon.awssdk.services.dynamodb.model.{DynamoDbRequest, DynamoDbResponse}

import scala.concurrent.Future
import scala.jdk.FutureConverters._

private[dynamodb] class DynamoDbSubscriber[In <: DynamoDbRequest, Out <: DynamoDbResponse]()(
implicit
Expand All @@ -41,10 +40,7 @@ private[dynamodb] class DynamoDbSubscriber[In <: DynamoDbRequest, Out <: DynamoD
private var dynamoDbResponse: Task[Out] = _

def onNext(dynamoDbRequest: In): Future[Ack] = {
dynamoDbResponse = Task.fromFuture(
dynamoDbOp
.execute(dynamoDbRequest)
.asScala)
dynamoDbResponse = Task.from(dynamoDbOp.execute(dynamoDbRequest))

dynamoDbResponse.onErrorRecover { case _ => monix.execution.Ack.Stop }
.map(_ => monix.execution.Ack.Continue)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ private[parquet] class ParquetPublisher[T](reader: ParquetReader[T]) {
*/
private def readRecords(sub: Subscriber[T]): Task[Unit] = {
val t = Task(reader.read())
t.onErrorHandleWith(ex => Task(sub.onError(ex)))
t.onErrorHandleWith { case ex: Throwable => Task(sub.onError(ex)) }
t.flatMap { r =>
if (r != null) {
Task.deferFuture(sub.onNext(r)).flatMap {
Expand Down
22 changes: 7 additions & 15 deletions project/Dependencies.scala
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ object Dependencies {
}

private val CommonProjectDependencies = Seq(
"io.monix" %% "monix-reactive" % DependencyVersions.Monix
"io.monix" %% "monix-reactive" % DependencyVersions.Monix,
"org.scala-lang.modules" %% "scala-collection-compat" % "2.1.6", //todo use as replacement for `collection.JavaConverters`
"org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0"
// "com.typesafe.scala-logging" %% "scala-logging" % "3.9.2",
//"org.slf4j" % "log4j-over-slf4j" % "1.7.30"
)
Expand All @@ -44,28 +46,20 @@ object Dependencies {
)

private val AkkaMain = Seq(
"io.monix" %% "monix-reactive" % DependencyVersions.Monix,
"com.typesafe.akka" %% "akka-stream" % DependencyVersions.AkkaStreams
)

val Akka = AkkaMain ++ CommonTestDependencies.map(_ % Test)

private val CommonMain = Seq(
"io.monix" %% "monix-reactive" % DependencyVersions.Monix
)

val Common = CommonMain ++ CommonTestDependencies.map(_ % Test)
val Akka = AkkaMain ++ CommonProjectDependencies ++ CommonTestDependencies.map(_ % Test)

private val DynamoDbDependencies = Seq(
"io.monix" %% "monix-reactive" % DependencyVersions.Monix,
"com.amazonaws" % "aws-java-sdk-core" % DependencyVersions.AWS,
// "com.amazonaws" % "aws-java-sdk-dynamodb" % DependencyVersions.AWS, //todo compatibility with java sdk aws
"software.amazon.awssdk" % "dynamodb" % DependencyVersions.DynamoDb,
"org.typelevel" %% "cats-core" % DependencyVersions.Cats,
"com.github.pureconfig" %% "pureconfig" % DependencyVersions.PureConfig
)

val DynamoDb = DynamoDbDependencies ++ CommonTestDependencies.map(_ % Test) ++ CommonTestDependencies.map(
val DynamoDb = DynamoDbDependencies ++ CommonProjectDependencies ++ CommonTestDependencies.map(_ % Test) ++ CommonTestDependencies.map(
_ % IntegrationTest)

private val HdfsDependecies = Seq(
Expand All @@ -91,22 +85,20 @@ object Dependencies {
val Parquet = ParquetDependecies ++ CommonProjectDependencies ++ CommonTestDependencies.map(_ % Test)

private val S3Dependecies = Seq(
"io.monix" %% "monix-reactive" % DependencyVersions.Monix,
"software.amazon.awssdk" % "s3" % DependencyVersions.S3,
"org.typelevel" %% "cats-core" % DependencyVersions.Cats,
"com.amazonaws" % "aws-java-sdk-core" % DependencyVersions.AWS % IntegrationTest,
"com.amazonaws" % "aws-java-sdk-s3" % DependencyVersions.AWS % IntegrationTest,
"org.scalatestplus" %% "scalacheck-1-14" % "3.1.1.1" % Test
)
val S3 = S3Dependecies ++ CommonTestDependencies.map(_ % Test) ++ CommonTestDependencies.map(_ % IntegrationTest)
val S3 = S3Dependecies ++ CommonProjectDependencies ++ CommonTestDependencies.map(_ % Test) ++ CommonTestDependencies.map(_ % IntegrationTest)

private val RedisDependencies = Seq(
"io.monix" %% "monix-reactive" % DependencyVersions.Monix,
"io.lettuce" % "lettuce-core" % "5.1.2.RELEASE",
"org.typelevel" %% "cats-core" % DependencyVersions.Cats,
"com.github.pureconfig" %% "pureconfig" % DependencyVersions.PureConfig
)

val Redis = RedisDependencies ++ CommonTestDependencies.map(_ % Test)
val Redis = RedisDependencies ++ CommonProjectDependencies ++ CommonTestDependencies.map(_ % Test)

}
Loading

0 comments on commit d279d8c

Please sign in to comment.