diff --git a/.gitignore b/.gitignore index 13d9760..e610234 100644 --- a/.gitignore +++ b/.gitignore @@ -408,6 +408,7 @@ project/plugins/project/ .metals/ metals.sbt .bloop/ +.bsp/ project/secret # mdoc @@ -415,3 +416,4 @@ website/node_modules website/build website/i18n/en.json website/static/api + diff --git a/build.sbt b/build.sbt index f53835b..5db5079 100644 --- a/build.sbt +++ b/build.sbt @@ -60,9 +60,10 @@ lazy val zioCache = crossProject(JSPlatform, JVMPlatform, NativePlatform) .settings(buildInfoSettings("zio.cache")) .settings( libraryDependencies ++= Seq( - "dev.zio" %% "zio" % zioVersion, - "dev.zio" %% "zio-test" % zioVersion % Test, - "dev.zio" %% "zio-test-sbt" % zioVersion % Test + "dev.zio" %% "zio" % zioVersion, + "org.scala-lang.modules" %% "scala-collection-compat" % "2.7.0", + "dev.zio" %% "zio-test" % zioVersion % Test, + "dev.zio" %% "zio-test-sbt" % zioVersion % Test ) ) .settings(testFrameworks += new TestFramework("zio.test.sbt.ZTestFramework")) diff --git a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala index a54a7a8..3173221 100644 --- a/zio-cache/shared/src/main/scala/zio/cache/Cache.scala +++ b/zio-cache/shared/src/main/scala/zio/cache/Cache.scala @@ -281,17 +281,6 @@ object Cache { } } - /** - * A `MapKey` represents a key in the cache. It contains mutable references - * to the previous key and next key in the `KeySet` to support an efficient - * implementation of a sorted set of keys. - */ - private final class MapKey[Key]( - val value: Key, - var previous: MapKey[Key] = null, - var next: MapKey[Key] = null - ) - /** * A `MapValue` represents a value in the cache. A value may either be * `Pending` with a `Promise` that will contain the result of computing the @@ -346,60 +335,4 @@ object Cache { new AtomicBoolean(false) ) } - - /** - * A `KeySet` is a sorted set of keys in the cache ordered by last access. - * For efficiency, the set is implemented in terms of a doubly linked list - * and is not safe for concurrent access. - */ - private final class KeySet[Key] { - private[this] var head: MapKey[Key] = null - private[this] var tail: MapKey[Key] = null - - /** - * Adds the specified key to the set. - */ - def add(key: MapKey[Key]): Unit = - if (key ne tail) { - if (tail ne null) { - val previous = key.previous - val next = key.next - if (next ne null) { - key.next = null - if (previous ne null) { - previous.next = next - next.previous = previous - } else { - head = next - head.previous = null - } - } - tail.next = key - key.previous = tail - tail = key - } else { - head = key - tail = key - } - } - - /** - * Removes the lowest priority key from the set. - */ - def remove(): MapKey[Key] = { - val key = head - if (key ne null) { - val next = key.next - if (next ne null) { - key.next = null - head = next - head.previous = null - } else { - head = null - tail = null - } - } - key - } - } } diff --git a/zio-cache/shared/src/main/scala/zio/cache/KeySet.scala b/zio-cache/shared/src/main/scala/zio/cache/KeySet.scala new file mode 100644 index 0000000..e250477 --- /dev/null +++ b/zio-cache/shared/src/main/scala/zio/cache/KeySet.scala @@ -0,0 +1,57 @@ +package zio.cache + +/** + * A `KeySet` is a sorted set of keys in the cache ordered by last access. + * For efficiency, the set is implemented in terms of a doubly linked list + * and is not safe for concurrent access. + */ +private[cache] final class KeySet[Key] { + private[this] var head: MapKey[Key] = null + private[this] var tail: MapKey[Key] = null + + /** + * Adds the specified key to the set. + */ + def add(key: MapKey[Key]): Unit = + if (key ne tail) { + if (tail ne null) { + val previous = key.previous + val next = key.next + if (next ne null) { + key.next = null + if (previous ne null) { + previous.next = next + next.previous = previous + } else { + head = next + head.previous = null + } + } + tail.next = key + key.previous = tail + tail = key + } else { + head = key + tail = key + } + } + + /** + * Removes the lowest priority key from the set. + */ + def remove(): MapKey[Key] = { + val key = head + if (key ne null) { + val next = key.next + if (next ne null) { + key.next = null + head = next + head.previous = null + } else { + head = null + tail = null + } + } + key + } +} diff --git a/zio-cache/shared/src/main/scala/zio/cache/ManagedCache.scala b/zio-cache/shared/src/main/scala/zio/cache/ManagedCache.scala new file mode 100644 index 0000000..5819f4d --- /dev/null +++ b/zio-cache/shared/src/main/scala/zio/cache/ManagedCache.scala @@ -0,0 +1,386 @@ +package zio.cache + +import zio.ZManaged.Finalizer +import zio._ +import zio.internal.MutableConcurrentQueue + +import java.time.{Clock, Duration, Instant} +import java.util +import java.util.concurrent.atomic.{AtomicBoolean, AtomicInteger, LongAdder} +import scala.jdk.CollectionConverters._ + +sealed abstract class ManagedCache[-Key, +Error, +Value] { + + /** + * Returns statistics for this cache. + * @return + */ + def cacheStats: UIO[CacheStats] + + /** + * Return whether a resource associated with the specified key exists in the cache. + * Sometime `contains` can return true if the resource is currently being created + * but not yet totally created + * @param key + * @return + */ + def contains(key: Key): UIO[Boolean] + + /** + * Return statistics for the specified entry. + */ + def entryStats(key: Key): UIO[Option[EntryStats]] + + /** + * Gets the value from the cache if it exists or otherwise computes it, the release action signals to the cache that + * the value is no longer being used and can potentially be finalized subject to the policies of the cache + * @param key + * @return + */ + def get(key: Key): Managed[Error, Value] + + /** + * Force the reuse of the lookup function to compute the returned managed associated with the specified key immediately + * Once the new resource is recomputed, the old resource associated to the key is cleaned (once all fiber using it are done with it) + * During the time the new resource is computed, concurrent call the .get will use the old resource if this one is not expired + * @param key + * @return + */ + def refresh(key: Key): IO[Error, Unit] + + /** + * Invalidates the resource associated with the specified key. + */ + def invalidate(key: Key): UIO[Unit] + + /** + * Invalidates all values in the cache. + */ + def invalidateAll: UIO[Unit] + + /** + * Returns the approximate number of values in the cache. + */ + def size: UIO[Int] +} +object ManagedCache { + + /** + * Constructs a new cache with the specified capacity, time to live, and + * lookup function. + */ + def make[Key, Environment, Error, Value]( + capacity: Int, + timeToLive: Duration, + lookup: ManagedLookup[Key, Environment, Error, Value] + ): URManaged[Environment, ManagedCache[Key, Error, Value]] = + makeWith(capacity, lookup)(_ => timeToLive) + + /** + * Constructs a new cache with the specified capacity, time to live, and + * lookup function, where the time to live can depend on the `Exit` value + * returned by the lookup function. + */ + def makeWith[Key, Environment, Error, Value]( + capacity: Int, + managedLookup: ManagedLookup[Key, Environment, Error, Value] + )(timeToLive: Exit[Error, Value] => Duration): URManaged[Environment, ManagedCache[Key, Error, Value]] = + makeWith(capacity, managedLookup, Clock.systemUTC())(timeToLive) + + //util for test because it allow to inject a mocked Clock + private[cache] def makeWith[Key, Environment, Error, Value]( + capacity: Int, + managedLookup: ManagedLookup[Key, Environment, Error, Value], + clock: Clock + )(timeToLive: Exit[Error, Value] => Duration): URManaged[Environment, ManagedCache[Key, Error, Value]] = + ZManaged.make(buildWith(capacity, managedLookup, clock)(timeToLive))(_.invalidateAll) + + private def buildWith[Key, Environment, Error, Value]( + capacity: Int, + managedLookup: ManagedLookup[Key, Environment, Error, Value], + clock: Clock + )(timeToLive: Exit[Error, Value] => Duration): URIO[Environment, ManagedCache[Key, Error, Value]] = + ZIO.environment[Environment].map { environment => + val cacheState = CacheState.initial[Key, Error, Value]() + import cacheState._ + + def trackAccess(key: MapKey[Key]): Array[MapValue[Key, Error, Value]] = { + val cleanedKey = scala.collection.mutable.ArrayBuilder.make[MapValue[Key, Error, Value]] + accesses.offer(key) + if (updating.compareAndSet(false, true)) { + var loop = true + while (loop) { + val key = accesses.poll(null) + if (key ne null) { + keys.add(key) + } else { + loop = false + } + } + var size = map.size + loop = size > capacity + while (loop) { + val key = keys.remove() + if (key ne null) { + val removed = map.remove(key.value) + if (removed ne null) { + size -= 1 + cleanedKey += removed + loop = size > capacity + } + } else { + loop = false + } + } + updating.set(false) + } + cleanedKey.result() + } + + def trackHit(): Unit = + hits.increment() + + def trackMiss(): Unit = + misses.increment() + + new ManagedCache[Key, Error, Value] { + private def ensureMapSizeNotExceeded(key: MapKey[Key]): UIO[Unit] = + ZIO.foreachPar_(trackAccess(key)) { cleanedMapValue => + cleanMapValue(cleanedMapValue) + } + + override def cacheStats: UIO[CacheStats] = + ZIO.succeed(CacheStats(hits.longValue, misses.longValue, map.size)) + + override def contains(k: Key): UIO[Boolean] = + ZIO.succeed(map.containsKey(k)) + + override def entryStats(k: Key): UIO[Option[EntryStats]] = + ZIO.succeed { + val value = map.get(k) + if (value eq null) None + else { + value match { + case MapValue.Pending(_, _) => + None + case MapValue.Complete(_, _, _, entryState, _) => + Option(EntryStats(entryState.loaded)) + case MapValue.Refreshing(_, MapValue.Complete(_, _, _, entryState, _)) => + Option(EntryStats(entryState.loaded)) + } + } + } + + override def get(k: Key): Managed[Error, Value] = Managed.unwrap { + lookupValueOf(k).memoize.flatMap { lookupValue => + UIO.effectSuspendTotal { + var key: MapKey[Key] = null + var value = map.get(k) + if (value eq null) { + key = new MapKey(k) + value = map.putIfAbsent(k, MapValue.Pending(key, lookupValue)) + } + if (value eq null) { + trackMiss() + ensureMapSizeNotExceeded(key) *> lookupValue + } else { + value match { + case MapValue.Pending(key, managed) => + trackHit() + ensureMapSizeNotExceeded(key) *> managed + case complete @ MapValue.Complete(key, _, _, _, timeToLive) => + trackHit() + if (hasExpired(timeToLive)) { + map.remove(k, value) + ensureMapSizeNotExceeded(key) *> complete.releaseOwner *> ZIO.succeed(get(k)) + } else { + ensureMapSizeNotExceeded(key).as(complete.toManaged) + } + case MapValue.Refreshing(promiseInProgress, complete @ MapValue.Complete(mapKey, _, _, _, ttl)) => + trackHit() + if (hasExpired(ttl)) { + ensureMapSizeNotExceeded(mapKey) *> promiseInProgress + } else { + ensureMapSizeNotExceeded(mapKey).as(complete.toManaged) + } + } + } + } + } + } + + override def refresh(k: Key): IO[Error, Unit] = lookupValueOf(k).memoize.flatMap { managed => + var value = map.get(k) + var newKey: MapKey[Key] = null + if (value eq null) { + newKey = new MapKey[Key](k) + value = map.putIfAbsent(k, MapValue.Pending(newKey, managed)) + } + val finalManaged = if (value eq null) { + ensureMapSizeNotExceeded(newKey) *> managed + } else { + value match { + case MapValue.Pending(_, managedEffect) => + managedEffect + case completeResult @ MapValue.Complete(_, _, _, _, ttl) => + if (hasExpired(ttl)) { + ZIO.succeed(get(k)) + } else { + if (map.replace(k, completeResult, MapValue.Refreshing(managed, completeResult))) { + managed + } else { + ZIO.succeed(get(k)) + } + } + case MapValue.Refreshing(managed, _) => managed + } + } + finalManaged.flatMap(_.use_(ZIO.unit)) + } + + override def invalidate(k: Key): UIO[Unit] = + map.remove(k) match { + case complete @ MapValue.Complete(_, _, _, _, _) => complete.releaseOwner + case MapValue.Refreshing(_, complete) => complete.releaseOwner + case _ => UIO.unit + } + + override def invalidateAll: UIO[Unit] = + ZIO.foreachPar_(map.keySet().asScala)(invalidate) + + override def size: UIO[Int] = + ZIO.succeed(map.size) + + private def cleanMapValue(mapValue: MapValue[Key, Error, Value]): UIO[Unit] = + mapValue match { + case complete @ MapValue.Complete(_, _, _, _, _) => complete.releaseOwner + case MapValue.Refreshing(_, complete) => complete.releaseOwner + case _ => ZIO.unit + } + + private def lookupValueOf(key: Key): UIO[Managed[Error, Value]] = for { + managedEffect <- (for { + reservation <- managedLookup(key) + .provide(environment) + .reserve + exit <- reservation.acquire.run + } yield (exit, reservation.release)) + .onInterrupt(ZIO.effectTotal(map.remove(key))) + .flatMap { case (exit, release) => + val now = Instant.now(clock) + val expiredAt = now.plus(timeToLive(exit)) + exit match { + case Exit.Success(value) => + val exitWithReleaser: Exit[Nothing, (Value, Finalizer)] = + Exit.succeed(value -> release) + val completedResult = MapValue + .Complete( + key = new MapKey(key), + exit = exitWithReleaser, + ownerCount = new AtomicInteger(1), + entryStats = EntryStats(now), + timeToLive = expiredAt + ) + val previousValue = map.put(key, completedResult) + ZIO.succeed( + Managed.unwrap(cleanMapValue(previousValue).as(completedResult.toManaged)) + ) + case failure @ Exit.Failure(_) => + val completedResult = + MapValue.Complete( + key = new MapKey(key), + exit = failure, + ownerCount = new AtomicInteger(0), + entryStats = EntryStats(now), + timeToLive = expiredAt + ) + val previousValue = map.put(key, completedResult) + release(failure) *> ZIO.succeed( + Managed.unwrap(cleanMapValue(previousValue).as(completedResult.toManaged)) + ) + } + } + .memoize + } yield Managed.unwrap(managedEffect) + + private def hasExpired(timeToLive: Instant) = + Instant.now(clock).isAfter(timeToLive) + } + } + + /** + * A `MapValue` represents a value in the cache. A value may either be + * `Pending` with a `Promise` that will contain the result of computing the + * lookup function, when it is available, or `Complete` with an `Exit` value + * that contains the result of computing the lookup function. + */ + private sealed trait MapValue[Key, +Error, +Value] extends Product with Serializable + + private object MapValue { + final case class Pending[Key, Error, Value]( + key: MapKey[Key], + managed: UIO[Managed[Error, Value]] + ) extends MapValue[Key, Error, Value] + + final case class Complete[Key, +Error, +Value]( + key: MapKey[Key], + exit: Exit[Error, (Value, Finalizer)], + ownerCount: AtomicInteger, + entryStats: EntryStats, + timeToLive: Instant + ) extends MapValue[Key, Error, Value] { + def toManaged: Managed[Error, Value] = + exit.fold( + cause => ZManaged.done(Exit.Failure(cause)), + { case (value, _) => + ZManaged.makeExit_(ZIO.effectTotal(ownerCount.incrementAndGet()).as(value)) { _ => + releaseOwner + } + } + ) + + def releaseOwner: UIO[Unit] = + exit.fold( + _ => UIO.unit, + { case (_, finalizer) => + ZIO.effectTotal(ownerCount.decrementAndGet()).flatMap { numOwner => + finalizer(Exit.unit).when(numOwner == 0) + } + } + ) + } + + final case class Refreshing[Key, Error, Value]( + managedEffect: UIO[Managed[Error, Value]], + complete: Complete[Key, Error, Value] + ) extends MapValue[Key, Error, Value] + } + + /** + * The `CacheState` represents the mutable state underlying the cache. + */ + private final case class CacheState[Key, Error, Value]( + map: util.Map[Key, MapValue[Key, Error, Value]], + keys: KeySet[Key], + accesses: MutableConcurrentQueue[MapKey[Key]], + hits: LongAdder, + misses: LongAdder, + updating: AtomicBoolean + ) + + private object CacheState { + + /** + * Constructs an initial cache state. + */ + def initial[Key, Error, Value](): CacheState[Key, Error, Value] = + CacheState( + Platform.newConcurrentMap, + new KeySet, + MutableConcurrentQueue.unbounded, + new LongAdder, + new LongAdder, + new AtomicBoolean(false) + ) + } +} diff --git a/zio-cache/shared/src/main/scala/zio/cache/ManagedLookup.scala b/zio-cache/shared/src/main/scala/zio/cache/ManagedLookup.scala new file mode 100644 index 0000000..7e4bdcb --- /dev/null +++ b/zio-cache/shared/src/main/scala/zio/cache/ManagedLookup.scala @@ -0,0 +1,16 @@ +package zio.cache + +import zio.ZManaged + +/** + * Like lookup but managed version + */ +final case class ManagedLookup[-Key, -Environment, +Error, +Value](lookup: Key => ZManaged[Environment, Error, Value]) + extends (Key => ZManaged[Environment, Error, Value]) { + + /** + * Computes a value for the specified key or fails with an error. + */ + def apply(key: Key): ZManaged[Environment, Error, Value] = + lookup(key) +} diff --git a/zio-cache/shared/src/main/scala/zio/cache/MapKey.scala b/zio-cache/shared/src/main/scala/zio/cache/MapKey.scala new file mode 100644 index 0000000..2b6d7a8 --- /dev/null +++ b/zio-cache/shared/src/main/scala/zio/cache/MapKey.scala @@ -0,0 +1,12 @@ +package zio.cache + +/** + * A `MapKey` represents a key in the cache. It contains mutable references + * to the previous key and next key in the `KeySet` to support an efficient + * implementation of a sorted set of keys. + */ +private[cache] final class MapKey[Key]( + val value: Key, + var previous: MapKey[Key] = null, + var next: MapKey[Key] = null +) diff --git a/zio-cache/shared/src/test/scala/zio/cache/ManagedCacheSpec.scala b/zio-cache/shared/src/test/scala/zio/cache/ManagedCacheSpec.scala new file mode 100644 index 0000000..de17251 --- /dev/null +++ b/zio-cache/shared/src/test/scala/zio/cache/ManagedCacheSpec.scala @@ -0,0 +1,973 @@ +package zio.cache + +import zio._ +import zio.clock.Clock +import zio.duration._ +import zio.random.Random +import zio.test.Assertion._ +import zio.test._ + +object ManagedCacheSpec extends DefaultRunnableSpec { + override def spec: ZSpec[Environment, Any] = suite("SharedManaged")( + testM("cacheStats should correctly keep track of cache size, hits and misses") { + checkM(Gen.anyInt) { salt => + val capacity = 100 + val managedCache = + ManagedCache.make( + capacity = capacity, + timeToLive = Duration.Infinity, + lookup = ManagedLookup((key: Int) => Managed.succeed(hash(salt)(key))) + ) + managedCache.use { cache => + for { + _ <- ZIO.foreachPar_((1 to capacity).map(_ / 2))(cache.get(_).use_(ZIO.unit)) + cacheStats <- cache.cacheStats + } yield assert(cacheStats)(equalTo(CacheStats(hits = 49, misses = 51, size = 51))) + } + } + }, + testM("invalidate should properly remove and clean a resource from the cache") { + val capacity = 100 + for { + observablesResource <- ZIO.foreach((0 until capacity).toList)(_ => ObservableResourceForTest.makeUnit) + managedCache = + ManagedCache.make( + capacity = capacity, + timeToLive = Duration.Infinity, + lookup = ManagedLookup((key: Int) => observablesResource(key).managed) + ) + result <- + managedCache.use { cache => + for { + _ <- ZIO.foreachPar_((0 until capacity))(cache.get(_).use_(ZIO.unit)) + _ <- cache.invalidate(42) + cacheContainsKey42 <- cache.contains(42) + cacheStats <- cache.cacheStats + key42ResourceCleaned <- observablesResource(42).assertAcquiredOnceAndCleaned + allOtherStillNotCleaned <- + ZIO.foreach(observablesResource.zipWithIndex.filterNot(_._2 == 42).map(_._1))( + _.assertAcquiredOnceAndNotCleaned + ) + } yield assert(cacheStats)(equalTo(CacheStats(hits = 0, misses = 100, size = 99))) && assert( + cacheContainsKey42 + )( + isFalse + ) && key42ResourceCleaned && allOtherStillNotCleaned.reduce(_ && _) + } + } yield result + }, + testM("invalidateAll should properly remove and clean all resource from the cache") { + val capacity = 100 + for { + observablesResource <- ZIO.foreach((0 until capacity).toList)(_ => ObservableResourceForTest.makeUnit) + managedCache = + ManagedCache.make( + capacity, + Duration.Infinity, + ManagedLookup((key: Int) => observablesResource(key).managed) + ) + result <- managedCache.use { cache => + for { + _ <- ZIO.foreachPar_((0 until capacity))(cache.get(_).use_(ZIO.unit)) + _ <- cache.invalidateAll + contains <- ZIO.foreachPar(0 to capacity)(cache.contains(_)) + cacheStats <- cache.cacheStats + allCleaned <- ZIO.foreach(observablesResource)(_.assertAcquiredOnceAndCleaned) + } yield assert(cacheStats)(equalTo(CacheStats(hits = 0, misses = 100, size = 0))) && assert( + contains + )(forall(isFalse)) && + allCleaned.reduce(_ && _) + } + } yield result + }, + suite(".get")( + testM("should not put anything in the cache before returned managed by get is used") { + for { + subResource <- ObservableResourceForTest.makeUnit + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = 60.second, + lookup = ManagedLookup((_: Unit) => subResource.managed) + ) + checkInside <- + managedCache.use { cache => + for { + notAcquiredBeforeAnything <- subResource.assertNotAcquired + _ = cache.get(key = ()) + notAcquireYetAfterGettingManagedFromCache <- subResource.assertNotAcquired + keyPresent <- cache.contains(key = ()) + } yield notAcquiredBeforeAnything && assert(keyPresent)( + isFalse + ) && notAcquireYetAfterGettingManagedFromCache + } + } yield checkInside + }, + testM("when use sequentially, should properly call correct lookup") { + checkM(Gen.anyInt) { salt => + val managedCache = + ManagedCache.make(100, Duration.Infinity, ManagedLookup((key: Int) => Managed.succeed(hash(salt)(key)))) + managedCache.use { cache => + for { + actual <- ZIO.foreach(1 to 100)(cache.get(_).use(ZIO.succeed(_))) + expected = (1 to 100).map(hash(salt)) + } yield assert(actual)(equalTo(expected)) + } + } + }, + testM("when use concurrent, should properly call correct lookup") { + checkM(Gen.anyInt) { salt => + val managedCache = + ManagedCache.make(100, Duration.Infinity, ManagedLookup((key: Int) => Managed.succeed(hash(salt)(key)))) + managedCache.use { cache => + for { + actual <- ZIO.foreachPar(1 to 100)(cache.get(_).use(ZIO.succeed(_))) + expected = (1 to 100).map(hash(salt)) + } yield assert(actual)(equalTo(expected)) + } + } + }, + testM(".get should clean and remove old resource to respect cache capacity") { + checkM(Gen.anyInt) { salt => + val managedCache = + ManagedCache.make(10, Duration.Infinity, ManagedLookup((key: Int) => Managed.succeed(hash(salt)(key)))) + managedCache.use { cache => + for { + actual <- ZIO.foreach(1 to 100)(cache.get(_).use(ZIO.succeed(_))) + expected = (1 to 100).map(hash(salt)) + cacheStats <- cache.cacheStats + } yield assert(actual)(equalTo(expected)) && assert(cacheStats.size)(equalTo(10)) + } + } + }, + testM("sequential use on managed returned by a single call to .get should create only one resource") { + for { + subResource <- ObservableResourceForTest.makeUnit + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = 60.second, + lookup = ManagedLookup((_: Unit) => subResource.managed) + ) + checkInside <- + managedCache.use { cache => + for { + notAcquiredBeforeAnything <- subResource.assertNotAcquired + resourceManagedProxy = cache.get(key = ()) + notAcquireYetAfterGettingManagedFromCache <- subResource.assertNotAcquired + _ <- resourceManagedProxy.use(ZIO.succeed(_)) + acquireOnceAfterUse <- subResource.assertAcquiredOnceAndNotCleaned + _ <- resourceManagedProxy.use(ZIO.succeed(_)) + acquireOnceAfterSecondUse <- subResource.assertAcquiredOnceAndNotCleaned + } yield notAcquiredBeforeAnything && notAcquireYetAfterGettingManagedFromCache && acquireOnceAfterUse && acquireOnceAfterSecondUse + } + finallyCleaned <- subResource.assertAcquiredOnceAndCleaned + } yield checkInside && finallyCleaned + }, + testM("sequentially use of .get should create only one resource") { + for { + subResource <- ObservableResourceForTest.makeUnit + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = 60.second, + lookup = ManagedLookup((_: Unit) => subResource.managed) + ) + checkInside <- managedCache.use { cache => + for { + notAcquiredBeforeAnything <- subResource.assertNotAcquired + _ <- cache.get(key = ()).use(ZIO.succeed(_)) + acquireOnceAfterUse <- subResource.assertAcquiredOnceAndNotCleaned + _ <- cache.get(key = ()).use(ZIO.succeed(_)) + acquireOnceAfterSecondUse <- subResource.assertAcquiredOnceAndNotCleaned + } yield notAcquiredBeforeAnything && acquireOnceAfterUse && acquireOnceAfterSecondUse + } + finallyCleaned <- subResource.assertAcquiredOnceAndCleaned + } yield checkInside && finallyCleaned + }, + testM("sequential use on failing managed should cache the error and immediately call the resource finalizer") { + for { + watchableLookup <- + WatchableLookup.makeZIO[Unit, Throwable, Nothing](_ => ZIO.fail(new RuntimeException("fail"))) + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = 60.second, + lookup = ManagedLookup(watchableLookup.lookup) + ) + testResult <- + managedCache.use { cache => + for { + notAcquiredBeforeAnything <- watchableLookup.assertCalledNum(key = ())(equalTo(0)) + resourceManagedProxy = cache.get(key = ()) + notAcquireYetAfterGettingManagedFromCache <- watchableLookup.assertCalledNum(key = ())(equalTo(0)) + _ <- resourceManagedProxy.use(ZIO.succeed(_)).either + acquireAndCleanedRightAway <- watchableLookup.assertAllCleanedForKey(()) + _ <- resourceManagedProxy.use(ZIO.succeed(_)).either + didNotTryToCreateAnOtherResource <- watchableLookup.assertCalledNum(key = ())(equalTo(1)) + } yield notAcquiredBeforeAnything && notAcquireYetAfterGettingManagedFromCache && acquireAndCleanedRightAway && didNotTryToCreateAnOtherResource + } + } yield testResult + }, + testM("concurrent use on managed returned by a single call to .get should create only one resource") { + for { + subResource <- ObservableResourceForTest.makeUnit + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = 60.second, + lookup = ManagedLookup((_: Unit) => subResource.managed) + ) + checkInside <- + managedCache.use { cache => + val managed = cache.get(key = ()) + for { + Reservation(acquire1, release1) <- managed.reserve + Reservation(acquire2, release2) <- managed.reserve + acquisition <- subResource.assertNotAcquired + _ <- acquire2 + acquireOnceAfterFirstUse <- subResource.assertAcquiredOnceAndNotCleaned + _ <- acquire1 + acquireOnceAfterSecondUse <- subResource.assertAcquiredOnceAndNotCleaned + _ <- release2(Exit.unit) + _ <- release1(Exit.unit) + stillNotCleanedForPotentialNextUse <- subResource.assertAcquiredOnceAndNotCleaned + } yield acquisition && acquireOnceAfterFirstUse && acquireOnceAfterSecondUse && stillNotCleanedForPotentialNextUse + } + finallyCleaned <- subResource.assertAcquiredOnceAndCleaned + } yield checkInside && finallyCleaned + }, + testM("concurrent use on failing managed should cache the error and immediately call the resource finalizer") { + for { + watchableLookup <- + WatchableLookup.makeZIO[Unit, Throwable, Nothing](_ => ZIO.fail(new RuntimeException("fail"))) + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = 60.second, + lookup = ManagedLookup(watchableLookup.lookup) + ) + testResult <- + managedCache.use { cache => + for { + notAcquiredBeforeAnything <- watchableLookup.assertCalledNum(key = ())(equalTo(0)) + resourceManagedProxy = cache.get(key = ()) + notAcquireYetAfterGettingManagedFromCache <- watchableLookup.assertCalledNum(key = ())(equalTo(0)) + _ <- resourceManagedProxy.use(ZIO.succeed(_)).either <&> resourceManagedProxy.use(ZIO.succeed(_)).either + acquireAndCleanedRightAway <- watchableLookup.assertAllCleanedForKey(()) + didNotTryToCreateAnOtherResource <- watchableLookup.assertCalledNum(key = ())(equalTo(1)) + } yield notAcquiredBeforeAnything && notAcquireYetAfterGettingManagedFromCache && acquireAndCleanedRightAway && didNotTryToCreateAnOtherResource + } + } yield testResult + }, + testM( + "when two managed returned by two .get call live longer than the cache, the real created subresource should be cleaned only use it's not in use anymore" + ) { + for { + subResource <- ObservableResourceForTest.makeUnit + managedCache = ManagedCache.make(1, 60.second, ManagedLookup((_: Unit) => subResource.managed)) + (release1, release2) <- managedCache.use { cache => + for { + Reservation(acquire1, release1) <- cache.get(key = ()).reserve + Reservation(acquire2, release2) <- cache.get(key = ()).reserve + _ <- acquire2 + _ <- acquire1 + } yield (release1, release2) + } + afterSharedManagerLife <- subResource.assertAcquiredOnceAndNotCleaned + _ <- release1(Exit.unit) + afterFirstSubClean <- subResource.assertAcquiredOnceAndNotCleaned + _ <- release2(Exit.unit) + afterSecondSubClean <- subResource.assertAcquiredOnceAndCleaned + } yield afterSharedManagerLife && afterFirstSubClean && afterSecondSubClean + }, + testM( + "when two managed obtained by a single managed returned by a single .get call live longer than the cache, the real created subresource should be cleaned only use it's not in use anymore" + ) { + for { + subResource <- ObservableResourceForTest.makeUnit + managedCache = ManagedCache.make(1, 60.second, ManagedLookup((_: Unit) => subResource.managed)) + (release1, release2) <- managedCache.use { cache => + val manager = cache.get(key = ()) + for { + Reservation(acquire1, release1) <- manager.reserve + Reservation(acquire2, release2) <- manager.reserve + _ <- acquire2 + _ <- acquire1 + } yield (release1, release2) + } + afterSharedManagerLife <- subResource.assertAcquiredOnceAndNotCleaned + _ <- release1(Exit.unit) + afterFirstSubClean <- subResource.assertAcquiredOnceAndNotCleaned + _ <- release2(Exit.unit) + afterSecondSubClean <- subResource.assertAcquiredOnceAndCleaned + } yield afterSharedManagerLife && afterFirstSubClean && afterSecondSubClean + }, + testM("should clean old resource if cache size is exceeded") { + val genTestInput = for { + cacheSize <- Gen.int(1, 5) + numCreatedKey <- Gen.int(cacheSize, cacheSize + 3) + } yield (cacheSize, numCreatedKey) + checkM(genTestInput) { case (cacheSize, numCreatedKey) => + for { + watchableLookup <- WatchableLookup.make[Int, Unit]((_: Int) => ()) + managedCache = ManagedCache.make(cacheSize, 60.second, ManagedLookup(watchableLookup.lookup)) + testResult <- + managedCache.use { cache => + for { + _ <- ZIO.foreach_((0 until numCreatedKey).toList) { key => + cache.get(key).use_(ZIO.unit) + } + createdResource <- watchableLookup.createdResources + oldestResourceCleaned <- + assertAllM( + (0 until numCreatedKey - cacheSize).flatMap(createdResource).map(_.assertAcquiredOnceAndCleaned) + ) + newestResourceNotCleanedYet <- assertAllM( + (numCreatedKey - cacheSize until numCreatedKey) + .flatMap(createdResource) + .map(_.assertAcquiredOnceAndNotCleaned) + ) + } yield oldestResourceCleaned && newestResourceNotCleanedYet + } + } yield testResult + } + } + ), + suite("`refresh` method")( + testM("should update the cache with a new value") { + def inc(n: Int) = n * 10 + + def retrieve(multiplier: Ref[Int])(key: Int) = + multiplier + .updateAndGet(inc) + .map(key * _) + + val seed = 1 + val key = 123 + for { + ref <- Ref.make(seed) + managedCache = ManagedCache.make( + 1, + Duration.Infinity, + ManagedLookup((key: Int) => Managed.fromEffect(retrieve(ref)(key))) + ) + result <- managedCache.use { cache => + for { + val1 <- cache.get(key).use(ZIO.succeed(_)) + _ <- cache.refresh(key) + val2 <- cache.get(key).use(ZIO.succeed(_)) + val3 <- cache.get(key).use(ZIO.succeed(_)) + } yield assert(val2)(equalTo(val3) && equalTo(inc(val1))) + } + } yield result + }, + testM("should clean old resource when making a new one") { + for { + watchableLookup <- WatchableLookup.makeUnit + _ <- ZIO.unit + managedCache = ManagedCache.make( + 1, + Duration.Infinity, + ManagedLookup(watchableLookup.lookup) + ) + result <- managedCache.use { cache => + for { + _ <- cache.get(key = ()).use_(ZIO.unit) + _ <- cache.refresh(key = ()) + createdResources <- watchableLookup.createdResources.map(_.apply(key = ())) + firstResourceCleaned <- createdResources.head.assertAcquiredOnceAndCleaned + secondResourceNotCleaned <- createdResources(1).assertAcquiredOnceAndNotCleaned + } yield firstResourceCleaned && secondResourceNotCleaned + } + } yield result + }, + testM("should update the cache with a new value even if the last `get` or `refresh` failed") { + val error = new RuntimeException("Must be a multiple of 3") + + def inc(n: Int) = n + 1 + + def retrieve(number: Ref[Int])(key: Int) = + number + .updateAndGet(inc) + .flatMap { + case n if n % 3 == 0 => + ZIO.fail(error) + case n => + ZIO.succeed(key * n) + } + + val seed = 2 + val key = 1 + for { + ref <- Ref.make(seed) + managedCache = ManagedCache.make( + capacity = 1, + timeToLive = Duration.Infinity, + lookup = ManagedLookup((key: Int) => Managed.fromEffect(retrieve(ref)(key))) + ) + result <- managedCache.use { cache => + for { + failure1 <- cache.get(key).use(ZIO.succeed(_)).either + _ <- cache.refresh(key) + val1 <- cache.get(key).use(ZIO.succeed(_)).either + _ <- cache.refresh(key) + failure2 <- cache.refresh(key).either + _ <- cache.refresh(key) + val2 <- cache.get(key).use(ZIO.succeed(_)).either + } yield assert(failure1)(isLeft(equalTo(error))) && + assert(failure2)(isLeft(equalTo(error))) && + assert(val1)(isRight(equalTo(4))) && + assert(val2)(isRight(equalTo(7))) + } + } yield result + }, + testM("should create and acquire subresource if the key doesn't exist in the cache") { + val capacity = 100 + val managedCache = ManagedCache.make(capacity, Duration.Infinity, ManagedLookup((_: Int) => Managed.unit)) + managedCache.use { cache => + for { + count0 <- cache.size + _ <- ZIO.foreach_(1 to capacity)(cache.refresh(_)) + count1 <- cache.size + } yield assert(count0)(equalTo(0)) && assert(count1)(equalTo(capacity)) + } + }, + testM("should clean old resource if cache size is exceeded") { + val genTestInput = for { + cacheSize <- Gen.int(1, 5) + numCreatedKey <- Gen.int(cacheSize, cacheSize + 3) + } yield (cacheSize, numCreatedKey) + checkM(genTestInput) { case (cacheSize, numCreatedKey) => + for { + watchableLookup <- WatchableLookup.make[Int, Unit]((_: Int) => ()) + managedCache = ManagedCache.make(cacheSize, 60.second, ManagedLookup(watchableLookup.lookup)) + testResult <- + managedCache.use { cache => + for { + _ <- ZIO.foreach_((0 until numCreatedKey).toList) { key => + cache.refresh(key) + } + createdResource <- watchableLookup.createdResources + oldestResourceCleaned <- + assertAllM( + (0 until numCreatedKey - cacheSize).flatMap(createdResource).map(_.assertAcquiredOnceAndCleaned) + ) + newestResourceNotCleanedYet <- assertAllM( + (numCreatedKey - cacheSize until numCreatedKey) + .flatMap(createdResource) + .map(_.assertAcquiredOnceAndNotCleaned) + ) + } yield oldestResourceCleaned && newestResourceNotCleanedYet + } + } yield testResult + } + } + ), + suite("expiration")( + suite("get")( + testM( + "managed returned by .get should recall lookup function if resource is too old and release the previous resource" + ) { + for { + watchableLookup <- WatchableLookup.makeUnit + fakeClock <- MockedJavaClock.make + result <- + ManagedCache + .makeWith(capacity = 10, managedLookup = ManagedLookup(watchableLookup.lookup), clock = fakeClock) { + (_: Exit[Nothing, Unit]) => + 10.second + } + .use { (managedCache: ManagedCache[Unit, Nothing, Unit]) => + val subManaged = managedCache.get(()) + for { + _ <- subManaged.use_(ZIO.unit) + _ <- fakeClock.advance(5.second) + _ <- subManaged.use_(ZIO.unit) + oneResourceCreatedAfter5second <- watchableLookup.assertCalledNum(key = ())(equalTo(1)) + _ <- fakeClock.advance(4.second) + _ <- subManaged.use_(ZIO.unit) + oneResourceCreatedAfter9second <- watchableLookup.assertCalledNum(key = ())(equalTo(1)) + _ <- fakeClock.advance(2.second) + _ <- subManaged.use_(ZIO.unit) + twoResourceCreatedAfter11second <- watchableLookup.assertCalledNum(key = ())(equalTo(2)) + previousResourceCleaned <- watchableLookup.assertFirstNCreatedResourceCleaned(key = (), 1) + } yield oneResourceCreatedAfter5second && oneResourceCreatedAfter9second && twoResourceCreatedAfter11second && previousResourceCleaned + } + } yield result + }, + testM( + "get should recall lookup function if resource is too old and release old resource (when using multiple time the managed given by .get)" + ) { + for { + watchableLookup <- WatchableLookup.makeUnit + fakeClock <- MockedJavaClock.make + result <- ManagedCache + .makeWith(10, ManagedLookup(watchableLookup.lookup), fakeClock) { (_: Exit[Nothing, Unit]) => + 10.second + } + .use { (managedCache: ManagedCache[Unit, Nothing, Unit]) => + val useGetManaged = managedCache.get(key = ()).use_(ZIO.unit) + for { + _ <- useGetManaged + _ <- fakeClock.advance(5.second) + _ <- useGetManaged + after5second <- watchableLookup.assertCalledNum(key = ())(equalTo(1)) + _ <- fakeClock.advance(4.second) + _ <- useGetManaged + after9second <- watchableLookup.assertCalledNum(key = ())(equalTo(1)) + _ <- fakeClock.advance(2.second) + _ <- useGetManaged + after11second <- watchableLookup.assertCalledNum(key = ())(equalTo(2)) + previousResourcesCleaned <- watchableLookup.assertFirstNCreatedResourceCleaned(key = (), 1) + } yield after5second && after9second && after11second && previousResourcesCleaned + } + } yield result + }, + testM( + "when resource get expired but still used it should wait until resource is not cleaned anymore to clean immediately" + ) { + for { + watchableLookup <- WatchableLookup.makeUnit + fakeClock <- MockedJavaClock.make + result <- ManagedCache + .makeWith(10, ManagedLookup(watchableLookup.lookup), fakeClock) { (_: Exit[Nothing, Unit]) => + 10.second + } + .use { (managedCache: ManagedCache[Unit, Nothing, Unit]) => + for { + Reservation(acquire, release) <- managedCache.get(()).reserve + _ <- acquire + _ <- fakeClock.advance(11.second) + _ <- managedCache.get(()).use_(ZIO.unit) + twoResourcesCreated <- watchableLookup.assertCalledNum(key = ())(equalTo(2)) + firstCreatedResource <- watchableLookup.firstCreatedResource(key = ()) + notCleanedBeforeItFinishToBeUse <- firstCreatedResource.assertAcquiredOnceAndNotCleaned + _ <- release(Exit.unit) + finallyCleanedAfterItsUsed <- firstCreatedResource.assertAcquiredOnceAndCleaned + } yield twoResourcesCreated && notCleanedBeforeItFinishToBeUse && finallyCleanedAfterItsUsed + } + } yield result + } + ), + suite("refresh")( + testM("should not clean the resource if it's not yet expired until the new resource is ready") { + for { + watchableLookup <- WatchableLookup.makeUnit + fakeClock <- MockedJavaClock.make + result <- ManagedCache + .makeWith(10, ManagedLookup(watchableLookup.lookup), fakeClock) { (_: Exit[Nothing, Unit]) => + 10.second + } + .use { (managedCache: ManagedCache[Unit, Nothing, Unit]) => + for { + _ <- managedCache.get(key = ()).use_(ZIO.unit) + _ <- fakeClock.advance(9.second) + _ <- watchableLookup.lock + refreshFiber <- managedCache.refresh(key = ()).fork + _ <- watchableLookup + .getCalledNum(key = ()) + .repeat( + (Schedule.recurWhile[Int](_ < 1) >>> Schedule.elapsed).whileOutput(_ < 100.millis) + ) + _ <- ZIO.sleep(100.millis).provideLayer(Clock.live) + secondLookupCalled <- watchableLookup.assertCalledNum(key = ())(equalTo(2)) + firstCreatedResource <- watchableLookup.firstCreatedResource(key = ()) + firstResourceNotYetCleaned <- firstCreatedResource.assertAcquiredOnceAndNotCleaned + _ <- watchableLookup.unlock + _ <- refreshFiber.join + firsResourceFinallyCleaned <- firstCreatedResource.assertAcquiredOnceAndCleaned + } yield secondLookupCalled && firstResourceNotYetCleaned && firsResourceFinallyCleaned + } + } yield result + }, + testM("should clean the resource if it's expired and not in used") { + for { + watchableLookup <- WatchableLookup.makeUnit + fakeClock <- MockedJavaClock.make + result <- ManagedCache + .makeWith(10, ManagedLookup(watchableLookup.lookup), fakeClock) { (_: Exit[Nothing, Unit]) => + 10.second + } + .use { (managedCache: ManagedCache[Unit, Nothing, Unit]) => + for { + _ <- managedCache.get(key = ()).use_(ZIO.unit) + _ <- fakeClock.advance(11.second) + _ <- watchableLookup.lock + refreshFiber <- managedCache.refresh(key = ()).fork + _ <- watchableLookup + .getCalledNum(key = ()) + .repeat( + (Schedule.recurWhile[Int](_ < 1) >>> Schedule.elapsed).whileOutput(_ < 100.millis) + ) + _ <- ZIO.sleep(100.millis).provideLayer(Clock.live) + secondLookupCalled <- watchableLookup.assertCalledNum(key = ())(equalTo(2)) + firstResourceCleaned <- watchableLookup.assertFirstNCreatedResourceCleaned(key = (), 1) + _ <- watchableLookup.unlock + _ <- refreshFiber.join + } yield secondLookupCalled && firstResourceCleaned + } + } yield result + }, + testM("should wait to clean expired resource until it's not in use anymore") { + for { + watchableLookup <- WatchableLookup.makeUnit + fakeClock <- MockedJavaClock.make + result <- ManagedCache + .makeWith(10, ManagedLookup(watchableLookup.lookup), fakeClock) { (_: Exit[Nothing, Unit]) => + 10.second + } + .use { (managedCache: ManagedCache[Unit, Nothing, Unit]) => + for { + Reservation(acquire, release) <- managedCache.get(key = ()).reserve + _ <- acquire + _ <- fakeClock.advance(11.second) + _ <- managedCache.refresh(key = ()) + secondLookupCalled <- watchableLookup.assertCalledNum(key = ())(equalTo(2)) + firstCreatedResource <- watchableLookup.firstCreatedResource(key = ()) + firstResourceNotYetCleaned <- firstCreatedResource.assertAcquiredOnceAndNotCleaned + _ <- release(Exit.unit) + firsResourceFinallyCleaned <- firstCreatedResource.assertAcquiredOnceAndCleaned + } yield secondLookupCalled && firstResourceNotYetCleaned && firsResourceFinallyCleaned + } + } yield result + } + ) + ), + suite("property base testing")( + testM( + "after any suite of balanced resource use, cleaning the cache should release all underlying resources" + ) { + import PropertyBaseTestingUtil._ + checkM(balancedSequenceOfAcquireReleaseAndRefresh, Gen.int(1, 20)) { (resourceOperations, cacheSize) => + for { + watchableLookup <- WatchableLookup.make[Key, Unit]((_: Key) => ()) + releasers <- + ManagedCache + .makeWith(cacheSize, managedLookup = ManagedLookup(watchableLookup.lookup)) { + (_: Exit[Nothing, Unit]) => + 10.second + } + .use(applyResourceOperations(_, resourceOperations)) + allCleaned <- watchableLookup.assertAllCleaned + } yield allCleaned && assert(releasers)(isEmpty) + } + }, + testM( + "after any suite of resource use, cleaning the cache should only resource not in used, others should be cleaned after there are not used anymore" + ) { + import PropertyBaseTestingUtil._ + checkM(sequenceOfAcquireReleaseAndRefreshLettingSomeResourceUsed, Gen.int(1, 20)) { + case (ResourceOperationsAndResult(resourceOperations, resourceCleaned, resourceNotCleaned), cacheSize) => + for { + watchableLookup <- WatchableLookup.make[Key, Unit]((_: Key) => ()) + notUsedReleasers <- + ManagedCache + .make(capacity = cacheSize, timeToLive = 10.second, lookup = ManagedLookup(watchableLookup.lookup)) + .use(applyResourceOperations(_, resourceOperations)) + allResourceNotInUseAnymoreCleaned <- + assertAllM(resourceCleaned.map(watchableLookup.assertAllCleanedForKey)) + + allResourceInUseAnymoreNotCleaned <- + assertAllM(resourceNotCleaned.map(watchableLookup.assertAtLeastOneResourceNotCleanedForKey)) + _ <- ZIO.foreach(notUsedReleasers.values)(_.apply(Exit.unit)) + allCleanedAfterAllResourceAreNotInUseAnymore <- watchableLookup.assertAllCleaned + } yield allResourceNotInUseAnymoreCleaned && allResourceInUseAnymoreNotCleaned && allCleanedAfterAllResourceAreNotInUseAnymore + } + }, + testM( + "after any suite of balanced resource use, cleaning the cache should release all underlying resources even if some resource acquisition did fail" + ) { + import PropertyBaseTestingUtil._ + case class TestInput( + operations: List[ResourceOperation], + cacheSize: Int, + sequenceOfFailureOrSuccess: List[Boolean] + ) + val genTestInput = for { + operations <- balancedSequenceOfAcquireReleaseAndRefresh + cacheSize <- Gen.int(1, 10) + sequenceOfFailureOrSuccess <- Gen.listOfN(operations.length)(Gen.boolean) + } yield TestInput(operations, cacheSize, sequenceOfFailureOrSuccess) + + checkM(genTestInput) { case TestInput(resourceOperations, cacheSize, sequenceOfFailureOrSuccess) => + for { + lookupCallNum <- Ref.make(0) + watchableLookup <- WatchableLookup.makeZIO[Key, Throwable, Unit] { (_: Key) => + lookupCallNum.getAndUpdate(_ + 1).flatMap { index => + ZIO.fail(new RuntimeException("fail")).unless(sequenceOfFailureOrSuccess(index)) + } + } + releasers <- + ManagedCache + .make(capacity = cacheSize, timeToLive = 10.second, lookup = ManagedLookup(watchableLookup.lookup)) + .use(applyResourceOperations(_, resourceOperations, ignoreCacheError = true)) + allCleaned <- watchableLookup.assertAllCleaned + } yield allCleaned && assert(releasers)(isEmpty) + } + } + ) + ) + + type Releaser = Exit[Any, Any] => UIO[Any] + + sealed trait ObservableResourceForTest[E, V] { + def assertNotAcquired: UIO[TestResult] + + def assertAcquiredOnceAndNotCleaned: UIO[TestResult] + + def assertAcquiredOnceAndCleaned: UIO[TestResult] + + def managed: Managed[E, V] + } + + object ObservableResourceForTest { + def makeUnit: UIO[ObservableResourceForTest[Nothing, Unit]] = make(value = ()) + + def make[V](value: V): UIO[ObservableResourceForTest[Nothing, V]] = makeZIO(ZIO.succeed(value)) + + def makeZIO[E, V](effect: IO[E, V]): UIO[ObservableResourceForTest[E, V]] = for { + resourceAcquisitionCount <- Ref.make(0) + resourceAcquisitionReleasing <- Ref.make(0) + getState = resourceAcquisitionCount.get <*> resourceAcquisitionReleasing.get + } yield new ObservableResourceForTest[E, V] { + + override def assertNotAcquired: UIO[TestResult] = getState.map { case (numAcquisition, numCleaned) => + assert(numAcquisition)(equalTo(0).label(s"Resource acquired when it should not have")) && + assert(numCleaned)(equalTo(0).label(s"Resource cleaned when it should not have")) + } + + override def assertAcquiredOnceAndNotCleaned: UIO[TestResult] = getState.map { + case (numAcquisition, numCleaned) => + assert(numAcquisition)(equalTo(1).label(s"Resource not acquired once")) && + assert(numCleaned)(equalTo(0).label(s"Resource cleaned when it should not have")) + } + + override def assertAcquiredOnceAndCleaned: UIO[TestResult] = getState.map { case (numAcquisition, numCleaned) => + assert(numAcquisition)(equalTo(1).label(s"Resource not acquired once")) && + assert(numCleaned)(equalTo(1).label(s"Resource not cleaned when it should have")) + } + + override def managed: Managed[E, V] = ZManaged.makeReserve( + ZIO.succeed( + Reservation( + acquire = resourceAcquisitionCount.update(_ + 1) *> effect, + release = { _ => resourceAcquisitionReleasing.update(_ + 1) } + ) + ) + ) + } + } + + sealed trait WatchableLookup[K, E, V] { + def lookup(key: K): Managed[E, V] + + def createdResources: UIO[Map[K, List[ObservableResourceForTest[E, V]]]] + + def assertCalledNum(key: K)(sizeAssertion: Assertion[Int]): UIO[TestResult] = + assertM(getCalledNum(key))(sizeAssertion) + + def getCalledNum(key: K): UIO[Int] = createdResources.map(_.get(key).fold(ifEmpty = 0)(_.length)) + + def assertFirstNCreatedResourceCleaned(key: K, num: Int): UIO[TestResult] = + createdResources.flatMap { resources => + resourcesCleaned(resources.getOrElse(key, List.empty).take(num)) + } + + private def resourcesCleaned(resources: Iterable[ObservableResourceForTest[E, V]]): UIO[TestResult] = + assertAllM(resources.map(_.assertAcquiredOnceAndCleaned)) + + def assertAllCleaned: UIO[TestResult] = createdResources.flatMap { resources => + resourcesCleaned(resources.values.flatten) + } + + def assertAllCleanedForKey(key: K): UIO[TestResult] = createdResources.flatMap { resources => + resourcesCleaned(resources.getOrElse(key, List.empty)) + } + + def assertAtLeastOneResourceNotCleanedForKey(key: K): UIO[TestResult] = createdResources.flatMap { resources => + assertOneOfM(resources.getOrElse(key, List.empty).map(_.assertAcquiredOnceAndNotCleaned)) + } + + def lock: UIO[Unit] + + def unlock: UIO[Unit] + + def firstCreatedResource(key: K): UIO[ObservableResourceForTest[E, V]] = + createdResources.map(_.apply(key = key).head) + } + + object WatchableLookup { + def makeZIO[K, E, V](concreteLookup: K => IO[E, V]): UIO[WatchableLookup[K, E, V]] = for { + blocked <- Ref.make(false) + resources <- Ref.make(Map.empty[K, List[ObservableResourceForTest[E, V]]]) + } yield new WatchableLookup[K, E, V] { + override def lookup(key: K): Managed[E, V] = Managed.unwrap(for { + observableResource <- ObservableResourceForTest.makeZIO(concreteLookup(key)) + _ <- resources.update { resourceMap => + val newResource = resourceMap.get(key).getOrElse(List.empty) ++ List(observableResource) + resourceMap.updated(key, newResource) + } + _ <- blocked.get + .repeat(Schedule.recurWhile[Boolean](identity) && Schedule.exponential(10.millis, 2.0)) + .provideLayer(Clock.live) + } yield observableResource.managed) + + override val createdResources: UIO[Map[K, List[ObservableResourceForTest[E, V]]]] = resources.get + + override val lock = blocked.set(true) + override val unlock = blocked.set(false) + } + + def make[K, V](concreteLookup: K => V): UIO[WatchableLookup[K, Nothing, V]] = makeZIO( + concreteLookup.andThen(ZIO.succeed(_)) + ) + + def makeUnit: UIO[WatchableLookup[Unit, Nothing, Unit]] = make((_: Unit) => ()) + } + + private def hash(x: Int): Int => Int = + y => (x ^ y).hashCode + + object PropertyBaseTestingUtil { + type Key = Char + type ResourceIdForKey = Int + sealed trait ResourceOperation + case class ResourceId(key: Key, resourceIdForKey: ResourceIdForKey) + case class Acquire(id: ResourceId) extends ResourceOperation + case class Release(id: ResourceId) extends ResourceOperation + case class Refresh(key: Key) extends ResourceOperation + case class Invalidate(key: Key) extends ResourceOperation + + case class ResourceOperationsAndResult( + operations: List[ResourceOperation], + keyWithCleanedResource: Set[Key], + keyWithUncleanedResource: Set[Key] + ) + + private def sequenceOfAcquireReleaseAndRefreshRec( + previousResourceOperation: List[ResourceOperation], + allKey: Set[Key], + notOpenYet: Set[ResourceId], + openedButNotCleaned: Set[ResourceId], + acceptResourceNotCleaned: Boolean + ): Gen[Random, ResourceOperationsAndResult] = if ( + notOpenYet.isEmpty && (openedButNotCleaned.isEmpty || acceptResourceNotCleaned) + ) { + val keyWithUncleanedResource = openedButNotCleaned.map(_.key) + Gen.const( + ResourceOperationsAndResult( + operations = previousResourceOperation, + keyWithCleanedResource = allKey -- keyWithUncleanedResource, + keyWithUncleanedResource = keyWithUncleanedResource + ) + ) + } else { + val acquireOrRelease = + Gen.elements[ResourceOperation]( + (notOpenYet.toList.map(Acquire.apply) ++ openedButNotCleaned.toList.map(Release.apply)): _* + ) + val refresh = Gen.elements[ResourceOperation](allKey.map(Refresh.apply).toList: _*) + val invalidatePresent = Gen.elements[ResourceOperation](openedButNotCleaned.map { totalId => + Invalidate(totalId.key) + }.toList: _*) + val invalidateNotPresent = + Gen.elements[ResourceOperation]((allKey -- openedButNotCleaned.map(_.key)).map(Invalidate.apply).toList: _*) + for { + nextOp <- + Gen.weighted((acquireOrRelease, 8.0), (refresh, 2.0), (invalidatePresent, 2.0), (invalidateNotPresent, 1.0)) + (newOpened, newNotOpenYet) = nextOp match { + case Acquire(id) => (notOpenYet - id, openedButNotCleaned + id) + case Release(id) => (notOpenYet, openedButNotCleaned - id) + case Refresh(_) => (notOpenYet, openedButNotCleaned) + case Invalidate(_) => (notOpenYet, openedButNotCleaned) + } + result <- + sequenceOfAcquireReleaseAndRefreshRec( + previousResourceOperation = previousResourceOperation ++ List(nextOp), + allKey = allKey, + notOpenYet = newOpened, + openedButNotCleaned = newNotOpenYet, + acceptResourceNotCleaned = acceptResourceNotCleaned + ) + } yield result + } + + val balancedSequenceOfAcquireReleaseAndRefresh: Gen[Random with Sized, List[ResourceOperation]] = { + val someKey = Gen.alphaChar + val numResourcesCreatedPerKey = Gen.int(1, 10) + Gen.mapOf(someKey, numResourcesCreatedPerKey).flatMap { numPairByKey => + sequenceOfAcquireReleaseAndRefreshRec( + previousResourceOperation = List.empty, + allKey = numPairByKey.keySet, + notOpenYet = numPairByKey.flatMap { case (key, numPair) => (1 to numPair).map(ResourceId(key, _)) }.toSet, + openedButNotCleaned = Set.empty, + acceptResourceNotCleaned = false + ).map(_.operations) + } + } + + val sequenceOfAcquireReleaseAndRefreshLettingSomeResourceUsed + : Gen[Random with Sized, ResourceOperationsAndResult] = { + val someKey = Gen.alphaChar + val numResourcesCreatedPerKey = Gen.int(1, 10) + Gen.mapOf(someKey, numResourcesCreatedPerKey).flatMap { numPairByKey => + sequenceOfAcquireReleaseAndRefreshRec( + previousResourceOperation = List.empty, + allKey = numPairByKey.keySet, + notOpenYet = numPairByKey.flatMap { case (key, numPair) => (1 to numPair).map(ResourceId(key, _)) }.toSet, + openedButNotCleaned = Set.empty, + acceptResourceNotCleaned = true + ) + } + } + + def applyResourceOperations[V]( + managedCache: ManagedCache[Key, Throwable, V], + resourceOperations: List[ResourceOperation], + ignoreCacheError: Boolean = false + ): IO[TestFailure[Nothing], Map[ResourceId, Releaser]] = + for { + notUsedReleasers <- + ZIO.foldLeft(resourceOperations)(Map.empty[ResourceId, Releaser]) { (releasers, resourceOperation) => + resourceOperation match { + case Acquire(totalId @ ResourceId(key, _)) => + managedCache + .get(key) + .reserve + .flatMap { case Reservation(acquire, release) => + acquire.as(releasers.updated(totalId, release)) + } + .catchAll { error => + if (ignoreCacheError) { + ZIO.succeed(releasers) + } else { + ZIO.fail(TestFailure.die(error)) + } + } + case Release(totalId) => + releasers.get(totalId) match { + case None => + ZIO + .fail(TestFailure.die(new RuntimeException("release before acquire"))) + .unless(ignoreCacheError) + .as(releasers) + case Some(releaser) => + releaser(Exit.unit).as(releasers - totalId) + } + case Refresh(key) => + managedCache + .refresh(key) + .catchAll { error => + ZIO.fail(TestFailure.die(error)).unless(ignoreCacheError) + } + .as(releasers) + case Invalidate(key) => + managedCache + .invalidate(key) + .as(releasers) + } + } + } yield notUsedReleasers + } + + def assertAllM(results: Iterable[UIO[TestResult]]): UIO[TestResult] = + ZIO.foldLeft(results)(assertCompletes)((l, r) => r.map(l && _)) + + def assertOneOfM(results: Iterable[UIO[TestResult]]): UIO[TestResult] = + ZIO.foldLeft(results)(assertCompletes.negate)((l, r) => r.map(l || _)) +} diff --git a/zio-cache/shared/src/test/scala/zio/cache/MockedJavaClock.scala b/zio-cache/shared/src/test/scala/zio/cache/MockedJavaClock.scala new file mode 100644 index 0000000..0d61420 --- /dev/null +++ b/zio-cache/shared/src/test/scala/zio/cache/MockedJavaClock.scala @@ -0,0 +1,24 @@ +package zio.cache + +import zio.duration.Duration +import zio.{Ref, Runtime, UIO} + +import java.time.{Clock, Instant, ZoneId} + +abstract class MockedJavaClock extends Clock { + def advance(duration: Duration): UIO[Unit] +} + +object MockedJavaClock { + def make: UIO[MockedJavaClock] = for { + ref <- Ref.make(Instant.now()) + } yield new MockedJavaClock { + override def getZone: ZoneId = ??? + override def instant(): Instant = Runtime.default.unsafeRun { + ref.get + } + + override def withZone(zone: ZoneId): Clock = ??? + override def advance(duration: Duration): UIO[Unit] = ref.update(_.plusNanos(duration.toNanos)) + } +}