diff --git a/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/attributes/AttributesCacheSpec.scala b/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/attributes/AttributesCacheSpec.scala index 57c14cd943..0b89b5b59e 100644 --- a/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/attributes/AttributesCacheSpec.scala +++ b/storage/src/test/scala/ch/epfl/bluebrain/nexus/storage/attributes/AttributesCacheSpec.scala @@ -4,159 +4,157 @@ import akka.actor.ActorSystem import akka.http.scaladsl.model.MediaTypes.{`application/octet-stream`, `image/jpeg`} import akka.testkit.TestKit import akka.util.Timeout -import cats.effect.IO -import cats.effect.unsafe.implicits._ +import cats.effect.{IO, Ref} +import cats.syntax.all._ import ch.epfl.bluebrain.nexus.storage.File.{Digest, FileAttributes} import ch.epfl.bluebrain.nexus.storage._ import ch.epfl.bluebrain.nexus.storage.config.AppConfig.DigestConfig import ch.epfl.bluebrain.nexus.storage.utils.Randomness import ch.epfl.bluebrain.nexus.testkit.scalatest.ce.CatsEffectSpec -import org.mockito.{IdiomaticMockito, Mockito} -import org.scalatest.concurrent.{Eventually, ScalaFutures} -import org.scalatest.{BeforeAndAfter, Ignore, Inspectors} +import org.scalactic.source.Position +import org.scalatest.concurrent.Eventually +import org.scalatest.{BeforeAndAfter, Inspectors} import java.nio.file.{Path, Paths} import java.time.{Clock, Instant, ZoneId} -import java.util.concurrent.atomic.AtomicInteger +import scala.concurrent.ExecutionContext import scala.concurrent.duration._ -import scala.concurrent.{ExecutionContext, Future} -@Ignore class AttributesCacheSpec extends TestKit(ActorSystem("AttributesCacheSpec")) with CatsEffectSpec - with IdiomaticMockito with BeforeAndAfter with Inspectors with Randomness - with Eventually - with ScalaFutures { + with Eventually { implicit override def patienceConfig: PatienceConfig = PatienceConfig(20.second, 100.milliseconds) - implicit val config: DigestConfig = + implicit val config: DigestConfig = DigestConfig("SHA-256", maxInMemory = 10, concurrentComputations = 3, 20, 5.seconds) - implicit val computation: AttributesComputation[String] = mock[AttributesComputation[String]] - implicit val timeout: Timeout = Timeout(1.minute) - implicit val executionContext: ExecutionContext = ExecutionContext.global - before { - Mockito.reset(computation) - } + implicit val timeout: Timeout = Timeout(1.minute) + implicit val executionContext: ExecutionContext = ExecutionContext.global trait Ctx { val path: Path = Paths.get(randomString()) val digest = Digest(config.algorithm, randomString()) val attributes = FileAttributes(s"file://$path", genInt().toLong, digest, `image/jpeg`) def attributesEmpty(p: Path = path) = FileAttributes(p.toAkkaUri, 0L, Digest.empty, `application/octet-stream`) - val counter = new AtomicInteger(0) + val computedPaths = Ref.unsafe[IO, Map[Path, Int]](Map.empty[Path, Int]) + + def computedPathsSize: IO[Int] = computedPaths.get.map(_.size) + + def wasCalledOnce(path: Path)(implicit pos: Position) = computedPaths.get + .map { + _.get(path) + } + .accepted + .value shouldEqual 1 implicit val clock: Clock = new Clock { override def getZone: ZoneId = ZoneId.systemDefault() override def withZone(zoneId: ZoneId): Clock = Clock.systemUTC() // For every attribute computation done, it passes one second - override def instant(): Instant = Instant.ofEpochSecond(counter.get + 1L) + override def instant(): Instant = Instant.EPOCH } - val attributesCache = AttributesCache[String] - computation(path, config.algorithm) shouldReturn - IO { counter.incrementAndGet(); attributes } + + val defaultComputation: AttributesComputation[String] = (path: Path, _: String) => + computedPaths.update(_.updatedWith(path)(_.fold(1)(_ + 1).some)).as(attributes) + + def computedAttributes(path: Path, algorithm: String): FileAttributes = { + val digest = Digest(algorithm, "COMPUTED") + FileAttributes(path.toAkkaUri, 42L, digest, `image/jpeg`) + } + } "An AttributesCache" should { "trigger a computation and fetch file after" in new Ctx { + implicit val computation: AttributesComputation[String] = defaultComputation + val attributesCache = AttributesCache[String] attributesCache.asyncComputePut(path, config.algorithm) - eventually(counter.get shouldEqual 1) - computation(path, config.algorithm) wasCalled once - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributes - computation(path, config.algorithm) wasCalled once + eventually { computedPathsSize.accepted shouldEqual 1 } + wasCalledOnce(path) + attributesCache.get(path).accepted shouldEqual attributes + wasCalledOnce(path) } "get file that triggers attributes computation" in new Ctx { - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty() - eventually(counter.get shouldEqual 1) - computation(path, config.algorithm) wasCalled once - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributes - computation(path, config.algorithm) wasCalled once + implicit val computation: AttributesComputation[String] = defaultComputation + val attributesCache = AttributesCache[String] + attributesCache.get(path).accepted shouldEqual attributesEmpty() + eventually(computedPathsSize.accepted shouldEqual 1) + wasCalledOnce(path) + attributesCache.get(path).accepted shouldEqual attributes + wasCalledOnce(path) } - //FIXME Flaky test - "verify 2 concurrent computations" ignore new Ctx { - val list = List.tabulate(10) { i => - val path = Paths.get(i.toString) - val digest = Digest(config.algorithm, i.toString) - path -> FileAttributes(path.toAkkaUri, i.toLong, digest, `image/jpeg`) - } + "verify 2 concurrent computations" in new Ctx { + val list = List.tabulate(10) { i => Paths.get(i.toString) } val time = System.currentTimeMillis() - forAll(list) { case (path, attr) => - computation(path, config.algorithm) shouldReturn - IO.fromFuture(IO.pure(Future { - Thread.sleep(1000) - counter.incrementAndGet() - attr - })) - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path) + implicit val delayedComputation: AttributesComputation[String] = (path: Path, algorithm: String) => + IO.sleep(1000.millis) >> defaultComputation(path, algorithm) >> IO.pure(computedAttributes(path, algorithm)) + val attributesCache = AttributesCache[String] + + forAll(list) { path => + attributesCache.get(path).accepted shouldEqual attributesEmpty(path) } - eventually(counter.get() shouldEqual 10) + eventually(computedPathsSize.accepted shouldEqual 10) - forAll(list) { case (path, _) => - eventually(computation(path, config.algorithm) wasCalled once) - } + forAll(list) { path => wasCalledOnce(path) } val diff = System.currentTimeMillis() - time diff should be > 4000L diff should be < 6500L - forAll(list) { case (path, attr) => - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attr + forAll(list) { path => + attributesCache.get(path).accepted shouldEqual computedAttributes(path, config.algorithm) } } - //FIXME Flaky test - "verify remove oldest" ignore new Ctx { - val list = List.tabulate(20) { i => - val path = Paths.get(i.toString) - val digest = Digest(config.algorithm, i.toString) - path -> FileAttributes(path.toAkkaUri, i.toLong, digest, `image/jpeg`) - } + "verify remove oldest" in new Ctx { + val list = List.tabulate(20) { i => Paths.get(i.toString) } - forAll(list) { case (path, attr) => - computation(path, config.algorithm) shouldReturn - IO { counter.incrementAndGet(); attr } - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path) + implicit val computation: AttributesComputation[String] = (path: Path, algorithm: String) => + defaultComputation(path, algorithm) >> IO.pure(computedAttributes(path, algorithm)) + val attributesCache = AttributesCache[String] + + forAll(list) { path => + attributesCache.get(path).accepted shouldEqual attributesEmpty(path) } - eventually(counter.get() shouldEqual 20) + eventually(computedPathsSize.accepted shouldEqual 20) - forAll(list.takeRight(10)) { case (path, attr) => - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attr + forAll(list.takeRight(10)) { path => + attributesCache.get(path).accepted shouldEqual computedAttributes(path, config.algorithm) } - forAll(list.take(10)) { case (path, _) => - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path) + forAll(list.take(10)) { path => + attributesCache.get(path).accepted shouldEqual attributesEmpty(path) } } "verify failure is skipped" in new Ctx { - val list = List.tabulate(5) { i => - val path = Paths.get(i.toString) - val digest = Digest(config.algorithm, i.toString) - path -> FileAttributes(path.toAkkaUri, i.toLong, digest, `image/jpeg`) - } + val list = List.tabulate(5) { i => Paths.get(i.toString) } - forAll(list) { case (path, attr) => - if (attr.bytes == 0L) - computation(path, config.algorithm) shouldReturn IO.raiseError(new RuntimeException) + implicit val computation: AttributesComputation[String] = (path: Path, algorithm: String) => { + if (path.endsWith("0")) + IO.raiseError(new RuntimeException) else - computation(path, config.algorithm) shouldReturn IO(attr) - - attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attributesEmpty(path) + defaultComputation(path, algorithm) >> IO.pure(computedAttributes(path, algorithm)) } + val attributesCache = AttributesCache[String] + + forAll(list) { path => attributesCache.get(path).accepted shouldEqual attributesEmpty(path) } - forAll(list.drop(1)) { case (path, attr) => - eventually(attributesCache.get(path).unsafeToFuture().futureValue shouldEqual attr) + forAll(list.drop(1)) { path => + eventually( + attributesCache.get(path).accepted shouldEqual computedAttributes(path, config.algorithm) + ) } } }