Fix flaky tests in storage (#4576)
imsdu authored Dec 8, 2023
1 parent ea075ab commit 83f748c
Showing 1 changed file with 80 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,159 +4,157 @@ import
import akka.http.scaladsl.model.MediaTypes.{`application/octet-stream`, `image/jpeg`}
import akka.testkit.TestKit
import akka.util.Timeout
import cats.effect.IO
import cats.effect.unsafe.implicits._
import cats.effect.{IO, Ref}
import cats.syntax.all._
import{Digest, FileAttributes}
import org.mockito.{IdiomaticMockito, Mockito}
import org.scalatest.concurrent.{Eventually, ScalaFutures}
import org.scalatest.{BeforeAndAfter, Ignore, Inspectors}
import org.scalactic.source.Position
import org.scalatest.concurrent.Eventually
import org.scalatest.{BeforeAndAfter, Inspectors}

import java.nio.file.{Path, Paths}
import java.time.{Clock, Instant, ZoneId}
import java.util.concurrent.atomic.AtomicInteger
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.concurrent.{ExecutionContext, Future}

class AttributesCacheSpec
extends TestKit(ActorSystem("AttributesCacheSpec"))
with CatsEffectSpec
with IdiomaticMockito
with BeforeAndAfter
with Inspectors
with Randomness
with Eventually
with ScalaFutures {
with Eventually {

implicit override def patienceConfig: PatienceConfig = PatienceConfig(20.second, 100.milliseconds)

implicit val config: DigestConfig =
implicit val config: DigestConfig =
DigestConfig("SHA-256", maxInMemory = 10, concurrentComputations = 3, 20, 5.seconds)
implicit val computation: AttributesComputation[String] = mock[AttributesComputation[String]]
implicit val timeout: Timeout = Timeout(1.minute)
implicit val executionContext: ExecutionContext =

before {
implicit val timeout: Timeout = Timeout(1.minute)
implicit val executionContext: ExecutionContext =

trait Ctx {
val path: Path = Paths.get(randomString())
val digest = Digest(config.algorithm, randomString())
val attributes = FileAttributes(s"file://$path", genInt().toLong, digest, `image/jpeg`)
def attributesEmpty(p: Path = path) = FileAttributes(p.toAkkaUri, 0L, Digest.empty, `application/octet-stream`)
val counter = new AtomicInteger(0)
val computedPaths = Ref.unsafe[IO, Map[Path, Int]](Map.empty[Path, Int])

def computedPathsSize: IO[Int] =

def wasCalledOnce(path: Path)(implicit pos: Position) = computedPaths.get
.map {
.value shouldEqual 1

implicit val clock: Clock = new Clock {
override def getZone: ZoneId = ZoneId.systemDefault()
override def withZone(zoneId: ZoneId): Clock = Clock.systemUTC()
// For every attribute computation done, it passes one second
override def instant(): Instant = Instant.ofEpochSecond(counter.get + 1L)
override def instant(): Instant = Instant.EPOCH
val attributesCache = AttributesCache[String]
computation(path, config.algorithm) shouldReturn
IO { counter.incrementAndGet(); attributes }

val defaultComputation: AttributesComputation[String] = (path: Path, _: String) =>
computedPaths.update(_.updatedWith(path)(_.fold(1)(_ + 1).some)).as(attributes)

def computedAttributes(path: Path, algorithm: String): FileAttributes = {
val digest = Digest(algorithm, "COMPUTED")
FileAttributes(path.toAkkaUri, 42L, digest, `image/jpeg`)


"An AttributesCache" should {

"trigger a computation and fetch file after" in new Ctx {
implicit val computation: AttributesComputation[String] = defaultComputation
val attributesCache = AttributesCache[String]
attributesCache.asyncComputePut(path, config.algorithm)
eventually(counter.get shouldEqual 1)
computation(path, config.algorithm) wasCalled once
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributes
computation(path, config.algorithm) wasCalled once
eventually { computedPathsSize.accepted shouldEqual 1 }
attributesCache.get(path).accepted shouldEqual attributes

"get file that triggers attributes computation" in new Ctx {
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty()
eventually(counter.get shouldEqual 1)
computation(path, config.algorithm) wasCalled once
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributes
computation(path, config.algorithm) wasCalled once
implicit val computation: AttributesComputation[String] = defaultComputation
val attributesCache = AttributesCache[String]
attributesCache.get(path).accepted shouldEqual attributesEmpty()
eventually(computedPathsSize.accepted shouldEqual 1)
attributesCache.get(path).accepted shouldEqual attributes

//FIXME Flaky test
"verify 2 concurrent computations" ignore new Ctx {
val list = List.tabulate(10) { i =>
val path = Paths.get(i.toString)
val digest = Digest(config.algorithm, i.toString)
path -> FileAttributes(path.toAkkaUri, i.toLong, digest, `image/jpeg`)
"verify 2 concurrent computations" in new Ctx {
val list = List.tabulate(10) { i => Paths.get(i.toString) }
val time = System.currentTimeMillis()

forAll(list) { case (path, attr) =>
computation(path, config.algorithm) shouldReturn
IO.fromFuture(IO.pure(Future {
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path)
implicit val delayedComputation: AttributesComputation[String] = (path: Path, algorithm: String) =>
IO.sleep(1000.millis) >> defaultComputation(path, algorithm) >> IO.pure(computedAttributes(path, algorithm))
val attributesCache = AttributesCache[String]

forAll(list) { path =>
attributesCache.get(path).accepted shouldEqual attributesEmpty(path)

eventually(counter.get() shouldEqual 10)
eventually(computedPathsSize.accepted shouldEqual 10)

forAll(list) { case (path, _) =>
eventually(computation(path, config.algorithm) wasCalled once)
forAll(list) { path => wasCalledOnce(path) }

val diff = System.currentTimeMillis() - time
diff should be > 4000L
diff should be < 6500L

forAll(list) { case (path, attr) =>
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attr
forAll(list) { path =>
attributesCache.get(path).accepted shouldEqual computedAttributes(path, config.algorithm)

//FIXME Flaky test
"verify remove oldest" ignore new Ctx {
val list = List.tabulate(20) { i =>
val path = Paths.get(i.toString)
val digest = Digest(config.algorithm, i.toString)
path -> FileAttributes(path.toAkkaUri, i.toLong, digest, `image/jpeg`)
"verify remove oldest" in new Ctx {
val list = List.tabulate(20) { i => Paths.get(i.toString) }

forAll(list) { case (path, attr) =>
computation(path, config.algorithm) shouldReturn
IO { counter.incrementAndGet(); attr }
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path)
implicit val computation: AttributesComputation[String] = (path: Path, algorithm: String) =>
defaultComputation(path, algorithm) >> IO.pure(computedAttributes(path, algorithm))
val attributesCache = AttributesCache[String]

forAll(list) { path =>
attributesCache.get(path).accepted shouldEqual attributesEmpty(path)

eventually(counter.get() shouldEqual 20)
eventually(computedPathsSize.accepted shouldEqual 20)

forAll(list.takeRight(10)) { case (path, attr) =>
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attr
forAll(list.takeRight(10)) { path =>
attributesCache.get(path).accepted shouldEqual computedAttributes(path, config.algorithm)

forAll(list.take(10)) { case (path, _) =>
attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path)
forAll(list.take(10)) { path =>
attributesCache.get(path).accepted shouldEqual attributesEmpty(path)

"verify failure is skipped" in new Ctx {
val list = List.tabulate(5) { i =>
val path = Paths.get(i.toString)
val digest = Digest(config.algorithm, i.toString)
path -> FileAttributes(path.toAkkaUri, i.toLong, digest, `image/jpeg`)
val list = List.tabulate(5) { i => Paths.get(i.toString) }

forAll(list) { case (path, attr) =>
if (attr.bytes == 0L)
computation(path, config.algorithm) shouldReturn IO.raiseError(new RuntimeException)
implicit val computation: AttributesComputation[String] = (path: Path, algorithm: String) => {
if (path.endsWith("0"))
IO.raiseError(new RuntimeException)
computation(path, config.algorithm) shouldReturn IO(attr)

attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path)
defaultComputation(path, algorithm) >> IO.pure(computedAttributes(path, algorithm))
val attributesCache = AttributesCache[String]

forAll(list) { path => attributesCache.get(path).accepted shouldEqual attributesEmpty(path) }

forAll(list.drop(1)) { case (path, attr) =>
eventually(attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attr)
forAll(list.drop(1)) { path =>
attributesCache.get(path).accepted shouldEqual computedAttributes(path, config.algorithm)
0 comments on commit 83f748c

