Skip to content

Commit

Permalink
Fix everything
Browse files Browse the repository at this point in the history
  • Loading branch information
pondzix committed Feb 14, 2024
1 parent 99ad846 commit b47cb47
Show file tree
Hide file tree
Showing 36 changed files with 147 additions and 123 deletions.
12 changes: 2 additions & 10 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ jobs:
env:
PGPASSWORD: supersecret1
- name: Run tests
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt coverage +test
run: SBT_OPTS="-Xms1G -Xmx8G -Xss4M -XX:MaxMetaspaceSize=1024M" sbt +test
env:
OER_KEY: ${{ secrets.OER_KEY }}
- name: Check Scala formatting
Expand All @@ -62,12 +62,4 @@ jobs:
sbt "project kafka" IntegrationTest/test
docker-compose -f integration-tests/enrich-kafka/docker-compose.yml down
- name: Run integration tests for enrich-nsq
run: sbt "project nsqDistroless" IntegrationTest/test
- name: Generate coverage report
run: sbt coverageReport
- name: Aggregate coverage data
run: sbt coverageAggregate
- name: Submit coveralls data
run: sbt coveralls
env:
COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }}
run: sbt "project nsqDistroless" IntegrationTest/test
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ object S3Client {

def mk[F[_]: Async]: Resource[F, Client[F]] =
for {
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion).build()))
s3Client <- Resource.fromAutoCloseable(Sync[F].delay(S3AsyncClient.builder().region(getRegion()).build()))
store <- Resource.eval(S3Store.builder[F](s3Client).build.toEither.leftMap(_.head).pure[F].rethrow)
} yield new Client[F] {
def canDownload(uri: URI): Boolean =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.apache.thrift.TSerializer

import java.util.Base64

import com.snowplowanalytics.iglu.core.{ SelfDescribingData, SchemaKey, SchemaVer }
import com.snowplowanalytics.iglu.core.{SchemaKey, SchemaVer, SelfDescribingData}
import com.snowplowanalytics.iglu.core.circe.CirceIgluCodecs._

import com.snowplowanalytics.snowplow.enrich.common.loaders.CollectorPayload
Expand All @@ -40,7 +40,9 @@ object CollectorPayloadGen {
generateRaw(nbGoodEvents, nbBadRows).map(_.toThrift).map(new TSerializer().serialize)

def generateRaw[F[_]: Sync](nbGoodEvents: Long, nbBadRows: Long): Stream[F, CollectorPayload] =
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream.repeatEval(runGen(collectorPayloadGen(false))).take(nbBadRows)
Stream.repeatEval(runGen(collectorPayloadGen(true))).take(nbGoodEvents) ++ Stream
.repeatEval(runGen(collectorPayloadGen(false)))
.take(nbBadRows)

private def collectorPayloadGen(valid: Boolean): Gen[CollectorPayload] =
for {
Expand Down Expand Up @@ -74,46 +76,74 @@ object CollectorPayloadGen {
aid <- Gen.const("enrich-kinesis-integration-tests").withKey("aid")
e <- Gen.const("ue").withKey("e")
tv <- Gen.oneOf("scala-tracker_1.0.0", "js_2.0.0", "go_1.2.3").withKey("tv")
uePx <-
if(valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
Gen.const("foo").withKey("ue_px")
uePx <- if (valid)
ueGen.map(_.toString).map(str => base64Encoder.encodeToString(str.getBytes)).withKey("ue_px")
else
Gen.const("foo").withKey("ue_px")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1,0,4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
SchemaKey("com.snowplowanalytics.snowplow", "payload_data", "jsonschema", SchemaVer.Full(1, 0, 4)),
List(asObject(List(p, aid, e, uePx, tv))).asJson
).asJson.toString

private def ueGen =
for {
sdj <- Gen.oneOf(changeFormGen, clientSessionGen)
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1,0,0)),
SchemaKey("com.snowplowanalytics.snowplow", "unstruct_event", "jsonschema", SchemaVer.Full(1, 0, 0)),
sdj.asJson
).asJson


private def changeFormGen =
for {
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
formId <- strGen(32, Gen.alphaNumChar).withKey("formId")
elementId <- strGen(32, Gen.alphaNumChar).withKey("elementId")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen.option(Gen.oneOf(List("button", "checkbox", "color", "date", "datetime", "datetime-local", "email", "file", "hidden", "image", "month", "number", "password", "radio", "range", "reset", "search", "submit", "tel", "text", "time", "url", "week"))).withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
nodeName <- Gen.oneOf(List("INPUT", "TEXTAREA", "SELECT")).withKey("nodeName")
`type` <- Gen
.option(
Gen.oneOf(
List(
"button",
"checkbox",
"color",
"date",
"datetime",
"datetime-local",
"email",
"file",
"hidden",
"image",
"month",
"number",
"password",
"radio",
"range",
"reset",
"search",
"submit",
"tel",
"text",
"time",
"url",
"week"
)
)
)
.withKeyOpt("type")
value <- Gen.option(strGen(16, Gen.alphaNumChar)).withKeyNull("value")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1,0,0)),
SchemaKey("com.snowplowanalytics.snowplow", "change_form", "jsonschema", SchemaVer.Full(1, 0, 0)),
asObject(List(formId, elementId, nodeName, `type`, value))
)

private def clientSessionGen =
for {
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
userId <- Gen.uuid.withKey("userId")
sessionId <- Gen.uuid.withKey("sessionId")
sessionIndex <- Gen.choose(0, 2147483647).withKey("sessionIndex")
previousSessionId <- Gen.option(Gen.uuid).withKeyNull("previousSessionId")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
storageMechanism <- Gen.oneOf(List("SQLITE", "COOKIE_1", "COOKIE_3", "LOCAL_STORAGE", "FLASH_LSO")).withKey("storageMechanism")
} yield SelfDescribingData(
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1,0,1)),
SchemaKey("com.snowplowanalytics.snowplow", "client_session", "jsonschema", SchemaVer.Full(1, 0, 1)),
asObject(List(userId, sessionId, sessionIndex, previousSessionId, storageMechanism))
)

Expand Down Expand Up @@ -159,7 +189,7 @@ object CollectorPayloadGen {

implicit class GenOps[A](gen: Gen[A]) {
def withKey[B](name: String)(implicit enc: Encoder[A]): Gen[Option[(String, Json)]] =
gen.map { a => Some((name -> a.asJson)) }
gen.map(a => Some((name -> a.asJson)))
}

implicit class GenOptOps[A](gen: Gen[Option[A]]) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import java.net.URI

import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.control.NonFatal

import cats.Applicative
import cats.implicits._
Expand Down Expand Up @@ -259,8 +258,7 @@ object Assets {
def worthRetrying[F[_]: Applicative](e: Throwable): F[Boolean] =
e match {
case _: Clients.RetryableFailure => Applicative[F].pure(true)
case _: IllegalArgumentException => Applicative[F].pure(false)
case NonFatal(_) => Applicative[F].pure(false)
case _ => Applicative[F].pure(false)
}

def onError[F[_]: Sync](error: Throwable, details: RetryDetails): F[Unit] =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ object Enrich {
payload.fold(_.asJson.noSpaces, _.map(_.toBadRowPayload.asJson.noSpaces).getOrElse("None"))

/** Log an error, turn the problematic `CollectorPayload` into `BadRow` and notify Sentry if configured */
def sendToSentry[F[_]: Sync: Clock](
def sendToSentry[F[_]: Sync](
original: Array[Byte],
sentry: Option[SentryClient],
processor: Processor,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@ import cats.implicits._

import fs2.Stream

import scala.concurrent.ExecutionContext

import cats.effect.kernel.{Async, Resource, Sync}
import cats.effect.ExitCode

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common.fs2.io

import java.nio.file.{Files, Path}

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.io.{Source => SSource}

import cats.data.EitherT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.fs2.blackbox

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import org.specs2.mutable.Specification

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ trait Adapter {
formatter: FormatterFunc,
platform: String
): RawEventParameters = {
val params = formatter(parameters - ("nuid", "aid", "cv", "p"))
val params = formatter(parameters -- List("nuid", "aid", "cv", "p"))
val json = toUnstructEvent(SelfDescribingData(schema, params)).noSpaces
buildUnstructEventParams(tracker, platform, parameters, json)
}
Expand All @@ -182,7 +182,7 @@ trait Adapter {
"p" -> parameters.getOrElse("p", Option(platform)), // Required field
"ue_pr" -> Option(json)
) ++
parameters.filterKeys(AcceptedQueryParameters)
parameters.view.filterKeys(AcceptedQueryParameters).toMap

/**
* Creates a Snowplow unstructured event by nesting the provided JValue in a self-describing
Expand Down Expand Up @@ -375,7 +375,7 @@ trait Adapter {
*/
private[registry] def camelCase(snakeOrDash: String) =
snakeCaseOrDashTokenCapturingRegex.replaceAllIn(
Character.toLowerCase(snakeOrDash.charAt(0)) + snakeOrDash.substring(1),
Character.toString(Character.toLowerCase(snakeOrDash.charAt(0))) + snakeOrDash.substring(1),
m => m.group(1).capitalize
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry

import scala.annotation.tailrec

import cats.{Applicative, Functor, Monad}
import cats.{Applicative, Monad}
import cats.data.{NonEmptyList, ValidatedNel}
import cats.implicits._

Expand Down Expand Up @@ -531,7 +531,7 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
)
schemaVal = lookupSchema(
hitType.some,
unstructEventData.mapValues(_.schemaKey)
unstructEventData.view.mapValues(_.schemaKey).toMap
).toValidatedNel
simpleContexts = buildContexts(params, contextData, fieldToSchemaMap)
compositeContexts = buildCompositeContexts(
Expand Down Expand Up @@ -675,7 +675,9 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
// composite params have digits in their key
composite <- originalParams
.collect { case (k, Some(v)) => (k, v) }
.view
.filterKeys(k => k.exists(_.isDigit))
.toMap
.asRight
brokenDown <- composite.toList.sorted.map {
case (k, v) => breakDownCompField(k, v, indicator)
Expand All @@ -684,7 +686,9 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
// we additionally make sure we have a rectangular dataset
grouped = (partitioned._2 ++ removeConsecutiveDuplicates(partitioned._1)).flatten
.groupBy(_._1)
.view
.mapValues(_.map(_._2))
.toMap
translated <- {
val m = grouped
.foldLeft(
Expand Down Expand Up @@ -821,7 +825,7 @@ case class GoogleAnalyticsAdapter(schemas: GoogleAnalyticsSchemas) extends Adapt
case head => head :: transpose(l.collect { case _ :: tail => tail })
}

private def traverseMap[G[_]: Functor: Applicative, K, V](m: Map[K, G[V]]): G[Map[K, V]] =
private def traverseMap[G[_]: Applicative, K, V](m: Map[K, G[V]]): G[Map[K, V]] =
m.toList
.traverse {
case (name, vnel) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ package com.snowplowanalytics.snowplow.enrich.common.adapters.registry
import java.net.URI
import java.nio.charset.StandardCharsets.UTF_8

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._
import scala.util.{Try, Success => TS, Failure => TF}

import cats.Monad
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ object RedirectAdapter extends Adapter {
case (None, Some(Some(co))) if co == "" => newCo.asRight
case (None, Some(Some(co))) => addToExistingCo(json, co).map(str => Map("co" -> str))
case (Some(Some(cx)), _) => addToExistingCx(json, cx).map(str => Map("cx" -> str))
case other => throw new IllegalStateException(s"Illegal state: $other")
}
} else
// Add URI redirect as an unstructured event
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,7 +287,7 @@ object EnrichmentManager {
def setCollectorTstamp(event: EnrichedEvent, timestamp: Option[DateTime]): Either[FailureDetails.EnrichmentFailure, Unit] =
EE.formatCollectorTstamp(timestamp).map { t =>
event.collector_tstamp = t
().asRight
()
}

def setUseragent(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ final case class AnonIpEnrichment(ipv4Octets: AnonIPv4Octets.AnonIPv4Octets, ipv
.map {
case _: Inet4Address => anonymizeIpV4(ip)
case ipv6: Inet6Address => anonymizeIpV6(ipv6.getHostAddress)
case _ => throw new IllegalStateException(s"Illegal state")
}
.getOrElse(tryAnonymizingInvalidIp(ip))
}.orNull
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
*/
package com.snowplowanalytics.snowplow.enrich.common.enrichments.registry

import scala.collection.JavaConverters._
import scala.jdk.CollectionConverters._

import cats.data.ValidatedNel
import cats.syntax.either._
Expand Down Expand Up @@ -64,7 +64,7 @@ object YauaaEnrichment extends ParseableEnrichment {
s match {
case _ if s.isEmpty => s
case _ if s.length == 1 => s.toLowerCase
case _ => s.charAt(0).toLower + s.substring(1)
case _ => Character.toString(s.charAt(0).toLower) + s.substring(1)
}
}

Expand Down Expand Up @@ -112,7 +112,9 @@ final case class YauaaEnrichment(cacheSize: Option[Int]) extends Enrichment {
parsedUA.getAvailableFieldNamesSorted.asScala
.map(field => decapitalize(field) -> parsedUA.getValue(field))
.toMap
.view
.filterKeys(validFields)
.toMap
}

/** Yauaa 7.x added many new fields which are not in the 1-0-4 schema */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ object ApiRequestEnrichment extends ParseableEnrichment {
UUID.nameUUIDFromBytes(contentKey.getBytes).toString
}

def create[F[_]: Async: Clock](
def create[F[_]: Async](
schemaKey: SchemaKey,
inputs: List[Input],
api: HttpApi,
Expand Down
Loading

0 comments on commit b47cb47

Please sign in to comment.