Skip to content

Commit

Permalink
Add fs2-aws-ciris with decode support for kinesis initial position (#375
Browse files Browse the repository at this point in the history
)

* Add fs2-aws-ciris with decode support for Either[InitialPositionInStream, Date]

* bump many versions
  • Loading branch information
barryoneill authored Jun 17, 2020
1 parent a12252d commit 9d6e295
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 25 deletions.
74 changes: 50 additions & 24 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,30 @@ organization := "io.laserdisc"
name := "fs2-aws"

lazy val scala212 = "2.12.10"
lazy val scala213 = "2.13.1"
lazy val scala213 = "2.13.2"
lazy val supportedScalaVersions = List(scala212, scala213)

crossScalaVersions in ThisBuild := supportedScalaVersions

scalaVersion in ThisBuild := scala213

val fs2Version = "2.2.2"
val AwsSdkVersion = "1.11.772"
val cirisVersion = "0.12.1"
val circeVersion = "0.13.0"
val fs2Version = "2.4.2"
val AwsSdkVersion = "1.11.804"
val cirisVersion = "0.12.1"
val circeVersion = "0.13.0"
val scalaTestVersion = "3.1.2"
val mockitoCoreVersion = "3.3.3"
val mockitoScalaTestVersion = "1.14.4"

lazy val root = (project in file("."))
.aggregate(`fs2-aws`, `fs2-aws-testkit`, `fs2-aws-dynamodb`, `fs2-aws-core`, `fs2-aws-examples`)
.aggregate(
`fs2-aws`,
`fs2-aws-testkit`,
`fs2-aws-dynamodb`,
`fs2-aws-core`,
`fs2-aws-examples`,
`fs2-aws-ciris`
)
.settings(
publishArtifact := false,
crossScalaVersions := Nil
Expand All @@ -27,11 +37,27 @@ lazy val `fs2-aws-core` = (project in file("fs2-aws-core"))
.settings(
name := "fs2-aws-core",
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % "2.3.0",
"co.fs2" %% "fs2-io" % "2.3.0",
"org.mockito" % "mockito-core" % "3.3.3" % Test,
"org.mockito" %% "mockito-scala-scalatest" % "1.14.2" % Test,
"org.scalatest" %% "scalatest" % "3.1.2" % Test
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.mockito" % "mockito-core" % mockitoCoreVersion % Test,
"org.mockito" %% "mockito-scala-scalatest" % mockitoScalaTestVersion % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test
),
coverageMinimum := 40,
coverageFailOnMinimum := true
)
.settings(commonSettings)
.settings(scalacOptions := commonOptions(scalaVersion.value))

lazy val `fs2-aws-ciris` = (project in file("fs2-aws-ciris"))
.dependsOn(`fs2-aws`)
.settings(
name := "fs2-aws-ciris",
libraryDependencies ++= Seq(
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.mockito" % "mockito-core" % mockitoCoreVersion % Test,
"org.mockito" %% "mockito-scala-scalatest" % mockitoScalaTestVersion % Test,
"is.cir" %% "ciris" % "1.1.0"
),
coverageMinimum := 40,
coverageFailOnMinimum := true
Expand All @@ -48,9 +74,9 @@ lazy val `fs2-aws-dynamodb` = (project in file("fs2-aws-dynamodb"))
libraryDependencies ++= Seq(
"co.fs2" %% "fs2-core" % fs2Version,
"co.fs2" %% "fs2-io" % fs2Version,
"org.mockito" % "mockito-core" % "3.3.3" % Test,
"org.scalatest" %% "scalatest" % "3.1.2" % Test,
"org.mockito" %% "mockito-scala-scalatest" % "1.11.3" % Test,
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.mockito" % "mockito-core" % mockitoCoreVersion % Test,
"org.mockito" %% "mockito-scala-scalatest" % mockitoScalaTestVersion % Test,
"com.amazonaws" % "dynamodb-streams-kinesis-adapter" % "1.5.1",
"io.laserdisc" %% "scanamo-circe" % "1.0.8"
)
Expand All @@ -64,8 +90,8 @@ lazy val `fs2-aws-examples` = (project in file("fs2-aws-examples"))
name := "fs2-aws-examples",
coverageMinimum := 0,
libraryDependencies ++= Seq(
"org.mockito" % "mockito-core" % "3.3.3" % Test,
"org.mockito" %% "mockito-scala-scalatest" % "1.11.3" % Test,
"org.mockito" % "mockito-core" % mockitoCoreVersion % Test,
"org.mockito" %% "mockito-scala-scalatest" % mockitoScalaTestVersion % Test,
"ch.qos.logback" % "logback-classic" % "1.2.3",
"ch.qos.logback" % "logback-core" % "1.2.3",
"org.slf4j" % "jcl-over-slf4j" % "1.7.30",
Expand All @@ -91,11 +117,11 @@ lazy val `fs2-aws` = (project in file("fs2-aws"))
"com.amazonaws" % "aws-java-sdk-s3" % AwsSdkVersion,
"com.amazonaws" % "aws-java-sdk-sqs" % AwsSdkVersion,
"com.amazonaws" % "amazon-kinesis-producer" % "0.14.0",
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.2.10",
"org.mockito" % "mockito-core" % "3.3.3" % Test,
"software.amazon.awssdk" % "sts" % "2.13.21",
"org.scalatest" %% "scalatest" % "3.1.2" % Test,
"org.mockito" %% "mockito-scala-scalatest" % "1.11.3" % Test,
"software.amazon.kinesis" % "amazon-kinesis-client" % "2.2.11",
"org.mockito" % "mockito-core" % mockitoCoreVersion % Test,
"software.amazon.awssdk" % "sts" % "2.13.38",
"org.scalatest" %% "scalatest" % scalaTestVersion % Test,
"org.mockito" %% "mockito-scala-scalatest" % mockitoScalaTestVersion % Test,
"com.amazonaws" % "aws-java-sdk-sqs" % AwsSdkVersion excludeAll ("commons-logging", "commons-logging"),
"com.amazonaws" % "amazon-sqs-java-messaging-lib" % "1.0.8" excludeAll ("commons-logging", "commons-logging"),
"eu.timepit" %% "refined" % "0.9.14"
Expand All @@ -115,9 +141,9 @@ lazy val `fs2-aws-testkit` = (project in file("fs2-aws-testkit"))
"io.circe" %% "circe-generic" % circeVersion,
"io.circe" %% "circe-generic-extras" % circeVersion,
"io.circe" %% "circe-parser" % circeVersion,
"org.mockito" % "mockito-core" % "3.3.3",
"org.scalatest" %% "scalatest" % "3.1.2",
"org.mockito" %% "mockito-scala-scalatest" % "1.11.3"
"org.scalatest" %% "scalatest" % scalaTestVersion,
"org.mockito" % "mockito-core" % mockitoCoreVersion,
"org.mockito" %% "mockito-scala-scalatest" % mockitoScalaTestVersion
)
)
.settings(commonSettings)
Expand Down
40 changes: 40 additions & 0 deletions fs2-aws-ciris/src/main/scala/fs2/aws/ciris/package.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
package fs2.aws

import java.util.Date

import cats.implicits._
import _root_.ciris.ConfigDecoder
import software.amazon.kinesis.common.InitialPositionInStream

import scala.util.matching.Regex

package object ciris {

/**
* Ciris decoder support for Either[InitialPositionInStream, Date], useful when building [[fs2.aws.kinesis.KinesisConsumerSettings]]
*
* Usage:
* `env("FOOBAR").as[Either[InitialPositionInStream, Date]]`
*
* <ul>
* <li>`"TRIM_HORIZON"` becomes `Left(InitialPositionInStream.TRIM_HORIZON)` (consume from beginning)</li>
* <li>`"LATEST"` becomes `(Left[InitialPositionInStream.LATEST)` (consume from latest)</li>
* <li>`"TS_123456"` becomes `Right(Date(123456))` (consume from timestamp)</li>
* </ul>
*
*/
implicit def kinesisInitialPositionDecoder[T](
implicit decoder: ConfigDecoder[T, String]
): ConfigDecoder[T, Either[InitialPositionInStream, Date]] = {

val DateRegex: Regex = """^TS_([0-9]+)$""".r

decoder.mapOption(typeName = "InitialPositionInStream") {
case "TRIM_HORIZON" => Some(Left(InitialPositionInStream.TRIM_HORIZON))
case "LATEST" => Some(Left(InitialPositionInStream.LATEST))
case DateRegex(millis) => Some(Right(new Date(millis.toLong)))
case _ => None
}
}

}
56 changes: 56 additions & 0 deletions fs2-aws-ciris/src/test/scala/fs2/aws/ciris/CirisDecoderSpec.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package fs2.aws.ciris;

import java.util.Date

import cats.effect.{ ContextShift, IO }
import ciris.{ ConfigException, ConfigValue }
import org.scalatest.Assertion
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
import software.amazon.kinesis.common.InitialPositionInStream

import scala.concurrent.ExecutionContext.Implicits.global;

class CirisDecoderSpec extends AnyWordSpec with Matchers {
implicit val cs: ContextShift[IO] = IO.contextShift(global)

"InitialPositionDecoderSpec" should {

"when decoding Either[InitialPositionInStream, Date]" can {

// same package, so `import fs2.aws.ciris._` not necessary here
def decode(testStr: String): Either[InitialPositionInStream, Date] =
ConfigValue
.default(testStr)
.as[Either[InitialPositionInStream, Date]]
.load[IO]
.unsafeRunSync()

def expectDecodeFailure(testString: String): Assertion =
intercept[ConfigException] {
decode(testString)
}.getMessage should include(
s"Unable to convert value $testString to InitialPositionInStream"
)

"decode supported strings as initial offsets" in {

decode("LATEST") should equal(Left(InitialPositionInStream.LATEST))
decode("TRIM_HORIZON") should equal(Left(InitialPositionInStream.TRIM_HORIZON))
decode("TS_1592404273000") should equal(Right(new Date(1592404273000L)))

}

"fail to decode valid strings" in {

expectDecodeFailure("FOOBAR")
expectDecodeFailure("TS_FOO")
expectDecodeFailure("TS_")
expectDecodeFailure("_1592404273000")

}
}

}

}
2 changes: 1 addition & 1 deletion project/build.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
sbt.version=1.3.10
sbt.version=1.3.11

0 comments on commit 9d6e295

Please sign in to comment.