diff --git a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala index 3e1bba9c9..5683081c6 100644 --- a/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala +++ b/modules/fs2/src/test/scala/com/snowplowanalytics/snowplow/enrich/fs2/PayloadGen.scala @@ -12,21 +12,22 @@ */ package com.snowplowanalytics.snowplow.enrich.fs2 -import java.nio.file.{Path, Paths} +import java.nio.file.Paths import java.util.Base64 -import cats.effect.{Blocker, IO} +import cats.effect.{IO, Blocker} import cats.effect.concurrent.Ref +import _root_.io.circe.Json import _root_.io.circe.literal._ -import fs2.{Chunk, Stream} -import fs2.io.file.{createDirectory, writeAll} + +import fs2.{Stream, Chunk} +import fs2.io.file.{writeAll, createDirectory} import org.apache.http.message.BasicNameValuePair import org.joda.time.{DateTimeZone, LocalDate} - -import org.scalacheck.{Arbitrary, Gen} +import org.scalacheck.{ Gen, Arbitrary } import cats.effect.testing.specs2.CatsIO import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload @@ -57,12 +58,51 @@ object PayloadGen extends CatsIO { payload = json"""{"latitude":$latitude,"longitude":$longitude}""" schemaKey = "iglu:com.snowplowanalytics.snowplow/geolocation_context/jsonschema/1-1-0" } yield json"""{"schema":$schemaKey, "data": $payload}""" + val performanceTimingGen = for { + navigationStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + redirectStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + redirectEnd <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + fetchStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + domainLookupStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + domainLookupEnd <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + connectStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + secureConnectionStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + connectEnd <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + requestStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + responseStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + responseEnd <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + unloadEventStart <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + unloadEventEnd <- Gen.option(Gen.chooseNum(0, Long.MaxValue)) + } yield { + val fields = List( + "navigationStart" -> navigationStart, + "redirectStart" -> redirectStart, + "redirectEnd" -> redirectEnd, + "fetchStart" -> fetchStart, + "domainLookupStart" -> domainLookupStart, + "domainLookupEnd" -> domainLookupEnd, + "connectStart" -> connectStart, + "secureConnectionStart" -> secureConnectionStart, + "connectEnd" -> connectEnd, + "requestStart" -> requestStart, + "responseStart" -> responseStart, + "responseEnd" -> responseEnd, + "unloadEventStart" -> unloadEventStart, + "unloadEventEnd" -> unloadEventEnd + ).collect { case (key, Some(v)) => key -> Json.fromLong(v) } + val schemaKey = "iglu:org.w3/PerformanceTiming/jsonschema/1-0-0" + val payload = Json.fromFields(fields) + json"""{"schema":$schemaKey, "data": $payload}""" + } + val contextListGen = Gen + .sequence[List[Option[Json]], Option[Json]](List(geolocationGen, performanceTimingGen).map(Gen.option)) + .map { list => list.collect { case Some(json) => json } } val contextsGen = for { - geo <- Gen.option(geolocationGen).map(_.toList) + datums <- contextListGen schemaKey = "iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1" - } yield json"""{"schema":$schemaKey, "data": $geo}""" + } yield json"""{"schema":$schemaKey, "data": $datums}""" - val localDateGen: Gen[LocalDate] = Gen.calendar.map(LocalDate.fromCalendarFields).suchThat(_.year().get() < 3000) + val localDateGen: Gen[LocalDate] = Gen.calendar.map(x => scala.util.Try(LocalDate.fromCalendarFields(x)).getOrElse(LocalDate.now())).suchThat(x => x.year().get() < 3000) val ipGen: Gen[String] = for { part1 <- Gen.choose(2, 255) part2 <- Gen.choose(0, 255) @@ -95,19 +135,24 @@ object PayloadGen extends CatsIO { case Some(x) => x } - def write(dir: Path, cardinality: Long): IO[Unit] = - for { - counter <- Ref.of[IO, Int](0) - dir <- Blocker[IO].use(b => createDirectory[IO](b, dir)) - filename = counter.updateAndGet(_ + 1).map(i => Paths.get(s"${dir.toAbsolutePath}/payload.$i.thrift")) - _ <- Blocker[IO].use { b => - val result = - for { - payload <- payloadStream.take(cardinality) - fileName <- Stream.eval(filename) - _ <- Stream.chunk(Chunk.bytes(payload.toRaw)).through(writeAll[IO](fileName, b)) - } yield () - result.compile.drain - } - } yield () + def write(out: String, cardinality: Long): IO[Unit] = + Blocker[IO].use { b => + import cats.implicits._ + for { + counter <- Ref.of[IO, Int](-1) + rootDir <- createDirectory[IO](b, Paths.get(out)) + fileName = for { + id <- counter.updateAndGet(_ + 1) + shard = Paths.get(rootDir.toAbsolutePath.toString, ((id / 10000) + 1).toString) + dirPath <- if (id % 10000 == 0) createDirectory[IO](b, shard).flatTap(x => IO(println(x))) else IO.pure(shard) + } yield Paths.get(dirPath.toString, s"payload.$id.thrift") + s = payloadStream.take(cardinality).evalMap { payload => + for { + file <- fileName + _ <- Stream.chunk[IO, Byte](Chunk.bytes(payload.toRaw)).through(writeAll[IO](file, b)).compile.drain + } yield () + } + _ <- s.compile.drain + } yield () + } }